frankgh commented on code in PR #160: URL: https://github.com/apache/cassandra-sidecar/pull/160#discussion_r1884640535
########## server/src/main/java/org/apache/cassandra/sidecar/coordination/BestEffortSingleInstanceExecutor.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.coordination; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.exceptions.CASWriteUnknownException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.config.CoordinationConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.jetbrains.annotations.VisibleForTesting; + +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_GLOBAL_LEASE_CLAIMED; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_GLOBAL_LEASE_LOST; + +/** + * A best-effort process where 0 or more instance executors can be chosen from the electorate. + * The electorate is expected to be a small subset of the entirety of Sidecar instances. + * + * <p>There will be situations where multiple members of the electorate will + * be acting as executors, for example in cases where we have: + * <ul> + * <li>Network partitions + * <li>Binary protocol is disabled for a member of the electorate + * </ul> + * + * <p>The chosen instance executor(s) must keep in mind that there might be other executors in the + * cluster, so operations that they perform must be safe to be performed by one or more + * Sidecar instances. + */ +public class BestEffortSingleInstanceExecutor implements SingleInstanceExecutor, PeriodicTask +{ + private static final Logger LOGGER = LoggerFactory.getLogger(BestEffortSingleInstanceExecutor.class); + private final ElectorateMembership electorateMembership; + private final SidecarLeaseDatabaseAccessor accessor; + private final ServiceConfiguration config; + private final CoordinationConfiguration coordinationConfiguration; + private final Vertx vertx; + private final TaskExecutorPool internalPool; + private volatile boolean isLocalSidecarSingleInstanceExecutor = false; + + public BestEffortSingleInstanceExecutor(Vertx vertx, + ExecutorPools executorPools, + ServiceConfiguration serviceConfiguration, + ElectorateMembership electorateMembership, + SidecarLeaseDatabaseAccessor accessor) + { + this.vertx = vertx; + this.internalPool = executorPools.internal(); + this.config = serviceConfiguration; + this.coordinationConfiguration = serviceConfiguration.coordinationConfiguration(); + this.electorateMembership = electorateMembership; + this.accessor = accessor; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLocalSidecarSingleInstanceExecutor() + { + return isLocalSidecarSingleInstanceExecutor; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean shouldSkip() + { + if (!config.schemaKeyspaceConfiguration().isEnabled()) + { + // The Sidecar schema feature is required for this implementation + // so skip when the feature is not enabled + return true; + } + return !coordinationConfiguration.singleInstanceExecutorProcessEnabled(); + } + + /** + * {@inheritDoc} + */ + @Override + public long initialDelay() + { + return coordinationConfiguration.singleInstanceExecutorInitialDelayMillis(); + } + + /** + * {@inheritDoc} + */ + @Override + public long delay() + { + return coordinationConfiguration.singleInstanceExecutorFrequencyMillis(); + } + + @Override + public void execute(Promise<Void> promise) + { + // Complete early so we can be scheduled for the next iteration + promise.complete(); + + // runs the election process based on the electorate membership + internalPool.runBlocking(() -> determineSingleInstanceExecutor(electorateMembership)); + } + + @Override + public void determineSingleInstanceExecutor(ElectorateMembership electorateMembership) + { + boolean shouldParticipate = electorateMembership.shouldParticipate(); + LOGGER.debug("Sidecar instance shouldParticipate={} in the selection", shouldParticipate); + if (!shouldParticipate) + { + return; + } + + boolean wasCurrentExecutor = isLocalSidecarSingleInstanceExecutor; + Boolean isCurrentExecutor = null; + + String owner = owner(); + LOGGER.debug("Starting selection for owner={}", owner); + if (wasCurrentExecutor) + { + SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null; + try + { + LOGGER.debug("Attempting to extend lease for owner={}", owner); + result = accessor.extendLease(owner); + } + catch (CASWriteUnknownException | NoHostAvailableException e) + { + LOGGER.debug("Unable to claim lease for owner={}", owner, e); + } + catch (Exception e) + { + LOGGER.error("Unable to extend lease for owner={}", owner, e); + } + + isCurrentExecutor = determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner); + LOGGER.debug("Extend lease for owner={} result={}", owner, isCurrentExecutor); + } + + if (isCurrentExecutor == null || !isCurrentExecutor) + { + SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null; + try + { + LOGGER.debug("Attempting to claim lease for owner={}", owner); + result = accessor.claimLease(owner); + } + catch (CASWriteUnknownException | NoHostAvailableException e) + { + LOGGER.debug("Unable to claim lease for owner={}", owner, e); + } + catch (Exception e) + { + LOGGER.error("Unable to claim lease for owner={}", owner, e); + } + + isCurrentExecutor = determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner); + LOGGER.debug("Claim lease for owner={} result={}", owner, isCurrentExecutor); + } + + if (isCurrentExecutor == null) + { + LOGGER.debug("Unable to perform lease election for owner={}", owner); + +// if (wasCurrentExecutor) +// { + // TODO : do we give up lease after some period of time? ie TTL which is 10 minutes? +// } + } + else + { + if (wasCurrentExecutor && !isCurrentExecutor) + { + LOGGER.info("Cluster-wide lease has been lost by owner={}", owner); + isLocalSidecarSingleInstanceExecutor = isCurrentExecutor; + // notify lease has been lost + vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_LOST.address(), owner); + } + + if (!wasCurrentExecutor && isCurrentExecutor) + { + LOGGER.info("Cluster-wide lease has been claimed by owner={}", owner); + isLocalSidecarSingleInstanceExecutor = isCurrentExecutor; + // notify lease has been gained + vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_CLAIMED.address(), owner); + } + + if (LOGGER.isDebugEnabled() && wasCurrentExecutor && isCurrentExecutor) + { + LOGGER.debug("Cluster-wide lease has been extended by owner={}", owner); + } + } + } + + protected Boolean determineIfIsCurrentLeaseHolder(Boolean isCurrentLeaseHolder, + SidecarLeaseDatabaseAccessor.LeaseClaimResult result, + String owner) + { + if (result != null) + { + if (result.leaseAcquired) + { + return true; + } + else + { + // For the case where the current Sidecar was a lease-holder but the information was lost from + // the in-memory process (i.e. Sidecar restarted) but the information is still persisted + // in the database, so we recover the state from the database + return owner.equals(result.existingOwner); + } + } + return isCurrentLeaseHolder; + } + + /** + * Returns the configured host ID for Sidecar. + * + * @return the configured host ID for Sidecar + */ + protected String owner() + { + return config.hostId(); Review Comment: I chose randomness to simplify, maybe we can think about a follow up for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

