frankgh commented on code in PR #160:
URL: https://github.com/apache/cassandra-sidecar/pull/160#discussion_r1884640535


##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/BestEffortSingleInstanceExecutor.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.exceptions.CASWriteUnknownException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.CoordinationConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_GLOBAL_LEASE_CLAIMED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_GLOBAL_LEASE_LOST;
+
+/**
+ * A best-effort process where 0 or more instance executors can be chosen from 
the electorate.
+ * The electorate is expected to be a small subset of the entirety of Sidecar 
instances.
+ *
+ * <p>There will be situations where multiple members of the electorate will
+ * be acting as executors, for example in cases where we have:
+ * <ul>
+ *     <li>Network partitions
+ *     <li>Binary protocol is disabled for a member of the electorate
+ * </ul>
+ *
+ * <p>The chosen instance executor(s) must keep in mind that there might be 
other executors in the
+ * cluster, so operations that they perform must be safe to be performed by 
one or more
+ * Sidecar instances.
+ */
+public class BestEffortSingleInstanceExecutor implements 
SingleInstanceExecutor, PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BestEffortSingleInstanceExecutor.class);
+    private final ElectorateMembership electorateMembership;
+    private final SidecarLeaseDatabaseAccessor accessor;
+    private final ServiceConfiguration config;
+    private final CoordinationConfiguration coordinationConfiguration;
+    private final Vertx vertx;
+    private final TaskExecutorPool internalPool;
+    private volatile boolean isLocalSidecarSingleInstanceExecutor = false;
+
+    public BestEffortSingleInstanceExecutor(Vertx vertx,
+                                            ExecutorPools executorPools,
+                                            ServiceConfiguration 
serviceConfiguration,
+                                            ElectorateMembership 
electorateMembership,
+                                            SidecarLeaseDatabaseAccessor 
accessor)
+    {
+        this.vertx = vertx;
+        this.internalPool = executorPools.internal();
+        this.config = serviceConfiguration;
+        this.coordinationConfiguration = 
serviceConfiguration.coordinationConfiguration();
+        this.electorateMembership = electorateMembership;
+        this.accessor = accessor;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isLocalSidecarSingleInstanceExecutor()
+    {
+        return isLocalSidecarSingleInstanceExecutor;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldSkip()
+    {
+        if (!config.schemaKeyspaceConfiguration().isEnabled())
+        {
+            // The Sidecar schema feature is required for this implementation
+            // so skip when the feature is not enabled
+            return true;
+        }
+        return 
!coordinationConfiguration.singleInstanceExecutorProcessEnabled();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long initialDelay()
+    {
+        return 
coordinationConfiguration.singleInstanceExecutorInitialDelayMillis();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long delay()
+    {
+        return 
coordinationConfiguration.singleInstanceExecutorFrequencyMillis();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        // Complete early so we can be scheduled for the next iteration
+        promise.complete();
+
+        // runs the election process based on the electorate membership
+        internalPool.runBlocking(() -> 
determineSingleInstanceExecutor(electorateMembership));
+    }
+
+    @Override
+    public void determineSingleInstanceExecutor(ElectorateMembership 
electorateMembership)
+    {
+        boolean shouldParticipate = electorateMembership.shouldParticipate();
+        LOGGER.debug("Sidecar instance shouldParticipate={} in the selection", 
shouldParticipate);
+        if (!shouldParticipate)
+        {
+            return;
+        }
+
+        boolean wasCurrentExecutor = isLocalSidecarSingleInstanceExecutor;
+        Boolean isCurrentExecutor = null;
+
+        String owner = owner();
+        LOGGER.debug("Starting selection for owner={}", owner);
+        if (wasCurrentExecutor)
+        {
+            SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null;
+            try
+            {
+                LOGGER.debug("Attempting to extend lease for owner={}", owner);
+                result = accessor.extendLease(owner);
+            }
+            catch (CASWriteUnknownException | NoHostAvailableException e)
+            {
+                LOGGER.debug("Unable to claim lease for owner={}", owner, e);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Unable to extend lease for owner={}", owner, e);
+            }
+
+            isCurrentExecutor = 
determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner);
+            LOGGER.debug("Extend lease for owner={} result={}", owner, 
isCurrentExecutor);
+        }
+
+        if (isCurrentExecutor == null || !isCurrentExecutor)
+        {
+            SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null;
+            try
+            {
+                LOGGER.debug("Attempting to claim lease for owner={}", owner);
+                result = accessor.claimLease(owner);
+            }
+            catch (CASWriteUnknownException | NoHostAvailableException e)
+            {
+                LOGGER.debug("Unable to claim lease for owner={}", owner, e);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Unable to claim lease for owner={}", owner, e);
+            }
+
+            isCurrentExecutor = 
determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner);
+            LOGGER.debug("Claim lease for owner={} result={}", owner, 
isCurrentExecutor);
+        }
+
+        if (isCurrentExecutor == null)
+        {
+            LOGGER.debug("Unable to perform lease election for owner={}", 
owner);
+
+//            if (wasCurrentExecutor)
+//            {
+            // TODO : do we give up lease after some period of time? ie TTL 
which is 10 minutes?
+//            }
+        }
+        else
+        {
+            if (wasCurrentExecutor && !isCurrentExecutor)
+            {
+                LOGGER.info("Cluster-wide lease has been lost by owner={}", 
owner);
+                isLocalSidecarSingleInstanceExecutor = isCurrentExecutor;
+                // notify lease has been lost
+                
vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_LOST.address(), owner);
+            }
+
+            if (!wasCurrentExecutor && isCurrentExecutor)
+            {
+                LOGGER.info("Cluster-wide lease has been claimed by owner={}", 
owner);
+                isLocalSidecarSingleInstanceExecutor = isCurrentExecutor;
+                // notify lease has been gained
+                
vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_CLAIMED.address(), owner);
+            }
+
+            if (LOGGER.isDebugEnabled() && wasCurrentExecutor && 
isCurrentExecutor)
+            {
+                LOGGER.debug("Cluster-wide lease has been extended by 
owner={}", owner);
+            }
+        }
+    }
+
+    protected Boolean determineIfIsCurrentLeaseHolder(Boolean 
isCurrentLeaseHolder,
+                                                   
SidecarLeaseDatabaseAccessor.LeaseClaimResult result,
+                                                   String owner)
+    {
+        if (result != null)
+        {
+            if (result.leaseAcquired)
+            {
+                return true;
+            }
+            else
+            {
+                // For the case where the current Sidecar was a lease-holder 
but the information was lost from
+                // the in-memory process (i.e. Sidecar restarted) but the 
information is still persisted
+                // in the database, so we recover the state from the database
+                return owner.equals(result.existingOwner);
+            }
+        }
+        return isCurrentLeaseHolder;
+    }
+
+    /**
+     * Returns the configured host ID for Sidecar.
+     *
+     * @return the configured host ID for Sidecar
+     */
+    protected String owner()
+    {
+        return config.hostId();

Review Comment:
   I chose randomness to simplify, maybe we can think about a follow up for 
this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to