This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 10034a4e CASSANDRASC-134: Detect out of range data and cleanup using 
nodetool (#125)
10034a4e is described below

commit 10034a4e9c9913127e28546f2263f2ce818f6a37
Author: Yifan Cai <y...@apache.org>
AuthorDate: Tue Jun 4 14:54:13 2024 -0700

    CASSANDRASC-134: Detect out of range data and cleanup using nodetool (#125)
    
    The patch adds a step to check the data ownership before importing sstable. 
When fully out of range sstables are found, the sstables are removed before 
importing. When partially out of range sstables are found, running nodetool 
cleanup is requested on job completion, including both success and failure 
cases.
    
    Patch by Yifan Cai; Reviewed by Bernardo Botella and Francisco Guerrero for 
CASSANDRASC-134
---
 CHANGES.txt                                        |   1 +
 .../adapters/base/CassandraStorageOperations.java  |  12 ++
 .../base/GossipDependentStorageJmxOperations.java  |   7 +
 .../adapters/base/StorageJmxOperations.java        |  13 ++
 .../adapters/base/TokenRangeReplicaProvider.java   |   1 +
 .../sidecar/adapters/base/TokenRangeReplicas.java  |   1 +
 .../base/TokenRangeReplicaProviderTest.java        |   1 +
 .../adapters/base/TokenRangeReplicasTest.java      |   2 +
 .../sidecar/common/server/StorageOperations.java   |  25 +++
 .../server/cluster/locator}/Partitioner.java       |   4 +-
 .../common/server/cluster/locator/Token.java       | 122 +++++++++++++
 .../common/server/cluster/locator/TokenRange.java  | 200 +++++++++++++++++++++
 .../server/cluster/locator/TokenRangeTest.java     | 179 ++++++++++++++++++
 .../common/server/cluster/locator/TokenTest.java   |  55 ++++++
 .../apache/cassandra/sidecar/db/RestoreSlice.java  |  11 ++
 .../sidecar/db/RestoreSliceDatabaseAccessor.java   |   9 +-
 .../sidecar/locator/CachedLocalTokenRanges.java    |   7 +-
 .../sidecar/locator/LocalTokenRangesProvider.java  |  11 +-
 .../cassandra/sidecar/locator/TokenRange.java      | 114 ------------
 .../sidecar/restore/RestoreJobDiscoverer.java      |   4 +-
 .../sidecar/restore/RestoreJobManager.java         | 132 +++++++-------
 .../sidecar/restore/RestoreProcessor.java          |   6 +
 .../sidecar/restore/RestoreSliceTask.java          | 174 +++++++++++++-----
 .../sidecar/restore/RestoreSliceTracker.java       |  44 ++++-
 .../cassandra/sidecar/restore/StorageClient.java   |  21 ++-
 .../sidecar/common/JmxClientIntegrationTest.java   |  89 ++++++---
 .../tokenrange/BaseTokenRangeIntegrationTest.java  |   2 +-
 .../sidecar/routes/tokenrange/MovingBaseTest.java  |   2 +-
 .../sidecar/concurrent/ExecutorPoolsTest.java      |  65 +++++--
 .../cassandra/sidecar/locator/TokenRangeTest.java  |  89 ---------
 .../sidecar/restore/RestoreJobManagerTest.java     |   6 +-
 .../sidecar/restore/RestoreProcessorTest.java      |   2 +-
 .../sidecar/restore/RestoreSliceTaskTest.java      | 125 ++++++++++++-
 .../routes/restore/BaseRestoreJobTests.java        |  12 +-
 34 files changed, 1134 insertions(+), 414 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 43de8233..d8813da9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Detect out of range data and cleanup using nodetool (CASSANDRASC-134)
  * Allow optional reason to abort restore jobs (CASSANDRASC-133)
  * Fix SidecarLoadBalancingPolicy unexpectedly removing local node and improve 
CI stability (CASSANDRASC-131)
  * Reduce implementations accessible from client (CASSANDRASC-127)
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
index 3a87e473..9903a0dc 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +33,7 @@ import 
org.apache.cassandra.sidecar.common.response.RingResponse;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
 import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.sidecar.common.server.data.Name;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import 
org.apache.cassandra.sidecar.common.server.exceptions.NodeBootstrappingException;
@@ -201,4 +203,14 @@ public class CassandraStorageOperations implements 
StorageOperations
         }
         return dataFileLocations;
     }
+
+    @Override
+    public void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull 
String table, int concurrency)
+    throws IOException, ExecutionException, InterruptedException
+    {
+        requireNonNull(keyspace, "keyspace must be non-null");
+        requireNonNull(table, "table must be non-null");
+        jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
+                 .forceKeyspaceCleanup(concurrency, keyspace, table);
+    }
 }
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
index d0305141..8982c1b9 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,6 +140,12 @@ public class GossipDependentStorageJmxOperations 
implements StorageJmxOperations
         return delegate.getAllDataFileLocations();
     }
 
+    @Override
+    public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... 
tables) throws IOException, ExecutionException, InterruptedException
+    {
+        return delegate.forceKeyspaceCleanup(jobs, keyspaceName, tables);
+    }
+
     /**
      * Ensures that gossip is running on the Cassandra instance
      *
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
index eba3f2f8..d8f94125 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.adapters.base;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 /**
  * An interface that pulls methods from the Cassandra Storage Service Proxy
@@ -146,4 +147,16 @@ public interface StorageJmxOperations
      * @return String array of all locations
      */
     String[] getAllDataFileLocations();
+
+    /**
+     * Force cleanup the data of the tables in the keyspace. All partitions 
that out of the range are removed
+     * @param jobs job concurrency
+     * @param keyspaceName keyspace of the table to clean
+     * @param tables tables to clean
+     * @return status code. 0: success; 1: aborted; 2: unable to cancel
+     * @throws IOException i/o exception during cleanup
+     * @throws ExecutionException it does not really throw but declared in 
MBean
+     * @throws InterruptedException it does not really throw but declared in 
MBean
+     */
+    int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) 
throws IOException, ExecutionException, InterruptedException;
 }
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
index ae1e6ce5..dd1d497f 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
@@ -41,6 +41,7 @@ import 
org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
 import 
org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse.ReplicaInfo;
 import 
org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.sidecar.common.server.data.Name;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.common.server.utils.GossipInfoParser;
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java
index a46437ca..fdef5599 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java
@@ -36,6 +36,7 @@ import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.jetbrains.annotations.NotNull;
 
 
diff --git 
a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
 
b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
index 91eb8099..69915b5c 100644
--- 
a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
+++ 
b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.sidecar.common.server.data.Name;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.assertj.core.api.InstanceOfAssertFactories;
diff --git 
a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java
 
b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java
index 2ebcbeb3..3e965949 100644
--- 
a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java
+++ 
b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicasTest.java
@@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
index 1c00932b..d2935297 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
@@ -18,9 +18,11 @@
 
 package org.apache.cassandra.sidecar.common.server;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.sidecar.common.response.RingResponse;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
@@ -76,4 +78,27 @@ public interface StorageOperations
      * @return the list of all data file locations for the Cassandra instance
      */
     List<String> dataFileLocations();
+
+    /**
+     * Clean up the data of the specified and remove the keys no longer 
belongs to the Cassandra node.
+     *
+     * @param keyspace keyspace of the table to clean
+     * @param table table to clean
+     * @param concurrency concurrency of the cleanup (compaction) job.
+     *                    Note that it cannot exceed the configured 
`concurrent_compactors` in Cassandra
+     * @throws IOException i/o exception during cleanup
+     * @throws ExecutionException it does not really throw but declared in 
MBean
+     * @throws InterruptedException it does not really throw but declared in 
MBean
+     */
+    void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String 
table, int concurrency)
+    throws IOException, ExecutionException, InterruptedException;
+
+    /**
+     * Similar to {@link #outOfRangeDataCleanup(String, String, int)}, but use 
1 for concurrency
+     */
+    default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull 
String table)
+    throws IOException, ExecutionException, InterruptedException
+    {
+        outOfRangeDataCleanup(keyspace, table, 1);
+    }
 }
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java
similarity index 96%
rename from 
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java
rename to 
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java
index 2f4687b3..39595930 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/Partitioner.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Partitioner.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.adapters.base;
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
 
 import java.math.BigInteger;
 
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java
new file mode 100644
index 00000000..f8464acf
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/Token.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
+
+import java.math.BigInteger;
+import java.util.Comparator;
+import java.util.Objects;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Token, i.e. hashed partition key, in Cassandra
+ */
+public final class Token implements Comparable<Token>
+{
+    private static final Comparator<Token> TOKEN_COMPARATOR = 
Comparator.comparing(Token::toBigInteger);
+
+    private final BigInteger value;
+
+    /**
+     * Create token from {@code BigInteger} value
+     * @param value token value
+     * @return token
+     */
+    public static Token from(BigInteger value)
+    {
+        return new Token(value);
+    }
+
+    /**
+     * Create token from its string literal
+     * @param valueStr token value
+     * @throws NumberFormatException {@code valueStr} is not a valid 
representation
+     *         of a BigInteger.
+     * @return token
+     */
+    public static Token from(String valueStr)
+    {
+        return new Token(new BigInteger(valueStr));
+    }
+
+    /**
+     * Create token from long value
+     * @param value token value
+     * @return token
+     */
+    public static Token from(long value)
+    {
+        return new Token(BigInteger.valueOf(value));
+    }
+
+    private Token(BigInteger value)
+    {
+        this.value = value;
+    }
+
+    /**
+     * @return the {@code BigInteger} value
+     */
+    public BigInteger toBigInteger()
+    {
+        return value;
+    }
+
+    /**
+     * @return a new instance of token whose value is {@code (this + 1)}
+     */
+    public Token increment()
+    {
+        return new Token(value.add(BigInteger.ONE));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        Token token = (Token) o;
+        return Objects.equals(value, token.value);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return value.hashCode();
+    }
+
+    @Override
+    public int compareTo(@NotNull Token other)
+    {
+        return Objects.compare(this, Objects.requireNonNull(other), 
TOKEN_COMPARATOR);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Token(" + value + ')';
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java
new file mode 100644
index 00000000..5abc1048
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Range;
+
+import com.datastax.driver.core.DataType;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Range: (start, end] - start exclusive and end inclusive
+ */
+public class TokenRange
+{
+    private final Range<Token> range;
+    private volatile Token firstToken = null;
+
+    /**
+     * Unwrap the java driver's token range if necessary and convert the 
unwrapped ranges list.
+     * Only the token ranges from Murmur3Partitioner and RandomPartitioner are 
supported.
+     *
+     * @param dsTokenRange TokenRange implementation in Cassandra java driver
+     * @return list of token ranges. If the input token range wraps around, 
the size of the list is 2;
+     * otherwise, the list has only one range
+     */
+    public static List<TokenRange> from(com.datastax.driver.core.TokenRange 
dsTokenRange)
+    {
+        DataType tokenDataType = dsTokenRange.getStart().getType();
+        if (tokenDataType == DataType.varint()) // BigInteger - 
RandomPartitioner
+        {
+            return dsTokenRange.unwrap()
+                               .stream()
+                               .map(range -> {
+                                   BigInteger start = (BigInteger) 
range.getStart().getValue();
+                                   BigInteger end = (BigInteger) 
range.getEnd().getValue();
+                                   if 
(end.compareTo(Partitioner.Random.minToken) == 0)
+                                   {
+                                       end = Partitioner.Random.maxToken;
+                                   }
+                                   return new TokenRange(start, end);
+                               })
+                               .collect(Collectors.toList());
+        }
+        else if (tokenDataType == DataType.bigint()) // Long - 
Murmur3Partitioner
+        {
+            return dsTokenRange.unwrap()
+                               .stream()
+                               .map(range -> {
+                                   BigInteger start = 
BigInteger.valueOf((Long) range.getStart().getValue());
+                                   BigInteger end = BigInteger.valueOf((Long) 
range.getEnd().getValue());
+                                   if 
(end.compareTo(Partitioner.Murmur3.minToken) == 0)
+                                   {
+                                       end = Partitioner.Murmur3.maxToken;
+                                   }
+                                   return new TokenRange(start, end);
+                               })
+                               .collect(Collectors.toList());
+        }
+        else
+        {
+            throw new IllegalArgumentException(
+            "Unsupported token type: " + tokenDataType +
+            ". Only tokens of Murmur3Partitioner and RandomPartitioner are 
supported.");
+        }
+    }
+
+    public TokenRange(long start, long end)
+    {
+        this(BigInteger.valueOf(start), BigInteger.valueOf(end));
+    }
+
+    public TokenRange(BigInteger start, BigInteger end)
+    {
+        this(Token.from(start), Token.from(end));
+    }
+
+    public TokenRange(Token start, Token end)
+    {
+        this.range = Range.openClosed(start, end);
+    }
+
+    /**
+     * @return start token. It is not enclosed in the range.
+     */
+    public Token start()
+    {
+        return range.lowerEndpoint();
+    }
+
+    /**
+     * @return end token. It is the last token enclosed in the range.
+     */
+    public Token end()
+    {
+        return range.upperEndpoint();
+    }
+
+    /**
+     * @return the first token enclosed in the range
+     */
+    @Nullable
+    public Token firstToken()
+    {
+        if (range.isEmpty())
+        {
+            return null;
+        }
+
+        // it is ok to race
+        if (firstToken == null)
+        {
+            firstToken = range.lowerEndpoint().increment();
+        }
+        return firstToken;
+    }
+
+    /**
+     * Test if this range encloses the other range.
+     * It simply delegates to {@link Range#encloses(Range)}
+     */
+    public boolean encloses(TokenRange other)
+    {
+        return this.range.encloses(other.range);
+    }
+
+    /**
+     * Two ranges are overlapping when their intersection is non-empty. For 
example,
+     *
+     * Ranges (0, 3] and (1, 4] are overlapping. The intersection is (1, 3]
+     * Ranges (0, 3] and (5, 7] are not overlapping, as there is no 
intersection
+     * Ranges (0, 3] and (3, 5] are not overlapping, as the intersection (3, 
3] is empty
+     *
+     * Note that the semantics is different from {@link 
Range#isConnected(Range)}
+     *
+     * @return true if this range overlaps with the other range; otherwise, 
false
+     */
+    public boolean overlaps(TokenRange other)
+    {
+        return 
this.range.lowerEndpoint().compareTo(other.range.upperEndpoint()) < 0
+               && 
other.range.lowerEndpoint().compareTo(this.range.upperEndpoint()) < 0;
+    }
+
+    public TokenRange intersection(TokenRange overlapping)
+    {
+        Range<Token> overlap = this.range.intersection(overlapping.range);
+        return new TokenRange(overlap.lowerEndpoint(), 
overlap.upperEndpoint());
+    }
+
+    /**
+     * Determine whether all tokens in this range are larger than the ones in 
the other token range
+     * @param other token range
+     * @return true if the start token of this range is larger or equals to 
the other range's end token; otherwise, false
+     */
+    public boolean largerThan(TokenRange other)
+    {
+        return this.start().compareTo(other.end()) >= 0;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        TokenRange that = (TokenRange) o;
+        return Objects.equals(range, that.range);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return range.hashCode();
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java
 
b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java
new file mode 100644
index 00000000..103ff934
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TokenRangeTest
+{
+    @Test
+    void testEquals()
+    {
+        TokenRange r1 = new TokenRange(1, 100);
+        TokenRange r2 = new TokenRange(1, 100);
+        TokenRange r3 = new TokenRange(-10, 10);
+        assertThat(r1).isEqualTo(r2);
+        assertThat(r3).isNotEqualTo(r1)
+                      .isNotEqualTo(r2);
+    }
+
+    @Test
+    void testFirstToken()
+    {
+        TokenRange range = new TokenRange(1, 100);
+        assertThat(range.firstToken()).isEqualTo(Token.from(2));
+        // test the first token refer is the same
+        assertThat(range.firstToken()).isSameAs(range.firstToken());
+
+        TokenRange emptyRange = new TokenRange(1, 1);
+        assertThat(emptyRange.firstToken()).isNull();
+    }
+
+    @Test
+    void testCreateRangeWithInvalidParams()
+    {
+        assertThatThrownBy(() -> new TokenRange(1, -1))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid range: (Token(1)‥Token(-1)]");
+    }
+
+    @Test
+    void testCreateFromJavaDriverTokenRange()
+    {
+        com.datastax.driver.core.TokenRange ordinaryRange = mockRange(1L, 
100L);
+        when(ordinaryRange.isWrappedAround()).thenReturn(false);
+        when(ordinaryRange.unwrap()).thenCallRealMethod();
+        List<TokenRange> ranges = TokenRange.from(ordinaryRange);
+        assertThat(ranges).hasSize(1)
+                          .isEqualTo(Collections.singletonList(new 
TokenRange(1, 100)));
+    }
+
+    @Test
+    void testCreateFromWraparoundJavaDriverTokenRange()
+    {
+        com.datastax.driver.core.TokenRange range = mockRange(10L, -10L);
+        List<com.datastax.driver.core.TokenRange> unwrapped = 
Arrays.asList(mockRange(10L, Long.MAX_VALUE),
+                                                                            
mockRange(Long.MIN_VALUE, -10L));
+        when(range.unwrap()).thenReturn(unwrapped);
+        List<TokenRange> ranges = TokenRange.from(range);
+        assertThat(ranges).hasSize(2)
+                          .isEqualTo(Arrays.asList(new TokenRange(10, 
Long.MAX_VALUE),
+                                                   new 
TokenRange(Long.MIN_VALUE, -10L)));
+    }
+
+    @Test
+    void testCreateFromWraparoundJavaDriverTokenRangeEndingInMinToken()
+    {
+        com.datastax.driver.core.TokenRange range = mockRange(10L, 
Long.MIN_VALUE);
+        // Java driver's token range considers the range is no a wraparound, 
if the end is the minimum token
+        when(range.unwrap()).thenReturn(Collections.singletonList(range));
+        List<TokenRange> ranges = TokenRange.from(range);
+        assertThat(ranges).hasSize(1)
+                          .isEqualTo(Collections.singletonList(new 
TokenRange(10L, Long.MAX_VALUE)));
+    }
+
+    @Test
+    void testRangeEnclose()
+    {
+        TokenRange r1 = new TokenRange(3, 5);
+        TokenRange r2 = new TokenRange(1, 10);
+        TokenRange r3 = new TokenRange(10, 11);
+        TokenRange r4 = new TokenRange(4, 11);
+        assertThat(r2.encloses(r1)).isTrue();
+        assertThat(r4.encloses(r3)).isTrue();
+        assertThat(r1.encloses(r2)).isFalse();
+        assertThat(r3.encloses(r1)).isFalse();
+        assertThat(r2.encloses(r3)).isFalse();
+        assertThat(r1.encloses(r4)).isFalse();
+        assertThat(r4.encloses(r1)).isFalse();
+    }
+
+    @Test
+    void testOverlaps()
+    {
+        TokenRange r1 = new TokenRange(3, 5);
+        TokenRange r2 = new TokenRange(1, 10);
+        TokenRange r3 = new TokenRange(10, 11);
+        TokenRange r4 = new TokenRange(4, 11);
+        assertThat(r1.overlaps(r2)).isTrue();
+        assertThat(r2.overlaps(r1)).isTrue();
+        assertThat(r3.overlaps(r4)).isTrue();
+        assertThat(r4.overlaps(r3)).isTrue();
+        assertThat(r2.overlaps(r4)).isTrue();
+        assertThat(r4.overlaps(r2)).isTrue();
+        assertThat(r2.overlaps(r3)).isFalse();
+        assertThat(r3.overlaps(r2)).isFalse();
+    }
+
+    @Test
+    void testLargerThan()
+    {
+        TokenRange r1 = new TokenRange(3, 5);
+        TokenRange r2 = new TokenRange(1, 10);
+        TokenRange r3 = new TokenRange(10, 11);
+        assertThat(r1.largerThan(r2)).isFalse();
+        assertThat(r2.largerThan(r1)).isFalse();
+        assertThat(r3.largerThan(r1)).isTrue();
+        assertThat(r1.largerThan(r3)).isFalse();
+        assertThat(r3.largerThan(r2)).isTrue();
+        assertThat(r2.largerThan(r3)).isFalse();
+    }
+
+    @Test
+    void testIntersection()
+    {
+        TokenRange r1 = new TokenRange(3, 5);
+        TokenRange r2 = new TokenRange(1, 10);
+        TokenRange r3 = new TokenRange(4, 6);
+        TokenRange r4 = new TokenRange(10, 11);
+        assertThat(r1.intersection(r2)).isEqualTo(r1);
+        assertThat(r2.intersection(r1)).isEqualTo(r1);
+        assertThat(r1.intersection(r3)).isEqualTo(new TokenRange(4, 5));
+        assertThat(r3.intersection(r1)).isEqualTo(new TokenRange(4, 5));
+        assertThat(r2.intersection(r4)).isEqualTo(new TokenRange(10, 10)); // 
empty range
+        assertThat(r2.intersection(r4)).isNotEqualTo(new TokenRange(5, 5)); // 
but not any empty range
+    }
+
+    private com.datastax.driver.core.TokenRange mockRange(long start, long end)
+    {
+        com.datastax.driver.core.TokenRange range = 
mock(com.datastax.driver.core.TokenRange.class);
+        com.datastax.driver.core.Token startToken = mockToken(start);
+        when(range.getStart()).thenReturn(startToken);
+        com.datastax.driver.core.Token endToken = mockToken(end);
+        when(range.getEnd()).thenReturn(endToken);
+        return range;
+    }
+
+    private com.datastax.driver.core.Token mockToken(long value)
+    {
+        com.datastax.driver.core.Token token = 
mock(com.datastax.driver.core.Token.class);
+        
when(token.getType()).thenReturn(com.datastax.driver.core.DataType.bigint());
+        when(token.getValue()).thenReturn(value);
+        return token;
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java
 
b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java
new file mode 100644
index 00000000..a11010fa
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.cluster.locator;
+
+import java.math.BigInteger;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class TokenTest
+{
+    @Test
+    void testCreateToken()
+    {
+        Token t1 = Token.from(1);
+        Token t2 = Token.from(BigInteger.ONE);
+        Token t3 = Token.from("1");
+        Token t4 = Token.from(1L);
+        assertThat(t1).isEqualTo(t2).isEqualTo(t3).isEqualTo(t4);
+    }
+
+    @Test
+    void testIncrement()
+    {
+        Token t1 = Token.from(1);
+        Token t2 = t1.increment();
+        assertThat(t2).isEqualTo(Token.from(2));
+    }
+
+    @Test
+    void testCompare()
+    {
+        Token t1 = Token.from(1);
+        Token t2 = Token.from(2);
+        Token t3 = Token.from(1);
+        assertThat(t1).isLessThan(t2)
+                      .isEqualByComparingTo(t3);
+        assertThat(t2).isGreaterThan(t1);
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
index 57b1bcba..8f8bc8d8 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
@@ -37,6 +37,7 @@ import 
org.apache.cassandra.sidecar.common.server.data.RestoreSliceStatus;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.restore.RestoreJobUtil;
 import org.apache.cassandra.sidecar.restore.RestoreSliceHandler;
@@ -207,6 +208,14 @@ public class RestoreSlice
         failAtInstance(owner().id());
     }
 
+    /**
+     * Request to clean up out of range data. It is requested when detecting 
the slice contains out of range data
+     */
+    public void requestOutOfRangeDataCleanup()
+    {
+        tracker.requestOutOfRangeDataCleanup();
+    }
+
     public void setExistsOnS3()
     {
         this.existsOnS3 = true;
@@ -234,6 +243,7 @@ public class RestoreSlice
                                            double 
requiredUsableSpacePercentage,
                                            RestoreSliceDatabaseAccessor 
sliceDatabaseAccessor,
                                            RestoreJobUtil restoreJobUtil,
+                                           LocalTokenRangesProvider 
localTokenRangesProvider,
                                            SidecarMetrics metrics)
     {
         if (isCancelled)
@@ -248,6 +258,7 @@ public class RestoreSlice
                                         requiredUsableSpacePercentage,
                                         sliceDatabaseAccessor,
                                         restoreJobUtil,
+                                        localTokenRangesProvider,
                                         metrics);
         }
         catch (IllegalStateException illegalState)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
index d0173bd3..1e23e2b0 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.sidecar.db;
 
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -29,6 +28,7 @@ import com.datastax.driver.core.Row;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.db.schema.RestoreSlicesSchema;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 
@@ -88,16 +88,15 @@ public class RestoreSliceDatabaseAccessor extends 
DatabaseAccessor
         return slice;
     }
 
-    public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, 
short bucketId,
-                                                              BigInteger 
startToken, BigInteger endToken)
+    public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID jobId, 
short bucketId, TokenRange range)
     {
         sidecarSchema.ensureInitialized();
 
         BoundStatement statement = restoreSlicesSchema.findAllByTokenRange()
                                                       .bind(jobId,
                                                             bucketId,
-                                                            startToken,
-                                                            endToken);
+                                                            
range.start().toBigInteger(),
+                                                            
range.end().toBigInteger());
         ResultSet result = execute(statement);
         List<RestoreSlice> slices = new ArrayList<>();
         for (Row row : result)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
 
b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
index 654544ae..196ba7b3 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
@@ -47,6 +47,7 @@ import com.google.inject.Singleton;
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.jetbrains.annotations.NotNull;
 
@@ -82,7 +83,6 @@ public class CachedLocalTokenRanges implements 
LocalTokenRangesProvider
     }
 
     @Override
-    @Nullable
     public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace)
     {
         List<InstanceMetadata> localInstances = instancesConfig.instances();
@@ -153,7 +153,6 @@ public class CachedLocalTokenRanges implements 
LocalTokenRangesProvider
     /**
      * Reload the locally cached token ranges when needed
      */
-    @Nullable
     private synchronized Map<Integer, Set<TokenRange>> 
getCacheOrReload(Metadata metadata,
                                                                         String 
keyspace,
                                                                         
Set<Integer> localInstanceIds,
@@ -167,7 +166,7 @@ public class CachedLocalTokenRanges implements 
LocalTokenRangesProvider
             && localTokenRangesCache.containsKey(keyspace)
             && isClusterTheSame)
         {
-            return localTokenRangesCache.get(keyspace);
+            return localTokenRangesCache.getOrDefault(keyspace, 
Collections.emptyMap());
         }
 
         // otherwise, reload the token ranges
@@ -226,7 +225,7 @@ public class CachedLocalTokenRanges implements 
LocalTokenRangesProvider
         {
             LOGGER.warn("Unable to determine local instances from client 
meta-data!");
         }
-        return localTokenRangesCache.get(keyspace);
+        return localTokenRangesCache.getOrDefault(keyspace, 
Collections.emptyMap());
     }
 
     private static class IpAddressAndPort
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
 
b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
index cef04f6e..e4920131 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
@@ -20,7 +20,8 @@ package org.apache.cassandra.sidecar.locator;
 
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
+
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 
 /**
  * Provides the token ranges of the local Cassandra instance(s)
@@ -30,12 +31,12 @@ public interface LocalTokenRangesProvider
     /**
      * Calculate the token ranges owned and replicated to the local Cassandra 
instance(s).
      * When Sidecar is paired with multiple Cassandra instance, the ranges of 
each Cassandra instance is captured
-     * in the form of map, where the key is the instance id and the value is 
the ranges of the instance. When Sidecar
-     * is paired with a single Cassandra instance, the result map has a single 
entry.
+     * in the form of map, where the key is the instance id and the value is 
the ranges of the Cassandra instance.
+     * When Cassandra is not running with VNode, the set of ranges has a 
single value.
+     * When Sidecar is paired with a single Cassandra instance, the result map 
has a single entry.
      *
      * @param keyspace keyspace to determine replication
-     * @return token ranges of the local Cassandra instances
+     * @return token ranges of the local Cassandra instances or an empty map 
of nothing is found
      */
-    @Nullable
     Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace);
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java 
b/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java
deleted file mode 100644
index c0564db4..00000000
--- a/src/main/java/org/apache/cassandra/sidecar/locator/TokenRange.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.locator;
-
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import com.datastax.driver.core.DataType;
-
-/**
- * Range: (start, end] - start exclusive and end inclusive
- */
-public class TokenRange
-{
-    public final BigInteger start;
-    public final BigInteger end;
-
-    /**
-     * Unwrap the java driver's token range if necessary and convert the 
unwrapped ranges list.
-     * Only the token ranges from Murmur3Partitioner and RandomPartitioner are 
supported.
-     *
-     * @param dsTokenRange TokenRange implementation in Cassandra java driver
-     * @return list of token ranges. If the input token range wraps around, 
the size of the list is 2;
-     * otherwise, the list has only one range
-     */
-    public static List<TokenRange> from(com.datastax.driver.core.TokenRange 
dsTokenRange)
-    {
-        DataType tokenDataType = dsTokenRange.getStart().getType();
-        if (tokenDataType == DataType.varint()) // BigInteger - 
RandomPartitioner
-        {
-            return dsTokenRange.unwrap()
-                               .stream()
-                               .map(range -> new TokenRange((BigInteger) 
range.getStart().getValue(),
-                                                            (BigInteger) 
range.getEnd().getValue()))
-                               .collect(Collectors.toList());
-        }
-        else if (tokenDataType == DataType.bigint()) // Long - 
Murmur3Partitioner
-        {
-            return dsTokenRange.unwrap()
-                               .stream()
-                               .map(range -> new TokenRange((Long) 
range.getStart().getValue(),
-                                                            (Long) 
range.getEnd().getValue()))
-                               .collect(Collectors.toList());
-        }
-        else
-        {
-            throw new IllegalArgumentException(
-            "Unsupported token type: " + tokenDataType +
-            ". Only tokens of Murmur3Partitioner and RandomPartitioner are 
supported.");
-        }
-    }
-
-    public TokenRange(long start, long end)
-    {
-        this(BigInteger.valueOf(start), BigInteger.valueOf(end));
-    }
-
-    public TokenRange(BigInteger start, BigInteger end)
-    {
-        this.start = start;
-        this.end = end;
-    }
-
-    public BigInteger start()
-    {
-        return this.start;
-    }
-
-    public BigInteger end()
-    {
-        return this.end;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-        {
-            return true;
-        }
-
-        if (o == null || getClass() != o.getClass())
-        {
-            return false;
-        }
-
-        TokenRange that = (TokenRange) o;
-        return Objects.equals(start, that.start) && Objects.equals(end, 
that.end);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(start, end);
-    }
-}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index c5d007a8..46218ad5 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -37,6 +37,7 @@ import com.google.inject.Singleton;
 import io.vertx.core.Promise;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.db.RestoreJob;
@@ -46,7 +47,6 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges;
 import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
-import org.apache.cassandra.sidecar.locator.TokenRange;
 import org.apache.cassandra.sidecar.metrics.RestoreMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.tasks.PeriodicTask;
@@ -264,7 +264,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
     {
         short bucketId = 0; // TODO: update the implementation to pick proper 
bucketId
         restoreSliceDatabaseAccessor
-        .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, 
range.start, range.end)
+        .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, range)
         .forEach(slice -> {
             // set the owner instance, which is not read from database
             slice = slice.unbuild().ownerInstance(instance).build();
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java
index cb68a8a1..b47f9f05 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManager.java
@@ -24,18 +24,17 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.vertx.core.CompositeFuture;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import io.vertx.core.Future;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
@@ -54,8 +53,10 @@ import org.jetbrains.annotations.VisibleForTesting;
 public class RestoreJobManager
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RestoreJobManager.class);
+    private static final Object PRESENT = new Object();
 
     private final Map<UUID, RestoreSliceTracker> jobs = new 
ConcurrentHashMap<>();
+    private final Cache<UUID, Object> deletedJobs;
     private final RestoreProcessor processor;
     private final ExecutorPools executorPools;
     private final InstanceMetadata instanceMetadata;
@@ -80,7 +81,8 @@ public class RestoreJobManager
         this.instanceMetadata = instanceMetadata;
         this.executorPools = executorPools;
         this.processor = restoreProcessor;
-        // delete obsolete on start up. Once instance is started, the 
jobDiscoverer will find the jobs to cleanup
+        this.deletedJobs = Caffeine.newBuilder().expireAfterAccess(1, 
TimeUnit.DAYS).build();
+        // delete obsolete on start up. Once instance is started, the 
jobDiscoverer will find the jobs to clean up
         if (deleteOnStart)
         {
             deleteObsoleteDataAsync();
@@ -99,7 +101,7 @@ public class RestoreJobManager
     throws RestoreJobFatalException
     {
         RestoreSliceTracker tracker = jobs.computeIfAbsent(slice.jobId(),
-                                                           id -> new 
RestoreSliceTracker(restoreJob, processor));
+                                                           id -> new 
RestoreSliceTracker(restoreJob, processor, instanceMetadata));
         return tracker.trySubmit(slice);
     }
 
@@ -107,17 +109,17 @@ public class RestoreJobManager
      * Update the restore job reference in tracker, in order for pending 
restore slices to read the latest
      * restore job, especially the credentials to download from cloud storage.
      *
-     * @param job restore job to update
+     * @param restoreJob restore job to update
      */
-    void updateRestoreJob(RestoreJob job)
+    void updateRestoreJob(RestoreJob restoreJob)
     {
-        RestoreSliceTracker tracker = jobs.computeIfAbsent(job.jobId,
-                                                           id -> new 
RestoreSliceTracker(job, processor));
-        tracker.updateRestoreJob(job);
+        RestoreSliceTracker tracker = jobs.computeIfAbsent(restoreJob.jobId,
+                                                           id -> new 
RestoreSliceTracker(restoreJob, processor, instanceMetadata));
+        tracker.updateRestoreJob(restoreJob);
     }
 
     /**
-     * Remove the tracker of the job when it is completed and delete its data 
on disk. The method internal.
+     * Remove the tracker of the job when it is completed and delete its data 
on disk. The method runs async and it for internal use only.
      * It should only be called by the background task, when it discovers the 
job is
      * in the final {@link 
org.apache.cassandra.sidecar.common.data.RestoreJobStatus}, i.e. SUCCEEDED or 
FAILED.
      *
@@ -125,46 +127,52 @@ public class RestoreJobManager
      */
     void removeJobInternal(UUID jobId)
     {
-        RestoreSliceTracker tracker = jobs.remove(jobId);
-        if (tracker != null)
+        if (deletedJobs.getIfPresent(jobId) == PRESENT)
         {
-            tracker.cleanupInternal();
+            LOGGER.debug("The job is already removed. Skipping. jobId={}", 
jobId);
+            return;
         }
-        // There might be no tracker, but the job has data on disk.
-        deleteDataOfJobAsync(jobId);
+
+        executorPools
+        .internal()
+        .runBlocking(() -> {
+            RestoreSliceTracker tracker = jobs.remove(jobId);
+            if (tracker != null)
+            {
+                tracker.cleanupInternal();
+            }
+        })
+        .recover(cause -> {
+            // There might be no tracker, but the job has data on disk.
+            LOGGER.warn("Failed to clean up restore job. Recover and proceed 
to delete the on-disk files. jobId={}", jobId, cause);
+            return Future.succeededFuture();
+        })
+        .compose(v -> deleteDataOfJobAsync(jobId))
+        .onSuccess(v -> deletedJobs.put(jobId, PRESENT));
     }
 
     /**
      * Find obsolete job data on disk and delete them
      * The obsoleteness is determined by {@link 
RestoreJobConfiguration#jobDiscoveryRecencyDays}
      */
-    Future<Void> deleteObsoleteDataAsync()
+    void deleteObsoleteDataAsync()
     {
-        return findObsoleteJobDataDirs()
-               .compose(pathStream -> {
-                   try (Stream<Path> stream = pathStream)
-                   {
-                       // use 'join' to complete the other deletes, when there 
is an error
-                       List<Future> deletes = stream.map(this::deleteDataAsync)
-                                                    
.collect(Collectors.toList());
-                       return CompositeFuture.join(deletes)
-                              .compose(compositeFuture -> {
-                                  // None of them should fail. Having the 
branch here for logic completeness
-                                  if (compositeFuture.failed())
-                                  {
-                                      LOGGER.warn("Unexpected error while 
deleting files.",
-                                                  compositeFuture.cause());
-                                  }
-                                  return Future.<Void>succeededFuture();
-                              });
-                   }
-               })
-               .recover(any -> Future.succeededFuture());
+        findObsoleteJobDataDirs()
+        .compose(pathStream -> executorPools
+                               .internal()
+                               .runBlocking(() -> {
+                                   try (Stream<Path> stream = pathStream)
+                                   {
+                                       stream.forEach(this::deleteRecursively);
+                                   }
+                               }))
+        .onFailure(cause -> LOGGER.warn("Unexpected error while deleting 
files.", cause));
     }
 
     /**
      * Find the restore job directories that are older than {@link 
RestoreJobConfiguration#jobDiscoveryRecencyDays}
-     * @return a future of stream of path that should be closed on success. 
When failed to list, no stream is created.
+     * Note that the returned Stream should be closed by the caller.
+     * @return a future of stream of path. When failed to list, return a 
failed failure.
      */
     Future<Stream<Path>> findObsoleteJobDataDirs()
     {
@@ -172,23 +180,9 @@ public class RestoreJobManager
         if (!Files.exists(rootDir))
             return Future.succeededFuture(Stream.empty());
 
-        return executorPools.internal().executeBlocking(promise -> {
-            try
-            {
-                Stream<Path> obsoleteDirs = Files.walk(rootDir, 1)
-                                                 
.filter(this::isObsoleteRestoreJobDir);
-                promise.complete(obsoleteDirs);
-            }
-            catch (IOException ioe)
-            {
-                LOGGER.warn("Error on listing restore job data directories.", 
ioe);
-            }
-            finally
-            {
-                // Ensure the promise is complete. It is a no-op, if the 
promise is already completed.
-                promise.tryComplete(Stream.empty());
-            }
-        });
+        return executorPools.internal()
+                            .executeBlocking(() -> Files.walk(rootDir, 1)
+                                                        
.filter(this::isObsoleteRestoreJobDir));
     }
 
     // Deletes quietly w/o returning failed futures
@@ -204,7 +198,7 @@ public class RestoreJobManager
             {
                 rootDirs
                 .filter(path -> Files.isDirectory(path) && 
path.startsWith(prefixedJobId))
-                .forEach(this::deleteDataAsync);
+                .forEach(this::deleteRecursively);
             }
             catch (IOException ioe) // thrown from Files.walk.
             {
@@ -213,21 +207,19 @@ public class RestoreJobManager
         });
     }
 
-    // Deletes quietly w/o returning failed futures
-    private Future<Void> deleteDataAsync(Path root)
+    // Delete files from the root recursively and quietly w/o throwing any 
exception
+    private void deleteRecursively(Path root)
     {
-        return executorPools.internal().runBlocking(() -> {
-            try (Stream<Path> pathStream = Files.walk(root))
-            {
-                pathStream
-                .sorted(Comparator.reverseOrder())
-                .forEach(path -> ThrowableUtils.propagate(() -> 
Files.delete(path)));
-            }
-            catch (Exception exception)
-            {
-                LOGGER.warn("Error on deleting data. Path={}", root, 
exception);
-            }
-        });
+        try (Stream<Path> pathStream = Files.walk(root))
+        {
+            pathStream
+            .sorted(Comparator.reverseOrder())
+            .forEach(path -> ThrowableUtils.propagate(() -> 
Files.delete(path)));
+        }
+        catch (Exception exception)
+        {
+            LOGGER.warn("Error on deleting data. Path={}", root, exception);
+        }
     }
 
     // returns true only when all conditions are met
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index f765b9c6..36a5d770 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -43,6 +43,8 @@ import 
org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
+import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.tasks.PeriodicTask;
 import org.apache.cassandra.sidecar.utils.SSTableImporter;
@@ -66,6 +68,7 @@ public class RestoreProcessor implements PeriodicTask
     private final RestoreJobUtil restoreJobUtil;
     private final Set<RestoreSliceHandler> activeTasks = 
ConcurrentHashMap.newKeySet();
     private final long longRunningHandlerThresholdInSeconds;
+    private final LocalTokenRangesProvider localTokenRangesProvider;
     private final SidecarMetrics metrics;
 
     private volatile boolean isClosed = false; // OK to run close twice, so 
relax the control to volatile
@@ -78,6 +81,7 @@ public class RestoreProcessor implements PeriodicTask
                             SSTableImporter importer,
                             RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
                             RestoreJobUtil restoreJobUtil,
+                            CachedLocalTokenRanges localTokenRangesProvider,
                             SidecarMetrics metrics)
     {
         this.pool = executorPools.internal();
@@ -92,6 +96,7 @@ public class RestoreProcessor implements PeriodicTask
         this.importer = importer;
         this.sliceDatabaseAccessor = sliceDatabaseAccessor;
         this.restoreJobUtil = restoreJobUtil;
+        this.localTokenRangesProvider = localTokenRangesProvider;
         this.metrics = metrics;
     }
 
@@ -144,6 +149,7 @@ public class RestoreProcessor implements PeriodicTask
                                                          
requiredUsableSpacePercentage,
                                                          sliceDatabaseAccessor,
                                                          restoreJobUtil,
+                                                         
localTokenRangesProvider,
                                                          metrics);
             activeTasks.add(task);
             pool.executeBlocking(task, false) // unordered; run in parallel
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index cddd950d..697ad070 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -19,8 +19,13 @@
 package org.apache.cassandra.sidecar.restore;
 
 import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -32,6 +37,7 @@ import io.vertx.core.Promise;
 import io.vertx.ext.web.handler.HttpException;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.utils.Preconditions;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.db.RestoreJob;
@@ -41,6 +47,7 @@ import 
org.apache.cassandra.sidecar.exceptions.RestoreJobException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.metrics.RestoreMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.StopWatch;
@@ -72,6 +79,7 @@ public class RestoreSliceTask implements RestoreSliceHandler
     private final double requiredUsableSpacePercentage;
     private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
     private final RestoreJobUtil restoreJobUtil;
+    private final LocalTokenRangesProvider localTokenRangesProvider;
     private final RestoreMetrics metrics;
     private final InstanceMetrics instanceMetrics;
     private long taskStartTimeNanos = -1;
@@ -83,6 +91,7 @@ public class RestoreSliceTask implements RestoreSliceHandler
                             double requiredUsableSpacePercentage,
                             RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
                             RestoreJobUtil restoreJobUtil,
+                            LocalTokenRangesProvider localTokenRangesProvider,
                             SidecarMetrics metrics)
     {
         Preconditions.checkArgument(!slice.job().isManagedBySidecar()
@@ -95,6 +104,7 @@ public class RestoreSliceTask implements RestoreSliceHandler
         this.requiredUsableSpacePercentage = requiredUsableSpacePercentage;
         this.sliceDatabaseAccessor = sliceDatabaseAccessor;
         this.restoreJobUtil = restoreJobUtil;
+        this.localTokenRangesProvider = localTokenRangesProvider;
         this.metrics = metrics.server().restore();
         this.instanceMetrics = metrics.instance(slice.owner().id());
     }
@@ -104,6 +114,19 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
         return new Failed(cause, slice);
     }
 
+    @Override
+    public long elapsedInNanos()
+    {
+        return taskStartTimeNanos == -1 ? -1 :
+               currentTimeInNanos() - taskStartTimeNanos;
+    }
+
+    @Override
+    public RestoreSlice slice()
+    {
+        return slice;
+    }
+
     @Override
     public void handle(Promise<RestoreSlice> event)
     {
@@ -137,10 +160,10 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
                     // 1. check object existence and validate eTag / checksum
                     return checkObjectExistence(event)
                            .compose(headObject -> downloadSlice(event))
-                           .<Void>compose(file -> {
+                           .compose(file -> {
                                slice.completeStagePhase();
                                sliceDatabaseAccessor.updateStatus(slice);
-                               return Future.succeededFuture();
+                               return Future.<Void>succeededFuture();
                            })
                            // completed staging. A new task is produced when 
it comes to import
                            .onSuccess(_v -> event.tryComplete(slice))
@@ -326,10 +349,11 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
 
     private Future<File> unzip(File zipFile)
     {
-        Future<File> future = executorPool.executeBlocking(promise -> {
-            if (failOnCancelled(promise))
-                return;
+        if (slice.isCancelled())
+            return 
Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is 
cancelled",
+                                                                         
slice, null));
 
+        Future<File> future = executorPool.executeBlocking(() -> {
             // targetPathInStaging points to the directory named after uploadId
             // SSTableImporter expects the file system structure to be 
uploadId/keyspace/table/sstables
             File targetDir = slice.stageDirectory()
@@ -345,14 +369,13 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
                 {
                     LOGGER.debug("The files in slice are already extracted. 
Maybe it is a retried task? " +
                                  "jobId={} sliceKey={}", slice.jobId(), 
slice.key());
-                    promise.complete(targetDir);
+                    // return early
+                    return targetDir;
                 }
                 else
                 {
-                    promise.tryFail(new RestoreJobException("Object not found 
from disk. File: " + zipFile));
+                    throw new RestoreJobException("Object not found from disk. 
File: " + zipFile);
                 }
-                // return early
-                return;
             }
 
             try
@@ -362,51 +385,49 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
                 // The validation step later expects only the files registered 
in the manifest.
                 RestoreJobUtil.cleanDirectory(targetDir.toPath());
                 RestoreJobUtil.unzip(zipFile, targetDir);
-                // Notify the next step that unzip is complete
-                promise.complete(targetDir);
                 // Then, delete the downloaded zip file
                 if (!zipFile.delete())
                 {
                     LOGGER.warn("File deletion attempt failed. jobId={} 
sliceKey={} file={}",
                                 slice.jobId(), slice.key(), 
zipFile.getAbsolutePath());
                 }
+                // Notify the next step that unzip is complete
+                return targetDir;
             }
             catch (Exception cause)
             {
-                promise.tryFail(RestoreJobExceptions.propagate("Failed to 
unzip. File: " + zipFile, cause));
+                throw RestoreJobExceptions.propagate("Failed to unzip. File: " 
+ zipFile, cause);
             }
         }, false); // unordered
 
         return StopWatch.measureTimeTaken(future, d -> 
instanceMetrics.restore().sliceUnzipTime.metric.update(d, 
TimeUnit.NANOSECONDS));
     }
 
-    // Validate integrity of the files from the zip. The failures from any 
step is fatal and not retryable.
+    // Validate integrity of the files from the zip. If the SSTables that are 
fully out of the owning range of the node is removed
     private Future<File> validateFiles(File directory)
     {
-        Future<File> future = executorPool.executeBlocking(promise -> {
-            if (failOnCancelled(promise))
-                return;
+        if (slice.isCancelled())
+            return 
Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is 
cancelled",
+                                                                         
slice, null));
 
-            Map<String, String> checksums;
-            try
-            {
-                File manifestFile = new File(directory, 
RestoreSliceManifest.MANIFEST_FILE_NAME);
-                RestoreSliceManifest manifest = 
RestoreSliceManifest.read(manifestFile);
-                checksums = manifest.mergeAllChecksums();
-            }
-            catch (RestoreJobFatalException e)
+        Future<File> future = executorPool.executeBlocking(() -> {
+            File manifestFile = new File(directory, 
RestoreSliceManifest.MANIFEST_FILE_NAME);
+            RestoreSliceManifest manifest = 
RestoreSliceManifest.read(manifestFile);
+
+            if (manifest.isEmpty())
             {
-                promise.tryFail(e);
-                return;
+                throw new RestoreJobFatalException("The downloaded slice has 
no data. " +
+                                                   "Directory: " + directory);
             }
 
-            if (checksums.isEmpty())
+            // validate the SSTable ranges with the owning range of the node 
and remove the out-of-range sstables
+            if (slice.job().isManagedBySidecar())
             {
-                promise.tryFail(new RestoreJobFatalException("The downloaded 
slice has no data. " +
-                                                             "Directory: " + 
directory));
-                return;
+                removeOutOfRangeSSTables(directory, manifest);
             }
 
+            Map<String, String> checksums = manifest.mergeAllChecksums();
+
             // exclude the manifest file
             File[] files = directory.listFiles((dir, name) -> 
!name.equals(RestoreSliceManifest.MANIFEST_FILE_NAME));
             if (files == null || files.length != checksums.size())
@@ -414,11 +435,10 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
                 String msg = "Number of files does not match. Expected: " + 
checksums.size() +
                              "; Actual: " + (files == null ? 0 : files.length) 
+
                              "; Directory: " + directory;
-                promise.tryFail(new RestoreJobFatalException(msg));
-                return;
+                throw new RestoreJobFatalException(msg);
             }
 
-            compareChecksums(checksums, files, promise);
+            compareChecksums(checksums, files);
 
             // capture the data component size of sstables
             for (File file : files)
@@ -430,13 +450,71 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
             }
 
             // all files match with the provided checksums
-            promise.tryComplete(directory);
+            return directory;
         }, false); // unordered
 
         return StopWatch.measureTimeTaken(future, d -> 
instanceMetrics.restore().sliceValidationTime.metric.update(d, 
TimeUnit.NANOSECONDS));
     }
 
-    private void compareChecksums(Map<String, String> expectedChecksums, 
File[] files, Promise<?> promise)
+    // Remove all the SSTables that does not belong this node
+    // The method modifies the input manifest and delete files under 
directory, if out of range sstables are found
+    private void removeOutOfRangeSSTables(File directory, RestoreSliceManifest 
manifest) throws RestoreJobException, IOException
+    {
+        Set<TokenRange> ranges = 
localTokenRangesProvider.localTokenRanges(slice.keyspace()).get(slice.owner().id());
+        if (ranges == null || ranges.isEmpty())
+        {
+            // Note: retry is allowed for the failure
+            throw new RestoreJobException("Unable to fetch local range, retry 
later");
+        }
+
+        // 1. remove the sstables that are fully out of range
+        // 2. detect if there is any range that partially overlaps. In that 
case, signal that this node is required to run nodetool cleanup on job 
completion
+        Iterator<Map.Entry<String, RestoreSliceManifest.ManifestEntry>> it = 
manifest.entrySet().iterator();
+        while (it.hasNext())
+        {
+            RestoreSliceManifest.ManifestEntry entry = it.next().getValue();
+            // TokenRange is open-closed, hence subtracting one from the 
rangeStart read from manifest
+            TokenRange sstableRange = new 
TokenRange(entry.startToken().subtract(BigInteger.ONE),
+                                                     entry.endToken());
+
+            boolean hasOverlap = false;
+            boolean fullyEnclosed = false;
+            for (TokenRange owningRange : ranges)
+            {
+                if (hasOverlap)
+                {
+                    break;
+                }
+
+                hasOverlap = owningRange.overlaps(sstableRange);
+
+                if (hasOverlap)
+                {
+                    fullyEnclosed = owningRange.encloses(sstableRange);
+                }
+            }
+
+            // fully out of range
+            if (!hasOverlap)
+            {
+                // remove the entry from manifest
+                it.remove();
+                // delete the files
+                for (String fileName : entry.componentsChecksum().keySet())
+                {
+                    Path path = directory.toPath().resolve(fileName);
+                    Files.deleteIfExists(path);
+                }
+            }
+            // overlaps, but is not fully enclosed; we need to run cleanup on 
this node
+            else if (!fullyEnclosed)
+            {
+                slice.requestOutOfRangeDataCleanup();
+            }
+        }
+    }
+
+    private void compareChecksums(Map<String, String> expectedChecksums, 
File[] files) throws RestoreJobFatalException
     {
         for (File file : files)
         {
@@ -444,8 +522,7 @@ public class RestoreSliceTask implements RestoreSliceHandler
             String expectedChecksum = expectedChecksums.get(name);
             if (expectedChecksum == null)
             {
-                promise.tryFail(new RestoreJobFatalException("File not found 
in manifest. File: " + name));
-                return;
+                throw new RestoreJobFatalException("File not found in 
manifest. File: " + name);
             }
 
             try
@@ -455,14 +532,12 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
                 {
                     String msg = "Checksum does not match. Expected: " + 
expectedChecksum +
                                  "; actual: " + actualChecksum + "; file: " + 
file;
-                    promise.tryFail(new RestoreJobFatalException(msg));
-                    return;
+                    throw new RestoreJobFatalException(msg);
                 }
             }
-            catch (Exception cause)
+            catch (IOException cause)
             {
-                promise.tryFail(new RestoreJobFatalException("Failed to 
calculate checksum. File: " + file));
-                return;
+                throw new RestoreJobFatalException("Failed to calculate 
checksum. File: " + file, cause);
             }
         }
     }
@@ -520,17 +595,18 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
                     httpException.getPayload(), httpException);
     }
 
-    @Override
-    public long elapsedInNanos()
+    // For testing only. Unsafe to call in production code.
+    @VisibleForTesting
+    void removeOutOfRangeSSTablesUnsafe(File directory, RestoreSliceManifest 
manifest) throws RestoreJobException, IOException
     {
-        return taskStartTimeNanos == -1 ? -1 :
-               currentTimeInNanos() - taskStartTimeNanos;
+        removeOutOfRangeSSTables(directory, manifest);
     }
 
-    @Override
-    public RestoreSlice slice()
+    // For testing only. Unsafe to call in production code.
+    @VisibleForTesting
+    void compareChecksumsUnsafe(Map<String, String> expectedChecksums, File[] 
files) throws RestoreJobFatalException
     {
-        return slice;
+        compareChecksums(expectedChecksums, files);
     }
 
     /**
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java
index 5905980e..c3e042b9 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTracker.java
@@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
@@ -39,14 +42,17 @@ public class RestoreSliceTracker
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RestoreSliceTracker.class);
 
     private volatile RestoreJob restoreJob;
+    private volatile boolean cleanupOutOfRangeRequested = false;
+    private final InstanceMetadata instanceMetadata;
     private final Map<RestoreSlice, Status> slices = new ConcurrentHashMap<>();
     private final RestoreProcessor processor;
     private final AtomicReference<RestoreJobFatalException> failureRef = new 
AtomicReference<>();
 
-    public RestoreSliceTracker(RestoreJob restoreJob, RestoreProcessor 
restoreProcessor)
+    public RestoreSliceTracker(RestoreJob restoreJob, RestoreProcessor 
restoreProcessor, InstanceMetadata instanceMetadata)
     {
         this.restoreJob = restoreJob;
         this.processor = restoreProcessor;
+        this.instanceMetadata = instanceMetadata;
     }
 
     /**
@@ -102,6 +108,11 @@ public class RestoreSliceTracker
         cleanupInternal();
     }
 
+    public void requestOutOfRangeDataCleanup()
+    {
+        cleanupOutOfRangeRequested = true;
+    }
+
     /**
      * Internal method to clean up the {@link RestoreSlice}.
      * It validates the slices and log warnings if they are not in a final 
state,
@@ -109,8 +120,9 @@ public class RestoreSliceTracker
      */
     void cleanupInternal()
     {
+        boolean succeeded = failureRef.get() == null;
         slices.forEach((slice, status) -> {
-            if (failureRef.get() == null && status != Status.COMPLETED)
+            if (succeeded && status != Status.COMPLETED)
             {
                 LOGGER.warn("Clean up pending restore slice when the job has 
not failed. jobId={}, sliceId={}",
                             restoreJob.jobId, slice.sliceId());
@@ -118,6 +130,34 @@ public class RestoreSliceTracker
             slice.cancel();
         });
         slices.clear();
+
+        runOnCompletion();
+    }
+
+    /**
+     * Run operations on restore job completion, including success and failure 
cases
+     */
+    private void runOnCompletion()
+    {
+        if (cleanupOutOfRangeRequested)
+        {
+            CassandraAdapterDelegate delegate = instanceMetadata.delegate();
+            StorageOperations operations = delegate == null ? null : 
delegate.storageOperations();
+            if (operations == null)
+            {
+                LOGGER.warn("Out of range data cleanup for the restore job is 
requested. It failed to start the operation. jobId={}", restoreJob.jobId);
+                return;
+            }
+
+            try
+            {
+                operations.outOfRangeDataCleanup(restoreJob.keyspaceName, 
restoreJob.tableName);
+            }
+            catch (Throwable cause)
+            {
+                LOGGER.warn("Clean up out of range data has failed", cause);
+            }
+        }
     }
 
     /**
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index 2795b7b5..fd31c7cb 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -148,13 +148,6 @@ public class StorageClient
             return failedFuture;
         }
 
-        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
-        GetObjectRequest request =
-        GetObjectRequest.builder()
-                        .overrideConfiguration(b -> 
b.credentialsProvider(credentials.awsCredentialsProvider()))
-                        .bucket(slice.bucket())
-                        .key(slice.key())
-                        .build();
         Path objectPath = slice.stagedObjectPath();
         File object = objectPath.toFile();
         if (object.exists())
@@ -169,12 +162,22 @@ public class StorageClient
             // For now, we just skip download, assuming the scenario is rare 
and no maliciousness
             return CompletableFuture.completedFuture(object);
         }
+
         if (!object.getParentFile().mkdirs())
         {
-            LOGGER.warn("Error occurred while creating directory. jobId={} 
s3_object={}",
+            LOGGER.warn("Error occurred while creating directory. jobId={} 
s3Object={}",
                         slice.jobId(), slice.stagedObjectPath());
+
         }
-        LOGGER.info("Downloading object. jobId={} s3_object={}", 
slice.jobId(), slice.stagedObjectPath());
+
+        LOGGER.info("Downloading object. jobId={} s3Object={}", slice.jobId(), 
slice.stagedObjectPath());
+        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+        GetObjectRequest request =
+        GetObjectRequest.builder()
+                        .overrideConfiguration(b -> 
b.credentialsProvider(credentials.awsCredentialsProvider()))
+                        .bucket(slice.bucket())
+                        .key(slice.key())
+                        .build();
         return rateLimitedGetObject(slice, client, request, objectPath)
                .whenComplete(logCredentialOnRequestFailure(slice, credentials))
                .thenApply(res -> object);
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java
index 59f04e7d..5325b2ad 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientIntegrationTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.common;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
@@ -29,6 +30,8 @@ import org.apache.cassandra.testing.CassandraIntegrationTest;
 import org.apache.cassandra.testing.CassandraTestContext;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Test to ensure connectivity with the JMX client
@@ -37,46 +40,77 @@ public class JmxClientIntegrationTest
 {
     private static final String SS_OBJ_NAME = 
"org.apache.cassandra.db:type=StorageService";
 
+    /**
+     * Test jmx connectivity with various operations
+     */
     @CassandraIntegrationTest
     void testJmxConnectivity(CassandraTestContext context) throws IOException
     {
         try (JmxClient jmxClient = createJmxClient(context))
         {
-            String opMode = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
-                                     .getOperationMode();
-            assertThat(opMode).isNotNull();
-            assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", 
"DECOMMISSIONED", "CLIENT");
-
-            IUpgradeableInstance instance = 
context.cluster().getFirstRunningInstance();
-            IInstanceConfig config = instance.config();
-            
assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress());
-            assertThat(jmxClient.port()).isEqualTo(config.jmxPort());
+            testGetOperationMode(jmxClient, context.cluster());
+
+            testGossipInfo(jmxClient);
+
+            testCorrectVersion(jmxClient, 
String.valueOf(context.version.major));
+
+            testTableCleanup(jmxClient, context.cluster());
         }
     }
 
-    @CassandraIntegrationTest
-    void testGossipInfo(CassandraTestContext context) throws IOException
+    private void testGetOperationMode(JmxClient jmxClient, UpgradeableCluster 
cluster)
     {
-        try (JmxClient jmxClient = createJmxClient(context))
-        {
-            FailureDetector proxy = jmxClient.proxy(FailureDetector.class,
-                                                    
"org.apache.cassandra.net:type=FailureDetector");
-            String rawGossipInfo = proxy.getAllEndpointStates();
-            assertThat(rawGossipInfo).isNotEmpty();
-            Map<String, ?> gossipInfoMap = 
GossipInfoParser.parse(rawGossipInfo);
-            assertThat(gossipInfoMap).isNotEmpty();
-            gossipInfoMap.forEach((key, value) -> 
GossipInfoParser.isGossipInfoHostHeader(key));
-        }
+        String opMode = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                                 .getOperationMode();
+        assertThat(opMode).isNotNull();
+        assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", 
"DECOMMISSIONED", "CLIENT");
+
+        IUpgradeableInstance instance = cluster.getFirstRunningInstance();
+        IInstanceConfig config = instance.config();
+        
assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress());
+        assertThat(jmxClient.port()).isEqualTo(config.jmxPort());
     }
 
-    @CassandraIntegrationTest
-    void testCorrectVersion(CassandraTestContext context) throws IOException
+    private void testGossipInfo(JmxClient jmxClient)
     {
-        try (JmxClient jmxClient = createJmxClient(context))
+        FailureDetector proxy = jmxClient.proxy(FailureDetector.class,
+                                                
"org.apache.cassandra.net:type=FailureDetector");
+        String rawGossipInfo = proxy.getAllEndpointStates();
+        assertThat(rawGossipInfo).isNotEmpty();
+        Map<String, ?> gossipInfoMap = GossipInfoParser.parse(rawGossipInfo);
+        assertThat(gossipInfoMap).isNotEmpty();
+        gossipInfoMap.forEach((key, value) -> 
GossipInfoParser.isGossipInfoHostHeader(key));
+    }
+
+    private void testCorrectVersion(JmxClient jmxClient, String majorVersion)
+    {
+        String releaseVersion = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                                         .getReleaseVersion();
+        assertThat(releaseVersion).startsWith(majorVersion);
+    }
+
+    // a test to ensure the jmx client can invoke the MBean method
+    private void testTableCleanup(JmxClient jmxClient, UpgradeableCluster 
cluster)
+    {
+        cluster.schemaChange("CREATE KEYSPACE jmx_client_test WITH REPLICATION 
= {'class' : 'SimpleStrategy', 'replication_factor' : 1}");
+        cluster.schemaChange("CREATE TABLE jmx_client_test.table_cleanup ( a 
int PRIMARY KEY, b int)");
+        cluster.get(1).executeInternal("INSERT INTO 
jmx_client_test.table_cleanup (a, b) VALUES (1, 1)");
+        cluster.get(1).flush("jmx_client_test");
+        int status = -1;
+        try
+        {
+            status = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                              .forceKeyspaceCleanup(1, "jmx_client_test", 
"table_cleanup");
+        }
+        catch (Exception e)
         {
-            jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
-                     .refreshSizeEstimates();
+            fail("Unexpected exception ", e);
         }
+        assertThat(status).isZero();
+
+        assertThatThrownBy(() -> jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                                          .forceKeyspaceCleanup(1, 
"jmx_client_test", "table_not_exist"))
+        .hasMessageContaining("Unknown keyspace/cf pair");
     }
 
     /**
@@ -89,6 +123,8 @@ public class JmxClientIntegrationTest
         void refreshSizeEstimates();
 
         String getReleaseVersion();
+
+        int forceKeyspaceCleanup(int jobs, String keyspaceName, String... 
tables);
     }
 
     /**
@@ -99,7 +135,6 @@ public class JmxClientIntegrationTest
         String getAllEndpointStates();
     }
 
-
     private static JmxClient createJmxClient(CassandraTestContext context)
     {
         IUpgradeableInstance instance = 
context.cluster().getFirstRunningInstance();
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
index d08131de..604532b5 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
@@ -40,8 +40,8 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.TokenSupplier;
-import org.apache.cassandra.sidecar.adapters.base.Partitioner;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.AbstractCassandraTestContext;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java
index aff80a53..deb84afc 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingBaseTest.java
@@ -42,8 +42,8 @@ import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
-import org.apache.cassandra.sidecar.adapters.base.Partitioner;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 
 import static org.assertj.core.api.Assertions.assertThat;
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java 
b/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java
index d3f065b4..6085525e 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/concurrent/ExecutorPoolsTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.sidecar.concurrent;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -35,6 +33,7 @@ import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
 import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -44,7 +43,7 @@ import static org.mockito.Mockito.when;
 /**
  * Test {@link ExecutorPools}
  */
-public class ExecutorPoolsTest
+class ExecutorPoolsTest
 {
     private ExecutorPools pools;
     private SidecarMetrics metrics;
@@ -69,7 +68,7 @@ public class ExecutorPoolsTest
     }
 
     @Test
-    public void testClosingExecutorPoolShouldThrow()
+    void testClosingExecutorPoolShouldThrow()
     {
         assertThatThrownBy(() -> pools.service().close())
         .hasMessage("Closing TaskExecutorPool is not supported!")
@@ -81,9 +80,37 @@ public class ExecutorPoolsTest
     }
 
     @Test
-    public void testOrdered()
+    void testExecutionOrder()
     {
-        // not thread-safe
+        testExecutionOrder(true, true);
+        testExecutionOrder(false, true);
+        testExecutionOrder(true, false);
+        testExecutionOrder(false, false);
+    }
+
+    @Test
+    void testMetricCapture()
+    {
+        TaskExecutorPool pool = pools.internal();
+        int total = 100;
+        CountDownLatch stop = new CountDownLatch(total);
+        for (int i = 0; i < total; i++)
+        {
+            pool.runBlocking(() -> stop.countDown());
+        }
+
+        assertThat(Uninterruptibles.awaitUninterruptibly(stop, 10, 
TimeUnit.SECONDS))
+        .describedAs("Test should finish in 10 seconds")
+        .isTrue();
+
+        // there could be some delay to read the metric that reflects the last 
task. If so, retry the assertion for at most 2 seconds
+        loopAssert(2,
+                   () -> 
assertThat(metrics.server().resource().internalTaskTime.metric.getCount()).isEqualTo(total));
+    }
+
+    private void testExecutionOrder(boolean orderedSubmission, boolean 
orderedExecution)
+    {
+        // not thread-safe deliberated
         class IntWrapper
         {
             int i = 0;
@@ -97,25 +124,33 @@ public class ExecutorPoolsTest
         TaskExecutorPool pool = pools.internal();
         IntWrapper v = new IntWrapper();
         int total = 100;
+        CountDownLatch ready = new CountDownLatch(1);
         CountDownLatch stop = new CountDownLatch(total);
-        Set<String> threadNames = new HashSet<>();
         for (int i = 0; i < total; i++)
         {
-            // Start 100 parallel executions that each submits the ordered 
execution
-            pool.executeBlocking(promise -> {
-                pool.executeBlocking(p -> {
+            // Start 100 executions that each submits the ordered execution
+            pool.runBlocking(() -> {
+                Uninterruptibles.awaitUninterruptibly(ready);
+                pool.runBlocking(() -> {
                     v.increment();
-                    threadNames.add(Thread.currentThread().getName());
                     stop.countDown();
-                    
assertThat(metrics.server().resource().internalTaskTime.metric.getCount()).isEqualTo(200);
-                }, true);
-            }, false);
+                }, orderedExecution);
+            }, orderedSubmission);
         }
+        ready.countDown();
 
         assertThat(Uninterruptibles.awaitUninterruptibly(stop, 10, 
TimeUnit.SECONDS))
         .describedAs("Test should finish in 10 seconds")
         .isTrue();
+
         // Although IntWrapper is not thread safe, the serial execution 
(ordered) prevents any race condition.
-        assertThat(v.i).isEqualTo(total);
+        if (orderedExecution)
+        {
+            assertThat(v.i).isEqualTo(total);
+        }
+        else // if execution is unordered, the output is likely less than 
total due to race
+        {
+            assertThat(v.i).isLessThanOrEqualTo(total);
+        }
     }
 }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java 
b/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java
deleted file mode 100644
index 4671f187..00000000
--- a/src/test/java/org/apache/cassandra/sidecar/locator/TokenRangeTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.locator;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.Token;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-class TokenRangeTest
-{
-    @Test
-    void testEquals()
-    {
-        TokenRange r1 = new TokenRange(1, 100);
-        TokenRange r2 = new TokenRange(1, 100);
-        TokenRange r3 = new TokenRange(-10, 10);
-        assertThat(r1).isEqualTo(r2);
-        assertThat(r1).isEqualTo(r1);
-        assertThat(r1).isNotEqualTo(r3);
-        assertThat(r3).isNotEqualTo(r1);
-    }
-
-    @Test
-    void testCreateFromJavaDriverTokenRange()
-    {
-        com.datastax.driver.core.TokenRange ordinaryRange = mockRange(1L, 
100L);
-        when(ordinaryRange.isWrappedAround()).thenReturn(false);
-        when(ordinaryRange.unwrap()).thenCallRealMethod();
-        List<TokenRange> ranges = TokenRange.from(ordinaryRange);
-        assertThat(ranges).hasSize(1)
-                          .isEqualTo(Collections.singletonList(new 
TokenRange(1, 100)));
-    }
-
-    @Test
-    void testCreateFromWraparoundJavaDriverTokenRange()
-    {
-        com.datastax.driver.core.TokenRange range = mockRange(10L, -10L);
-        List<com.datastax.driver.core.TokenRange> unwrapped = 
Arrays.asList(mockRange(10L, Long.MIN_VALUE),
-                                                                            
mockRange(Long.MIN_VALUE, -10L));
-        when(range.unwrap()).thenReturn(unwrapped);
-        List<TokenRange> ranges = TokenRange.from(range);
-        assertThat(ranges).hasSize(2)
-                          .isEqualTo(Arrays.asList(new TokenRange(10, 
Long.MIN_VALUE),
-                                                   new 
TokenRange(Long.MIN_VALUE, -10L)));
-    }
-
-    private com.datastax.driver.core.TokenRange mockRange(long start, long end)
-    {
-        com.datastax.driver.core.TokenRange range = 
mock(com.datastax.driver.core.TokenRange.class);
-        Token startToken = mockToken(start);
-        when(range.getStart()).thenReturn(startToken);
-        Token endToken = mockToken(end);
-        when(range.getEnd()).thenReturn(endToken);
-        return range;
-    }
-
-    private Token mockToken(long value)
-    {
-        Token token = mock(Token.class);
-        when(token.getType()).thenReturn(DataType.bigint());
-        when(token.getValue()).thenReturn(value);
-        return token;
-    }
-}
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
index 51ad81e8..89729a31 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
@@ -148,7 +148,8 @@ class RestoreJobManagerTest
 
         manager.removeJobInternal(slice.jobId()); // it cancels the 
non-completed slices
 
-        assertThat(slice.isCancelled()).isTrue();
+        // removeJobInternal runs async. Wait for at most 2 seconds for the 
slice to be cancelled
+        loopAssert(2, () -> assertThat(slice.isCancelled()).isTrue());
     }
 
     @Test
@@ -252,6 +253,7 @@ class RestoreJobManagerTest
         RestoreSlice slice = RestoreSlice
                              .builder()
                              .jobId(job.jobId)
+                             .sliceId("testSliceId")
                              .bucketId((short) 0)
                              .stageDirectory(testDir, "uploadId")
                              .storageKey("storageKey")
@@ -260,7 +262,7 @@ class RestoreJobManagerTest
                              .replicaStatus(Collections.emptyMap())
                              .replicas(Collections.emptySet())
                              .build();
-        RestoreSliceTracker tracker = new RestoreSliceTracker(job, 
mock(RestoreProcessor.class));
+        RestoreSliceTracker tracker = new RestoreSliceTracker(job, 
mock(RestoreProcessor.class), owner);
         slice.registerTracker(tracker);
         return slice;
     }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index d5812dd0..e878fa22 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -215,7 +215,7 @@ class RestoreProcessorTest
                                    .jobStatus(RestoreJobStatus.CREATED)
                                    .build();
         when(slice.job()).thenReturn(job);
-        when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), 
any())).thenReturn(
+        when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), 
any(), any())).thenReturn(
         new RestoreSliceHandler()
         {
             private Long startTime = timeInNanosSupplier.get();
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 0c7dfc92..0d2a0c41 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -20,14 +20,22 @@ package org.apache.cassandra.sidecar.restore;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
+import java.util.stream.IntStream;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -40,6 +48,7 @@ import io.vertx.core.Promise;
 import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
@@ -49,14 +58,18 @@ import org.apache.cassandra.sidecar.db.RestoreSlice;
 import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
+import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceRestoreMetrics;
+import org.apache.cassandra.sidecar.restore.RestoreSliceManifest.ManifestEntry;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.SSTableImporter;
+import org.apache.cassandra.sidecar.utils.XXHash32Provider;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 
@@ -64,7 +77,9 @@ import static 
org.apache.cassandra.sidecar.AssertionUtils.getBlocking;
 import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -77,6 +92,7 @@ class RestoreSliceTaskTest
     private TaskExecutorPool executorPool;
     private SidecarMetrics metrics;
     private TestRestoreSliceAccessor sliceDatabaseAccessor;
+    private LocalTokenRangesProvider localTokenRangesProvider;
     private RestoreJobUtil util;
 
     @BeforeEach
@@ -99,7 +115,8 @@ class RestoreSliceTaskTest
         when(mockRegistryFactory.getOrCreate()).thenReturn(registry());
         when(mockRegistryFactory.getOrCreate(1)).thenReturn(registry(1));
         metrics = new SidecarMetricsImpl(mockRegistryFactory, 
mockInstanceMetadataFetcher);
-        util = mock(RestoreJobUtil.class);
+        localTokenRangesProvider = mock(LocalTokenRangesProvider.class);
+        util = new RestoreJobUtil(new XXHash32Provider());
         sliceDatabaseAccessor = new TestRestoreSliceAccessor();
     }
 
@@ -340,6 +357,95 @@ class RestoreSliceTaskTest
         assertThat(task.elapsedInNanos()).isEqualTo(123L);
     }
 
+    @Test
+    void testRemoveOutOfRangeSSTables(@TempDir Path tempDir) throws 
RestoreJobException, IOException
+    {
+        // TODO: update test to use replica ranges implementation
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.STAGED, "QUORUM");
+        RestoreSliceTask task = createTask(mockSlice, job);
+
+        // the mocked localTokenRangesProvider returns null, so retry later
+        RestoreSliceManifest manifest = new RestoreSliceManifest();
+        assertThatThrownBy(() -> 
task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest))
+        .isExactlyInstanceOf(RestoreJobException.class)
+        .hasMessageContaining("Unable to fetch local range, retry later");
+
+        // enclosed in the node's owning range: [1, 10] is fully enclosed in 
(0, 100]
+        // it should not remove the manifest entry, and no cleanup is needed
+        Map<Integer, Set<TokenRange>> localRanges = new HashMap<>(1);
+        Set<TokenRange> nodeRanges = new HashSet<>();
+        nodeRanges.add(new TokenRange(0, 100)); // not using vnode, so a 
single range
+        localRanges.put(1, nodeRanges); // instance id is 1. See setup()
+        
when(localTokenRangesProvider.localTokenRanges(any())).thenReturn(localRanges);
+        ManifestEntry rangeEnclosed = new ManifestEntry(Collections.emptyMap(),
+                                                        BigInteger.valueOf(1), 
// start
+                                                        
BigInteger.valueOf(10)); // end
+        manifest.put("foo-", rangeEnclosed);
+        task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest);
+        assertThat(manifest).hasSize(1);
+        verify(mockSlice, times(0)).requestOutOfRangeDataCleanup();
+
+        // fully out of range: [-10, 0] is fully out of range of (0, 100]
+        // it should remove the manifest entry entirely; no clean up required
+        manifest.clear();
+        ManifestEntry outOfRange = new ManifestEntry(Collections.emptyMap(),
+                                                     BigInteger.valueOf(-10), 
// start
+                                                     BigInteger.valueOf(0)); 
// end
+        manifest.put("foo-", outOfRange);
+        task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest);
+        assertThat(manifest).isEmpty();
+        verify(mockSlice, times(0)).requestOutOfRangeDataCleanup();
+
+        // partially out of range: [-10, 10] is partially out of range of (0, 
100]
+        // it should not remove the manifest entry, but it should signal to 
request out of range data cleanup
+        manifest.clear();
+        ManifestEntry partiallyOutOfRange = new 
ManifestEntry(Collections.emptyMap(),
+                                                              
BigInteger.valueOf(-10), // start
+                                                              
BigInteger.valueOf(10)); // end
+        manifest.put("foo-", partiallyOutOfRange);
+        task.removeOutOfRangeSSTablesUnsafe(tempDir.toFile(), manifest);
+        assertThat(manifest).hasSize(1);
+        verify(mockSlice, times(1)).requestOutOfRangeDataCleanup();
+    }
+
+    @Test
+    void testCompareChecksum(@TempDir Path tempDir) throws 
RestoreJobFatalException, IOException
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.CREATED);
+        RestoreSliceTask task = createTask(mockSlice, job);
+
+        byte[] bytes = "Hello".getBytes(StandardCharsets.UTF_8);
+        File[] testFiles = IntStream.range(0, 10).mapToObj(i -> new 
File(tempDir.toFile(), "f" + i))
+                                    .map(f -> ThrowableUtils.propagate(() -> 
Files.write(f.toPath(), bytes)).toFile())
+                                    .toArray(File[]::new);
+        Map<String, String> expectedChecksums = new HashMap<>(10);
+        for (File f : testFiles)
+        {
+            expectedChecksums.put(f.getName(), util.checksum(f));
+        }
+
+        assertThat(expectedChecksums)
+        .hasSize(10)
+        .containsEntry("f0", "f206d28f"); // hash value for "Hello"
+
+        // it should not throw
+        task.compareChecksumsUnsafe(expectedChecksums, testFiles);
+
+        // test check with file that does not exist
+        Map<String, String> nonexistFileChecksums = new HashMap<>(10);
+        nonexistFileChecksums.put("non-exist-file", "hash");
+        assertThatThrownBy(() -> 
task.compareChecksumsUnsafe(nonexistFileChecksums, testFiles))
+        .isInstanceOf(RestoreJobFatalException.class)
+        .hasMessageContaining("File not found in manifest");
+
+        // test check with invalid checksum value
+        Map<String, String> invalidChecksums = new 
HashMap<>(expectedChecksums);
+        invalidChecksums.put("f0", "invalid_hash"); // modify the hash of the 
file
+        assertThatThrownBy(() -> task.compareChecksumsUnsafe(invalidChecksums, 
testFiles))
+        .isInstanceOf(RestoreJobFatalException.class)
+        .hasMessageContaining("Checksum does not match. Expected: 
invalid_hash; actual: f206d28f");
+    }
+
     private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job)
     {
         return createTask(slice, job, System::nanoTime);
@@ -351,10 +457,10 @@ class RestoreSliceTaskTest
         assertThat(slice.job()).isSameAs(job);
         
assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
         assertThat(slice.job().status).isEqualTo(job.status);
-        RestoreJobUtil util = mock(RestoreJobUtil.class);
-        when(util.currentTimeNanos()).thenAnswer(invok -> 
currentNanoTimeSupplier.get());
+        RestoreJobUtil spiedUtil = spy(util);
+        when(spiedUtil.currentTimeNanos()).thenAnswer(invok -> 
currentNanoTimeSupplier.get());
         return new TestRestoreSliceTask(slice, mockStorageClient, 
executorPool, mockSSTableImporter,
-                                        0, sliceDatabaseAccessor, util, 
metrics);
+                                        0, sliceDatabaseAccessor, spiedUtil, 
localTokenRangesProvider, metrics);
     }
 
     private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, 
RestoreJob job)
@@ -365,7 +471,7 @@ class RestoreSliceTaskTest
         assertThat(slice.job().status).isEqualTo(job.status);
         return new TestUnexpectedExceptionInRestoreSliceTask(slice, 
mockStorageClient, executorPool,
                                                              
mockSSTableImporter, 0, sliceDatabaseAccessor,
-                                                             util, metrics);
+                                                             util, 
localTokenRangesProvider, metrics);
     }
 
     static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor
@@ -393,10 +499,11 @@ class RestoreSliceTaskTest
         public TestRestoreSliceTask(RestoreSlice slice, StorageClient 
s3Client, TaskExecutorPool executorPool,
                                     SSTableImporter importer, double 
requiredUsableSpacePercentage,
                                     RestoreSliceDatabaseAccessor 
sliceDatabaseAccessor, RestoreJobUtil restoreJobUtil,
+                                    LocalTokenRangesProvider 
localTokenRangesProvider,
                                     SidecarMetrics metrics)
         {
             super(slice, s3Client, executorPool, importer, 
requiredUsableSpacePercentage,
-                  sliceDatabaseAccessor, restoreJobUtil, metrics);
+                  sliceDatabaseAccessor, restoreJobUtil, 
localTokenRangesProvider, metrics);
             this.slice = slice;
             this.instanceMetrics = metrics.instance(slice.owner().id());
         }
@@ -429,10 +536,12 @@ class RestoreSliceTaskTest
                                                          TaskExecutorPool 
executorPool, SSTableImporter importer,
                                                          double 
requiredUsableSpacePercentage,
                                                          
RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
-                                                         RestoreJobUtil util, 
SidecarMetrics metrics)
+                                                         RestoreJobUtil util,
+                                                         
LocalTokenRangesProvider localTokenRangesProvider,
+                                                         SidecarMetrics 
metrics)
         {
             super(slice, s3Client, executorPool, importer, 
requiredUsableSpacePercentage,
-                  sliceDatabaseAccessor, util, metrics);
+                  sliceDatabaseAccessor, util, localTokenRangesProvider, 
metrics);
         }
 
         @Override
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
 
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
index 3eb94a0b..5a78fc20 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
@@ -18,14 +18,12 @@
 
 package org.apache.cassandra.sidecar.routes.restore;
 
-import java.math.BigInteger;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
-import com.google.common.collect.Range;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -45,6 +43,7 @@ import 
org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import 
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
 import 
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
@@ -153,7 +152,7 @@ public abstract class BaseRestoreJobTests
         testRestoreSlices.updateStatusFunc = func;
     }
 
-    protected void mockLookupRestoreSlices(BiFunction<UUID, Range<BigInteger>, 
List<RestoreSlice>> func)
+    protected void mockLookupRestoreSlices(BiFunction<UUID, TokenRange, 
List<RestoreSlice>> func)
     {
         testRestoreSlices.selectByJobByRangeFunc = func;
     }
@@ -206,7 +205,7 @@ public abstract class BaseRestoreJobTests
         {
             Function<RestoreSlice, RestoreSlice> createFunc;
             Function<RestoreSlice, RestoreSlice> updateStatusFunc;
-            BiFunction<UUID, Range<BigInteger>, List<RestoreSlice>> 
selectByJobByRangeFunc;
+            BiFunction<UUID, TokenRange, List<RestoreSlice>> 
selectByJobByRangeFunc;
 
             TestRestoreSliceDatabaseAccessor(SidecarSchema sidecarSchema)
             {
@@ -226,10 +225,9 @@ public abstract class BaseRestoreJobTests
             }
 
             @Override
-            public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID 
jobId, short bucketId,
-                                                                      
BigInteger startToken, BigInteger endToken)
+            public List<RestoreSlice> selectByJobByBucketByTokenRange(UUID 
jobId, short bucketId, TokenRange range)
             {
-                return selectByJobByRangeFunc.apply(jobId, 
Range.closed(startToken, endToken));
+                return selectByJobByRangeFunc.apply(jobId, range);
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to