This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 10034a4e CASSANDRASC-134: Detect out of range data and cleanup using nodetool (#125) 10034a4e is described below commit 10034a4e9c9913127e28546f2263f2ce818f6a37 Author: Yifan Cai <y...@apache.org> AuthorDate: Tue Jun 4 14:54:13 2024 -0700 CASSANDRASC-134: Detect out of range data and cleanup using nodetool (#125) The patch adds a step to check the data ownership before importing sstable. When fully out of range sstables are found, the sstables are removed before importing. When partially out of range sstables are found, running nodetool cleanup is requested on job completion, including both success and failure cases. Patch by Yifan Cai; Reviewed by Bernardo Botella and Francisco Guerrero for CASSANDRASC-134 --- CHANGES.txt | 1 + .../adapters/base/CassandraStorageOperations.java | 12 ++ .../base/GossipDependentStorageJmxOperations.java | 7 + .../adapters/base/StorageJmxOperations.java | 13 ++ .../adapters/base/TokenRangeReplicaProvider.java | 1 + .../sidecar/adapters/base/TokenRangeReplicas.java | 1 + .../base/TokenRangeReplicaProviderTest.java | 1 + .../adapters/base/TokenRangeReplicasTest.java | 2 + .../sidecar/common/server/StorageOperations.java | 25 +++ .../server/cluster/locator}/Partitioner.java | 4 +- .../common/server/cluster/locator/Token.java | 122 +++++++++++++ .../common/server/cluster/locator/TokenRange.java | 200 +++++++++++++++++++++ .../server/cluster/locator/TokenRangeTest.java | 179 ++++++++++++++++++ .../common/server/cluster/locator/TokenTest.java | 55 ++++++ .../apache/cassandra/sidecar/db/RestoreSlice.java | 11 ++ .../sidecar/db/RestoreSliceDatabaseAccessor.java | 9 +- .../sidecar/locator/CachedLocalTokenRanges.java | 7 +- .../sidecar/locator/LocalTokenRangesProvider.java | 11 +- .../cassandra/sidecar/locator/TokenRange.java | 114 ------------ .../sidecar/restore/RestoreJobDiscoverer.java | 4 +- .../sidecar/restore/RestoreJobManager.java | 132 +++++++------- .../sidecar/restore/RestoreProcessor.java | 6 + .../sidecar/restore/RestoreSliceTask.java | 174 +++++++++++++----- .../sidecar/restore/RestoreSliceTracker.java | 44 ++++- .../cassandra/sidecar/restore/StorageClient.java | 21 ++- .../sidecar/common/JmxClientIntegrationTest.java | 89 ++++++--- .../tokenrange/BaseTokenRangeIntegrationTest.java | 2 +- .../sidecar/routes/tokenrange/MovingBaseTest.java | 2 +- .../sidecar/concurrent/ExecutorPoolsTest.java | 65 +++++-- .../cassandra/sidecar/locator/TokenRangeTest.java | 89 --------- .../sidecar/restore/RestoreJobManagerTest.java | 6 +- .../sidecar/restore/RestoreProcessorTest.java | 2 +- .../sidecar/restore/RestoreSliceTaskTest.java | 125 ++++++++++++- .../routes/restore/BaseRestoreJobTests.java | 12 +- 34 files changed, 1134 insertions(+), 414 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 43de8233..d8813da9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Detect out of range data and cleanup using nodetool (CASSANDRASC-134) * Allow optional reason to abort restore jobs (CASSANDRASC-133) * Fix SidecarLoadBalancingPolicy unexpectedly removing local node and improve CI stability (CASSANDRASC-131) * Reduce implementations accessible from client (CASSANDRASC-127) diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java index 3a87e473..9903a0dc 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ import org.apache.cassandra.sidecar.common.response.RingResponse; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.server.JmxClient; import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.exceptions.NodeBootstrappingException; @@ -201,4 +203,14 @@ public class CassandraStorageOperations implements StorageOperations } return dataFileLocations; } + + @Override + public void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String table, int concurrency) + throws IOException, ExecutionException, InterruptedException + { + requireNonNull(keyspace, "keyspace must be non-null"); + requireNonNull(table, "table must be non-null"); + jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) + .forceKeyspaceCleanup(concurrency, keyspace, table); + } } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java index d0305141..8982c1b9 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,6 +140,12 @@ public class GossipDependentStorageJmxOperations implements StorageJmxOperations return delegate.getAllDataFileLocations(); } + @Override + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException + { + return delegate.forceKeyspaceCleanup(jobs, keyspaceName, tables); + } + /** * Ensures that gossip is running on the Cassandra instance * diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java index eba3f2f8..d8f94125 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java @@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.adapters.base; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; /** * An interface that pulls methods from the Cassandra Storage Service Proxy @@ -146,4 +147,16 @@ public interface StorageJmxOperations * @return String array of all locations */ String[] getAllDataFileLocations(); + + /** + * Force cleanup the data of the tables in the keyspace. All partitions that out of the range are removed + * @param jobs job concurrency + * @param keyspaceName keyspace of the table to clean + * @param tables tables to clean + * @return status code. 0: success; 1: aborted; 2: unable to cancel + * @throws IOException i/o exception during cleanup + * @throws ExecutionException it does not really throw but declared in MBean + * @throws InterruptedException it does not really throw but declared in MBean + */ + int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException; } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java index ae1e6ce5..dd1d497f 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java @@ -41,6 +41,7 @@ import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse.ReplicaInfo; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse.ReplicaMetadata; import org.apache.cassandra.sidecar.common.server.JmxClient; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.utils.GossipInfoParser; diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java index a46437ca..fdef5599 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; import org.jetbrains.annotations.NotNull; diff --git a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java index 91eb8099..69915b5c 100644 --- a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java +++ b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.server.JmxClient; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.assertj.core.api.InstanceOfAssertFactories; diff --git a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java index 2ebcbeb3..3e965949 100644 --- a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java +++ b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java @@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; + import static org.assertj.core.api.Assertions.assertThat; /** diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java index 1c00932b..d2935297 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java @@ -18,9 +18,11 @@ package org.apache.cassandra.sidecar.common.server; +import java.io.IOException; import java.net.UnknownHostException; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import org.apache.cassandra.sidecar.common.response.RingResponse; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; @@ -76,4 +78,27 @@ public interface StorageOperations * @return the list of all data file locations for the Cassandra instance */ List<String> dataFileLocations(); + + /** + * Clean up the data of the specified and remove the keys no longer belongs to the Cassandra node. + * + * @param keyspace keyspace of the table to clean + * @param table table to clean + * @param concurrency concurrency of the cleanup (compaction) job. + * Note that it cannot exceed the configured `concurrent_compactors` in Cassandra + * @throws IOException i/o exception during cleanup + * @throws ExecutionException it does not really throw but declared in MBean + * @throws InterruptedException it does not really throw but declared in MBean + */ + void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String table, int concurrency) + throws IOException, ExecutionException, InterruptedException; + + /** + * Similar to {@link #outOfRangeDataCleanup(String, String, int)}, but use 1 for concurrency + */ + default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String table) + throws IOException, ExecutionException, InterruptedException + { + outOfRangeDataCleanup(keyspace, table, 1); + } } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java similarity index 96% rename from adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java rename to server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java index 2f4687b3..39595930 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java @@ -6,9 +6,7 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * * http://www.apache.org/licenses/LICENSE-2.0 - * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +14,7 @@ * limitations under the License. */ -package org.apache.cassandra.sidecar.adapters.base; +package org.apache.cassandra.sidecar.common.server.cluster.locator; import java.math.BigInteger; diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java new file mode 100644 index 00000000..f8464acf --- /dev/null +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.server.cluster.locator; + +import java.math.BigInteger; +import java.util.Comparator; +import java.util.Objects; + +import org.jetbrains.annotations.NotNull; + +/** + * Token, i.e. hashed partition key, in Cassandra + */ +public final class Token implements Comparable<Token> +{ + private static final Comparator<Token> TOKEN_COMPARATOR = Comparator.comparing(Token::toBigInteger); + + private final BigInteger value; + + /** + * Create token from {@code BigInteger} value + * @param value token value + * @return token + */ + public static Token from(BigInteger value) + { + return new Token(value); + } + + /** + * Create token from its string literal + * @param valueStr token value + * @throws NumberFormatException {@code valueStr} is not a valid representation + * of a BigInteger. + * @return token + */ + public static Token from(String valueStr) + { + return new Token(new BigInteger(valueStr)); + } + + /** + * Create token from long value + * @param value token value + * @return token + */ + public static Token from(long value) + { + return new Token(BigInteger.valueOf(value)); + } + + private Token(BigInteger value) + { + this.value = value; + } + + /** + * @return the {@code BigInteger} value + */ + public BigInteger toBigInteger() + { + return value; + } + + /** + * @return a new instance of token whose value is {@code (this + 1)} + */ + public Token increment() + { + return new Token(value.add(BigInteger.ONE)); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (o == null || getClass() != o.getClass()) + { + return false; + } + Token token = (Token) o; + return Objects.equals(value, token.value); + } + + @Override + public int hashCode() + { + return value.hashCode(); + } + + @Override + public int compareTo(@NotNull Token other) + { + return Objects.compare(this, Objects.requireNonNull(other), TOKEN_COMPARATOR); + } + + @Override + public String toString() + { + return "Token(" + value + ')'; + } +} diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java new file mode 100644 index 00000000..5abc1048 --- /dev/null +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.server.cluster.locator; + +import java.math.BigInteger; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.collect.Range; + +import com.datastax.driver.core.DataType; +import org.jetbrains.annotations.Nullable; + +/** + * Range: (start, end] - start exclusive and end inclusive + */ +public class TokenRange +{ + private final Range<Token> range; + private volatile Token firstToken = null; + + /** + * Unwrap the java driver's token range if necessary and convert the unwrapped ranges list. + * Only the token ranges from Murmur3Partitioner and RandomPartitioner are supported. + * + * @param dsTokenRange TokenRange implementation in Cassandra java driver + * @return list of token ranges. If the input token range wraps around, the size of the list is 2; + * otherwise, the list has only one range + */ + public static List<TokenRange> from(com.datastax.driver.core.TokenRange dsTokenRange) + { + DataType tokenDataType = dsTokenRange.getStart().getType(); + if (tokenDataType == DataType.varint()) // BigInteger - RandomPartitioner + { + return dsTokenRange.unwrap() + .stream() + .map(range -> { + BigInteger start = (BigInteger) range.getStart().getValue(); + BigInteger end = (BigInteger) range.getEnd().getValue(); + if (end.compareTo(Partitioner.Random.minToken) == 0) + { + end = Partitioner.Random.maxToken; + } + return new TokenRange(start, end); + }) + .collect(Collectors.toList()); + } + else if (tokenDataType == DataType.bigint()) // Long - Murmur3Partitioner + { + return dsTokenRange.unwrap() + .stream() + .map(range -> { + BigInteger start = BigInteger.valueOf((Long) range.getStart().getValue()); + BigInteger end = BigInteger.valueOf((Long) range.getEnd().getValue()); + if (end.compareTo(Partitioner.Murmur3.minToken) == 0) + { + end = Partitioner.Murmur3.maxToken; + } + return new TokenRange(start, end); + }) + .collect(Collectors.toList()); + } + else + { + throw new IllegalArgumentException( + "Unsupported token type: " + tokenDataType + + ". Only tokens of Murmur3Partitioner and RandomPartitioner are supported."); + } + } + + public TokenRange(long start, long end) + { + this(BigInteger.valueOf(start), BigInteger.valueOf(end)); + } + + public TokenRange(BigInteger start, BigInteger end) + { + this(Token.from(start), Token.from(end)); + } + + public TokenRange(Token start, Token end) + { + this.range = Range.openClosed(start, end); + } + + /** + * @return start token. It is not enclosed in the range. + */ + public Token start() + { + return range.lowerEndpoint(); + } + + /** + * @return end token. It is the last token enclosed in the range. + */ + public Token end() + { + return range.upperEndpoint(); + } + + /** + * @return the first token enclosed in the range + */ + @Nullable + public Token firstToken() + { + if (range.isEmpty()) + { + return null; + } + + // it is ok to race + if (firstToken == null) + { + firstToken = range.lowerEndpoint().increment(); + } + return firstToken; + } + + /** + * Test if this range encloses the other range. + * It simply delegates to {@link Range#encloses(Range)} + */ + public boolean encloses(TokenRange other) + { + return this.range.encloses(other.range); + } + + /** + * Two ranges are overlapping when their intersection is non-empty. For example, + * + * Ranges (0, 3] and (1, 4] are overlapping. The intersection is (1, 3] + * Ranges (0, 3] and (5, 7] are not overlapping, as there is no intersection + * Ranges (0, 3] and (3, 5] are not overlapping, as the intersection (3, 3] is empty + * + * Note that the semantics is different from {@link Range#isConnected(Range)} + * + * @return true if this range overlaps with the other range; otherwise, false + */ + public boolean overlaps(TokenRange other) + { + return this.range.lowerEndpoint().compareTo(other.range.upperEndpoint()) < 0 + && other.range.lowerEndpoint().compareTo(this.range.upperEndpoint()) < 0; + } + + public TokenRange intersection(TokenRange overlapping) + { + Range<Token> overlap = this.range.intersection(overlapping.range); + return new TokenRange(overlap.lowerEndpoint(), overlap.upperEndpoint()); + } + + /** + * Determine whether all tokens in this range are larger than the ones in the other token range + * @param other token range + * @return true if the start token of this range is larger or equals to the other range's end token; otherwise, false + */ + public boolean largerThan(TokenRange other) + { + return this.start().compareTo(other.end()) >= 0; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (o == null || getClass() != o.getClass()) + { + return false; + } + + TokenRange that = (TokenRange) o; + return Objects.equals(range, that.range); + } + + @Override + public int hashCode() + { + return range.hashCode(); + } +} diff --git a/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java new file mode 100644 index 00000000..103ff934 --- /dev/null +++ b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.server.cluster.locator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TokenRangeTest +{ + @Test + void testEquals() + { + TokenRange r1 = new TokenRange(1, 100); + TokenRange r2 = new TokenRange(1, 100); + TokenRange r3 = new TokenRange(-10, 10); + assertThat(r1).isEqualTo(r2); + assertThat(r3).isNotEqualTo(r1) + .isNotEqualTo(r2); + } + + @Test + void testFirstToken() + { + TokenRange range = new TokenRange(1, 100); + assertThat(range.firstToken()).isEqualTo(Token.from(2)); + // test the first token refer is the same + assertThat(range.firstToken()).isSameAs(range.firstToken()); + + TokenRange emptyRange = new TokenRange(1, 1); + assertThat(emptyRange.firstToken()).isNull(); + } + + @Test + void testCreateRangeWithInvalidParams() + { + assertThatThrownBy(() -> new TokenRange(1, -1)) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid range: (Token(1)‥Token(-1)]"); + } + + @Test + void testCreateFromJavaDriverTokenRange() + { + com.datastax.driver.core.TokenRange ordinaryRange = mockRange(1L, 100L); + when(ordinaryRange.isWrappedAround()).thenReturn(false); + when(ordinaryRange.unwrap()).thenCallRealMethod(); + List<TokenRange> ranges = TokenRange.from(ordinaryRange); + assertThat(ranges).hasSize(1) + .isEqualTo(Collections.singletonList(new TokenRange(1, 100))); + } + + @Test + void testCreateFromWraparoundJavaDriverTokenRange() + { + com.datastax.driver.core.TokenRange range = mockRange(10L, -10L); + List<com.datastax.driver.core.TokenRange> unwrapped = Arrays.asList(mockRange(10L, Long.MAX_VALUE), + mockRange(Long.MIN_VALUE, -10L)); + when(range.unwrap()).thenReturn(unwrapped); + List<TokenRange> ranges = TokenRange.from(range); + assertThat(ranges).hasSize(2) + .isEqualTo(Arrays.asList(new TokenRange(10, Long.MAX_VALUE), + new TokenRange(Long.MIN_VALUE, -10L))); + } + + @Test + void testCreateFromWraparoundJavaDriverTokenRangeEndingInMinToken() + { + com.datastax.driver.core.TokenRange range = mockRange(10L, Long.MIN_VALUE); + // Java driver's token range considers the range is no a wraparound, if the end is the minimum token + when(range.unwrap()).thenReturn(Collections.singletonList(range)); + List<TokenRange> ranges = TokenRange.from(range); + assertThat(ranges).hasSize(1) + .isEqualTo(Collections.singletonList(new TokenRange(10L, Long.MAX_VALUE))); + } + + @Test + void testRangeEnclose() + { + TokenRange r1 = new TokenRange(3, 5); + TokenRange r2 = new TokenRange(1, 10); + TokenRange r3 = new TokenRange(10, 11); + TokenRange r4 = new TokenRange(4, 11); + assertThat(r2.encloses(r1)).isTrue(); + assertThat(r4.encloses(r3)).isTrue(); + assertThat(r1.encloses(r2)).isFalse(); + assertThat(r3.encloses(r1)).isFalse(); + assertThat(r2.encloses(r3)).isFalse(); + assertThat(r1.encloses(r4)).isFalse(); + assertThat(r4.encloses(r1)).isFalse(); + } + + @Test + void testOverlaps() + { + TokenRange r1 = new TokenRange(3, 5); + TokenRange r2 = new TokenRange(1, 10); + TokenRange r3 = new TokenRange(10, 11); + TokenRange r4 = new TokenRange(4, 11); + assertThat(r1.overlaps(r2)).isTrue(); + assertThat(r2.overlaps(r1)).isTrue(); + assertThat(r3.overlaps(r4)).isTrue(); + assertThat(r4.overlaps(r3)).isTrue(); + assertThat(r2.overlaps(r4)).isTrue(); + assertThat(r4.overlaps(r2)).isTrue(); + assertThat(r2.overlaps(r3)).isFalse(); + assertThat(r3.overlaps(r2)).isFalse(); + } + + @Test + void testLargerThan() + { + TokenRange r1 = new TokenRange(3, 5); + TokenRange r2 = new TokenRange(1, 10); + TokenRange r3 = new TokenRange(10, 11); + assertThat(r1.largerThan(r2)).isFalse(); + assertThat(r2.largerThan(r1)).isFalse(); + assertThat(r3.largerThan(r1)).isTrue(); + assertThat(r1.largerThan(r3)).isFalse(); + assertThat(r3.largerThan(r2)).isTrue(); + assertThat(r2.largerThan(r3)).isFalse(); + } + + @Test + void testIntersection() + { + TokenRange r1 = new TokenRange(3, 5); + TokenRange r2 = new TokenRange(1, 10); + TokenRange r3 = new TokenRange(4, 6); + TokenRange r4 = new TokenRange(10, 11); + assertThat(r1.intersection(r2)).isEqualTo(r1); + assertThat(r2.intersection(r1)).isEqualTo(r1); + assertThat(r1.intersection(r3)).isEqualTo(new TokenRange(4, 5)); + assertThat(r3.intersection(r1)).isEqualTo(new TokenRange(4, 5)); + assertThat(r2.intersection(r4)).isEqualTo(new TokenRange(10, 10)); // empty range + assertThat(r2.intersection(r4)).isNotEqualTo(new TokenRange(5, 5)); // but not any empty range + } + + private com.datastax.driver.core.TokenRange mockRange(long start, long end) + { + com.datastax.driver.core.TokenRange range = mock(com.datastax.driver.core.TokenRange.class); + com.datastax.driver.core.Token startToken = mockToken(start); + when(range.getStart()).thenReturn(startToken); + com.datastax.driver.core.Token endToken = mockToken(end); + when(range.getEnd()).thenReturn(endToken); + return range; + } + + private com.datastax.driver.core.Token mockToken(long value) + { + com.datastax.driver.core.Token token = mock(com.datastax.driver.core.Token.class); + when(token.getType()).thenReturn(com.datastax.driver.core.DataType.bigint()); + when(token.getValue()).thenReturn(value); + return token; + } +} diff --git a/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java new file mode 100644 index 00000000..a11010fa --- /dev/null +++ b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.server.cluster.locator; + +import java.math.BigInteger; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class TokenTest +{ + @Test + void testCreateToken() + { + Token t1 = Token.from(1); + Token t2 = Token.from(BigInteger.ONE); + Token t3 = Token.from("1"); + Token t4 = Token.from(1L); + assertThat(t1).isEqualTo(t2).isEqualTo(t3).isEqualTo(t4); + } + + @Test + void testIncrement() + { + Token t1 = Token.from(1); + Token t2 = t1.increment(); + assertThat(t2).isEqualTo(Token.from(2)); + } + + @Test + void testCompare() + { + Token t1 = Token.from(1); + Token t2 = Token.from(2); + Token t3 = Token.from(1); + assertThat(t1).isLessThan(t2) + .isEqualByComparingTo(t3); + assertThat(t2).isGreaterThan(t1); + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java index 57b1bcba..8f8bc8d8 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java @@ -37,6 +37,7 @@ import org.apache.cassandra.sidecar.common.server.data.RestoreSliceStatus; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; +import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.restore.RestoreJobUtil; import org.apache.cassandra.sidecar.restore.RestoreSliceHandler; @@ -207,6 +208,14 @@ public class RestoreSlice failAtInstance(owner().id()); } + /** + * Request to clean up out of range data. It is requested when detecting the slice contains out of range data + */ + public void requestOutOfRangeDataCleanup() + { + tracker.requestOutOfRangeDataCleanup(); + } + public void setExistsOnS3() { this.existsOnS3 = true; @@ -234,6 +243,7 @@ public class RestoreSlice double requiredUsableSpacePercentage, RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobUtil restoreJobUtil, + LocalTokenRangesProvider localTokenRangesProvider, SidecarMetrics metrics) { if (isCancelled) @@ -248,6 +258,7 @@ public class RestoreSlice requiredUsableSpacePercentage, sliceDatabaseAccessor, restoreJobUtil, + localTokenRangesProvider, metrics); } catch (IllegalStateException illegalState) diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java index d0173bd3..1e23e2b0 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java @@ -18,7 +18,6 @@ package org.apache.cassandra.sidecar.db; -import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -29,6 +28,7 @@ import com.datastax.driver.core.Row; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.db.schema.RestoreSlicesSchema; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; @@ -88,16 +88,15 @@ public class RestoreSliceDatabaseAccessor extends DatabaseAccessor return slice; } - public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, short bucketId, - BigInteger startToken, BigInteger endToken) + public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, short bucketId, TokenRange range) { sidecarSchema.ensureInitialized(); BoundStatement statement = restoreSlicesSchema.findAllByTokenRange() .bind(jobId, bucketId, - startToken, - endToken); + range.start().toBigInteger(), + range.end().toBigInteger()); ResultSet result = execute(statement); List<RestoreSlice> slices = new ArrayList<>(); for (Row row : result) diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java index 654544ae..196ba7b3 100644 --- a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java +++ b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java @@ -47,6 +47,7 @@ import com.google.inject.Singleton; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesConfig; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.jetbrains.annotations.NotNull; @@ -82,7 +83,6 @@ public class CachedLocalTokenRanges implements LocalTokenRangesProvider } @Override - @Nullable public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace) { List<InstanceMetadata> localInstances = instancesConfig.instances(); @@ -153,7 +153,6 @@ public class CachedLocalTokenRanges implements LocalTokenRangesProvider /** * Reload the locally cached token ranges when needed */ - @Nullable private synchronized Map<Integer, Set<TokenRange>> getCacheOrReload(Metadata metadata, String keyspace, Set<Integer> localInstanceIds, @@ -167,7 +166,7 @@ public class CachedLocalTokenRanges implements LocalTokenRangesProvider && localTokenRangesCache.containsKey(keyspace) && isClusterTheSame) { - return localTokenRangesCache.get(keyspace); + return localTokenRangesCache.getOrDefault(keyspace, Collections.emptyMap()); } // otherwise, reload the token ranges @@ -226,7 +225,7 @@ public class CachedLocalTokenRanges implements LocalTokenRangesProvider { LOGGER.warn("Unable to determine local instances from client meta-data!"); } - return localTokenRangesCache.get(keyspace); + return localTokenRangesCache.getOrDefault(keyspace, Collections.emptyMap()); } private static class IpAddressAndPort diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java index cef04f6e..e4920131 100644 --- a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java +++ b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java @@ -20,7 +20,8 @@ package org.apache.cassandra.sidecar.locator; import java.util.Map; import java.util.Set; -import javax.annotation.Nullable; + +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; /** * Provides the token ranges of the local Cassandra instance(s) @@ -30,12 +31,12 @@ public interface LocalTokenRangesProvider /** * Calculate the token ranges owned and replicated to the local Cassandra instance(s). * When Sidecar is paired with multiple Cassandra instance, the ranges of each Cassandra instance is captured - * in the form of map, where the key is the instance id and the value is the ranges of the instance. When Sidecar - * is paired with a single Cassandra instance, the result map has a single entry. + * in the form of map, where the key is the instance id and the value is the ranges of the Cassandra instance. + * When Cassandra is not running with VNode, the set of ranges has a single value. + * When Sidecar is paired with a single Cassandra instance, the result map has a single entry. * * @param keyspace keyspace to determine replication - * @return token ranges of the local Cassandra instances + * @return token ranges of the local Cassandra instances or an empty map of nothing is found */ - @Nullable Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace); } diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java b/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java deleted file mode 100644 index c0564db4..00000000 --- a/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.sidecar.locator; - -import java.math.BigInteger; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -import com.datastax.driver.core.DataType; - -/** - * Range: (start, end] - start exclusive and end inclusive - */ -public class TokenRange -{ - public final BigInteger start; - public final BigInteger end; - - /** - * Unwrap the java driver's token range if necessary and convert the unwrapped ranges list. - * Only the token ranges from Murmur3Partitioner and RandomPartitioner are supported. - * - * @param dsTokenRange TokenRange implementation in Cassandra java driver - * @return list of token ranges. If the input token range wraps around, the size of the list is 2; - * otherwise, the list has only one range - */ - public static List<TokenRange> from(com.datastax.driver.core.TokenRange dsTokenRange) - { - DataType tokenDataType = dsTokenRange.getStart().getType(); - if (tokenDataType == DataType.varint()) // BigInteger - RandomPartitioner - { - return dsTokenRange.unwrap() - .stream() - .map(range -> new TokenRange((BigInteger) range.getStart().getValue(), - (BigInteger) range.getEnd().getValue())) - .collect(Collectors.toList()); - } - else if (tokenDataType == DataType.bigint()) // Long - Murmur3Partitioner - { - return dsTokenRange.unwrap() - .stream() - .map(range -> new TokenRange((Long) range.getStart().getValue(), - (Long) range.getEnd().getValue())) - .collect(Collectors.toList()); - } - else - { - throw new IllegalArgumentException( - "Unsupported token type: " + tokenDataType + - ". Only tokens of Murmur3Partitioner and RandomPartitioner are supported."); - } - } - - public TokenRange(long start, long end) - { - this(BigInteger.valueOf(start), BigInteger.valueOf(end)); - } - - public TokenRange(BigInteger start, BigInteger end) - { - this.start = start; - this.end = end; - } - - public BigInteger start() - { - return this.start; - } - - public BigInteger end() - { - return this.end; - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - - if (o == null || getClass() != o.getClass()) - { - return false; - } - - TokenRange that = (TokenRange) o; - return Objects.equals(start, that.start) && Objects.equals(end, that.end); - } - - @Override - public int hashCode() - { - return Objects.hash(start, end); - } -} diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java index c5d007a8..46218ad5 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java @@ -37,6 +37,7 @@ import com.google.inject.Singleton; import io.vertx.core.Promise; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.config.RestoreJobConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.db.RestoreJob; @@ -46,7 +47,6 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges; import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider; -import org.apache.cassandra.sidecar.locator.TokenRange; import org.apache.cassandra.sidecar.metrics.RestoreMetrics; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.tasks.PeriodicTask; @@ -264,7 +264,7 @@ public class RestoreJobDiscoverer implements PeriodicTask { short bucketId = 0; // TODO: update the implementation to pick proper bucketId restoreSliceDatabaseAccessor - .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, range.start, range.end) + .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, range) .forEach(slice -> { // set the owner instance, which is not read from database slice = slice.unbuild().ownerInstance(instance).build(); diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java index cb68a8a1..b47f9f05 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java @@ -24,18 +24,17 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.vertx.core.CompositeFuture; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import io.vertx.core.Future; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; @@ -54,8 +53,10 @@ import org.jetbrains.annotations.VisibleForTesting; public class RestoreJobManager { private static final Logger LOGGER = LoggerFactory.getLogger(RestoreJobManager.class); + private static final Object PRESENT = new Object(); private final Map<UUID, RestoreSliceTracker> jobs = new ConcurrentHashMap<>(); + private final Cache<UUID, Object> deletedJobs; private final RestoreProcessor processor; private final ExecutorPools executorPools; private final InstanceMetadata instanceMetadata; @@ -80,7 +81,8 @@ public class RestoreJobManager this.instanceMetadata = instanceMetadata; this.executorPools = executorPools; this.processor = restoreProcessor; - // delete obsolete on start up. Once instance is started, the jobDiscoverer will find the jobs to cleanup + this.deletedJobs = Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build(); + // delete obsolete on start up. Once instance is started, the jobDiscoverer will find the jobs to clean up if (deleteOnStart) { deleteObsoleteDataAsync(); @@ -99,7 +101,7 @@ public class RestoreJobManager throws RestoreJobFatalException { RestoreSliceTracker tracker = jobs.computeIfAbsent(slice.jobId(), - id -> new RestoreSliceTracker(restoreJob, processor)); + id -> new RestoreSliceTracker(restoreJob, processor, instanceMetadata)); return tracker.trySubmit(slice); } @@ -107,17 +109,17 @@ public class RestoreJobManager * Update the restore job reference in tracker, in order for pending restore slices to read the latest * restore job, especially the credentials to download from cloud storage. * - * @param job restore job to update + * @param restoreJob restore job to update */ - void updateRestoreJob(RestoreJob job) + void updateRestoreJob(RestoreJob restoreJob) { - RestoreSliceTracker tracker = jobs.computeIfAbsent(job.jobId, - id -> new RestoreSliceTracker(job, processor)); - tracker.updateRestoreJob(job); + RestoreSliceTracker tracker = jobs.computeIfAbsent(restoreJob.jobId, + id -> new RestoreSliceTracker(restoreJob, processor, instanceMetadata)); + tracker.updateRestoreJob(restoreJob); } /** - * Remove the tracker of the job when it is completed and delete its data on disk. The method internal. + * Remove the tracker of the job when it is completed and delete its data on disk. The method runs async and it for internal use only. * It should only be called by the background task, when it discovers the job is * in the final {@link org.apache.cassandra.sidecar.common.data.RestoreJobStatus}, i.e. SUCCEEDED or FAILED. * @@ -125,46 +127,52 @@ public class RestoreJobManager */ void removeJobInternal(UUID jobId) { - RestoreSliceTracker tracker = jobs.remove(jobId); - if (tracker != null) + if (deletedJobs.getIfPresent(jobId) == PRESENT) { - tracker.cleanupInternal(); + LOGGER.debug("The job is already removed. Skipping. jobId={}", jobId); + return; } - // There might be no tracker, but the job has data on disk. - deleteDataOfJobAsync(jobId); + + executorPools + .internal() + .runBlocking(() -> { + RestoreSliceTracker tracker = jobs.remove(jobId); + if (tracker != null) + { + tracker.cleanupInternal(); + } + }) + .recover(cause -> { + // There might be no tracker, but the job has data on disk. + LOGGER.warn("Failed to clean up restore job. Recover and proceed to delete the on-disk files. jobId={}", jobId, cause); + return Future.succeededFuture(); + }) + .compose(v -> deleteDataOfJobAsync(jobId)) + .onSuccess(v -> deletedJobs.put(jobId, PRESENT)); } /** * Find obsolete job data on disk and delete them * The obsoleteness is determined by {@link RestoreJobConfiguration#jobDiscoveryRecencyDays} */ - Future<Void> deleteObsoleteDataAsync() + void deleteObsoleteDataAsync() { - return findObsoleteJobDataDirs() - .compose(pathStream -> { - try (Stream<Path> stream = pathStream) - { - // use 'join' to complete the other deletes, when there is an error - List<Future> deletes = stream.map(this::deleteDataAsync) - .collect(Collectors.toList()); - return CompositeFuture.join(deletes) - .compose(compositeFuture -> { - // None of them should fail. Having the branch here for logic completeness - if (compositeFuture.failed()) - { - LOGGER.warn("Unexpected error while deleting files.", - compositeFuture.cause()); - } - return Future.<Void>succeededFuture(); - }); - } - }) - .recover(any -> Future.succeededFuture()); + findObsoleteJobDataDirs() + .compose(pathStream -> executorPools + .internal() + .runBlocking(() -> { + try (Stream<Path> stream = pathStream) + { + stream.forEach(this::deleteRecursively); + } + })) + .onFailure(cause -> LOGGER.warn("Unexpected error while deleting files.", cause)); } /** * Find the restore job directories that are older than {@link RestoreJobConfiguration#jobDiscoveryRecencyDays} - * @return a future of stream of path that should be closed on success. When failed to list, no stream is created. + * Note that the returned Stream should be closed by the caller. + * @return a future of stream of path. When failed to list, return a failed failure. */ Future<Stream<Path>> findObsoleteJobDataDirs() { @@ -172,23 +180,9 @@ public class RestoreJobManager if (!Files.exists(rootDir)) return Future.succeededFuture(Stream.empty()); - return executorPools.internal().executeBlocking(promise -> { - try - { - Stream<Path> obsoleteDirs = Files.walk(rootDir, 1) - .filter(this::isObsoleteRestoreJobDir); - promise.complete(obsoleteDirs); - } - catch (IOException ioe) - { - LOGGER.warn("Error on listing restore job data directories.", ioe); - } - finally - { - // Ensure the promise is complete. It is a no-op, if the promise is already completed. - promise.tryComplete(Stream.empty()); - } - }); + return executorPools.internal() + .executeBlocking(() -> Files.walk(rootDir, 1) + .filter(this::isObsoleteRestoreJobDir)); } // Deletes quietly w/o returning failed futures @@ -204,7 +198,7 @@ public class RestoreJobManager { rootDirs .filter(path -> Files.isDirectory(path) && path.startsWith(prefixedJobId)) - .forEach(this::deleteDataAsync); + .forEach(this::deleteRecursively); } catch (IOException ioe) // thrown from Files.walk. { @@ -213,21 +207,19 @@ public class RestoreJobManager }); } - // Deletes quietly w/o returning failed futures - private Future<Void> deleteDataAsync(Path root) + // Delete files from the root recursively and quietly w/o throwing any exception + private void deleteRecursively(Path root) { - return executorPools.internal().runBlocking(() -> { - try (Stream<Path> pathStream = Files.walk(root)) - { - pathStream - .sorted(Comparator.reverseOrder()) - .forEach(path -> ThrowableUtils.propagate(() -> Files.delete(path))); - } - catch (Exception exception) - { - LOGGER.warn("Error on deleting data. Path={}", root, exception); - } - }); + try (Stream<Path> pathStream = Files.walk(root)) + { + pathStream + .sorted(Comparator.reverseOrder()) + .forEach(path -> ThrowableUtils.propagate(() -> Files.delete(path))); + } + catch (Exception exception) + { + LOGGER.warn("Error on deleting data. Path={}", root, exception); + } } // returns true only when all conditions are met diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java index f765b9c6..36a5d770 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java @@ -43,6 +43,8 @@ import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; import org.apache.cassandra.sidecar.exceptions.RestoreJobException; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; +import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges; +import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.tasks.PeriodicTask; import org.apache.cassandra.sidecar.utils.SSTableImporter; @@ -66,6 +68,7 @@ public class RestoreProcessor implements PeriodicTask private final RestoreJobUtil restoreJobUtil; private final Set<RestoreSliceHandler> activeTasks = ConcurrentHashMap.newKeySet(); private final long longRunningHandlerThresholdInSeconds; + private final LocalTokenRangesProvider localTokenRangesProvider; private final SidecarMetrics metrics; private volatile boolean isClosed = false; // OK to run close twice, so relax the control to volatile @@ -78,6 +81,7 @@ public class RestoreProcessor implements PeriodicTask SSTableImporter importer, RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobUtil restoreJobUtil, + CachedLocalTokenRanges localTokenRangesProvider, SidecarMetrics metrics) { this.pool = executorPools.internal(); @@ -92,6 +96,7 @@ public class RestoreProcessor implements PeriodicTask this.importer = importer; this.sliceDatabaseAccessor = sliceDatabaseAccessor; this.restoreJobUtil = restoreJobUtil; + this.localTokenRangesProvider = localTokenRangesProvider; this.metrics = metrics; } @@ -144,6 +149,7 @@ public class RestoreProcessor implements PeriodicTask requiredUsableSpacePercentage, sliceDatabaseAccessor, restoreJobUtil, + localTokenRangesProvider, metrics); activeTasks.add(task); pool.executeBlocking(task, false) // unordered; run in parallel diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java index cddd950d..697ad070 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java @@ -19,8 +19,13 @@ package org.apache.cassandra.sidecar.restore; import java.io.File; +import java.io.IOException; +import java.math.BigInteger; import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; @@ -32,6 +37,7 @@ import io.vertx.core.Promise; import io.vertx.ext.web.handler.HttpException; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.common.data.SSTableImportOptions; +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.db.RestoreJob; @@ -41,6 +47,7 @@ import org.apache.cassandra.sidecar.exceptions.RestoreJobException; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; import org.apache.cassandra.sidecar.exceptions.ThrowableUtils; +import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider; import org.apache.cassandra.sidecar.metrics.RestoreMetrics; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.metrics.StopWatch; @@ -72,6 +79,7 @@ public class RestoreSliceTask implements RestoreSliceHandler private final double requiredUsableSpacePercentage; private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor; private final RestoreJobUtil restoreJobUtil; + private final LocalTokenRangesProvider localTokenRangesProvider; private final RestoreMetrics metrics; private final InstanceMetrics instanceMetrics; private long taskStartTimeNanos = -1; @@ -83,6 +91,7 @@ public class RestoreSliceTask implements RestoreSliceHandler double requiredUsableSpacePercentage, RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobUtil restoreJobUtil, + LocalTokenRangesProvider localTokenRangesProvider, SidecarMetrics metrics) { Preconditions.checkArgument(!slice.job().isManagedBySidecar() @@ -95,6 +104,7 @@ public class RestoreSliceTask implements RestoreSliceHandler this.requiredUsableSpacePercentage = requiredUsableSpacePercentage; this.sliceDatabaseAccessor = sliceDatabaseAccessor; this.restoreJobUtil = restoreJobUtil; + this.localTokenRangesProvider = localTokenRangesProvider; this.metrics = metrics.server().restore(); this.instanceMetrics = metrics.instance(slice.owner().id()); } @@ -104,6 +114,19 @@ public class RestoreSliceTask implements RestoreSliceHandler return new Failed(cause, slice); } + @Override + public long elapsedInNanos() + { + return taskStartTimeNanos == -1 ? -1 : + currentTimeInNanos() - taskStartTimeNanos; + } + + @Override + public RestoreSlice slice() + { + return slice; + } + @Override public void handle(Promise<RestoreSlice> event) { @@ -137,10 +160,10 @@ public class RestoreSliceTask implements RestoreSliceHandler // 1. check object existence and validate eTag / checksum return checkObjectExistence(event) .compose(headObject -> downloadSlice(event)) - .<Void>compose(file -> { + .compose(file -> { slice.completeStagePhase(); sliceDatabaseAccessor.updateStatus(slice); - return Future.succeededFuture(); + return Future.<Void>succeededFuture(); }) // completed staging. A new task is produced when it comes to import .onSuccess(_v -> event.tryComplete(slice)) @@ -326,10 +349,11 @@ public class RestoreSliceTask implements RestoreSliceHandler private Future<File> unzip(File zipFile) { - Future<File> future = executorPool.executeBlocking(promise -> { - if (failOnCancelled(promise)) - return; + if (slice.isCancelled()) + return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", + slice, null)); + Future<File> future = executorPool.executeBlocking(() -> { // targetPathInStaging points to the directory named after uploadId // SSTableImporter expects the file system structure to be uploadId/keyspace/table/sstables File targetDir = slice.stageDirectory() @@ -345,14 +369,13 @@ public class RestoreSliceTask implements RestoreSliceHandler { LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task? " + "jobId={} sliceKey={}", slice.jobId(), slice.key()); - promise.complete(targetDir); + // return early + return targetDir; } else { - promise.tryFail(new RestoreJobException("Object not found from disk. File: " + zipFile)); + throw new RestoreJobException("Object not found from disk. File: " + zipFile); } - // return early - return; } try @@ -362,51 +385,49 @@ public class RestoreSliceTask implements RestoreSliceHandler // The validation step later expects only the files registered in the manifest. RestoreJobUtil.cleanDirectory(targetDir.toPath()); RestoreJobUtil.unzip(zipFile, targetDir); - // Notify the next step that unzip is complete - promise.complete(targetDir); // Then, delete the downloaded zip file if (!zipFile.delete()) { LOGGER.warn("File deletion attempt failed. jobId={} sliceKey={} file={}", slice.jobId(), slice.key(), zipFile.getAbsolutePath()); } + // Notify the next step that unzip is complete + return targetDir; } catch (Exception cause) { - promise.tryFail(RestoreJobExceptions.propagate("Failed to unzip. File: " + zipFile, cause)); + throw RestoreJobExceptions.propagate("Failed to unzip. File: " + zipFile, cause); } }, false); // unordered return StopWatch.measureTimeTaken(future, d -> instanceMetrics.restore().sliceUnzipTime.metric.update(d, TimeUnit.NANOSECONDS)); } - // Validate integrity of the files from the zip. The failures from any step is fatal and not retryable. + // Validate integrity of the files from the zip. If the SSTables that are fully out of the owning range of the node is removed private Future<File> validateFiles(File directory) { - Future<File> future = executorPool.executeBlocking(promise -> { - if (failOnCancelled(promise)) - return; + if (slice.isCancelled()) + return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", + slice, null)); - Map<String, String> checksums; - try - { - File manifestFile = new File(directory, RestoreSliceManifest.MANIFEST_FILE_NAME); - RestoreSliceManifest manifest = RestoreSliceManifest.read(manifestFile); - checksums = manifest.mergeAllChecksums(); - } - catch (RestoreJobFatalException e) + Future<File> future = executorPool.executeBlocking(() -> { + File manifestFile = new File(directory, RestoreSliceManifest.MANIFEST_FILE_NAME); + RestoreSliceManifest manifest = RestoreSliceManifest.read(manifestFile); + + if (manifest.isEmpty()) { - promise.tryFail(e); - return; + throw new RestoreJobFatalException("The downloaded slice has no data. " + + "Directory: " + directory); } - if (checksums.isEmpty()) + // validate the SSTable ranges with the owning range of the node and remove the out-of-range sstables + if (slice.job().isManagedBySidecar()) { - promise.tryFail(new RestoreJobFatalException("The downloaded slice has no data. " + - "Directory: " + directory)); - return; + removeOutOfRangeSSTables(directory, manifest); } + Map<String, String> checksums = manifest.mergeAllChecksums(); + // exclude the manifest file File[] files = directory.listFiles((dir, name) -> !name.equals(RestoreSliceManifest.MANIFEST_FILE_NAME)); if (files == null || files.length != checksums.size()) @@ -414,11 +435,10 @@ public class RestoreSliceTask implements RestoreSliceHandler String msg = "Number of files does not match. Expected: " + checksums.size() + "; Actual: " + (files == null ? 0 : files.length) + "; Directory: " + directory; - promise.tryFail(new RestoreJobFatalException(msg)); - return; + throw new RestoreJobFatalException(msg); } - compareChecksums(checksums, files, promise); + compareChecksums(checksums, files); // capture the data component size of sstables for (File file : files) @@ -430,13 +450,71 @@ public class RestoreSliceTask implements RestoreSliceHandler } // all files match with the provided checksums - promise.tryComplete(directory); + return directory; }, false); // unordered return StopWatch.measureTimeTaken(future, d -> instanceMetrics.restore().sliceValidationTime.metric.update(d, TimeUnit.NANOSECONDS)); } - private void compareChecksums(Map<String, String> expectedChecksums, File[] files, Promise<?> promise) + // Remove all the SSTables that does not belong this node + // The method modifies the input manifest and delete files under directory, if out of range sstables are found + private void removeOutOfRangeSSTables(File directory, RestoreSliceManifest manifest) throws RestoreJobException, IOException + { + Set<TokenRange> ranges = localTokenRangesProvider.localTokenRanges(slice.keyspace()).get(slice.owner().id()); + if (ranges == null || ranges.isEmpty()) + { + // Note: retry is allowed for the failure + throw new RestoreJobException("Unable to fetch local range, retry later"); + } + + // 1. remove the sstables that are fully out of range + // 2. detect if there is any range that partially overlaps. In that case, signal that this node is required to run nodetool cleanup on job completion + Iterator<Map.Entry<String, RestoreSliceManifest.ManifestEntry>> it = manifest.entrySet().iterator(); + while (it.hasNext()) + { + RestoreSliceManifest.ManifestEntry entry = it.next().getValue(); + // TokenRange is open-closed, hence subtracting one from the rangeStart read from manifest + TokenRange sstableRange = new TokenRange(entry.startToken().subtract(BigInteger.ONE), + entry.endToken()); + + boolean hasOverlap = false; + boolean fullyEnclosed = false; + for (TokenRange owningRange : ranges) + { + if (hasOverlap) + { + break; + } + + hasOverlap = owningRange.overlaps(sstableRange); + + if (hasOverlap) + { + fullyEnclosed = owningRange.encloses(sstableRange); + } + } + + // fully out of range + if (!hasOverlap) + { + // remove the entry from manifest + it.remove(); + // delete the files + for (String fileName : entry.componentsChecksum().keySet()) + { + Path path = directory.toPath().resolve(fileName); + Files.deleteIfExists(path); + } + } + // overlaps, but is not fully enclosed; we need to run cleanup on this node + else if (!fullyEnclosed) + { + slice.requestOutOfRangeDataCleanup(); + } + } + } + + private void compareChecksums(Map<String, String> expectedChecksums, File[] files) throws RestoreJobFatalException { for (File file : files) { @@ -444,8 +522,7 @@ public class RestoreSliceTask implements RestoreSliceHandler String expectedChecksum = expectedChecksums.get(name); if (expectedChecksum == null) { - promise.tryFail(new RestoreJobFatalException("File not found in manifest. File: " + name)); - return; + throw new RestoreJobFatalException("File not found in manifest. File: " + name); } try @@ -455,14 +532,12 @@ public class RestoreSliceTask implements RestoreSliceHandler { String msg = "Checksum does not match. Expected: " + expectedChecksum + "; actual: " + actualChecksum + "; file: " + file; - promise.tryFail(new RestoreJobFatalException(msg)); - return; + throw new RestoreJobFatalException(msg); } } - catch (Exception cause) + catch (IOException cause) { - promise.tryFail(new RestoreJobFatalException("Failed to calculate checksum. File: " + file)); - return; + throw new RestoreJobFatalException("Failed to calculate checksum. File: " + file, cause); } } } @@ -520,17 +595,18 @@ public class RestoreSliceTask implements RestoreSliceHandler httpException.getPayload(), httpException); } - @Override - public long elapsedInNanos() + // For testing only. Unsafe to call in production code. + @VisibleForTesting + void removeOutOfRangeSSTablesUnsafe(File directory, RestoreSliceManifest manifest) throws RestoreJobException, IOException { - return taskStartTimeNanos == -1 ? -1 : - currentTimeInNanos() - taskStartTimeNanos; + removeOutOfRangeSSTables(directory, manifest); } - @Override - public RestoreSlice slice() + // For testing only. Unsafe to call in production code. + @VisibleForTesting + void compareChecksumsUnsafe(Map<String, String> expectedChecksums, File[] files) throws RestoreJobFatalException { - return slice; + compareChecksums(expectedChecksums, files); } /** diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java index 5905980e..c3e042b9 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java @@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreSlice; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; @@ -39,14 +42,17 @@ public class RestoreSliceTracker private static final Logger LOGGER = LoggerFactory.getLogger(RestoreSliceTracker.class); private volatile RestoreJob restoreJob; + private volatile boolean cleanupOutOfRangeRequested = false; + private final InstanceMetadata instanceMetadata; private final Map<RestoreSlice, Status> slices = new ConcurrentHashMap<>(); private final RestoreProcessor processor; private final AtomicReference<RestoreJobFatalException> failureRef = new AtomicReference<>(); - public RestoreSliceTracker(RestoreJob restoreJob, RestoreProcessor restoreProcessor) + public RestoreSliceTracker(RestoreJob restoreJob, RestoreProcessor restoreProcessor, InstanceMetadata instanceMetadata) { this.restoreJob = restoreJob; this.processor = restoreProcessor; + this.instanceMetadata = instanceMetadata; } /** @@ -102,6 +108,11 @@ public class RestoreSliceTracker cleanupInternal(); } + public void requestOutOfRangeDataCleanup() + { + cleanupOutOfRangeRequested = true; + } + /** * Internal method to clean up the {@link RestoreSlice}. * It validates the slices and log warnings if they are not in a final state, @@ -109,8 +120,9 @@ public class RestoreSliceTracker */ void cleanupInternal() { + boolean succeeded = failureRef.get() == null; slices.forEach((slice, status) -> { - if (failureRef.get() == null && status != Status.COMPLETED) + if (succeeded && status != Status.COMPLETED) { LOGGER.warn("Clean up pending restore slice when the job has not failed. jobId={}, sliceId={}", restoreJob.jobId, slice.sliceId()); @@ -118,6 +130,34 @@ public class RestoreSliceTracker slice.cancel(); }); slices.clear(); + + runOnCompletion(); + } + + /** + * Run operations on restore job completion, including success and failure cases + */ + private void runOnCompletion() + { + if (cleanupOutOfRangeRequested) + { + CassandraAdapterDelegate delegate = instanceMetadata.delegate(); + StorageOperations operations = delegate == null ? null : delegate.storageOperations(); + if (operations == null) + { + LOGGER.warn("Out of range data cleanup for the restore job is requested. It failed to start the operation. jobId={}", restoreJob.jobId); + return; + } + + try + { + operations.outOfRangeDataCleanup(restoreJob.keyspaceName, restoreJob.tableName); + } + catch (Throwable cause) + { + LOGGER.warn("Clean up out of range data has failed", cause); + } + } } /** diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java index 2795b7b5..fd31c7cb 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java @@ -148,13 +148,6 @@ public class StorageClient return failedFuture; } - // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html - GetObjectRequest request = - GetObjectRequest.builder() - .overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider())) - .bucket(slice.bucket()) - .key(slice.key()) - .build(); Path objectPath = slice.stagedObjectPath(); File object = objectPath.toFile(); if (object.exists()) @@ -169,12 +162,22 @@ public class StorageClient // For now, we just skip download, assuming the scenario is rare and no maliciousness return CompletableFuture.completedFuture(object); } + if (!object.getParentFile().mkdirs()) { - LOGGER.warn("Error occurred while creating directory. jobId={} s3_object={}", + LOGGER.warn("Error occurred while creating directory. jobId={} s3Object={}", slice.jobId(), slice.stagedObjectPath()); + } - LOGGER.info("Downloading object. jobId={} s3_object={}", slice.jobId(), slice.stagedObjectPath()); + + LOGGER.info("Downloading object. jobId={} s3Object={}", slice.jobId(), slice.stagedObjectPath()); + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html + GetObjectRequest request = + GetObjectRequest.builder() + .overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider())) + .bucket(slice.bucket()) + .key(slice.key()) + .build(); return rateLimitedGetObject(slice, client, request, objectPath) .whenComplete(logCredentialOnRequestFailure(slice, credentials)) .thenApply(res -> object); diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java index 59f04e7d..5325b2ad 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.common; import java.io.IOException; import java.util.Map; +import org.apache.cassandra.distributed.UpgradeableCluster; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IUpgradeableInstance; import org.apache.cassandra.sidecar.common.server.JmxClient; @@ -29,6 +30,8 @@ import org.apache.cassandra.testing.CassandraIntegrationTest; import org.apache.cassandra.testing.CassandraTestContext; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** * Test to ensure connectivity with the JMX client @@ -37,46 +40,77 @@ public class JmxClientIntegrationTest { private static final String SS_OBJ_NAME = "org.apache.cassandra.db:type=StorageService"; + /** + * Test jmx connectivity with various operations + */ @CassandraIntegrationTest void testJmxConnectivity(CassandraTestContext context) throws IOException { try (JmxClient jmxClient = createJmxClient(context)) { - String opMode = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME) - .getOperationMode(); - assertThat(opMode).isNotNull(); - assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", "DECOMMISSIONED", "CLIENT"); - - IUpgradeableInstance instance = context.cluster().getFirstRunningInstance(); - IInstanceConfig config = instance.config(); - assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress()); - assertThat(jmxClient.port()).isEqualTo(config.jmxPort()); + testGetOperationMode(jmxClient, context.cluster()); + + testGossipInfo(jmxClient); + + testCorrectVersion(jmxClient, String.valueOf(context.version.major)); + + testTableCleanup(jmxClient, context.cluster()); } } - @CassandraIntegrationTest - void testGossipInfo(CassandraTestContext context) throws IOException + private void testGetOperationMode(JmxClient jmxClient, UpgradeableCluster cluster) { - try (JmxClient jmxClient = createJmxClient(context)) - { - FailureDetector proxy = jmxClient.proxy(FailureDetector.class, - "org.apache.cassandra.net:type=FailureDetector"); - String rawGossipInfo = proxy.getAllEndpointStates(); - assertThat(rawGossipInfo).isNotEmpty(); - Map<String, ?> gossipInfoMap = GossipInfoParser.parse(rawGossipInfo); - assertThat(gossipInfoMap).isNotEmpty(); - gossipInfoMap.forEach((key, value) -> GossipInfoParser.isGossipInfoHostHeader(key)); - } + String opMode = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME) + .getOperationMode(); + assertThat(opMode).isNotNull(); + assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", "DECOMMISSIONED", "CLIENT"); + + IUpgradeableInstance instance = cluster.getFirstRunningInstance(); + IInstanceConfig config = instance.config(); + assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress()); + assertThat(jmxClient.port()).isEqualTo(config.jmxPort()); } - @CassandraIntegrationTest - void testCorrectVersion(CassandraTestContext context) throws IOException + private void testGossipInfo(JmxClient jmxClient) { - try (JmxClient jmxClient = createJmxClient(context)) + FailureDetector proxy = jmxClient.proxy(FailureDetector.class, + "org.apache.cassandra.net:type=FailureDetector"); + String rawGossipInfo = proxy.getAllEndpointStates(); + assertThat(rawGossipInfo).isNotEmpty(); + Map<String, ?> gossipInfoMap = GossipInfoParser.parse(rawGossipInfo); + assertThat(gossipInfoMap).isNotEmpty(); + gossipInfoMap.forEach((key, value) -> GossipInfoParser.isGossipInfoHostHeader(key)); + } + + private void testCorrectVersion(JmxClient jmxClient, String majorVersion) + { + String releaseVersion = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME) + .getReleaseVersion(); + assertThat(releaseVersion).startsWith(majorVersion); + } + + // a test to ensure the jmx client can invoke the MBean method + private void testTableCleanup(JmxClient jmxClient, UpgradeableCluster cluster) + { + cluster.schemaChange("CREATE KEYSPACE jmx_client_test WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}"); + cluster.schemaChange("CREATE TABLE jmx_client_test.table_cleanup ( a int PRIMARY KEY, b int)"); + cluster.get(1).executeInternal("INSERT INTO jmx_client_test.table_cleanup (a, b) VALUES (1, 1)"); + cluster.get(1).flush("jmx_client_test"); + int status = -1; + try + { + status = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME) + .forceKeyspaceCleanup(1, "jmx_client_test", "table_cleanup"); + } + catch (Exception e) { - jmxClient.proxy(SSProxy.class, SS_OBJ_NAME) - .refreshSizeEstimates(); + fail("Unexpected exception ", e); } + assertThat(status).isZero(); + + assertThatThrownBy(() -> jmxClient.proxy(SSProxy.class, SS_OBJ_NAME) + .forceKeyspaceCleanup(1, "jmx_client_test", "table_not_exist")) + .hasMessageContaining("Unknown keyspace/cf pair"); } /** @@ -89,6 +123,8 @@ public class JmxClientIntegrationTest void refreshSizeEstimates(); String getReleaseVersion(); + + int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables); } /** @@ -99,7 +135,6 @@ public class JmxClientIntegrationTest String getAllEndpointStates(); } - private static JmxClient createJmxClient(CassandraTestContext context) { IUpgradeableInstance instance = context.cluster().getFirstRunningInstance(); diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java index d08131de..604532b5 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java @@ -40,8 +40,8 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.UpgradeableCluster; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.TokenSupplier; -import org.apache.cassandra.sidecar.adapters.base.Partitioner; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; import org.apache.cassandra.sidecar.testing.IntegrationTestBase; import org.apache.cassandra.testing.AbstractCassandraTestContext; import org.apache.cassandra.testing.CassandraIntegrationTest; diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java index aff80a53..deb84afc 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java @@ -42,8 +42,8 @@ import org.apache.cassandra.distributed.UpgradeableCluster; import org.apache.cassandra.distributed.api.IUpgradeableInstance; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.ClusterUtils; -import org.apache.cassandra.sidecar.adapters.base.Partitioner; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; import org.apache.cassandra.testing.CassandraIntegrationTest; import static org.assertj.core.api.Assertions.assertThat; diff --git a/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java b/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java index d3f065b4..6085525e 100644 --- a/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java @@ -18,8 +18,6 @@ package org.apache.cassandra.sidecar.concurrent; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -35,6 +33,7 @@ import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -44,7 +43,7 @@ import static org.mockito.Mockito.when; /** * Test {@link ExecutorPools} */ -public class ExecutorPoolsTest +class ExecutorPoolsTest { private ExecutorPools pools; private SidecarMetrics metrics; @@ -69,7 +68,7 @@ public class ExecutorPoolsTest } @Test - public void testClosingExecutorPoolShouldThrow() + void testClosingExecutorPoolShouldThrow() { assertThatThrownBy(() -> pools.service().close()) .hasMessage("Closing TaskExecutorPool is not supported!") @@ -81,9 +80,37 @@ public class ExecutorPoolsTest } @Test - public void testOrdered() + void testExecutionOrder() { - // not thread-safe + testExecutionOrder(true, true); + testExecutionOrder(false, true); + testExecutionOrder(true, false); + testExecutionOrder(false, false); + } + + @Test + void testMetricCapture() + { + TaskExecutorPool pool = pools.internal(); + int total = 100; + CountDownLatch stop = new CountDownLatch(total); + for (int i = 0; i < total; i++) + { + pool.runBlocking(() -> stop.countDown()); + } + + assertThat(Uninterruptibles.awaitUninterruptibly(stop, 10, TimeUnit.SECONDS)) + .describedAs("Test should finish in 10 seconds") + .isTrue(); + + // there could be some delay to read the metric that reflects the last task. If so, retry the assertion for at most 2 seconds + loopAssert(2, + () -> assertThat(metrics.server().resource().internalTaskTime.metric.getCount()).isEqualTo(total)); + } + + private void testExecutionOrder(boolean orderedSubmission, boolean orderedExecution) + { + // not thread-safe deliberated class IntWrapper { int i = 0; @@ -97,25 +124,33 @@ public class ExecutorPoolsTest TaskExecutorPool pool = pools.internal(); IntWrapper v = new IntWrapper(); int total = 100; + CountDownLatch ready = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(total); - Set<String> threadNames = new HashSet<>(); for (int i = 0; i < total; i++) { - // Start 100 parallel executions that each submits the ordered execution - pool.executeBlocking(promise -> { - pool.executeBlocking(p -> { + // Start 100 executions that each submits the ordered execution + pool.runBlocking(() -> { + Uninterruptibles.awaitUninterruptibly(ready); + pool.runBlocking(() -> { v.increment(); - threadNames.add(Thread.currentThread().getName()); stop.countDown(); - assertThat(metrics.server().resource().internalTaskTime.metric.getCount()).isEqualTo(200); - }, true); - }, false); + }, orderedExecution); + }, orderedSubmission); } + ready.countDown(); assertThat(Uninterruptibles.awaitUninterruptibly(stop, 10, TimeUnit.SECONDS)) .describedAs("Test should finish in 10 seconds") .isTrue(); + // Although IntWrapper is not thread safe, the serial execution (ordered) prevents any race condition. - assertThat(v.i).isEqualTo(total); + if (orderedExecution) + { + assertThat(v.i).isEqualTo(total); + } + else // if execution is unordered, the output is likely less than total due to race + { + assertThat(v.i).isLessThanOrEqualTo(total); + } } } diff --git a/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java b/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java deleted file mode 100644 index 4671f187..00000000 --- a/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.sidecar.locator; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import org.junit.jupiter.api.Test; - -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.Token; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -class TokenRangeTest -{ - @Test - void testEquals() - { - TokenRange r1 = new TokenRange(1, 100); - TokenRange r2 = new TokenRange(1, 100); - TokenRange r3 = new TokenRange(-10, 10); - assertThat(r1).isEqualTo(r2); - assertThat(r1).isEqualTo(r1); - assertThat(r1).isNotEqualTo(r3); - assertThat(r3).isNotEqualTo(r1); - } - - @Test - void testCreateFromJavaDriverTokenRange() - { - com.datastax.driver.core.TokenRange ordinaryRange = mockRange(1L, 100L); - when(ordinaryRange.isWrappedAround()).thenReturn(false); - when(ordinaryRange.unwrap()).thenCallRealMethod(); - List<TokenRange> ranges = TokenRange.from(ordinaryRange); - assertThat(ranges).hasSize(1) - .isEqualTo(Collections.singletonList(new TokenRange(1, 100))); - } - - @Test - void testCreateFromWraparoundJavaDriverTokenRange() - { - com.datastax.driver.core.TokenRange range = mockRange(10L, -10L); - List<com.datastax.driver.core.TokenRange> unwrapped = Arrays.asList(mockRange(10L, Long.MIN_VALUE), - mockRange(Long.MIN_VALUE, -10L)); - when(range.unwrap()).thenReturn(unwrapped); - List<TokenRange> ranges = TokenRange.from(range); - assertThat(ranges).hasSize(2) - .isEqualTo(Arrays.asList(new TokenRange(10, Long.MIN_VALUE), - new TokenRange(Long.MIN_VALUE, -10L))); - } - - private com.datastax.driver.core.TokenRange mockRange(long start, long end) - { - com.datastax.driver.core.TokenRange range = mock(com.datastax.driver.core.TokenRange.class); - Token startToken = mockToken(start); - when(range.getStart()).thenReturn(startToken); - Token endToken = mockToken(end); - when(range.getEnd()).thenReturn(endToken); - return range; - } - - private Token mockToken(long value) - { - Token token = mock(Token.class); - when(token.getType()).thenReturn(DataType.bigint()); - when(token.getValue()).thenReturn(value); - return token; - } -} diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java index 51ad81e8..89729a31 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java @@ -148,7 +148,8 @@ class RestoreJobManagerTest manager.removeJobInternal(slice.jobId()); // it cancels the non-completed slices - assertThat(slice.isCancelled()).isTrue(); + // removeJobInternal runs async. Wait for at most 2 seconds for the slice to be cancelled + loopAssert(2, () -> assertThat(slice.isCancelled()).isTrue()); } @Test @@ -252,6 +253,7 @@ class RestoreJobManagerTest RestoreSlice slice = RestoreSlice .builder() .jobId(job.jobId) + .sliceId("testSliceId") .bucketId((short) 0) .stageDirectory(testDir, "uploadId") .storageKey("storageKey") @@ -260,7 +262,7 @@ class RestoreJobManagerTest .replicaStatus(Collections.emptyMap()) .replicas(Collections.emptySet()) .build(); - RestoreSliceTracker tracker = new RestoreSliceTracker(job, mock(RestoreProcessor.class)); + RestoreSliceTracker tracker = new RestoreSliceTracker(job, mock(RestoreProcessor.class), owner); slice.registerTracker(tracker); return slice; } diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java index d5812dd0..e878fa22 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java @@ -215,7 +215,7 @@ class RestoreProcessorTest .jobStatus(RestoreJobStatus.CREATED) .build(); when(slice.job()).thenReturn(job); - when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any())).thenReturn( + when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any(), any())).thenReturn( new RestoreSliceHandler() { private Long startTime = timeInNanosSupplier.get(); diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java index 0c7dfc92..0d2a0c41 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java @@ -20,14 +20,22 @@ package org.apache.cassandra.sidecar.restore; import java.io.File; import java.io.IOException; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -40,6 +48,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; @@ -49,14 +58,18 @@ import org.apache.cassandra.sidecar.db.RestoreSlice; import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.exceptions.RestoreJobException; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; +import org.apache.cassandra.sidecar.exceptions.ThrowableUtils; +import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider; import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl; import org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics; import org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl; import org.apache.cassandra.sidecar.metrics.instance.InstanceRestoreMetrics; +import org.apache.cassandra.sidecar.restore.RestoreSliceManifest.ManifestEntry; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.sidecar.utils.SSTableImporter; +import org.apache.cassandra.sidecar.utils.XXHash32Provider; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; @@ -64,7 +77,9 @@ import static org.apache.cassandra.sidecar.AssertionUtils.getBlocking; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -77,6 +92,7 @@ class RestoreSliceTaskTest private TaskExecutorPool executorPool; private SidecarMetrics metrics; private TestRestoreSliceAccessor sliceDatabaseAccessor; + private LocalTokenRangesProvider localTokenRangesProvider; private RestoreJobUtil util; @BeforeEach @@ -99,7 +115,8 @@ class RestoreSliceTaskTest when(mockRegistryFactory.getOrCreate()).thenReturn(registry()); when(mockRegistryFactory.getOrCreate(1)).thenReturn(registry(1)); metrics = new SidecarMetricsImpl(mockRegistryFactory, mockInstanceMetadataFetcher); - util = mock(RestoreJobUtil.class); + localTokenRangesProvider = mock(LocalTokenRangesProvider.class); + util = new RestoreJobUtil(new XXHash32Provider()); sliceDatabaseAccessor = new TestRestoreSliceAccessor(); } @@ -340,6 +357,95 @@ class RestoreSliceTaskTest assertThat(task.elapsedInNanos()).isEqualTo(123L); } + @Test + void testRemoveOutOfRangeSSTables(@TempDir Path tempDir) throws RestoreJobException, IOException + { + // TODO: update test to use replica ranges implementation + RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED, "QUORUM"); + RestoreSliceTask task = createTask(mockSlice, job); + + // the mocked localTokenRangesProvider returns null, so retry later + RestoreSliceManifest manifest = new RestoreSliceManifest(); + assertThatThrownBy(() -> task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest)) + .isExactlyInstanceOf(RestoreJobException.class) + .hasMessageContaining("Unable to fetch local range, retry later"); + + // enclosed in the node's owning range: [1, 10] is fully enclosed in (0, 100] + // it should not remove the manifest entry, and no cleanup is needed + Map<Integer, Set<TokenRange>> localRanges = new HashMap<>(1); + Set<TokenRange> nodeRanges = new HashSet<>(); + nodeRanges.add(new TokenRange(0, 100)); // not using vnode, so a single range + localRanges.put(1, nodeRanges); // instance id is 1. See setup() + when(localTokenRangesProvider.localTokenRanges(any())).thenReturn(localRanges); + ManifestEntry rangeEnclosed = new ManifestEntry(Collections.emptyMap(), + BigInteger.valueOf(1), // start + BigInteger.valueOf(10)); // end + manifest.put("foo-", rangeEnclosed); + task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest); + assertThat(manifest).hasSize(1); + verify(mockSlice, times(0)).requestOutOfRangeDataCleanup(); + + // fully out of range: [-10, 0] is fully out of range of (0, 100] + // it should remove the manifest entry entirely; no clean up required + manifest.clear(); + ManifestEntry outOfRange = new ManifestEntry(Collections.emptyMap(), + BigInteger.valueOf(-10), // start + BigInteger.valueOf(0)); // end + manifest.put("foo-", outOfRange); + task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest); + assertThat(manifest).isEmpty(); + verify(mockSlice, times(0)).requestOutOfRangeDataCleanup(); + + // partially out of range: [-10, 10] is partially out of range of (0, 100] + // it should not remove the manifest entry, but it should signal to request out of range data cleanup + manifest.clear(); + ManifestEntry partiallyOutOfRange = new ManifestEntry(Collections.emptyMap(), + BigInteger.valueOf(-10), // start + BigInteger.valueOf(10)); // end + manifest.put("foo-", partiallyOutOfRange); + task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest); + assertThat(manifest).hasSize(1); + verify(mockSlice, times(1)).requestOutOfRangeDataCleanup(); + } + + @Test + void testCompareChecksum(@TempDir Path tempDir) throws RestoreJobFatalException, IOException + { + RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED); + RestoreSliceTask task = createTask(mockSlice, job); + + byte[] bytes = "Hello".getBytes(StandardCharsets.UTF_8); + File[] testFiles = IntStream.range(0, 10).mapToObj(i -> new File(tempDir.toFile(), "f" + i)) + .map(f -> ThrowableUtils.propagate(() -> Files.write(f.toPath(), bytes)).toFile()) + .toArray(File[]::new); + Map<String, String> expectedChecksums = new HashMap<>(10); + for (File f : testFiles) + { + expectedChecksums.put(f.getName(), util.checksum(f)); + } + + assertThat(expectedChecksums) + .hasSize(10) + .containsEntry("f0", "f206d28f"); // hash value for "Hello" + + // it should not throw + task.compareChecksumsUnsafe(expectedChecksums, testFiles); + + // test check with file that does not exist + Map<String, String> nonexistFileChecksums = new HashMap<>(10); + nonexistFileChecksums.put("non-exist-file", "hash"); + assertThatThrownBy(() -> task.compareChecksumsUnsafe(nonexistFileChecksums, testFiles)) + .isInstanceOf(RestoreJobFatalException.class) + .hasMessageContaining("File not found in manifest"); + + // test check with invalid checksum value + Map<String, String> invalidChecksums = new HashMap<>(expectedChecksums); + invalidChecksums.put("f0", "invalid_hash"); // modify the hash of the file + assertThatThrownBy(() -> task.compareChecksumsUnsafe(invalidChecksums, testFiles)) + .isInstanceOf(RestoreJobFatalException.class) + .hasMessageContaining("Checksum does not match. Expected: invalid_hash; actual: f206d28f"); + } + private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job) { return createTask(slice, job, System::nanoTime); @@ -351,10 +457,10 @@ class RestoreSliceTaskTest assertThat(slice.job()).isSameAs(job); assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar()); assertThat(slice.job().status).isEqualTo(job.status); - RestoreJobUtil util = mock(RestoreJobUtil.class); - when(util.currentTimeNanos()).thenAnswer(invok -> currentNanoTimeSupplier.get()); + RestoreJobUtil spiedUtil = spy(util); + when(spiedUtil.currentTimeNanos()).thenAnswer(invok -> currentNanoTimeSupplier.get()); return new TestRestoreSliceTask(slice, mockStorageClient, executorPool, mockSSTableImporter, - 0, sliceDatabaseAccessor, util, metrics); + 0, sliceDatabaseAccessor, spiedUtil, localTokenRangesProvider, metrics); } private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, RestoreJob job) @@ -365,7 +471,7 @@ class RestoreSliceTaskTest assertThat(slice.job().status).isEqualTo(job.status); return new TestUnexpectedExceptionInRestoreSliceTask(slice, mockStorageClient, executorPool, mockSSTableImporter, 0, sliceDatabaseAccessor, - util, metrics); + util, localTokenRangesProvider, metrics); } static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor @@ -393,10 +499,11 @@ class RestoreSliceTaskTest public TestRestoreSliceTask(RestoreSlice slice, StorageClient s3Client, TaskExecutorPool executorPool, SSTableImporter importer, double requiredUsableSpacePercentage, RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobUtil restoreJobUtil, + LocalTokenRangesProvider localTokenRangesProvider, SidecarMetrics metrics) { super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, - sliceDatabaseAccessor, restoreJobUtil, metrics); + sliceDatabaseAccessor, restoreJobUtil, localTokenRangesProvider, metrics); this.slice = slice; this.instanceMetrics = metrics.instance(slice.owner().id()); } @@ -429,10 +536,12 @@ class RestoreSliceTaskTest TaskExecutorPool executorPool, SSTableImporter importer, double requiredUsableSpacePercentage, RestoreSliceDatabaseAccessor sliceDatabaseAccessor, - RestoreJobUtil util, SidecarMetrics metrics) + RestoreJobUtil util, + LocalTokenRangesProvider localTokenRangesProvider, + SidecarMetrics metrics) { super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, - sliceDatabaseAccessor, util, metrics); + sliceDatabaseAccessor, util, localTokenRangesProvider, metrics); } @Override diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java index 3eb94a0b..5a78fc20 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java @@ -18,14 +18,12 @@ package org.apache.cassandra.sidecar.routes.restore; -import java.math.BigInteger; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; -import com.google.common.collect.Range; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @@ -45,6 +43,7 @@ import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets; import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload; +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.SidecarConfiguration; @@ -153,7 +152,7 @@ public abstract class BaseRestoreJobTests testRestoreSlices.updateStatusFunc = func; } - protected void mockLookupRestoreSlices(BiFunction<UUID, Range<BigInteger>, List<RestoreSlice>> func) + protected void mockLookupRestoreSlices(BiFunction<UUID, TokenRange, List<RestoreSlice>> func) { testRestoreSlices.selectByJobByRangeFunc = func; } @@ -206,7 +205,7 @@ public abstract class BaseRestoreJobTests { Function<RestoreSlice, RestoreSlice> createFunc; Function<RestoreSlice, RestoreSlice> updateStatusFunc; - BiFunction<UUID, Range<BigInteger>, List<RestoreSlice>> selectByJobByRangeFunc; + BiFunction<UUID, TokenRange, List<RestoreSlice>> selectByJobByRangeFunc; TestRestoreSliceDatabaseAccessor(SidecarSchema sidecarSchema) { @@ -226,10 +225,9 @@ public abstract class BaseRestoreJobTests } @Override - public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, short bucketId, - BigInteger startToken, BigInteger endToken) + public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, short bucketId, TokenRange range) { - return selectByJobByRangeFunc.apply(jobId, Range.closed(startToken, endToken)); + return selectByJobByRangeFunc.apply(jobId, range); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org