博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊scalecube-cluster的FailureDetector
阅读量:6525 次
发布时间:2019-06-24

本文共 15509 字,大约阅读时间需要 51 分钟。

  hot3.png

本文主要研究一下scalecube-cluster的FailureDetector

FailureDetector

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetector.java

/** * Failure Detector component responsible for monitoring availability of other members in the * cluster. This interface is supposed to be used internally as part cluster membership protocol. It * doesn't specify that particular node is failed, but just provide information that either it is * suspected or trusted at current moment of time. So it is up to cluster membership or other top * level component to define when suspected member is actually failed. */public interface FailureDetector {  /**   * Starts running failure detection algorithm. After started it begins to receive and send ping   * messages.   */  void start();  /** Stops running failure detection algorithm and releases occupied resources. */  void stop();  /** Listens for results of ping checks (ALIVE/SUSPECT) done periodically by failure detector. */  Flux
listen();}
  • FailureDetector定义了start、stop、listen方法

FailureDetectorImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {  private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class);  // Qualifiers  public static final String PING = "sc/fdetector/ping";  public static final String PING_REQ = "sc/fdetector/pingReq";  public static final String PING_ACK = "sc/fdetector/pingAck";  // Injected  private final Member localMember;  private final Transport transport;  private final FailureDetectorConfig config;  private final CorrelationIdGenerator cidGenerator;  // State  private long currentPeriod = 0;  private List
pingMembers = new ArrayList<>(); private int pingMemberIndex = 0; // index for sequential ping member selection // Disposables private final Disposable.Composite actionsDisposables = Disposables.composite(); // Subject private final FluxProcessor
subject = DirectProcessor.
create().serialize(); private final FluxSink
sink = subject.sink(); // Scheduled private final Scheduler scheduler; /** * Creates new instance of failure detector with given transport and settings. * * @param localMember local cluster member * @param transport cluster transport * @param membershipProcessor membership event processor * @param config failure detector settings * @param scheduler scheduler * @param cidGenerator correlationId generator */ public FailureDetectorImpl( Member localMember, Transport transport, Flux
membershipProcessor, FailureDetectorConfig config, Scheduler scheduler, CorrelationIdGenerator cidGenerator) { this.localMember = Objects.requireNonNull(localMember); this.transport = Objects.requireNonNull(transport); this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); this.cidGenerator = Objects.requireNonNull(cidGenerator); // Subscribe actionsDisposables.addAll( Arrays.asList( membershipProcessor // .publishOn(scheduler) .subscribe(this::onMemberEvent, this::onError), transport .listen() // .publishOn(scheduler) .subscribe(this::onMessage, this::onError))); } @Override public void start() { actionsDisposables.add( scheduler.schedulePeriodically( this::doPing, config.getPingInterval(), config.getPingInterval(), TimeUnit.MILLISECONDS)); } @Override public void stop() { // Stop accepting requests and sending pings actionsDisposables.dispose(); // Stop publishing events sink.complete(); } @Override public Flux
listen() { return subject.onBackpressureBuffer(); } //......}
  • FailureDetectorImpl实现了FailureDetector接口;它定义了3个message的qualifier分别是PING、PING_REQ、PING_ACK;同时也定义了pingMembers列表
  • FailureDetectorImpl的构造器订阅了membershipProcessor触发onMemberEvent方法,订阅了transport.listen()触发onMessage方法
  • start方法通过scheduler.schedulePeriodically注册了doPing任务,每隔pingInterval执行,默认是5000ms;stop方法会执行actionsDisposables.dispose()及sink.complete();listen()则返回subject.onBackpressureBuffer()

onMemberEvent

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {  //......  private void onMemberEvent(MembershipEvent event) {    Member member = event.member();    if (event.isRemoved()) {      pingMembers.remove(member);    }    if (event.isAdded()) {      // insert member into random positions      int size = pingMembers.size();      int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0;      pingMembers.add(index, member);    }  }  //......}
  • onMemberEvent会根据MembershipEvent来移除或使用随机的index添加member到pingMembers中

onMessage

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {  //......  private void onMessage(Message message) {    if (isPing(message)) {      onPing(message);    } else if (isPingReq(message)) {      onPingReq(message);    } else if (isTransitPingAck(message)) {      onTransitPingAck(message);    }  }  private boolean isPing(Message message) {    return PING.equals(message.qualifier());  }  private boolean isPingReq(Message message) {    return PING_REQ.equals(message.qualifier());  }  private boolean isTransitPingAck(Message message) {    return PING_ACK.equals(message.qualifier())        && message.
data().getOriginalIssuer() != null; } /** Listens to PING message and answers with ACK. */ private void onPing(Message message) { long period = this.currentPeriod; LOGGER.trace("Received Ping[{}]", period); PingData data = message.data(); if (!data.getTo().id().equals(localMember.id())) { LOGGER.warn( "Received Ping[{}] to {}, but local member is {}", period, data.getTo(), localMember); return; } String correlationId = message.correlationId(); Message ackMessage = Message.withData(data) .qualifier(PING_ACK) .correlationId(correlationId) .sender(localMember.address()) .build(); Address address = data.getFrom().address(); LOGGER.trace("Send PingAck[{}] to {}", period, address); transport .send(address, ackMessage) .subscribe( null, ex -> LOGGER.debug( "Failed to send PingAck[{}] to {}, cause: {}", period, address, ex.toString())); } /** Listens to PING_REQ message and sends PING to requested cluster member. */ private void onPingReq(Message message) { long period = this.currentPeriod; LOGGER.trace("Received PingReq[{}]", period); PingData data = message.data(); Member target = data.getTo(); Member originalIssuer = data.getFrom(); String correlationId = message.correlationId(); PingData pingReqData = new PingData(localMember, target, originalIssuer); Message pingMessage = Message.withData(pingReqData) .qualifier(PING) .correlationId(correlationId) .sender(localMember.address()) .build(); Address address = target.address(); LOGGER.trace("Send transit Ping[{}] to {}", period, address); transport .send(address, pingMessage) .subscribe( null, ex -> LOGGER.debug( "Failed to send transit Ping[{}] to {}, cause: {}", period, address, ex.toString())); } /** * Listens to ACK with message containing ORIGINAL_ISSUER then converts message to plain ACK and * sends it to ORIGINAL_ISSUER. */ private void onTransitPingAck(Message message) { long period = this.currentPeriod; LOGGER.trace("Received transit PingAck[{}]", period); PingData data = message.data(); Member target = data.getOriginalIssuer(); String correlationId = message.correlationId(); PingData originalAckData = new PingData(target, data.getTo()); Message originalAckMessage = Message.withData(originalAckData) .qualifier(PING_ACK) .correlationId(correlationId) .sender(localMember.address()) .build(); Address address = target.address(); LOGGER.trace("Resend transit PingAck[{}] to {}", period, address); transport .send(address, originalAckMessage) .subscribe( null, ex -> LOGGER.debug( "Failed to resend transit PingAck[{}] to {}, cause: {}", period, address, ex.toString())); } //......}
  • onMessage方法则根据消息的不同qualifier及originalIssuer信息来判断执行onPing或onPingReq或onTransitPingAck方法;onPing方法会返回PING_ACK消息给sender;onPingReq方法则会发送PING给pingReq指定的member;onTransitPingAck方法则会将pingReq请求返回的ack再转发回给originalIssuer

doPing

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {  //......  private void doPing() {    // Increment period counter    long period = currentPeriod++;    // Select ping member    Member pingMember = selectPingMember();    if (pingMember == null) {      return;    }    // Send ping    String cid = cidGenerator.nextCid();    PingData pingData = new PingData(localMember, pingMember);    Message pingMsg =        Message.withData(pingData)            .qualifier(PING)            .correlationId(cid)            .sender(localMember.address())            .build();    LOGGER.trace("Send Ping[{}] to {}", period, pingMember);    Address address = pingMember.address();    transport        .requestResponse(pingMsg, address)        .timeout(Duration.ofMillis(config.getPingTimeout()), scheduler)        .publishOn(scheduler)        .subscribe(            message -> {              LOGGER.trace("Received PingAck[{}] from {}", period, pingMember);              publishPingResult(period, pingMember, MemberStatus.ALIVE);            },            ex -> {              LOGGER.debug(                  "Failed to get PingAck[{}] from {} within {} ms",                  period,                  pingMember,                  config.getPingTimeout());              final int timeLeft = config.getPingInterval() - config.getPingTimeout();              final List
pingReqMembers = selectPingReqMembers(pingMember); if (timeLeft <= 0 || pingReqMembers.isEmpty()) { LOGGER.trace("No PingReq[{}] occurred", period); publishPingResult(period, pingMember, MemberStatus.SUSPECT); } else { doPingReq(currentPeriod, pingMember, pingReqMembers, cid); } }); } private Member selectPingMember() { if (pingMembers.isEmpty()) { return null; } if (pingMemberIndex >= pingMembers.size()) { pingMemberIndex = 0; Collections.shuffle(pingMembers); } return pingMembers.get(pingMemberIndex++); } private List
selectPingReqMembers(Member pingMember) { if (config.getPingReqMembers() <= 0) { return Collections.emptyList(); } List
candidates = new ArrayList<>(pingMembers); candidates.remove(pingMember); if (candidates.isEmpty()) { return Collections.emptyList(); } Collections.shuffle(candidates); boolean selectAll = candidates.size() < config.getPingReqMembers(); return selectAll ? candidates : candidates.subList(0, config.getPingReqMembers()); } private void doPingReq( long period, final Member pingMember, final List
pingReqMembers, String cid) { Message pingReqMsg = Message.withData(new PingData(localMember, pingMember)) .qualifier(PING_REQ) .correlationId(cid) .sender(localMember.address()) .build(); LOGGER.trace("Send PingReq[{}] to {} for {}", period, pingReqMembers, pingMember); Duration timeout = Duration.ofMillis(config.getPingInterval() - config.getPingTimeout()); pingReqMembers.forEach( member -> transport .requestResponse(pingReqMsg, member.address()) .timeout(timeout, scheduler) .publishOn(scheduler) .subscribe( message -> { LOGGER.trace( "Received transit PingAck[{}] from {} to {}", period, message.sender(), pingMember); publishPingResult(period, pingMember, MemberStatus.ALIVE); }, throwable -> { LOGGER.trace( "Timeout getting transit PingAck[{}] from {} to {} within {} ms", period, pingReqMembers, pingMember, timeout); publishPingResult(period, pingMember, MemberStatus.SUSPECT); })); } private void publishPingResult(long period, Member member, MemberStatus status) { LOGGER.debug("Member {} detected as {} at [{}]", member, status, period); sink.next(new FailureDetectorEvent(member, status)); } //......}
  • doPing方法首先递增currentPeriod,然后通过selectPingMember随机选择pingMember,之后构造pingData,然后通过transport.requestResponse发送请求,请求成功时执行publishPingResult,异常情况下则通过selectPingReqMembers随机选择pingReqMembers,在config.getPingInterval() - config.getPingTimeout()小于等于0或者pingReqMembers为空时则执行publishPingResult(period, pingMember, MemberStatus.SUSPECT),否则进行doPingReq
  • selectPingMember方法在pingMemberIndex大于等于pingMembers.size()的时候会重置该index为0,并执行Collections.shuffle(pingMembers),之后递增pingMemberIndex;selectPingReqMembers方法则基于pingMembers创建新的list然后移除pingMember得到candidates,之后进行Collections.shuffle(candidates),然后根据config.getPingReqMembers()来subList该candidates列表得到pingReqMembers
  • onPingReq方法则遍历pingReqMembers对其发送pingReqMsg,当接收到transit PingAck时则执行publishPingResult(period, pingMember, MemberStatus.ALIVE),出现异常时执行publishPingResult(period, pingMember, MemberStatus.SUSPECT);publishPingResult方法往sink里头放入FailureDetectorEvent事件

小结

  • FailureDetector定义了start、stop、listen方法;FailureDetectorImpl实现了FailureDetector接口;它定义了3个message的qualifier分别是PING、PING_REQ、PING_ACK;同时也定义了pingMembers列表;FailureDetectorImpl的构造器订阅了membershipProcessor触发onMemberEvent方法,订阅了transport.listen()触发onMessage方法;start方法通过scheduler.schedulePeriodically注册了doPing任务,每隔pingInterval执行,默认是5000ms;stop方法会执行actionsDisposables.dispose()及sink.complete();listen()则返回subject.onBackpressureBuffer()
  • onMemberEvent会根据MembershipEvent来移除或使用随机的index添加member到pingMembers中;onMessage方法则根据消息的不同qualifier及originalIssuer信息来判断执行onPing或onPingReq或onTransitPingAck方法;onPing方法会返回PING_ACK消息给sender;onPingReq方法则会发送PING给pingReq指定的member;onTransitPingAck方法则会将pingReq请求返回的ack再转发回给originalIssuer
  • doPing方法首先递增currentPeriod,然后通过selectPingMember随机选择pingMember,之后构造pingData,然后通过transport.requestResponse发送请求,请求成功时执行publishPingResult,异常情况下则通过selectPingReqMembers随机选择pingReqMembers,在config.getPingInterval() - config.getPingTimeout()小于等于0或者pingReqMembers为空时则执行publishPingResult(period, pingMember, MemberStatus.SUSPECT),否则进行doPingReq

doc

转载于:https://my.oschina.net/go4it/blog/3045379

你可能感兴趣的文章
透视校正插值
查看>>
【转载】WinCE6.0 Camera驱动源码分析(二)
查看>>
Cobertura代码覆盖率测试
查看>>
【selenium学习笔记一】python + selenium定位页面元素的办法。
查看>>
Linux禁止ping
查看>>
【Matplotlib】 标注一些点
查看>>
[AX]乐观并发控制Optimistic Concurrency Control
查看>>
自定义类加载器
查看>>
MySQL数据库事务各隔离级别加锁情况--Repeatable Read && MVCC(转)
查看>>
C++构造函数例程
查看>>
把某一列值转换为逗号分隔字符串
查看>>
DLL,DML,DCL,TCL in Oracle
查看>>
android之存储篇_存储方式总览
查看>>
AngularJS 拦截器和应用例子(转)
查看>>
SSE指令集学习:Compiler Intrinsic
查看>>
两种attach to process的方法
查看>>
WCF如何使用X509证书(安装和错误)(二)
查看>>
Dubbo与Zookeeper、SpringMVC整合和使用(负载均衡、容错)
查看>>
iOS中--NSArray调用方法详解 (李洪强)
查看>>
java异步操作实例
查看>>