序
本文主要研究一下scalecube-cluster的FailureDetector
FailureDetector
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetector.java
/** * Failure Detector component responsible for monitoring availability of other members in the * cluster. This interface is supposed to be used internally as part cluster membership protocol. It * doesn't specify that particular node is failed, but just provide information that either it is * suspected or trusted at current moment of time. So it is up to cluster membership or other top * level component to define when suspected member is actually failed. */public interface FailureDetector { /** * Starts running failure detection algorithm. After started it begins to receive and send ping * messages. */ void start(); /** Stops running failure detection algorithm and releases occupied resources. */ void stop(); /** Listens for results of ping checks (ALIVE/SUSPECT) done periodically by failure detector. */ Fluxlisten();}
- FailureDetector定义了start、stop、listen方法
FailureDetectorImpl
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
public final class FailureDetectorImpl implements FailureDetector { private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class); // Qualifiers public static final String PING = "sc/fdetector/ping"; public static final String PING_REQ = "sc/fdetector/pingReq"; public static final String PING_ACK = "sc/fdetector/pingAck"; // Injected private final Member localMember; private final Transport transport; private final FailureDetectorConfig config; private final CorrelationIdGenerator cidGenerator; // State private long currentPeriod = 0; private ListpingMembers = new ArrayList<>(); private int pingMemberIndex = 0; // index for sequential ping member selection // Disposables private final Disposable.Composite actionsDisposables = Disposables.composite(); // Subject private final FluxProcessor subject = DirectProcessor. create().serialize(); private final FluxSink sink = subject.sink(); // Scheduled private final Scheduler scheduler; /** * Creates new instance of failure detector with given transport and settings. * * @param localMember local cluster member * @param transport cluster transport * @param membershipProcessor membership event processor * @param config failure detector settings * @param scheduler scheduler * @param cidGenerator correlationId generator */ public FailureDetectorImpl( Member localMember, Transport transport, Flux membershipProcessor, FailureDetectorConfig config, Scheduler scheduler, CorrelationIdGenerator cidGenerator) { this.localMember = Objects.requireNonNull(localMember); this.transport = Objects.requireNonNull(transport); this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); this.cidGenerator = Objects.requireNonNull(cidGenerator); // Subscribe actionsDisposables.addAll( Arrays.asList( membershipProcessor // .publishOn(scheduler) .subscribe(this::onMemberEvent, this::onError), transport .listen() // .publishOn(scheduler) .subscribe(this::onMessage, this::onError))); } @Override public void start() { actionsDisposables.add( scheduler.schedulePeriodically( this::doPing, config.getPingInterval(), config.getPingInterval(), TimeUnit.MILLISECONDS)); } @Override public void stop() { // Stop accepting requests and sending pings actionsDisposables.dispose(); // Stop publishing events sink.complete(); } @Override public Flux listen() { return subject.onBackpressureBuffer(); } //......}
- FailureDetectorImpl实现了FailureDetector接口;它定义了3个message的qualifier分别是PING、PING_REQ、PING_ACK;同时也定义了pingMembers列表
- FailureDetectorImpl的构造器订阅了membershipProcessor触发onMemberEvent方法,订阅了transport.listen()触发onMessage方法
- start方法通过scheduler.schedulePeriodically注册了doPing任务,每隔pingInterval执行,默认是5000ms;stop方法会执行actionsDisposables.dispose()及sink.complete();listen()则返回subject.onBackpressureBuffer()
onMemberEvent
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
public final class FailureDetectorImpl implements FailureDetector { //...... private void onMemberEvent(MembershipEvent event) { Member member = event.member(); if (event.isRemoved()) { pingMembers.remove(member); } if (event.isAdded()) { // insert member into random positions int size = pingMembers.size(); int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0; pingMembers.add(index, member); } } //......}
- onMemberEvent会根据MembershipEvent来移除或使用随机的index添加member到pingMembers中
onMessage
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
public final class FailureDetectorImpl implements FailureDetector { //...... private void onMessage(Message message) { if (isPing(message)) { onPing(message); } else if (isPingReq(message)) { onPingReq(message); } else if (isTransitPingAck(message)) { onTransitPingAck(message); } } private boolean isPing(Message message) { return PING.equals(message.qualifier()); } private boolean isPingReq(Message message) { return PING_REQ.equals(message.qualifier()); } private boolean isTransitPingAck(Message message) { return PING_ACK.equals(message.qualifier()) && message.data().getOriginalIssuer() != null; } /** Listens to PING message and answers with ACK. */ private void onPing(Message message) { long period = this.currentPeriod; LOGGER.trace("Received Ping[{}]", period); PingData data = message.data(); if (!data.getTo().id().equals(localMember.id())) { LOGGER.warn( "Received Ping[{}] to {}, but local member is {}", period, data.getTo(), localMember); return; } String correlationId = message.correlationId(); Message ackMessage = Message.withData(data) .qualifier(PING_ACK) .correlationId(correlationId) .sender(localMember.address()) .build(); Address address = data.getFrom().address(); LOGGER.trace("Send PingAck[{}] to {}", period, address); transport .send(address, ackMessage) .subscribe( null, ex -> LOGGER.debug( "Failed to send PingAck[{}] to {}, cause: {}", period, address, ex.toString())); } /** Listens to PING_REQ message and sends PING to requested cluster member. */ private void onPingReq(Message message) { long period = this.currentPeriod; LOGGER.trace("Received PingReq[{}]", period); PingData data = message.data(); Member target = data.getTo(); Member originalIssuer = data.getFrom(); String correlationId = message.correlationId(); PingData pingReqData = new PingData(localMember, target, originalIssuer); Message pingMessage = Message.withData(pingReqData) .qualifier(PING) .correlationId(correlationId) .sender(localMember.address()) .build(); Address address = target.address(); LOGGER.trace("Send transit Ping[{}] to {}", period, address); transport .send(address, pingMessage) .subscribe( null, ex -> LOGGER.debug( "Failed to send transit Ping[{}] to {}, cause: {}", period, address, ex.toString())); } /** * Listens to ACK with message containing ORIGINAL_ISSUER then converts message to plain ACK and * sends it to ORIGINAL_ISSUER. */ private void onTransitPingAck(Message message) { long period = this.currentPeriod; LOGGER.trace("Received transit PingAck[{}]", period); PingData data = message.data(); Member target = data.getOriginalIssuer(); String correlationId = message.correlationId(); PingData originalAckData = new PingData(target, data.getTo()); Message originalAckMessage = Message.withData(originalAckData) .qualifier(PING_ACK) .correlationId(correlationId) .sender(localMember.address()) .build(); Address address = target.address(); LOGGER.trace("Resend transit PingAck[{}] to {}", period, address); transport .send(address, originalAckMessage) .subscribe( null, ex -> LOGGER.debug( "Failed to resend transit PingAck[{}] to {}, cause: {}", period, address, ex.toString())); } //......}
- onMessage方法则根据消息的不同qualifier及originalIssuer信息来判断执行onPing或onPingReq或onTransitPingAck方法;onPing方法会返回PING_ACK消息给sender;onPingReq方法则会发送PING给pingReq指定的member;onTransitPingAck方法则会将pingReq请求返回的ack再转发回给originalIssuer
doPing
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
public final class FailureDetectorImpl implements FailureDetector { //...... private void doPing() { // Increment period counter long period = currentPeriod++; // Select ping member Member pingMember = selectPingMember(); if (pingMember == null) { return; } // Send ping String cid = cidGenerator.nextCid(); PingData pingData = new PingData(localMember, pingMember); Message pingMsg = Message.withData(pingData) .qualifier(PING) .correlationId(cid) .sender(localMember.address()) .build(); LOGGER.trace("Send Ping[{}] to {}", period, pingMember); Address address = pingMember.address(); transport .requestResponse(pingMsg, address) .timeout(Duration.ofMillis(config.getPingTimeout()), scheduler) .publishOn(scheduler) .subscribe( message -> { LOGGER.trace("Received PingAck[{}] from {}", period, pingMember); publishPingResult(period, pingMember, MemberStatus.ALIVE); }, ex -> { LOGGER.debug( "Failed to get PingAck[{}] from {} within {} ms", period, pingMember, config.getPingTimeout()); final int timeLeft = config.getPingInterval() - config.getPingTimeout(); final ListpingReqMembers = selectPingReqMembers(pingMember); if (timeLeft <= 0 || pingReqMembers.isEmpty()) { LOGGER.trace("No PingReq[{}] occurred", period); publishPingResult(period, pingMember, MemberStatus.SUSPECT); } else { doPingReq(currentPeriod, pingMember, pingReqMembers, cid); } }); } private Member selectPingMember() { if (pingMembers.isEmpty()) { return null; } if (pingMemberIndex >= pingMembers.size()) { pingMemberIndex = 0; Collections.shuffle(pingMembers); } return pingMembers.get(pingMemberIndex++); } private List selectPingReqMembers(Member pingMember) { if (config.getPingReqMembers() <= 0) { return Collections.emptyList(); } List candidates = new ArrayList<>(pingMembers); candidates.remove(pingMember); if (candidates.isEmpty()) { return Collections.emptyList(); } Collections.shuffle(candidates); boolean selectAll = candidates.size() < config.getPingReqMembers(); return selectAll ? candidates : candidates.subList(0, config.getPingReqMembers()); } private void doPingReq( long period, final Member pingMember, final List pingReqMembers, String cid) { Message pingReqMsg = Message.withData(new PingData(localMember, pingMember)) .qualifier(PING_REQ) .correlationId(cid) .sender(localMember.address()) .build(); LOGGER.trace("Send PingReq[{}] to {} for {}", period, pingReqMembers, pingMember); Duration timeout = Duration.ofMillis(config.getPingInterval() - config.getPingTimeout()); pingReqMembers.forEach( member -> transport .requestResponse(pingReqMsg, member.address()) .timeout(timeout, scheduler) .publishOn(scheduler) .subscribe( message -> { LOGGER.trace( "Received transit PingAck[{}] from {} to {}", period, message.sender(), pingMember); publishPingResult(period, pingMember, MemberStatus.ALIVE); }, throwable -> { LOGGER.trace( "Timeout getting transit PingAck[{}] from {} to {} within {} ms", period, pingReqMembers, pingMember, timeout); publishPingResult(period, pingMember, MemberStatus.SUSPECT); })); } private void publishPingResult(long period, Member member, MemberStatus status) { LOGGER.debug("Member {} detected as {} at [{}]", member, status, period); sink.next(new FailureDetectorEvent(member, status)); } //......}
- doPing方法首先递增currentPeriod,然后通过selectPingMember随机选择pingMember,之后构造pingData,然后通过transport.requestResponse发送请求,请求成功时执行publishPingResult,异常情况下则通过selectPingReqMembers随机选择pingReqMembers,在config.getPingInterval() - config.getPingTimeout()小于等于0或者pingReqMembers为空时则执行publishPingResult(period, pingMember, MemberStatus.SUSPECT),否则进行doPingReq
- selectPingMember方法在pingMemberIndex大于等于pingMembers.size()的时候会重置该index为0,并执行Collections.shuffle(pingMembers),之后递增pingMemberIndex;selectPingReqMembers方法则基于pingMembers创建新的list然后移除pingMember得到candidates,之后进行Collections.shuffle(candidates),然后根据config.getPingReqMembers()来subList该candidates列表得到pingReqMembers
- onPingReq方法则遍历pingReqMembers对其发送pingReqMsg,当接收到transit PingAck时则执行publishPingResult(period, pingMember, MemberStatus.ALIVE),出现异常时执行publishPingResult(period, pingMember, MemberStatus.SUSPECT);publishPingResult方法往sink里头放入FailureDetectorEvent事件
小结
- FailureDetector定义了start、stop、listen方法;FailureDetectorImpl实现了FailureDetector接口;它定义了3个message的qualifier分别是PING、PING_REQ、PING_ACK;同时也定义了pingMembers列表;FailureDetectorImpl的构造器订阅了membershipProcessor触发onMemberEvent方法,订阅了transport.listen()触发onMessage方法;start方法通过scheduler.schedulePeriodically注册了doPing任务,每隔pingInterval执行,默认是5000ms;stop方法会执行actionsDisposables.dispose()及sink.complete();listen()则返回subject.onBackpressureBuffer()
- onMemberEvent会根据MembershipEvent来移除或使用随机的index添加member到pingMembers中;onMessage方法则根据消息的不同qualifier及originalIssuer信息来判断执行onPing或onPingReq或onTransitPingAck方法;onPing方法会返回PING_ACK消息给sender;onPingReq方法则会发送PING给pingReq指定的member;onTransitPingAck方法则会将pingReq请求返回的ack再转发回给originalIssuer
- doPing方法首先递增currentPeriod,然后通过selectPingMember随机选择pingMember,之后构造pingData,然后通过transport.requestResponse发送请求,请求成功时执行publishPingResult,异常情况下则通过selectPingReqMembers随机选择pingReqMembers,在config.getPingInterval() - config.getPingTimeout()小于等于0或者pingReqMembers为空时则执行publishPingResult(period, pingMember, MemberStatus.SUSPECT),否则进行doPingReq