XComp commented on code in PR #22661: URL: https://github.com/apache/flink/pull/22661#discussion_r1245351965
########## flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java: ########## @@ -19,147 +19,347 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriConsumer; -import javax.annotation.Nullable; - +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; /** - * {@link LeaderElectionDriver} implementation which provides some convenience functions for testing - * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually control the - * leadership. + * {@code TestingLeaderElectionDriver} is a generic test implementation of {@link + * MultipleComponentLeaderElectionDriver} which can be used in test cases. */ -public class TestingLeaderElectionDriver implements LeaderElectionDriver { +public class TestingLeaderElectionDriver implements MultipleComponentLeaderElectionDriver { - private final Object lock = new Object(); + private final Function<ReentrantLock, Boolean> hasLeadershipFunction; + private final TriConsumer<ReentrantLock, String, LeaderInformation> + publishLeaderInformationConsumer; + private final BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer; - private final AtomicBoolean isLeader = new AtomicBoolean(false); - private final LeaderElectionEventHandler leaderElectionEventHandler; - private final FatalErrorHandler fatalErrorHandler; + private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer; - private final ThrowingConsumer<LeaderElectionEventHandler, Exception> closeRunnable; - private final ThrowingConsumer<LeaderElectionEventHandler, Exception> beforeLockCloseRunnable; + private final TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer; + private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer; - private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable; + private final ReentrantLock lock = new ReentrantLock(); - // Leader information on external storage - private LeaderInformation leaderInformation = LeaderInformation.empty(); + private Optional<Listener> listener = Optional.empty(); + private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty(); - private TestingLeaderElectionDriver( - LeaderElectionEventHandler leaderElectionEventHandler, - FatalErrorHandler fatalErrorHandler, - ThrowingConsumer<LeaderElectionEventHandler, Exception> closeRunnable, - ThrowingConsumer<LeaderElectionEventHandler, Exception> beforeLockCloseRunnable, - Consumer<LeaderElectionEventHandler> beforeGrantRunnable) { - this.leaderElectionEventHandler = leaderElectionEventHandler; - this.fatalErrorHandler = fatalErrorHandler; - this.closeRunnable = closeRunnable; - this.beforeLockCloseRunnable = beforeLockCloseRunnable; - this.beforeGrantRunnable = beforeGrantRunnable; + public TestingLeaderElectionDriver( + Function<ReentrantLock, Boolean> hasLeadershipFunction, + TriConsumer<ReentrantLock, String, LeaderInformation> publishLeaderInformationConsumer, + BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer, + ThrowingConsumer<ReentrantLock, Exception> closeConsumer, + TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer, + BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) { + this.hasLeadershipFunction = hasLeadershipFunction; + this.publishLeaderInformationConsumer = publishLeaderInformationConsumer; + this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer; + this.closeConsumer = closeConsumer; + this.grantConsumer = grantConsumer; + this.revokeConsumer = revokeConsumer; } - @Override - public void writeLeaderInformation(LeaderInformation leaderInformation) { - this.leaderInformation = leaderInformation; + private void initialize(Listener listener, FatalErrorHandler fatalErrorHandler) { + Preconditions.checkState( + !this.listener.isPresent(), "The driver should be only initialized once."); + + this.listener = Optional.of(listener); + this.fatalErrorHandler = Optional.of(fatalErrorHandler); + } + + public void grantLeadership() { + grantLeadership(UUID.randomUUID()); + } + + public void grantLeadership(UUID leaderSessionID) { + grantConsumer.accept(lock, listener, leaderSessionID); + } + + public void revokeLeadership() { + revokeConsumer.accept(lock, listener); + } + + public void triggerErrorHandling(Throwable throwable) { + runInLockIfPresent(fatalErrorHandler, f -> f.onFatalError(throwable)); + } + + public void triggerLeaderInformationChangeEvent( + String contenderID, LeaderInformation leaderInformation) { + runInLockIfPresent( + listener, + listener -> listener.notifyLeaderInformationChange(contenderID, leaderInformation)); + } + + public void triggerAllLeaderInformationChangeEvent( + LeaderInformationRegister newLeaderInformation) { + runInLockIfPresent( + listener, + listener -> listener.notifyAllKnownLeaderInformation(newLeaderInformation)); + } + + private <T> void runInLockIfPresent(Optional<T> optional, Consumer<T> callback) { + try { + lock.lock(); + optional.ifPresent(callback); + } finally { + lock.unlock(); + } } @Override public boolean hasLeadership() { - return isLeader.get(); + return hasLeadershipFunction.apply(lock); } @Override - public void close() throws Exception { - beforeLockCloseRunnable.accept(leaderElectionEventHandler); - synchronized (lock) { - closeRunnable.accept(leaderElectionEventHandler); - } + public void publishLeaderInformation(String contenderID, LeaderInformation leaderInformation) { + publishLeaderInformationConsumer.accept(lock, contenderID, leaderInformation); } - public LeaderInformation getLeaderInformation() { - return leaderInformation; + @Override + public void deleteLeaderInformation(String contenderID) { + deleteLeaderInformationConsumer.accept(lock, contenderID); } - public void isLeader(UUID newSessionID) { - synchronized (lock) { - isLeader.set(true); - beforeGrantRunnable.accept(leaderElectionEventHandler); - leaderElectionEventHandler.onGrantLeadership(newSessionID); - } + @Override + public void close() throws Exception { + closeConsumer.accept(lock); } - public void isLeader() { - isLeader(UUID.randomUUID()); + public static Builder newNoOpBuilder() { + return new Builder(); } - public void notLeader() { - synchronized (lock) { - isLeader.set(false); - leaderElectionEventHandler.onRevokeLeadership(); - } + public static Builder newBuilder() { + return newBuilder(new AtomicBoolean(), new AtomicReference<>(), new AtomicBoolean()); } - public void leaderInformationChanged(LeaderInformation newLeader) { - leaderInformation = newLeader; - leaderElectionEventHandler.onLeaderInformationChange(newLeader); + /** + * Returns a {@code Builder} that comes with a basic default implementation of the {@link + * MultipleComponentLeaderElectionDriver} contract using the passed parameters for information + * storage. + * + * @param hasLeadership saves the current leadership state of the instance that is created from + * the {@code Builder}. + * @param storedLeaderInformation saves the leader information that would be otherwise stored in + * some external storage. + * @param isClosed saves the running state of the driver. + */ + public static Builder newBuilder( + AtomicBoolean hasLeadership, + AtomicReference<LeaderInformationRegister> storedLeaderInformation, + AtomicBoolean isClosed) { + Preconditions.checkState( + !hasLeadership.get(), "Initial state check for hasLeadership failed."); + Preconditions.checkState( + storedLeaderInformation.get() == null + || !storedLeaderInformation + .get() + .getRegisteredContenderIDs() + .iterator() + .hasNext(), + "Initial state check for storedLeaderInformation failed."); + Preconditions.checkState(!isClosed.get(), "Initial state check for isClosed failed."); + return newNoOpBuilder() + .setHasLeadershipFunction( + lock -> { + try { + lock.lock(); + return hasLeadership.get(); + } finally { + lock.unlock(); + } + }) + .setPublishLeaderInformationConsumer( + (lock, contenderID, leaderInformation) -> { + try { + lock.lock(); + if (hasLeadership.get()) { + storedLeaderInformation.getAndUpdate( + oldData -> { + if (oldData != null) { + if (leaderInformation.isEmpty()) { + return LeaderInformationRegister.clear( + oldData, contenderID); + } else { + return LeaderInformationRegister.merge( + oldData, + contenderID, + leaderInformation); + } + } else if (!leaderInformation.isEmpty()) { + return LeaderInformationRegister.of( + contenderID, leaderInformation); + } + + return new LeaderInformationRegister(); + }); + } + } finally { + lock.unlock(); + } + }) + .setDeleteLeaderInformationConsumer( + (lock, contenderID) -> { + try { + lock.lock(); + if (hasLeadership.get()) { + storedLeaderInformation.getAndUpdate( + oldData -> + oldData != null + ? LeaderInformationRegister.clear( + oldData, contenderID) + : new LeaderInformationRegister()); + } + } finally { + lock.unlock(); + } + }) + .setGrantConsumer( + (lock, listener, leaderSessionID) -> { + try { + lock.lock(); + Preconditions.checkState( + !hasLeadership.get(), + "Granting the leadership shouldn't happen repeatedly."); + + hasLeadership.set(true); + listener.ifPresent(l -> l.isLeader(leaderSessionID)); + } finally { + lock.unlock(); + } + }) + .setRevokeConsumer( + (lock, listener) -> { + try { + lock.lock(); + Preconditions.checkState( + hasLeadership.get(), + "Revoking the leadership shouldn't happen repeatedly."); + hasLeadership.set(false); + listener.ifPresent(Listener::notLeader); + } finally { + lock.unlock(); + } + }) + .setCloseConsumer( + lock -> { + try { + lock.lock(); + isClosed.set(true); + } finally { + lock.unlock(); + } + }); } - public void onFatalError(Throwable throwable) { - fatalErrorHandler.onFatalError(throwable); + /** + * {@code Factory} implements {@link MultipleComponentLeaderElectionDriverFactory} for the + * {@code TestingLeaderElectionDriver}. + */ + public static class Factory implements MultipleComponentLeaderElectionDriverFactory { + + private final TestingLeaderElectionDriver driver; + + public static Factory createFactoryWithNoOpDriver() { + return new Factory(TestingLeaderElectionDriver.newNoOpBuilder().build()); + } + + public static Factory defaultDriverFactory( + AtomicBoolean hasLeadership, + AtomicReference<LeaderInformationRegister> storedLeaderInformation, + AtomicBoolean isClosed) { + return new Factory( + TestingLeaderElectionDriver.newBuilder( + hasLeadership, storedLeaderInformation, isClosed) + .build()); + } + + public Factory(TestingLeaderElectionDriver driver) { + this.driver = driver; + } + + @Override + public MultipleComponentLeaderElectionDriver create( + Listener leaderElectionListener, FatalErrorHandler fatalErrorHandler) + throws Exception { + driver.initialize(leaderElectionListener, fatalErrorHandler); Review Comment: > How is this related to the error handling? I guess, you're right - FLINK-32381 wouldn't have fixed it. The `FatalErrorHandler` would be removed with FLINK-32381 only. I committed your recommendation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org