XComp commented on code in PR #22661:
URL: https://github.com/apache/flink/pull/22661#discussion_r1245351965


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -19,147 +19,347 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
 
-import javax.annotation.Nullable;
-
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
- * {@link LeaderElectionDriver} implementation which provides some convenience 
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually 
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of 
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
  */
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
+public class TestingLeaderElectionDriver implements 
MultipleComponentLeaderElectionDriver {
 
-    private final Object lock = new Object();
+    private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+    private final TriConsumer<ReentrantLock, String, LeaderInformation>
+            publishLeaderInformationConsumer;
+    private final BiConsumer<ReentrantLock, String> 
deleteLeaderInformationConsumer;
 
-    private final AtomicBoolean isLeader = new AtomicBoolean(false);
-    private final LeaderElectionEventHandler leaderElectionEventHandler;
-    private final FatalErrorHandler fatalErrorHandler;
+    private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
 
-    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable;
-    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable;
+    private final TriConsumer<ReentrantLock, Optional<Listener>, UUID> 
grantConsumer;
+    private final BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer;
 
-    private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+    private final ReentrantLock lock = new ReentrantLock();
 
-    // Leader information on external storage
-    private LeaderInformation leaderInformation = LeaderInformation.empty();
+    private Optional<Listener> listener = Optional.empty();
+    private Optional<FatalErrorHandler> fatalErrorHandler = Optional.empty();
 
-    private TestingLeaderElectionDriver(
-            LeaderElectionEventHandler leaderElectionEventHandler,
-            FatalErrorHandler fatalErrorHandler,
-            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable,
-            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable,
-            Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
-        this.leaderElectionEventHandler = leaderElectionEventHandler;
-        this.fatalErrorHandler = fatalErrorHandler;
-        this.closeRunnable = closeRunnable;
-        this.beforeLockCloseRunnable = beforeLockCloseRunnable;
-        this.beforeGrantRunnable = beforeGrantRunnable;
+    public TestingLeaderElectionDriver(
+            Function<ReentrantLock, Boolean> hasLeadershipFunction,
+            TriConsumer<ReentrantLock, String, LeaderInformation> 
publishLeaderInformationConsumer,
+            BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+            ThrowingConsumer<ReentrantLock, Exception> closeConsumer,
+            TriConsumer<ReentrantLock, Optional<Listener>, UUID> grantConsumer,
+            BiConsumer<ReentrantLock, Optional<Listener>> revokeConsumer) {
+        this.hasLeadershipFunction = hasLeadershipFunction;
+        this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
+        this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+        this.closeConsumer = closeConsumer;
+        this.grantConsumer = grantConsumer;
+        this.revokeConsumer = revokeConsumer;
     }
 
-    @Override
-    public void writeLeaderInformation(LeaderInformation leaderInformation) {
-        this.leaderInformation = leaderInformation;
+    private void initialize(Listener listener, FatalErrorHandler 
fatalErrorHandler) {
+        Preconditions.checkState(
+                !this.listener.isPresent(), "The driver should be only 
initialized once.");
+
+        this.listener = Optional.of(listener);
+        this.fatalErrorHandler = Optional.of(fatalErrorHandler);
+    }
+
+    public void grantLeadership() {
+        grantLeadership(UUID.randomUUID());
+    }
+
+    public void grantLeadership(UUID leaderSessionID) {
+        grantConsumer.accept(lock, listener, leaderSessionID);
+    }
+
+    public void revokeLeadership() {
+        revokeConsumer.accept(lock, listener);
+    }
+
+    public void triggerErrorHandling(Throwable throwable) {
+        runInLockIfPresent(fatalErrorHandler, f -> f.onFatalError(throwable));
+    }
+
+    public void triggerLeaderInformationChangeEvent(
+            String contenderID, LeaderInformation leaderInformation) {
+        runInLockIfPresent(
+                listener,
+                listener -> 
listener.notifyLeaderInformationChange(contenderID, leaderInformation));
+    }
+
+    public void triggerAllLeaderInformationChangeEvent(
+            LeaderInformationRegister newLeaderInformation) {
+        runInLockIfPresent(
+                listener,
+                listener -> 
listener.notifyAllKnownLeaderInformation(newLeaderInformation));
+    }
+
+    private <T> void runInLockIfPresent(Optional<T> optional, Consumer<T> 
callback) {
+        try {
+            lock.lock();
+            optional.ifPresent(callback);
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
     public boolean hasLeadership() {
-        return isLeader.get();
+        return hasLeadershipFunction.apply(lock);
     }
 
     @Override
-    public void close() throws Exception {
-        beforeLockCloseRunnable.accept(leaderElectionEventHandler);
-        synchronized (lock) {
-            closeRunnable.accept(leaderElectionEventHandler);
-        }
+    public void publishLeaderInformation(String contenderID, LeaderInformation 
leaderInformation) {
+        publishLeaderInformationConsumer.accept(lock, contenderID, 
leaderInformation);
     }
 
-    public LeaderInformation getLeaderInformation() {
-        return leaderInformation;
+    @Override
+    public void deleteLeaderInformation(String contenderID) {
+        deleteLeaderInformationConsumer.accept(lock, contenderID);
     }
 
-    public void isLeader(UUID newSessionID) {
-        synchronized (lock) {
-            isLeader.set(true);
-            beforeGrantRunnable.accept(leaderElectionEventHandler);
-            leaderElectionEventHandler.onGrantLeadership(newSessionID);
-        }
+    @Override
+    public void close() throws Exception {
+        closeConsumer.accept(lock);
     }
 
-    public void isLeader() {
-        isLeader(UUID.randomUUID());
+    public static Builder newNoOpBuilder() {
+        return new Builder();
     }
 
-    public void notLeader() {
-        synchronized (lock) {
-            isLeader.set(false);
-            leaderElectionEventHandler.onRevokeLeadership();
-        }
+    public static Builder newBuilder() {
+        return newBuilder(new AtomicBoolean(), new AtomicReference<>(), new 
AtomicBoolean());
     }
 
-    public void leaderInformationChanged(LeaderInformation newLeader) {
-        leaderInformation = newLeader;
-        leaderElectionEventHandler.onLeaderInformationChange(newLeader);
+    /**
+     * Returns a {@code Builder} that comes with a basic default 
implementation of the {@link
+     * MultipleComponentLeaderElectionDriver} contract using the passed 
parameters for information
+     * storage.
+     *
+     * @param hasLeadership saves the current leadership state of the instance 
that is created from
+     *     the {@code Builder}.
+     * @param storedLeaderInformation saves the leader information that would 
be otherwise stored in
+     *     some external storage.
+     * @param isClosed saves the running state of the driver.
+     */
+    public static Builder newBuilder(
+            AtomicBoolean hasLeadership,
+            AtomicReference<LeaderInformationRegister> storedLeaderInformation,
+            AtomicBoolean isClosed) {
+        Preconditions.checkState(
+                !hasLeadership.get(), "Initial state check for hasLeadership 
failed.");
+        Preconditions.checkState(
+                storedLeaderInformation.get() == null
+                        || !storedLeaderInformation
+                                .get()
+                                .getRegisteredContenderIDs()
+                                .iterator()
+                                .hasNext(),
+                "Initial state check for storedLeaderInformation failed.");
+        Preconditions.checkState(!isClosed.get(), "Initial state check for 
isClosed failed.");
+        return newNoOpBuilder()
+                .setHasLeadershipFunction(
+                        lock -> {
+                            try {
+                                lock.lock();
+                                return hasLeadership.get();
+                            } finally {
+                                lock.unlock();
+                            }
+                        })
+                .setPublishLeaderInformationConsumer(
+                        (lock, contenderID, leaderInformation) -> {
+                            try {
+                                lock.lock();
+                                if (hasLeadership.get()) {
+                                    storedLeaderInformation.getAndUpdate(
+                                            oldData -> {
+                                                if (oldData != null) {
+                                                    if 
(leaderInformation.isEmpty()) {
+                                                        return 
LeaderInformationRegister.clear(
+                                                                oldData, 
contenderID);
+                                                    } else {
+                                                        return 
LeaderInformationRegister.merge(
+                                                                oldData,
+                                                                contenderID,
+                                                                
leaderInformation);
+                                                    }
+                                                } else if 
(!leaderInformation.isEmpty()) {
+                                                    return 
LeaderInformationRegister.of(
+                                                            contenderID, 
leaderInformation);
+                                                }
+
+                                                return new 
LeaderInformationRegister();
+                                            });
+                                }
+                            } finally {
+                                lock.unlock();
+                            }
+                        })
+                .setDeleteLeaderInformationConsumer(
+                        (lock, contenderID) -> {
+                            try {
+                                lock.lock();
+                                if (hasLeadership.get()) {
+                                    storedLeaderInformation.getAndUpdate(
+                                            oldData ->
+                                                    oldData != null
+                                                            ? 
LeaderInformationRegister.clear(
+                                                                    oldData, 
contenderID)
+                                                            : new 
LeaderInformationRegister());
+                                }
+                            } finally {
+                                lock.unlock();
+                            }
+                        })
+                .setGrantConsumer(
+                        (lock, listener, leaderSessionID) -> {
+                            try {
+                                lock.lock();
+                                Preconditions.checkState(
+                                        !hasLeadership.get(),
+                                        "Granting the leadership shouldn't 
happen repeatedly.");
+
+                                hasLeadership.set(true);
+                                listener.ifPresent(l -> 
l.isLeader(leaderSessionID));
+                            } finally {
+                                lock.unlock();
+                            }
+                        })
+                .setRevokeConsumer(
+                        (lock, listener) -> {
+                            try {
+                                lock.lock();
+                                Preconditions.checkState(
+                                        hasLeadership.get(),
+                                        "Revoking the leadership shouldn't 
happen repeatedly.");
+                                hasLeadership.set(false);
+                                listener.ifPresent(Listener::notLeader);
+                            } finally {
+                                lock.unlock();
+                            }
+                        })
+                .setCloseConsumer(
+                        lock -> {
+                            try {
+                                lock.lock();
+                                isClosed.set(true);
+                            } finally {
+                                lock.unlock();
+                            }
+                        });
     }
 
-    public void onFatalError(Throwable throwable) {
-        fatalErrorHandler.onFatalError(throwable);
+    /**
+     * {@code Factory} implements {@link 
MultipleComponentLeaderElectionDriverFactory} for the
+     * {@code TestingLeaderElectionDriver}.
+     */
+    public static class Factory implements 
MultipleComponentLeaderElectionDriverFactory {
+
+        private final TestingLeaderElectionDriver driver;
+
+        public static Factory createFactoryWithNoOpDriver() {
+            return new 
Factory(TestingLeaderElectionDriver.newNoOpBuilder().build());
+        }
+
+        public static Factory defaultDriverFactory(
+                AtomicBoolean hasLeadership,
+                AtomicReference<LeaderInformationRegister> 
storedLeaderInformation,
+                AtomicBoolean isClosed) {
+            return new Factory(
+                    TestingLeaderElectionDriver.newBuilder(
+                                    hasLeadership, storedLeaderInformation, 
isClosed)
+                            .build());
+        }
+
+        public Factory(TestingLeaderElectionDriver driver) {
+            this.driver = driver;
+        }
+
+        @Override
+        public MultipleComponentLeaderElectionDriver create(
+                Listener leaderElectionListener, FatalErrorHandler 
fatalErrorHandler)
+                throws Exception {
+            driver.initialize(leaderElectionListener, fatalErrorHandler);

Review Comment:
   > How is this related to the error handling?
   
   I guess, you're right - FLINK-32381 wouldn't have fixed it. The 
`FatalErrorHandler` would be removed with FLINK-32381 only. I committed your 
recommendation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to