JeetKunDoug commented on code in PR #160:
URL: https://github.com/apache/cassandra-sidecar/pull/160#discussion_r1882726641
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -210,7 +207,12 @@ public void outOfRangeDataCleanup(@NotNull String
keyspace, @NotNull String tabl
{
requireNonNull(keyspace, "keyspace must be non-null");
requireNonNull(table, "table must be non-null");
- jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
- .forceKeyspaceCleanup(concurrency, keyspace, table);
+ initializeStorageOps().forceKeyspaceCleanup(concurrency, keyspace,
table);
+ }
+
+ protected StorageJmxOperations initializeStorageOps()
+ {
+ return new
GossipDependentStorageJmxOperations(jmxClient.proxy(StorageJmxOperations.class,
Review Comment:
Does _every operation_ in here really need Gossip to be enabled? It doesn't
seem like things like taking local snapshots would require gossip, for example.
##########
scripts/build-dtest-jars.sh:
##########
@@ -19,9 +19,9 @@
set -xe
CANDIDATE_BRANCHES=(
- "cassandra-4.0:1f79c8492528f01bcc5f88951a1cc9e0d7265c54"
- "cassandra-4.1:725655dda2776fef35567496a6e331102eb7610d"
- "cassandra-5.0:f19dd0bb1309c35535876e8f0f996ad2b76adda5"
+ "cassandra-4.0:659558c980c67a80287ca7ccdfc8a70b1a56b7e2"
+ "cassandra-4.1:c44008e9e4ac06ea25bb63384a6777c0c1b8b41a"
+ "cassandra-5.0:1655578ae8f9de00393d923edbefe6bab0414153"
# note the trunk hash cannot be advanced beyond
ae0842372ff6dd1437d026f82968a3749f555ff4 (TCM), which breaks integration test
Review Comment:
Does TCM still break integration tests? Has anyone had a chance to look into
why?
##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/SingleInstanceExecutor.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+/**
+ * Defines an interface to choose a single Sidecar instance that will run
certain types of operations that need to
+ * run on a limited subset of Sidecar instances. In most cases there will be a
single Sidecar instance chosen as
+ * the executor.
+ */
+public interface SingleInstanceExecutor
+{
+ SingleInstanceExecutor ALWAYS_SCHEDULE_EXECUTOR = new
SingleInstanceExecutor()
Review Comment:
Can we move this into a class in the tests somewhere as it should really
never be used in production code - maybe put this and the
NEVER_SCHEDULE_EXECUTOR into a class called `TestSingleInstanceExecutors`?
##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/BestEffortSingleInstanceExecutor.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.exceptions.CASWriteUnknownException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.CoordinationConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_GLOBAL_LEASE_CLAIMED;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_GLOBAL_LEASE_LOST;
+
+/**
+ * A best-effort process where 0 or more instance executors can be chosen from
the electorate.
+ * The electorate is expected to be a small subset of the entirety of Sidecar
instances.
+ *
+ * <p>There will be situations where multiple members of the electorate will
+ * be acting as executors, for example in cases where we have:
+ * <ul>
+ * <li>Network partitions
+ * <li>Binary protocol is disabled for a member of the electorate
+ * </ul>
+ *
+ * <p>The chosen instance executor(s) must keep in mind that there might be
other executors in the
+ * cluster, so operations that they perform must be safe to be performed by
one or more
+ * Sidecar instances.
+ */
+public class BestEffortSingleInstanceExecutor implements
SingleInstanceExecutor, PeriodicTask
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BestEffortSingleInstanceExecutor.class);
+ private final ElectorateMembership electorateMembership;
+ private final SidecarLeaseDatabaseAccessor accessor;
+ private final ServiceConfiguration config;
+ private final CoordinationConfiguration coordinationConfiguration;
+ private final Vertx vertx;
+ private final TaskExecutorPool internalPool;
+ private volatile boolean isLocalSidecarSingleInstanceExecutor = false;
+
+ public BestEffortSingleInstanceExecutor(Vertx vertx,
+ ExecutorPools executorPools,
+ ServiceConfiguration
serviceConfiguration,
+ ElectorateMembership
electorateMembership,
+ SidecarLeaseDatabaseAccessor
accessor)
+ {
+ this.vertx = vertx;
+ this.internalPool = executorPools.internal();
+ this.config = serviceConfiguration;
+ this.coordinationConfiguration =
serviceConfiguration.coordinationConfiguration();
+ this.electorateMembership = electorateMembership;
+ this.accessor = accessor;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isLocalSidecarSingleInstanceExecutor()
+ {
+ return isLocalSidecarSingleInstanceExecutor;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean shouldSkip()
+ {
+ if (!config.schemaKeyspaceConfiguration().isEnabled())
+ {
+ // The Sidecar schema feature is required for this implementation
+ // so skip when the feature is not enabled
+ return true;
+ }
+ return
!coordinationConfiguration.singleInstanceExecutorProcessEnabled();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long initialDelay()
+ {
+ return
coordinationConfiguration.singleInstanceExecutorInitialDelayMillis();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long delay()
+ {
+ return
coordinationConfiguration.singleInstanceExecutorFrequencyMillis();
+ }
+
+ @Override
+ public void execute(Promise<Void> promise)
+ {
+ // Complete early so we can be scheduled for the next iteration
+ promise.complete();
+
+ // runs the election process based on the electorate membership
+ internalPool.runBlocking(() ->
determineSingleInstanceExecutor(electorateMembership));
+ }
+
+ @Override
+ public void determineSingleInstanceExecutor(ElectorateMembership
electorateMembership)
+ {
+ boolean shouldParticipate = electorateMembership.shouldParticipate();
+ LOGGER.debug("Sidecar instance shouldParticipate={} in the selection",
shouldParticipate);
+ if (!shouldParticipate)
+ {
+ return;
+ }
+
+ boolean wasCurrentExecutor = isLocalSidecarSingleInstanceExecutor;
+ Boolean isCurrentExecutor = null;
+
+ String owner = owner();
+ LOGGER.debug("Starting selection for owner={}", owner);
+ if (wasCurrentExecutor)
+ {
+ SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null;
+ try
+ {
+ LOGGER.debug("Attempting to extend lease for owner={}", owner);
+ result = accessor.extendLease(owner);
+ }
+ catch (CASWriteUnknownException | NoHostAvailableException e)
+ {
+ LOGGER.debug("Unable to claim lease for owner={}", owner, e);
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Unable to extend lease for owner={}", owner, e);
+ }
+
+ isCurrentExecutor =
determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner);
+ LOGGER.debug("Extend lease for owner={} result={}", owner,
isCurrentExecutor);
+ }
+
+ if (isCurrentExecutor == null || !isCurrentExecutor)
+ {
+ SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null;
+ try
+ {
+ LOGGER.debug("Attempting to claim lease for owner={}", owner);
+ result = accessor.claimLease(owner);
+ }
+ catch (CASWriteUnknownException | NoHostAvailableException e)
+ {
+ LOGGER.debug("Unable to claim lease for owner={}", owner, e);
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Unable to claim lease for owner={}", owner, e);
+ }
+
+ isCurrentExecutor =
determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner);
+ LOGGER.debug("Claim lease for owner={} result={}", owner,
isCurrentExecutor);
+ }
+
+ if (isCurrentExecutor == null)
+ {
+ LOGGER.debug("Unable to perform lease election for owner={}",
owner);
+
+// if (wasCurrentExecutor)
+// {
+ // TODO : do we give up lease after some period of time? ie TTL
which is 10 minutes?
+// }
+ }
+ else
+ {
+ if (wasCurrentExecutor && !isCurrentExecutor)
+ {
+ LOGGER.info("Cluster-wide lease has been lost by owner={}",
owner);
+ isLocalSidecarSingleInstanceExecutor = isCurrentExecutor;
+ // notify lease has been lost
+
vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_LOST.address(), owner);
+ }
+
+ if (!wasCurrentExecutor && isCurrentExecutor)
+ {
+ LOGGER.info("Cluster-wide lease has been claimed by owner={}",
owner);
+ isLocalSidecarSingleInstanceExecutor = isCurrentExecutor;
+ // notify lease has been gained
+
vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_CLAIMED.address(), owner);
+ }
+
+ if (LOGGER.isDebugEnabled() && wasCurrentExecutor &&
isCurrentExecutor)
+ {
+ LOGGER.debug("Cluster-wide lease has been extended by
owner={}", owner);
+ }
+ }
+ }
+
+ protected Boolean determineIfIsCurrentLeaseHolder(Boolean
isCurrentLeaseHolder,
+
SidecarLeaseDatabaseAccessor.LeaseClaimResult result,
+ String owner)
+ {
+ if (result != null)
+ {
+ if (result.leaseAcquired)
+ {
+ return true;
+ }
+ else
+ {
+ // For the case where the current Sidecar was a lease-holder
but the information was lost from
+ // the in-memory process (i.e. Sidecar restarted) but the
information is still persisted
+ // in the database, so we recover the state from the database
+ return owner.equals(result.existingOwner);
+ }
+ }
+ return isCurrentLeaseHolder;
+ }
Review Comment:
There was quite a bit or repetition in `determineSingleInstanceExecutor` and
it was somewhat hard to follow. Did some refactoring to make sure I understood
what was going on and I think it's easier to follow. Sorry for the very long
suggestion.
```suggestion
@Override
public void determineSingleInstanceExecutor(ElectorateMembership
electorateMembership)
{
boolean shouldParticipate = electorateMembership.shouldParticipate();
LOGGER.debug("Sidecar instance shouldParticipate={} in the
selection", shouldParticipate);
if (!shouldParticipate)
{
return;
}
boolean wasCurrentExecutor = isLocalSidecarSingleInstanceExecutor;
Boolean isCurrentExecutor = null;
String hostId = hostId();
LOGGER.debug("Starting selection for hostId={}", hostId);
if (wasCurrentExecutor)
{
isCurrentExecutor = executeLeaseAction("extend", hostId,
accessor::extendLease, null);
}
if (isCurrentExecutor == null || !isCurrentExecutor)
{
isCurrentExecutor = executeLeaseAction("claim", hostId,
accessor::claimLease, isCurrentExecutor);
LOGGER.debug("Claim lease for hostId={} result={}", hostId,
isCurrentExecutor);
}
maybeNotifyResults(isCurrentExecutor, hostId, wasCurrentExecutor);
}
private void maybeNotifyResults(Boolean isCurrentExecutor, String owner,
boolean wasCurrentExecutor)
{
if (isCurrentExecutor == null)
{
LOGGER.debug("Unable to perform lease election for owner={}",
owner);
// if (wasCurrentExecutor)
// {
// TODO : do we give up lease after some period of time? ie TTL
which is 10 minutes?
// }
return;
}
if (wasCurrentExecutor && !isCurrentExecutor)
{
LOGGER.info("Cluster-wide lease has been lost by owner={}",
owner);
isLocalSidecarSingleInstanceExecutor = false;
// notify lease has been lost
vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_LOST.address(),
owner);
}
if (!wasCurrentExecutor && isCurrentExecutor)
{
LOGGER.info("Cluster-wide lease has been claimed by owner={}",
owner);
isLocalSidecarSingleInstanceExecutor = true;
// notify lease has been gained
vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_CLAIMED.address(), owner);
}
}
private Boolean executeLeaseAction(String actionName,
String owner,
Function<String,
SidecarLeaseDatabaseAccessor.LeaseClaimResult> claimFunction,
Boolean isCurrentExecutor)
{
SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null;
try
{
LOGGER.debug("Attempting to " + actionName + " lease for
owner={}", owner);
result = claimFunction.apply(owner);
}
catch (CASWriteUnknownException | NoHostAvailableException e)
{
LOGGER.debug("Unable to claim lease for owner={}", owner, e);
}
catch (Exception e)
{
LOGGER.error("Unable to " + actionName + " lease for owner={}",
owner, e);
}
isCurrentExecutor =
determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner);
LOGGER.debug("{} lease for owner={} result={}",
capitalize(actionName), owner, isCurrentExecutor);
return isCurrentExecutor;
}
protected Boolean determineIfIsCurrentLeaseHolder(Boolean
isCurrentLeaseHolder,
SidecarLeaseDatabaseAccessor.LeaseClaimResult result,
String hostId)
{
if (result != null && (result.leaseAcquired ||
hostId.equals(result.existingOwner)))
{
return true;
}
return isCurrentLeaseHolder;
}
```
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -210,7 +207,12 @@ public void outOfRangeDataCleanup(@NotNull String
keyspace, @NotNull String tabl
{
requireNonNull(keyspace, "keyspace must be non-null");
requireNonNull(table, "table must be non-null");
- jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
- .forceKeyspaceCleanup(concurrency, keyspace, table);
+ initializeStorageOps().forceKeyspaceCleanup(concurrency, keyspace,
table);
+ }
+
+ protected StorageJmxOperations initializeStorageOps()
Review Comment:
This is really `createStorageOps` since it returns a new one every time.
##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/BestEffortSingleInstanceExecutor.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.exceptions.CASWriteUnknownException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.CoordinationConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_GLOBAL_LEASE_CLAIMED;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_GLOBAL_LEASE_LOST;
+
+/**
+ * A best-effort process where 0 or more instance executors can be chosen from
the electorate.
+ * The electorate is expected to be a small subset of the entirety of Sidecar
instances.
+ *
+ * <p>There will be situations where multiple members of the electorate will
+ * be acting as executors, for example in cases where we have:
+ * <ul>
+ * <li>Network partitions
+ * <li>Binary protocol is disabled for a member of the electorate
+ * </ul>
+ *
+ * <p>The chosen instance executor(s) must keep in mind that there might be
other executors in the
+ * cluster, so operations that they perform must be safe to be performed by
one or more
+ * Sidecar instances.
+ */
+public class BestEffortSingleInstanceExecutor implements
SingleInstanceExecutor, PeriodicTask
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BestEffortSingleInstanceExecutor.class);
+ private final ElectorateMembership electorateMembership;
+ private final SidecarLeaseDatabaseAccessor accessor;
+ private final ServiceConfiguration config;
+ private final CoordinationConfiguration coordinationConfiguration;
+ private final Vertx vertx;
+ private final TaskExecutorPool internalPool;
+ private volatile boolean isLocalSidecarSingleInstanceExecutor = false;
+
+ public BestEffortSingleInstanceExecutor(Vertx vertx,
+ ExecutorPools executorPools,
+ ServiceConfiguration
serviceConfiguration,
+ ElectorateMembership
electorateMembership,
+ SidecarLeaseDatabaseAccessor
accessor)
+ {
+ this.vertx = vertx;
+ this.internalPool = executorPools.internal();
+ this.config = serviceConfiguration;
+ this.coordinationConfiguration =
serviceConfiguration.coordinationConfiguration();
+ this.electorateMembership = electorateMembership;
+ this.accessor = accessor;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isLocalSidecarSingleInstanceExecutor()
+ {
+ return isLocalSidecarSingleInstanceExecutor;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean shouldSkip()
+ {
+ if (!config.schemaKeyspaceConfiguration().isEnabled())
+ {
+ // The Sidecar schema feature is required for this implementation
+ // so skip when the feature is not enabled
+ return true;
+ }
+ return
!coordinationConfiguration.singleInstanceExecutorProcessEnabled();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long initialDelay()
+ {
+ return
coordinationConfiguration.singleInstanceExecutorInitialDelayMillis();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long delay()
+ {
+ return
coordinationConfiguration.singleInstanceExecutorFrequencyMillis();
+ }
+
+ @Override
+ public void execute(Promise<Void> promise)
+ {
+ // Complete early so we can be scheduled for the next iteration
+ promise.complete();
+
+ // runs the election process based on the electorate membership
+ internalPool.runBlocking(() ->
determineSingleInstanceExecutor(electorateMembership));
+ }
+
+ @Override
+ public void determineSingleInstanceExecutor(ElectorateMembership
electorateMembership)
+ {
+ boolean shouldParticipate = electorateMembership.shouldParticipate();
+ LOGGER.debug("Sidecar instance shouldParticipate={} in the selection",
shouldParticipate);
+ if (!shouldParticipate)
+ {
+ return;
+ }
+
+ boolean wasCurrentExecutor = isLocalSidecarSingleInstanceExecutor;
+ Boolean isCurrentExecutor = null;
+
+ String owner = owner();
+ LOGGER.debug("Starting selection for owner={}", owner);
+ if (wasCurrentExecutor)
+ {
+ SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null;
+ try
+ {
+ LOGGER.debug("Attempting to extend lease for owner={}", owner);
+ result = accessor.extendLease(owner);
+ }
+ catch (CASWriteUnknownException | NoHostAvailableException e)
+ {
+ LOGGER.debug("Unable to claim lease for owner={}", owner, e);
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Unable to extend lease for owner={}", owner, e);
+ }
+
+ isCurrentExecutor =
determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner);
+ LOGGER.debug("Extend lease for owner={} result={}", owner,
isCurrentExecutor);
+ }
+
+ if (isCurrentExecutor == null || !isCurrentExecutor)
+ {
+ SidecarLeaseDatabaseAccessor.LeaseClaimResult result = null;
+ try
+ {
+ LOGGER.debug("Attempting to claim lease for owner={}", owner);
+ result = accessor.claimLease(owner);
+ }
+ catch (CASWriteUnknownException | NoHostAvailableException e)
+ {
+ LOGGER.debug("Unable to claim lease for owner={}", owner, e);
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Unable to claim lease for owner={}", owner, e);
+ }
+
+ isCurrentExecutor =
determineIfIsCurrentLeaseHolder(isCurrentExecutor, result, owner);
+ LOGGER.debug("Claim lease for owner={} result={}", owner,
isCurrentExecutor);
+ }
+
+ if (isCurrentExecutor == null)
+ {
+ LOGGER.debug("Unable to perform lease election for owner={}",
owner);
+
+// if (wasCurrentExecutor)
+// {
+ // TODO : do we give up lease after some period of time? ie TTL
which is 10 minutes?
+// }
+ }
+ else
+ {
+ if (wasCurrentExecutor && !isCurrentExecutor)
+ {
+ LOGGER.info("Cluster-wide lease has been lost by owner={}",
owner);
+ isLocalSidecarSingleInstanceExecutor = isCurrentExecutor;
+ // notify lease has been lost
+
vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_LOST.address(), owner);
+ }
+
+ if (!wasCurrentExecutor && isCurrentExecutor)
+ {
+ LOGGER.info("Cluster-wide lease has been claimed by owner={}",
owner);
+ isLocalSidecarSingleInstanceExecutor = isCurrentExecutor;
+ // notify lease has been gained
+
vertx.eventBus().publish(ON_SIDECAR_GLOBAL_LEASE_CLAIMED.address(), owner);
+ }
+
+ if (LOGGER.isDebugEnabled() && wasCurrentExecutor &&
isCurrentExecutor)
+ {
+ LOGGER.debug("Cluster-wide lease has been extended by
owner={}", owner);
+ }
+ }
+ }
+
+ protected Boolean determineIfIsCurrentLeaseHolder(Boolean
isCurrentLeaseHolder,
+
SidecarLeaseDatabaseAccessor.LeaseClaimResult result,
+ String owner)
+ {
+ if (result != null)
+ {
+ if (result.leaseAcquired)
+ {
+ return true;
+ }
+ else
+ {
+ // For the case where the current Sidecar was a lease-holder
but the information was lost from
+ // the in-memory process (i.e. Sidecar restarted) but the
information is still persisted
+ // in the database, so we recover the state from the database
+ return owner.equals(result.existingOwner);
+ }
+ }
+ return isCurrentLeaseHolder;
+ }
+
+ /**
+ * Returns the configured host ID for Sidecar.
+ *
+ * @return the configured host ID for Sidecar
+ */
+ protected String owner()
+ {
+ return config.hostId();
Review Comment:
Given this class can't operate if it can't talk to Cassandra, what do you
think about using `StorageService#getLocalHostId` via JMX from the first
managed instance for this in stead of configuration? We can populate it in
`config` still or via some other mechanism, but it should be constant and not
just a random UUID on each startup?
##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/ElectorateMembership.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+/**
+ * An interface that defines if the local Sidecar instance should participate
during the choosing of a single
+ * instance executor process.
+ */
+public interface ElectorateMembership
+{
+ /**
+ * @return {@code true} if the local Sidecar instance should participate
in the process,
+ * {@code false} otherwise
+ */
+ boolean shouldParticipate();
Review Comment:
I was struggling with the name of this class/method for some reason... I
think the method should be called `isMember` rather than `shouldParticipate`
and it all makes much more sense. Comments should also reflect that this (and
the TokenZeroElectorateMembership class) are just about membership in a
particular electorate (which I'm still not sure of name-wise, but I haven't
come up with anything better yet either).
##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/utils/StringUtils.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.utils;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Common utilities for String manipulations
+ */
+public class StringUtils
+{
+ /**
+ * The string representation of the address with the same signature as
represented by
+ * the Cassandra server. This is useful when we consume data representing
an Inet address
+ * and port from JMX that has been stringyfied, and we need to perform
string matching
+ * against those results from JMX.
+ *
+ * @param address the {@link InetSocketAddress address}
+ * @return the string representation of the address with the same
signature as represented by
+ * the Cassandra server
+ */
+ @SuppressWarnings("UnstableApiUsage")
+ public static String hostAddress(InetSocketAddress address)
Review Comment:
NIT: maybe `cassandraFormattedHostAddress`? Since it's in a fairly
generically-named StringUtils class, probably good to be more specific here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]