This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee45474  CASSANDRASC-99 Break restore job into stage and import phases 
and persist restore slice status on phase completion
ee45474 is described below

commit ee454741363f3f693726af242c5ec37ad1480d60
Author: Yifan Cai <y...@apache.org>
AuthorDate: Mon Jan 29 16:09:25 2024 -0800

    CASSANDRASC-99 Break restore job into stage and import phases and persist 
restore slice status on phase completion
    
    patch by Yifan Cai; reviewed by Doug Rohrer, Francisco Guerrero for 
CASSANDRASC-99
---
 .../data/CreateRestoreJobRequestPayload.java       |  28 ++-
 .../sidecar/common/data/RestoreJobConstants.java   |   1 +
 .../sidecar/common/data/RestoreJobStatus.java      |   1 +
 .../sidecar/common/data/RestoreSliceStatus.java    |  37 ++-
 .../data/CreateRestoreJobRequestPayloadTest.java   |   6 +-
 .../common/data/RestoreSliceStatusTest.java        |  83 +++++++
 spotbugs-exclude.xml                               |   1 +
 .../config/yaml/RestoreJobConfigurationImpl.java   |  16 +-
 .../apache/cassandra/sidecar/db/RestoreJob.java    |  85 ++++---
 .../sidecar/db/RestoreJobDatabaseAccessor.java     |  21 +-
 .../apache/cassandra/sidecar/db/RestoreSlice.java  |  97 ++++++--
 .../sidecar/db/RestoreSliceDatabaseAccessor.java   |  47 ++--
 .../sidecar/db/schema/RestoreJobsSchema.java       |   5 +-
 .../sidecar/db/schema/RestoreSlicesSchema.java     |   2 +-
 .../sidecar/locator/CachedLocalTokenRanges.java    | 276 +++++++++++++++++++++
 .../sidecar/locator/LocalTokenRangesProvider.java  |  41 +++
 .../sidecar/restore/RestoreJobDiscoverer.java      |  55 +++-
 .../cassandra/sidecar/restore/RestoreJobUtil.java  |   2 +-
 .../sidecar/restore/RestoreProcessor.java          |  36 ++-
 .../sidecar/restore/RestoreSliceTask.java          | 118 +++++++--
 .../cassandra/sidecar/restore/StorageClient.java   |   2 +-
 .../routes/restore/AbortRestoreJobHandler.java     |   6 +-
 .../routes/restore/CreateRestoreJobHandler.java    |   2 +-
 .../routes/restore/CreateRestoreSliceHandler.java  |   2 +-
 .../routes/restore/UpdateRestoreJobHandler.java    |  17 +-
 .../db/RestoreJobsDatabaseAccessorIntTest.java     |  12 +-
 .../testing/ConfigurableCassandraTestContext.java  |  43 +++-
 .../cassandra/sidecar/db/RestoreJobTest.java       |  16 ++
 .../cassandra/sidecar/db/SidecarSchemaTest.java    |  53 +++-
 .../sidecar/restore/RestoreJobDiscovererTest.java  |  84 ++++---
 .../sidecar/restore/RestoreJobManagerTest.java     |   7 +-
 .../sidecar/restore/RestoreProcessorTest.java      |   3 +-
 .../sidecar/restore/RestoreSliceTaskTest.java      | 113 +++++++--
 .../sidecar/restore/RestoreSliceTest.java          |   2 +-
 .../routes/restore/BaseRestoreJobTests.java        |   1 -
 .../restore/RestoreJobSummaryHandlerTest.java      |  29 ++-
 .../restore/UpdateRestoreJobHandlerTest.java       |  10 +-
 .../sidecar/utils/AsyncFileSystemUtilsTest.java    | 111 +++++++++
 38 files changed, 1255 insertions(+), 216 deletions(-)

diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java
 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java
index 12858d8..0e5a9a0 100644
--- 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java
@@ -26,8 +26,10 @@ import java.util.function.Consumer;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_AGENT;
+import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_CONSISTENCY_LEVEL;
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_EXPIRE_AT;
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_ID;
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_IMPORT_OPTIONS;
@@ -43,6 +45,8 @@ public class CreateRestoreJobRequestPayload
     private final RestoreJobSecrets secrets;
     private final SSTableImportOptions importOptions;
     private final long expireAtInMillis;
+    @Nullable
+    private final String consistencyLevel; // optional field
 
     /**
      * Builder to build a CreateRestoreJobRequest
@@ -65,13 +69,15 @@ public class CreateRestoreJobRequestPayload
      * @param secrets          secrets to be used by restore job to download 
data
      * @param importOptions    the configured options for SSTable import
      * @param expireAtInMillis a timestamp in the future when the job is 
considered expired
+     * @param consistencyLevel consistency level a job should satisfy
      */
     @JsonCreator
     public CreateRestoreJobRequestPayload(@JsonProperty(JOB_ID) UUID jobId,
                                           @JsonProperty(JOB_AGENT) String 
jobAgent,
                                           @JsonProperty(JOB_SECRETS) 
RestoreJobSecrets secrets,
                                           @JsonProperty(JOB_IMPORT_OPTIONS) 
SSTableImportOptions importOptions,
-                                          @JsonProperty(JOB_EXPIRE_AT) long 
expireAtInMillis)
+                                          @JsonProperty(JOB_EXPIRE_AT) long 
expireAtInMillis,
+                                          @JsonProperty(JOB_CONSISTENCY_LEVEL) 
String consistencyLevel)
     {
         Preconditions.checkArgument(jobId == null || jobId.version() == 1,
                                     "Only time based UUIDs allowed for jobId");
@@ -85,6 +91,7 @@ public class CreateRestoreJobRequestPayload
                              ? SSTableImportOptions.defaults()
                              : importOptions;
         this.expireAtInMillis = expireAtInMillis;
+        this.consistencyLevel = consistencyLevel;
     }
 
     private CreateRestoreJobRequestPayload(Builder builder)
@@ -94,6 +101,7 @@ public class CreateRestoreJobRequestPayload
         this.secrets = builder.secrets;
         this.importOptions = builder.importOptions;
         this.expireAtInMillis = builder.expireAtInMillis;
+        this.consistencyLevel = builder.consistencyLevel;
     }
 
     /**
@@ -151,6 +159,16 @@ public class CreateRestoreJobRequestPayload
         return new Date(expireAtInMillis);
     }
 
+    /**
+     * @return the consistency level a job should satisfy
+     */
+    @JsonProperty(JOB_CONSISTENCY_LEVEL)
+    @Nullable
+    public String consistencyLevel()
+    {
+        return consistencyLevel;
+    }
+
     @Override
     public String toString()
     {
@@ -159,6 +177,7 @@ public class CreateRestoreJobRequestPayload
                JOB_AGENT + "='" + jobAgent + "', " +
                JOB_SECRETS + "='" + secrets + "', " +
                JOB_EXPIRE_AT + "='" + expireAtInMillis + "', " +
+               JOB_CONSISTENCY_LEVEL + "='" + consistencyLevel + "', " +
                JOB_IMPORT_OPTIONS + "='" + importOptions + "'}";
     }
 
@@ -173,6 +192,7 @@ public class CreateRestoreJobRequestPayload
 
         private UUID jobId = null;
         private String jobAgent = null;
+        private String consistencyLevel = null;
 
         Builder(RestoreJobSecrets secrets, long expireAtInMillis)
         {
@@ -198,6 +218,12 @@ public class CreateRestoreJobRequestPayload
             return this;
         }
 
+        public Builder consistencyLevel(String consistencyLevel)
+        {
+            this.consistencyLevel = consistencyLevel;
+            return this;
+        }
+
         public CreateRestoreJobRequestPayload build()
         {
             return new CreateRestoreJobRequestPayload(this);
diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
index 78707ae..d18b0b3 100644
--- 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
@@ -32,6 +32,7 @@ public class RestoreJobConstants
     public static final String JOB_CREATED_AT = "createdAt";
     public static final String JOB_KEYSPACE = "keyspace";
     public static final String JOB_TABLE = "table";
+    public static final String JOB_CONSISTENCY_LEVEL = "consistencyLevel";
     public static final String SLICE_ID = "sliceId";
     public static final String BUCKET_ID = "bucketId";
     public static final String SLICE_START_TOKEN = "startToken";
diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java
 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java
index 1d691da..4694510 100644
--- 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java
@@ -24,6 +24,7 @@ package org.apache.cassandra.sidecar.common.data;
 public enum RestoreJobStatus
 {
     CREATED,
+    STAGED,
     @Deprecated // replaced by ABORTED
     FAILED,
     ABORTED,
diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java
 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java
index fa874bb..379be71 100644
--- 
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java
@@ -18,15 +18,44 @@
 
 package org.apache.cassandra.sidecar.common.data;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+
 /**
  * Holds all possible restore slice statues
  */
 public enum RestoreSliceStatus
 {
-    EMPTY,
-    PROCESSING,
-    COMMITTING,
     SUCCEEDED,
     FAILED,
-    ABORTED
+    ABORTED,
+    COMMITTING(SUCCEEDED, FAILED, ABORTED),
+    STAGED(COMMITTING, FAILED, ABORTED),
+    PROCESSING(STAGED, FAILED, ABORTED),
+    EMPTY(PROCESSING, FAILED, ABORTED);
+
+    // Do not use EnumSet, since validTargetStatuses is assigned on 
constructing and enums are not available yet.
+    private final Set<RestoreSliceStatus> validTargetStatusSet;
+
+    RestoreSliceStatus(RestoreSliceStatus... targetStatuses)
+    {
+        this.validTargetStatusSet = new HashSet<>();
+        Collections.addAll(validTargetStatusSet, targetStatuses);
+    }
+
+    /**
+     * Advance the status with validation
+     * @param targetStatus target status to advance to
+     * @return new status
+     */
+    public RestoreSliceStatus advanceTo(RestoreSliceStatus targetStatus)
+    {
+        
Preconditions.checkArgument(validTargetStatusSet.contains(targetStatus),
+                                    name() + " status can only advance to one 
of the follow statuses: " +
+                                    validTargetStatusSet);
+        return targetStatus;
+    }
 }
diff --git 
a/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java
 
b/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java
index e7f47b9..08ca103 100644
--- 
a/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java
+++ 
b/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java
@@ -46,6 +46,7 @@ class CreateRestoreJobRequestPayloadTest
         Date date = Date.from(Instant.ofEpochMilli(time));
         CreateRestoreJobRequestPayload req = 
CreateRestoreJobRequestPayload.builder(secrets, time)
                                                                            
.jobId(UUID.fromString(id))
+                                                                           
.consistencyLevel("QUORUM")
                                                                            
.jobAgent("agent")
                                                                            
.build();
         String json = MAPPER.writeValueAsString(req);
@@ -56,6 +57,7 @@ class CreateRestoreJobRequestPayloadTest
         assertThat(test.expireAtInMillis()).isEqualTo(time);
         assertThat(test.expireAtAsDate()).isEqualTo(date);
         
assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults());
+        assertThat(test.consistencyLevel()).isEqualTo("QUORUM");
     }
 
     @Test
@@ -157,9 +159,9 @@ class CreateRestoreJobRequestPayloadTest
         assertThat(test.expireAtInMillis()).isEqualTo(time);
         assertThat(test.expireAtAsDate()).isEqualTo(date);
         
assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults());
+        assertThat(test.consistencyLevel()).isNull();
     }
 
-
     @Test
     void testBuilder()
     {
@@ -172,11 +174,13 @@ class CreateRestoreJobRequestPayloadTest
                                                  .resetLevel(false)
                                                  .clearRepaired(false);
                                              })
+                                             .consistencyLevel("QUORUM")
                                              .build();
         assertThat(req.secrets()).isEqualTo(secrets);
         assertThat(req.jobAgent()).isEqualTo("agent");
         
assertThat(req.importOptions()).isEqualTo(SSTableImportOptions.defaults()
                                                                       
.resetLevel(false)
                                                                       
.clearRepaired(false));
+        assertThat(req.consistencyLevel()).isEqualTo("QUORUM");
     }
 }
diff --git 
a/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java
 
b/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java
new file mode 100644
index 0000000..27607cf
--- /dev/null
+++ 
b/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.ABORTED;
+import static 
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.COMMITTING;
+import static 
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.EMPTY;
+import static 
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.PROCESSING;
+import static 
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.STAGED;
+import static 
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class RestoreSliceStatusTest
+{
+    @Test
+    void testStatusAdvancing()
+    {
+        assertAdvanceTo(EMPTY, PROCESSING);
+        assertAdvanceTo(EMPTY, FAILED);
+        assertAdvanceTo(EMPTY, ABORTED);
+        assertAdvanceTo(PROCESSING, STAGED);
+        assertAdvanceTo(PROCESSING, FAILED);
+        assertAdvanceTo(PROCESSING, ABORTED);
+        assertAdvanceTo(STAGED, COMMITTING);
+        assertAdvanceTo(STAGED, FAILED);
+        assertAdvanceTo(STAGED, ABORTED);
+        assertAdvanceTo(COMMITTING, SUCCEEDED);
+        assertAdvanceTo(COMMITTING, FAILED);
+        assertAdvanceTo(COMMITTING, ABORTED);
+    }
+
+    @Test
+    void testInvalidStatusAdvancing()
+    {
+        String commonErrorMsg = "status can only advance to one of the follow 
statuses";
+
+        Stream
+        .of(new RestoreSliceStatus[][]
+            { // define test cases of invalid status advancing, e.g. it is 
invalid to advance from EMPTY to STAGED
+              { EMPTY, STAGED },
+              { STAGED, EMPTY },
+              { EMPTY, COMMITTING },
+              { STAGED, SUCCEEDED },
+              { COMMITTING, STAGED },
+              { STAGED, STAGED },
+              { SUCCEEDED, FAILED },
+              { FAILED, SUCCEEDED }
+            })
+        .forEach(testCase -> {
+            assertThatThrownBy(() -> testCase[0].advanceTo(testCase[1]))
+            .isExactlyInstanceOf(IllegalArgumentException.class)
+            .hasNoCause()
+            .hasMessageContaining(commonErrorMsg);
+        });
+    }
+
+    private void assertAdvanceTo(RestoreSliceStatus from, RestoreSliceStatus 
to)
+    {
+        assertThat(from.advanceTo(to)).isEqualTo(to);
+    }
+}
diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml
index 2439930..03be65c 100644
--- a/spotbugs-exclude.xml
+++ b/spotbugs-exclude.xml
@@ -44,6 +44,7 @@
             <Class name="org.apache.cassandra.sidecar.CassandraSidecarDaemon" 
/>
             <Class name="org.apache.cassandra.sidecar.utils.SSTableImporter" />
             <Class 
name="org.apache.cassandra.sidecar.tasks.HealthCheckPeriodicTask" />
+            <Class 
name="org.apache.cassandra.sidecar.restore.RestoreSliceTaskTest" />
         </Or>
     </Match>
 
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
 
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
index 6048d26..9ce5efb 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
@@ -167,8 +167,7 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
          */
         public Builder jobDiscoveryActiveLoopDelayMillis(long 
jobDiscoveryActiveLoopDelayMillis)
         {
-            this.jobDiscoveryActiveLoopDelayMillis = 
jobDiscoveryActiveLoopDelayMillis;
-            return this;
+            return update(b -> b.jobDiscoveryActiveLoopDelayMillis = 
jobDiscoveryActiveLoopDelayMillis);
         }
 
         /**
@@ -180,8 +179,7 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
          */
         public Builder jobDiscoveryIdleLoopDelayMillis(long 
jobDiscoveryIdleLoopDelayMillis)
         {
-            this.jobDiscoveryIdleLoopDelayMillis = 
jobDiscoveryIdleLoopDelayMillis;
-            return this;
+            return update(b -> b.jobDiscoveryIdleLoopDelayMillis = 
jobDiscoveryIdleLoopDelayMillis);
         }
 
         /**
@@ -193,8 +191,7 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
          */
         public Builder jobDiscoveryRecencyDays(int jobDiscoveryRecencyDays)
         {
-            this.jobDiscoveryRecencyDays = jobDiscoveryRecencyDays;
-            return this;
+            return update(b -> b.jobDiscoveryRecencyDays = 
jobDiscoveryRecencyDays);
         }
 
         /**
@@ -206,8 +203,7 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
          */
         public Builder processMaxConcurrency(int processMaxConcurrency)
         {
-            this.processMaxConcurrency = processMaxConcurrency;
-            return this;
+            return update(b -> b.processMaxConcurrency = 
processMaxConcurrency);
         }
 
         /**
@@ -219,11 +215,9 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
          */
         public Builder restoreJobTablesTtlSeconds(long 
restoreJobTablesTtlSeconds)
         {
-            this.restoreJobTablesTtlSeconds = restoreJobTablesTtlSeconds;
-            return this;
+            return update(b -> b.restoreJobTablesTtlSeconds = 
restoreJobTablesTtlSeconds);
         }
 
-
         @Override
         public RestoreJobConfigurationImpl build()
         {
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
index a7cee35..475f957 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.Date;
 import java.util.UUID;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import com.datastax.driver.core.LocalDate;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.utils.Bytes;
@@ -54,6 +52,7 @@ public class RestoreJob
     public final Date expireAt;
     public final short bucketCount;
     public final String consistencyLevel;
+    public final Manager restoreJobManager;
 
     public static Builder builder()
     {
@@ -75,46 +74,12 @@ public class RestoreJob
                .jobStatus(decodeJobStatus(row.getString("status")))
                .jobSecrets(decodeJobSecrets(row.getBytes("blob_secrets")))
                .expireAt(row.getTimestamp("expire_at"))
-               
.sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options")));
+               
.sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options")))
+               .consistencyLevel(row.getString("consistency_level"));
+
         // todo: Yifan, add them back when the cql statement is updated to 
reflect the new columns.
         //  Add new fields to CreateRestoreJobRequestPayload too
 //               .bucketCount(row.getShort("bucket_count"))
-//               .consistencyLevel(row.getString("consistency_level"));
-        return builder.build();
-    }
-
-    // todo: candidate to be removed
-    public static RestoreJob forUpdates(UUID jobId, String jobAgent,
-                                        RestoreJobStatus status,
-                                        RestoreJobSecrets secrets,
-                                        Date expireAt)
-    throws DataObjectMappingException
-    {
-        Builder builder = new Builder();
-        builder.createdAt(toLocalDate(jobId))
-               .jobId(jobId).jobAgent(jobAgent)
-               .jobStatus(status)
-               .jobSecrets(secrets)
-               .expireAt(expireAt);
-        return builder.build();
-    }
-
-    // todo: candidate to be removed
-    @VisibleForTesting
-    public static RestoreJob create(LocalDate createdAt,
-                                    UUID jobId,
-                                    String keyspaceName,
-                                    String tableName,
-                                    String jobAgent,
-                                    RestoreJobStatus status,
-                                    RestoreJobSecrets secrets,
-                                    SSTableImportOptions importOptions)
-    {
-        Builder builder = new Builder();
-        builder.createdAt(createdAt)
-               .jobId(jobId).jobAgent(jobAgent)
-               .keyspace(keyspaceName).table(tableName)
-               
.jobStatus(status).jobSecrets(secrets).sstableImportOptions(importOptions);
         return builder.build();
     }
 
@@ -156,6 +121,7 @@ public class RestoreJob
         this.expireAt = builder.expireAt;
         this.bucketCount = builder.bucketCount;
         this.consistencyLevel = builder.consistencyLevel;
+        this.restoreJobManager = builder.manager;
     }
 
     public Builder unbuild()
@@ -163,6 +129,11 @@ public class RestoreJob
         return new Builder(this);
     }
 
+    public boolean isManagedBySidecar()
+    {
+        return restoreJobManager == Manager.SIDECAR;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -211,6 +182,7 @@ public class RestoreJob
         private Date expireAt;
         private short bucketCount;
         private String consistencyLevel;
+        private Manager manager;
 
         private Builder()
         {
@@ -284,7 +256,10 @@ public class RestoreJob
 
         public Builder consistencyLevel(String consistencyLevel)
         {
-            return update(b -> b.consistencyLevel = consistencyLevel);
+            return update(b -> {
+                b.consistencyLevel = consistencyLevel;
+                b.manager = resolveManager(consistencyLevel);
+            });
         }
 
         @Override
@@ -298,5 +273,35 @@ public class RestoreJob
         {
             return new RestoreJob(this);
         }
+
+        /**
+         * Resolve the manager of the restore job based on the existence of 
consistencyLevel
+         * @return the resolved Manager
+         */
+        private Manager resolveManager(String consistencyLevel)
+        {
+            // If spark is the manager, the restore job is created w/o 
specifying consistency level
+            // If the manager of the restore job is sidecar, consistency level 
must present
+            return consistencyLevel == null ? Manager.SPARK : Manager.SIDECAR;
+        }
+    }
+
+    /**
+     * The manager of the restore job. The variant could change the code path 
a restore job runs.
+     * It is a feature switch essentially.
+     */
+    public enum Manager
+    {
+        /**
+         * The restore job is managed by Spark. Sidecar instances are just 
simple workers. They rely on client/Spark
+         * for decision-making.
+         */
+        SPARK,
+
+        /**
+         * The restore job is managed by Sidecar. Sidecar instances should 
assign slices to sidecar instances
+         * and check whether the job has met the consistency level to complete 
the job.
+         */
+        SIDECAR,
     }
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
index 3a06e08..99ba533 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
@@ -82,6 +82,7 @@ public class RestoreJobDatabaseAccessor extends 
DatabaseAccessor
                                    .jobSecrets(payload.secrets())
                                    
.sstableImportOptions(payload.importOptions())
                                    .expireAt(payload.expireAtAsDate())
+                                   
.consistencyLevel(payload.consistencyLevel())
                                    .build();
         ByteBuffer secrets = serializeValue(job.secrets, "secrets");
         ByteBuffer importOptions = serializeValue(job.importOptions, "sstable 
import options");
@@ -94,17 +95,29 @@ public class RestoreJobDatabaseAccessor extends 
DatabaseAccessor
                                                           
job.status.toString(),
                                                           secrets,
                                                           importOptions,
+                                                          job.consistencyLevel,
                                                           job.expireAt);
 
         execute(statement);
         return job;
     }
 
-    public RestoreJob update(UpdateRestoreJobRequestPayload payload, 
QualifiedTableName qualifiedTableName, UUID jobId)
+    /**
+     * Update fields in the restore job and persist
+     *
+     * @param payload fields to be updated
+     * @param jobId job ID
+     * @return the restore job object with only the updated fields
+     * @throws DataObjectMappingException when secrets json cannot be 
serialized
+     */
+    public RestoreJob update(UpdateRestoreJobRequestPayload payload, UUID 
jobId)
     throws DataObjectMappingException
     {
         sidecarSchema.ensureInitialized();
+        RestoreJob.Builder updateBuilder = RestoreJob.builder();
         LocalDate createdAt = RestoreJob.toLocalDate(jobId);
+        updateBuilder.createdAt(createdAt)
+                     .jobId(jobId);
 
         RestoreJobSecrets secrets = payload.secrets();
         RestoreJobStatus status = payload.status();
@@ -127,22 +140,26 @@ public class RestoreJobDatabaseAccessor extends 
DatabaseAccessor
             {
                 throw new DataObjectMappingException("Failed to serialize 
secrets", e);
             }
+            updateBuilder.jobSecrets(secrets);
         }
         if (status != null)
         {
             
batchStatement.add(restoreJobsSchema.updateStatus().bind(createdAt, jobId, 
status.name()));
+            updateBuilder.jobStatus(status);
         }
         if (jobAgent != null)
         {
             
batchStatement.add(restoreJobsSchema.updateJobAgent().bind(createdAt, jobId, 
jobAgent));
+            updateBuilder.jobAgent(jobAgent);
         }
         if (expireAt != null)
         {
             
batchStatement.add(restoreJobsSchema.updateExpireAt().bind(createdAt, jobId, 
expireAt));
+            updateBuilder.expireAt(expireAt);
         }
 
         execute(batchStatement);
-        return RestoreJob.forUpdates(jobId, jobAgent, status, secrets, 
expireAt);
+        return updateBuilder.build();
     }
 
     public void abort(UUID jobId)
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
index fdf45c5..6353894 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.sidecar.db;
 import java.math.BigInteger;
 import java.nio.file.Path;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -46,7 +48,14 @@ import org.apache.cassandra.sidecar.utils.SSTableImporter;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * Data object that contains all values that matter to the restore job slice
+ * <p>Data object that contains all values that matter to the restore job 
slice.</p>
+ *
+ * <p>How the staged files are organized on disk? For each slice,</p>
+ * <ol>
+ * <li>the S3 object is downloaded to the path at "stageDirectory/key". It is 
a zip file.</li>
+ * <li>the zip is then extracted to the directory at 
"stageDirectory/keyspace/table/".
+ *    The extracted sstables are imported into Cassandra.</li>
+ * </ol>
  */
 public class RestoreSlice
 {
@@ -58,7 +67,12 @@ public class RestoreSlice
     private final String bucket;
     private final String key;
     private final String checksum; // etag
-    private final Path targetPathInStaging; // the path to store the s3 object 
of the slice
+    // The path to the directory that stores the s3 object of the slice and 
the sstables after unzipping.
+    // Its value is "baseStageDirectory/uploadId"
+    private final Path stageDirectory;
+    // The path to the staged s3 object (file). The path is inside 
stageDirectory.
+    // Its value is "stageDirectory/key"
+    private final Path stagedObjectPath;
     private final String uploadId;
     private final InstanceMetadata owner;
     private final BigInteger startToken;
@@ -69,7 +83,11 @@ public class RestoreSlice
     private final long compressedSize;
     private final long uncompressedSize;
     private RestoreSliceTracker tracker;
+
+    // mutable states
     private boolean existsOnS3 = false;
+    private boolean hasStaged = false;
+    private boolean hasImported = false;
     private int downloadAttempt = 0;
     private volatile boolean isCancelled = false;
 
@@ -88,7 +106,8 @@ public class RestoreSlice
         this.bucket = builder.bucket;
         this.key = builder.key;
         this.checksum = builder.checksum;
-        this.targetPathInStaging = builder.targetPathInStaging;
+        this.stageDirectory = builder.stageDirectory;
+        this.stagedObjectPath = builder.stagedObjectPath;
         this.uploadId = builder.uploadId;
         this.owner = builder.owner;
         this.startToken = builder.startToken;
@@ -151,13 +170,29 @@ public class RestoreSlice
     }
 
     /**
-     * Make the slice as completed
+     * Mark the slice as completed
      */
     public void complete()
     {
         tracker.completeSlice(this);
     }
 
+    /**
+     * Mark the slice has completed the stage phase
+     */
+    public void completeStagePhase()
+    {
+        this.hasStaged = true;
+    }
+
+    /**
+     * Mark the slice has completed the import phase
+     */
+    public void completeImportPhase()
+    {
+        this.hasImported = true;
+    }
+
     public void failAtInstance(int instanceId)
     {
         statusByReplica.put(String.valueOf(instanceId), 
RestoreSliceStatus.FAILED);
@@ -169,6 +204,7 @@ public class RestoreSlice
     public void fail(RestoreJobFatalException exception)
     {
         tracker.fail(exception);
+        failAtInstance(owner().id());
     }
 
     public void setExistsOnS3()
@@ -196,6 +232,7 @@ public class RestoreSlice
                                                       
ExecutorPools.TaskExecutorPool executorPool,
                                                       SSTableImporter importer,
                                                       double 
requiredUsableSpacePercentage,
+                                                      
RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
                                                       RestoreJobStats stats)
     {
         if (isCancelled)
@@ -204,10 +241,12 @@ public class RestoreSlice
 
         try
         {
-            RestoreJob restoreJob = job();
-            StorageClient s3Client = s3ClientPool.storageClient(restoreJob);
-            return new RestoreSliceTask(restoreJob, this, s3Client,
-                                        executorPool, importer, 
requiredUsableSpacePercentage, stats);
+            StorageClient s3Client = s3ClientPool.storageClient(job());
+            return new RestoreSliceTask(this, s3Client,
+                                        executorPool, importer,
+                                        requiredUsableSpacePercentage,
+                                        sliceDatabaseAccessor,
+                                        stats);
         }
         catch (IllegalStateException illegalState)
         {
@@ -300,9 +339,21 @@ public class RestoreSlice
         return this.replicas;
     }
 
-    public Path targetPathInStaging()
+    /**
+     * @return the path to the directory that stores the s3 object of the slice
+     *         and the sstables after unzipping
+     */
+    public Path stageDirectory()
+    {
+        return stageDirectory;
+    }
+
+    /**
+     * @return the path to the staged s3 object
+     */
+    public Path stagedObjectPath()
     {
-        return targetPathInStaging;
+        return stagedObjectPath;
     }
 
     public long compressedSize()
@@ -330,6 +381,16 @@ public class RestoreSlice
         return existsOnS3;
     }
 
+    public boolean hasStaged()
+    {
+        return hasStaged;
+    }
+
+    public boolean hasImported()
+    {
+        return hasImported;
+    }
+
     public int downloadAttempt()
     {
         return downloadAttempt;
@@ -350,6 +411,7 @@ public class RestoreSlice
     public static RestoreSlice from(Row row)
     {
         Builder builder = new Builder();
+        builder.jobId(row.getUUID("job_id"));
         builder.sliceId(row.getString("slice_id"));
         builder.bucketId(row.getShort("bucket_id"));
         builder.storageBucket(row.getString("bucket"));
@@ -377,7 +439,8 @@ public class RestoreSlice
         private String bucket;
         private String key;
         private String checksum; // etag
-        private Path targetPathInStaging; // the path to store the s3 object 
of the slice
+        private Path stageDirectory;
+        private Path stagedObjectPath;
         private String uploadId;
         private InstanceMetadata owner;
         private BigInteger startToken;
@@ -401,7 +464,7 @@ public class RestoreSlice
             this.bucket = slice.bucket;
             this.key = slice.key;
             this.checksum = slice.checksum;
-            this.targetPathInStaging = slice.targetPathInStaging;
+            this.stageDirectory = slice.stageDirectory;
             this.uploadId = slice.uploadId;
             this.owner = slice.owner;
             this.startToken = slice.startToken;
@@ -450,10 +513,10 @@ public class RestoreSlice
             return update(b -> b.checksum = checksum);
         }
 
-        public Builder targetPathInStaging(Path basePath, String uploadId)
+        public Builder stageDirectory(Path basePath, String uploadId)
         {
             return update(b -> {
-                b.targetPathInStaging = basePath.resolve(uploadId);
+                b.stageDirectory = basePath.resolve(uploadId);
                 b.uploadId = uploadId;
             });
         }
@@ -485,12 +548,12 @@ public class RestoreSlice
 
         public Builder replicaStatus(Map<String, RestoreSliceStatus> 
statusByReplica)
         {
-            return update(b -> b.statusByReplica = 
Collections.unmodifiableMap(statusByReplica));
+            return update(b -> b.statusByReplica = new 
HashMap<>(statusByReplica));
         }
 
         public Builder replicas(Set<String> replicas)
         {
-            return update(b -> b.replicas = 
Collections.unmodifiableSet(replicas));
+            return update(b -> b.replicas = new HashSet<>(replicas));
         }
 
         /**
@@ -525,6 +588,8 @@ public class RestoreSlice
         @Override
         public RestoreSlice build()
         {
+            // precompute the path to the to-be-staged object on disk
+            stagedObjectPath = stageDirectory.resolve(key);
             return new RestoreSlice(this);
         }
 
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
index c3624a9..6416ce0 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
@@ -51,18 +51,19 @@ public class RestoreSliceDatabaseAccessor extends 
DatabaseAccessor
 
     public RestoreSlice create(RestoreSlice slice)
     {
-        BoundStatement statement = 
restoreSlicesSchema.insertSlice().bind(slice.jobId(),
-                                                                          
slice.bucketId(),
-                                                                          
slice.sliceId(),
-                                                                          
slice.bucket(),
-                                                                          
slice.key(),
-                                                                          
slice.checksum(),
-                                                                          
slice.startToken(),
-                                                                          
slice.endToken(),
-                                                                          
slice.compressedSize(),
-                                                                          
slice.uncompressedSize(),
-                                                                          
slice.statusByReplica(),
-                                                                          
slice.replicas());
+        BoundStatement statement = restoreSlicesSchema.insertSlice()
+                                                      .bind(slice.jobId(),
+                                                            slice.bucketId(),
+                                                            slice.sliceId(),
+                                                            slice.bucket(),
+                                                            slice.key(),
+                                                            slice.checksum(),
+                                                            slice.startToken(),
+                                                            slice.endToken(),
+                                                            
slice.compressedSize(),
+                                                            
slice.uncompressedSize(),
+                                                            
slice.statusByReplica(),
+                                                            slice.replicas());
         execute(statement);
         return slice;
     }
@@ -71,12 +72,13 @@ public class RestoreSliceDatabaseAccessor extends 
DatabaseAccessor
     {
         sidecarSchema.ensureInitialized();
 
-        BoundStatement statement = 
restoreSlicesSchema.updateStatus().bind(slice.statusByReplica(),
-                                                                           
slice.replicas(),
-                                                                           
slice.jobId(),
-                                                                           
slice.bucketId(),
-                                                                           
slice.startToken(),
-                                                                           
slice.sliceId());
+        BoundStatement statement = restoreSlicesSchema.updateStatus()
+                                                      
.bind(slice.statusByReplica(),
+                                                            slice.replicas(),
+                                                            slice.jobId(),
+                                                            slice.bucketId(),
+                                                            slice.startToken(),
+                                                            slice.sliceId());
         Row row = execute(statement).one();
         if (row == null)
         {
@@ -91,10 +93,11 @@ public class RestoreSliceDatabaseAccessor extends 
DatabaseAccessor
     {
         sidecarSchema.ensureInitialized();
 
-        BoundStatement statement = 
restoreSlicesSchema.findAllByTokenRange().bind(jobId,
-                                                                               
                          bucketId,
-                                                                               
                          startToken,
-                                                                               
                          endToken);
+        BoundStatement statement = restoreSlicesSchema.findAllByTokenRange()
+                                                      .bind(jobId,
+                                                            bucketId,
+                                                            startToken,
+                                                            endToken);
         ResultSet result = execute(statement);
         List<RestoreSlice> slices = new ArrayList<>();
         for (Row row : result)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java 
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
index 4476a61..8df27bf 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
@@ -141,8 +141,9 @@ public class RestoreJobsSchema extends 
AbstractSchema.TableSchema
                              "  status," +
                              "  blob_secrets," +
                              "  import_options," +
+                             "  consistency_level," +
                              "  expire_at" +
-                             ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", config);
+                             ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 
config);
         }
 
         static String updateBlobSecrets(SchemaKeyspaceConfiguration config)
@@ -192,6 +193,7 @@ public class RestoreJobsSchema extends 
AbstractSchema.TableSchema
                              "status, " +
                              "blob_secrets, " +
                              "import_options, " +
+                             "consistency_level, " +
                              "expire_at " +
                              "FROM %s.%s " +
                              "WHERE created_at = ? AND job_id = ?", config);
@@ -207,6 +209,7 @@ public class RestoreJobsSchema extends 
AbstractSchema.TableSchema
                              "status, " +
                              "blob_secrets, " +
                              "import_options, " +
+                             "consistency_level, " +
                              "expire_at " +
                              "FROM %s.%s " +
                              "WHERE created_at = ?", config);
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java 
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
index c016d2b..0602a9e 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
@@ -78,7 +78,7 @@ public class RestoreSlicesSchema extends 
AbstractSchema.TableSchema
                              "  end_token varint," +
                              "  compressed_size bigint," +
                              "  uncompressed_size bigint," +
-                             "  status_by_replica map<text, text>," +
+                             "  status_by_replica map<text, text>," + // key 
is instance ID; value is RestoreSliceStatus
                              "  all_replicas set<text>," +
                              "  PRIMARY KEY ((job_id, bucket_id), start_token, 
slice_id)" +
                              ") WITH default_time_to_live = %s",
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
 
b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
new file mode 100644
index 0000000..a171594
--- /dev/null
+++ 
b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.locator;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Get token ranges owned and replicated to the local Cassandra instance(s) by 
keyspace
+ * The results are cached and gets invalidated when local instances or cluster 
topology changed
+ */
+@Singleton
+public class CachedLocalTokenRanges implements LocalTokenRangesProvider
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachedLocalTokenRanges.class);
+    private final InstancesConfig instancesConfig;
+    private final DnsResolver dnsResolver;
+
+    @GuardedBy("this")
+    private Set<Integer> localInstanceIdsCache;
+    @GuardedBy("this")
+    private Set<Host> allInstancesCache;
+    @GuardedBy("this")
+    private Set<Host> localInstancesCache;
+    @GuardedBy("this")
+    private ImmutableMap<String, Map<Integer, Set<TokenRange>>> 
localTokenRangesCache;
+
+    @Inject
+    public CachedLocalTokenRanges(InstancesConfig instancesConfig, DnsResolver 
dnsResolver)
+    {
+        this.instancesConfig = instancesConfig;
+        this.dnsResolver = dnsResolver;
+        this.localTokenRangesCache = null;
+        this.localInstanceIdsCache = null;
+        this.allInstancesCache = null;
+        this.localInstancesCache = null;
+    }
+
+    @Override
+    @Nullable
+    public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace)
+    {
+        List<InstanceMetadata> localInstances = instancesConfig.instances();
+
+        if (localInstances.isEmpty())
+        {
+            LOGGER.warn("No local instances found");
+            return Collections.emptyMap();
+        }
+
+        CassandraAdapterDelegate delegate = localInstances.get(0).delegate();
+        Metadata metadata = delegate == null ? null : delegate.metadata();
+        if (metadata == null)
+        {
+            LOGGER.debug("Not yet connect to Cassandra cluster");
+            return Collections.emptyMap();
+        }
+
+        if (metadata.getKeyspace(keyspace) == null)
+        {
+            throw new NoSuchElementException("Keyspace does not exist. 
keyspace: " + keyspace);
+        }
+
+        Set<Integer> localInstanceIds = localInstances.stream()
+                                                      
.map(InstanceMetadata::id)
+                                                      
.collect(Collectors.toSet());
+        Set<Host> allInstances = metadata.getAllHosts();
+        return getCacheOrReload(metadata, keyspace, localInstanceIds, 
localInstances, allInstances);
+    }
+
+    /**
+     * Return the token ranges owned and replicated to the host according to 
the replication strategy of the keyspace
+     * The result set is unmodifiable.
+     */
+    @Nullable
+    private Pair<Host, Set<TokenRange>> tokenRangesOfHost(Metadata metadata,
+                                                          String keyspace,
+                                                          InstanceMetadata 
instance,
+                                                          
Map<IpAddressAndPort, Host> allHosts)
+    {
+        Host host;
+        try
+        {
+            final IpAddressAndPort ip = 
IpAddressAndPort.of(dnsResolver.resolve(instance.host()), instance.port());
+            host = allHosts.get(ip);
+            if (host == null)
+            {
+                LOGGER.warn("Could not map InstanceMetadata to Host host={} 
port={} ip={}",
+                            instance.host(), instance.port(), ip.ipAddress);
+                return null;
+            }
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException("Failed to resolve hostname to ip. 
hostname: " + instance.host(), e);
+        }
+        return Pair.of(host, tokenRangesOfHost(metadata, keyspace, host));
+    }
+
+    public Set<TokenRange> tokenRangesOfHost(Metadata metadata, String 
keyspace, Host host)
+    {
+        return metadata.getTokenRanges(keyspace, host)
+                       .stream()
+                       .flatMap(range -> TokenRange.from(range).stream())
+                       .collect(Collectors.toSet());
+    }
+
+    /**
+     * Reload the locally cached token ranges when needed
+     */
+    @Nullable
+    private synchronized Map<Integer, Set<TokenRange>> 
getCacheOrReload(Metadata metadata,
+                                                                        String 
keyspace,
+                                                                        
Set<Integer> localInstanceIds,
+                                                                        
List<InstanceMetadata> localInstances,
+                                                                        
Set<Host> allInstances)
+    {
+        // exit early if no change is found
+        boolean isClusterTheSame = allInstances.equals(allInstancesCache)
+                                   && 
localInstanceIds.equals(localInstanceIdsCache);
+        if (localTokenRangesCache != null
+            && localTokenRangesCache.containsKey(keyspace)
+            && isClusterTheSame)
+        {
+            return localTokenRangesCache.get(keyspace);
+        }
+
+        // otherwise, reload the token ranges
+        localInstanceIdsCache = localInstanceIds;
+        allInstancesCache = allInstances;
+        if (allInstances.isEmpty())
+        {
+            LOGGER.warn("No instances found in client session");
+        }
+        Map<IpAddressAndPort, Host> allHosts = new 
HashMap<>(allInstancesCache.size());
+        BiConsumer<InetSocketAddress, Host> putNullSafe = (endpoint, host) -> {
+            if (endpoint != null)
+            {
+                allHosts.put(IpAddressAndPort.of(endpoint), host);
+            }
+        };
+        for (Host host : allInstancesCache)
+        {
+            putNullSafe.accept(host.getSocketAddress(), host);
+            putNullSafe.accept(host.getListenSocketAddress(), host);
+            putNullSafe.accept(host.getBroadcastSocketAddress(), host);
+        }
+
+        ImmutableMap.Builder<String, Map<Integer, Set<TokenRange>>> 
perKeyspaceBuilder = ImmutableMap.builder();
+        ImmutableSet.Builder<Host> hostBuilder = ImmutableSet.builder();
+        if (isClusterTheSame && localInstancesCache != null)
+        {
+            hostBuilder.addAll(localInstancesCache);
+        }
+
+        for (KeyspaceMetadata ks : metadata.getKeyspaces())
+        {
+            if (isClusterTheSame && localTokenRangesCache != null && 
localTokenRangesCache.containsKey(ks.getName()))
+            {
+                // we don't need to rebuild if already cached
+                perKeyspaceBuilder.put(ks.getName(), 
localTokenRangesCache.get(ks.getName()));
+            }
+            else
+            {
+                ImmutableMap.Builder<Integer, Set<TokenRange>> resultBuilder = 
ImmutableMap.builder();
+                for (InstanceMetadata instance : localInstances)
+                {
+                    Pair<Host, Set<TokenRange>> pair = 
tokenRangesOfHost(metadata, keyspace, instance, allHosts);
+                    if (pair != null)
+                    {
+                        hostBuilder.add(pair.getKey());
+                        resultBuilder.put(instance.id(), 
Collections.unmodifiableSet(pair.getValue()));
+                    }
+                }
+                perKeyspaceBuilder.put(ks.getName(), resultBuilder.build());
+            }
+        }
+        localTokenRangesCache = perKeyspaceBuilder.build();
+        localInstancesCache = hostBuilder.build();
+        if (localInstancesCache.isEmpty())
+        {
+            LOGGER.warn("Unable to determine local instances from client 
meta-data!");
+        }
+        return localTokenRangesCache.get(keyspace);
+    }
+
+    private static class IpAddressAndPort
+    {
+        final String ipAddress;
+        final int port;
+
+        static IpAddressAndPort of(@NotNull InetSocketAddress endpoint)
+        {
+            return IpAddressAndPort.of(endpoint.getAddress().getHostAddress(),
+                                       endpoint.getPort());
+        }
+
+        static IpAddressAndPort of(String ipAddress, int port)
+        {
+            return new IpAddressAndPort(ipAddress, port);
+        }
+
+        IpAddressAndPort(String ipAddress, int port)
+        {
+            this.ipAddress = ipAddress;
+            this.port = port;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            IpAddressAndPort that = (IpAddressAndPort) o;
+            return port == that.port && Objects.equals(ipAddress, 
that.ipAddress);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(ipAddress, port);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
 
b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
new file mode 100644
index 0000000..cef04f6
--- /dev/null
+++ 
b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.locator;
+
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * Provides the token ranges of the local Cassandra instance(s)
+ */
+public interface LocalTokenRangesProvider
+{
+    /**
+     * Calculate the token ranges owned and replicated to the local Cassandra 
instance(s).
+     * When Sidecar is paired with multiple Cassandra instance, the ranges of 
each Cassandra instance is captured
+     * in the form of map, where the key is the instance id and the value is 
the ranges of the instance. When Sidecar
+     * is paired with a single Cassandra instance, the result map has a single 
entry.
+     *
+     * @param keyspace keyspace to determine replication
+     * @return token ranges of the local Cassandra instances
+     */
+    @Nullable
+    Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace);
+}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index 98f496b..e805df3 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -30,12 +30,17 @@ import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
+import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
+import org.apache.cassandra.sidecar.locator.TokenRange;
 import org.apache.cassandra.sidecar.stats.RestoreJobStats;
 import org.apache.cassandra.sidecar.tasks.PeriodicTask;
 import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor;
@@ -54,6 +59,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
     private final RestoreJobDatabaseAccessor restoreJobDatabaseAccessor;
     private final RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor;
     private final Provider<RestoreJobManagerGroup> 
restoreJobManagerGroupSingleton;
+    private final LocalTokenRangesProvider localTokenRangesProvider;
     private final InstanceMetadataFetcher instanceMetadataFetcher;
     private final RestoreJobStats stats;
     private volatile boolean refreshSignaled = true;
@@ -67,6 +73,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
                                 RestoreJobDatabaseAccessor 
restoreJobDatabaseAccessor,
                                 RestoreSliceDatabaseAccessor 
restoreSliceDatabaseAccessor,
                                 Provider<RestoreJobManagerGroup> 
restoreJobManagerGroupProvider,
+                                CachedLocalTokenRanges cachedLocalTokenRanges,
                                 InstanceMetadataFetcher 
instanceMetadataFetcher,
                                 RestoreJobStats stats)
     {
@@ -75,6 +82,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
              restoreJobDatabaseAccessor,
              restoreSliceDatabaseAccessor,
              restoreJobManagerGroupProvider,
+             cachedLocalTokenRanges,
              instanceMetadataFetcher,
              stats);
     }
@@ -85,6 +93,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
                          RestoreJobDatabaseAccessor restoreJobDatabaseAccessor,
                          RestoreSliceDatabaseAccessor 
restoreSliceDatabaseAccessor,
                          Provider<RestoreJobManagerGroup> 
restoreJobManagerGroupProvider,
+                         LocalTokenRangesProvider cachedLocalTokenRanges,
                          InstanceMetadataFetcher instanceMetadataFetcher,
                          RestoreJobStats stats)
     {
@@ -94,6 +103,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
         this.restoreSliceDatabaseAccessor = restoreSliceDatabaseAccessor;
         this.jobDiscoveryRecencyDays = 
restoreJobConfig.jobDiscoveryRecencyDays();
         this.restoreJobManagerGroupSingleton = restoreJobManagerGroupProvider;
+        this.localTokenRangesProvider = cachedLocalTokenRanges;
         this.instanceMetadataFetcher = instanceMetadataFetcher;
         this.stats = stats;
     }
@@ -153,6 +163,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
                 switch (job.status)
                 {
                     case CREATED:
+                    case STAGED:
                         if (job.expireAt == null // abort all old jobs that 
has no expireAt value
                             || job.expireAt.getTime() < nowMillis)
                         {
@@ -164,6 +175,11 @@ public class RestoreJobDiscoverer implements PeriodicTask
                         // find the oldest non-completed job
                         days = Math.max(days, delta(today, job.createdAt));
                         restoreJobManagers.updateRestoreJob(job);
+                        if (job.isManagedBySidecar())
+                        {
+                            // todo: potential exceedingly number of queries
+                            findSlicesAndSubmit(job);
+                        }
                         inflightJobsCount += 1;
                         break;
                     case FAILED:
@@ -208,7 +224,6 @@ public class RestoreJobDiscoverer implements PeriodicTask
     }
 
     /**
-     * TODO: remove the method on phase 2 completion
      * Signal the job discovery loop to refresh in the next execution
      */
     public void signalRefresh()
@@ -216,6 +231,44 @@ public class RestoreJobDiscoverer implements PeriodicTask
         refreshSignaled = true;
     }
 
+    // find all slices of the job that should be downloaded to the local 
instances,
+    // according to the cluster token ownership
+    private void findSlicesAndSubmit(RestoreJob restoreJob)
+    {
+        localTokenRangesProvider.localTokenRanges(restoreJob.keyspaceName)
+                                .forEach((key, ranges) -> {
+                                    int instanceId = key;
+                                    InstanceMetadata instance = 
instanceMetadataFetcher.instance(instanceId);
+                                    ranges.forEach(range -> 
findSlicesOfRangeAndSubmit(instance, restoreJob, range));
+                                });
+    }
+
+    // try to submit the slice.
+    // If it is already exist, it is a no-op.
+    // If the submission fails, the slice status of the instance is updated.
+    private void findSlicesOfRangeAndSubmit(InstanceMetadata instance, 
RestoreJob restoreJob, TokenRange range)
+    {
+        short bucketId = 0; // TODO: update the implementation to pick proper 
bucketId
+        restoreSliceDatabaseAccessor
+        .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, 
range.start, range.end)
+        .forEach(slice -> {
+            // set the owner instance, which is not read from database
+            slice = slice.unbuild().ownerInstance(instance).build();
+            try
+            {
+                // todo: do not re-submit for download if the slice is staged 
(when job status is before staged)
+                //       or imported (when job status is staged) on the 
instance already
+                restoreJobManagerGroupSingleton.get().trySubmit(instance, 
slice, restoreJob);
+            }
+            catch (RestoreJobFatalException e)
+            {
+                slice.fail(e); // TODO: is it still needed? no, remove it 
later.
+                slice.failAtInstance(instance.id());
+                restoreSliceDatabaseAccessor.updateStatus(slice);
+            }
+        });
+    }
+
     private boolean abortJob(RestoreJob job)
     {
         LOGGER.info("Abort expired job. jobId={} job={}", job.jobId, job);
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 8828c8f..d33ff37 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -61,7 +61,7 @@ public class RestoreJobUtil
      */
     public static void unzip(File zipFile, File targetDir) throws IOException, 
RestoreJobException
     {
-        try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(zipFile)))
+        try (ZipInputStream zis = new 
ZipInputStream(Files.newInputStream(zipFile.toPath())))
         {
             ZipEntry zipEntry = zis.getNextEntry();
 
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index d48b705..af6332f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -36,6 +36,7 @@ import 
org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
+import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
@@ -58,6 +59,7 @@ public class RestoreProcessor implements PeriodicTask
     private final ConcurrencyLimiter processMaxConcurrency;
     private final SliceQueue sliceQueue = new SliceQueue();
     private final double requiredUsableSpacePercentage; // value range: [0.0, 
1.0)
+    private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
     private final RestoreJobStats stats;
     private volatile boolean isClosed = false; // OK to run close twice, so 
relax the control to volatile
 
@@ -67,6 +69,7 @@ public class RestoreProcessor implements PeriodicTask
                             SidecarSchema sidecarSchema,
                             StorageClientPool s3ClientPool,
                             SSTableImporter importer,
+                            RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
                             RestoreJobStats stats)
     {
         this.pool = executorPools.internal();
@@ -77,6 +80,7 @@ public class RestoreProcessor implements PeriodicTask
         this.requiredUsableSpacePercentage
         = 
config.serviceConfiguration().ssTableUploadConfiguration().minimumSpacePercentageRequired()
 / 100.0;
         this.importer = importer;
+        this.sliceDatabaseAccessor = sliceDatabaseAccessor;
         this.stats = stats;
     }
 
@@ -126,12 +130,29 @@ public class RestoreProcessor implements PeriodicTask
             // capture the new queue length after polling
             sliceQueue.captureImportQueueLength();
             pool.executeBlocking(slice.toAsyncTask(s3ClientPool, pool, 
importer,
-                                                   
requiredUsableSpacePercentage, stats),
+                                                   
requiredUsableSpacePercentage,
+                                                   sliceDatabaseAccessor, 
stats),
                                  false) // unordered
             .onSuccess(restoreSlice -> {
-                stats.captureSliceCompletionTime(slice.owner().id(), 
System.nanoTime() - slice.creationTimeNanos());
-                LOGGER.info("Slice completes successfully. sliceKey={}", 
restoreSlice.key());
-                restoreSlice.complete();
+                if (slice.hasImported())
+                {
+                    stats.captureSliceCompletionTime(slice.owner().id(), 
System.nanoTime() - slice.creationTimeNanos());
+                    LOGGER.info("Slice completes successfully. sliceKey={}", 
slice.key());
+                    slice.complete();
+                }
+                else if (slice.hasStaged())
+                {
+                    // todo: report stat of time taken to stage
+                    LOGGER.info("Slice has been staged successfully. 
sliceKey={}", slice.key());
+                    // the slice is not fully complete yet. Re-enqueue the 
slice.
+                    sliceQueue.offer(slice);
+                }
+                else // log a warning and retry. It should not reach here.
+                {
+                    LOGGER.warn("Unexpected state of slice. It is neither 
staged nor imported. sliceKey={}",
+                                slice.key());
+                    sliceQueue.offer(slice);
+                }
             })
             .onFailure(cause -> {
                 if (cause instanceof RestoreJobException && 
((RestoreJobException) cause).retryable())
@@ -143,8 +164,13 @@ public class RestoreProcessor implements PeriodicTask
                 else
                 {
                     LOGGER.error("Slice failed with unrecoverable failure. 
sliceKey={}", slice.key(), cause);
-                    // fail the slice. In the current implementation, all 
slices of the job get aborted
+                    // fail the slice and mark the slice has failed on its 
owning instance.
+                    // In the phase 1 implementation, all slices of the job 
get aborted
                     slice.fail(RestoreJobExceptions.toFatal(cause));
+                    if (slice.job().isManagedBySidecar())
+                    {
+                        sliceDatabaseAccessor.updateStatus(slice);
+                    }
                     // revoke the s3 credentials of the job too
                     s3ClientPool.revokeCredentials(slice.jobId());
                 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index b0214d9..0cec6f2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.restore;
 
 import java.io.File;
+import java.nio.file.Files;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -30,10 +31,13 @@ import io.vertx.core.Future;
 import io.vertx.core.Handler;
 import io.vertx.core.Promise;
 import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
+import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
@@ -59,27 +63,31 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RestoreSliceTask.class);
 
-    private final RestoreJob job;
     private final RestoreSlice slice;
     private final StorageClient s3Client;
     private final ExecutorPools.TaskExecutorPool executorPool;
     private final SSTableImporter importer;
     private final double requiredUsableSpacePercentage;
+    private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
     private final RestoreJobStats stats;
 
-    public RestoreSliceTask(RestoreJob job, RestoreSlice slice,
+    public RestoreSliceTask(RestoreSlice slice,
                             StorageClient s3Client,
                             ExecutorPools.TaskExecutorPool executorPool,
                             SSTableImporter importer,
                             double requiredUsableSpacePercentage,
+                            RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
                             RestoreJobStats stats)
     {
-        this.job = job;
+        Preconditions.checkArgument(!slice.job().isManagedBySidecar()
+                                    || sliceDatabaseAccessor != null,
+                                    "sliceDatabaseAccessor cannot be null");
         this.slice = slice;
         this.s3Client = s3Client;
         this.executorPool = executorPool;
         this.importer = importer;
         this.requiredUsableSpacePercentage = requiredUsableSpacePercentage;
+        this.sliceDatabaseAccessor = sliceDatabaseAccessor;
         this.stats = stats;
     }
 
@@ -92,11 +100,58 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
         // The slice, when being process, requires a total of slice size 
(download) + uncompressed (unzip) to use.
         // The protection below guards the slice being process, if the usable 
disk space falls below the threshold
         // after considering the slice
-        ensureSufficientStorage(slice.targetPathInStaging().toString(),
+        ensureSufficientStorage(slice.stageDirectory().toString(),
                                 slice.compressedSize() + 
slice.uncompressedSize(),
                                 requiredUsableSpacePercentage,
                                 executorPool)
-        .onSuccess(ignored -> downloadSliceAndImport(event))
+        .onSuccess(ignored -> {
+            RestoreJob job = slice.job();
+            if (job.isManagedBySidecar())
+            {
+                if (job.status == RestoreJobStatus.CREATED)
+                {
+                    if (Files.exists(slice.stagedObjectPath()))
+                    {
+                        LOGGER.debug("The slice has been staged already. 
sliceKey={} stagedFilePath={}",
+                                     slice.key(), slice.stagedObjectPath());
+                        slice.completeStagePhase(); // update the flag if 
missed
+                        sliceDatabaseAccessor.updateStatus(slice);
+                        event.tryComplete(slice);
+                        return;
+                    }
+
+                    // 1. check object existence and validate eTag / checksum
+                    checkObjectExistence(event)
+                    // 2. download slice/object when the remote object exists
+                    .thenCompose(headObject -> downloadSlice(event))
+                    // 3. persist status
+                    .thenAccept(x -> {
+                        slice.completeStagePhase();
+                        sliceDatabaseAccessor.updateStatus(slice);
+                        // completed staging. A new task is produced when it 
comes to import
+                        event.tryComplete(slice);
+                    });
+                }
+                else if (job.status == RestoreJobStatus.STAGED)
+                {
+                    unzipAndImport(event, slice.stagedObjectPath().toFile(),
+                                   // persist status
+                                   () -> 
sliceDatabaseAccessor.updateStatus(slice));
+                }
+                else
+                {
+                    String msg = "Unexpected restore job status. Expected only 
CREATED or STAGED when " +
+                                 "processing active slices. Found status: " + 
job.status;
+                    Exception unexpectedState = new IllegalStateException(msg);
+                    
event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status",
+                                                                    slice, 
unexpectedState));
+                }
+            }
+            else
+            {
+                downloadSliceAndImport(event);
+            }
+        })
         .onFailure(cause -> {
             String msg = "Unable to ensure enough space for the slice. Retry 
later";
             event.tryFail(RestoreJobExceptions.ofSlice(msg, slice, cause));
@@ -184,6 +239,7 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
         {
             RestoreJobFatalException ex = 
RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
                                                                             
slice, null);
+            event.tryFail(ex);
             CompletableFuture<File> failedFuture = new CompletableFuture<>();
             failedFuture.completeExceptionally(ex);
             return failedFuture;
@@ -223,6 +279,11 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
 
     @VisibleForTesting
     void unzipAndImport(Promise<RestoreSlice> event, File file)
+    {
+        unzipAndImport(event, file, null);
+    }
+
+    void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable 
onSuccessCommit)
     {
         if (file == null) // the condition should never happen. Having it here 
for logic completeness
         {
@@ -234,7 +295,21 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
         unzip(file)
         .compose(this::validateFiles)
         .compose(this::commit)
-        .onSuccess(x -> event.tryComplete(slice))
+        .compose(x -> {
+            if (onSuccessCommit == null)
+            {
+                return Future.succeededFuture();
+            }
+
+            return executorPool.executeBlocking(promise -> {
+                onSuccessCommit.run();
+                promise.tryComplete();
+            });
+        })
+        .onSuccess(x -> {
+            slice.completeImportPhase();
+            event.tryComplete(slice);
+        })
         .onFailure(failure -> {
             logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
             event.tryFail(RestoreJobExceptions.propagate("Fail to commit 
slice. "
@@ -248,24 +323,33 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
             if (failOnCancelled(promise))
                 return;
 
-            if (!zipFile.exists())
-            {
-                promise.tryFail(new RestoreJobException("Object not found from 
disk. File: " + zipFile));
-                return;
-            }
-
             // targetPathInStaging points to the directory named after uploadId
             // SSTableImporter expects the file system structure to be 
uploadId/keyspace/table/sstables
-            File targetDir = slice.targetPathInStaging()
+            File targetDir = slice.stageDirectory()
                                   .resolve(slice.keyspace())
                                   .resolve(slice.table())
                                   .toFile();
-            if (!targetDir.mkdirs())
+
+            boolean targetDirExist = targetDir.isDirectory();
+
+            if (!zipFile.exists())
             {
-                LOGGER.warn("Error occurred while creating directory for 
holding SSTables for SSTableImporter");
+                if (targetDirExist)
+                {
+                    LOGGER.debug("The files in slice are already extracted. 
Maybe it is a retried task?");
+                    promise.complete(targetDir);
+                }
+                else
+                {
+                    promise.tryFail(new RestoreJobException("Object not found 
from disk. File: " + zipFile));
+                }
+                // return early
+                return;
             }
+
             try
             {
+                Files.createDirectories(targetDir.toPath());
                 // Remove all existing files under the target directory
                 // The validation step later expects only the files registered 
in the manifest.
                 RestoreJobUtil.cleanDirectory(targetDir.toPath());
@@ -275,7 +359,7 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
                 // Then, delete the downloaded zip file
                 if (!zipFile.delete())
                 {
-                    LOGGER.warn("Error while deleting file {}, please note for 
space wastage",
+                    LOGGER.warn("File deletion attempt failed. file={}",
                                 zipFile.getAbsolutePath());
                 }
             }
@@ -383,7 +467,7 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
 
         LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key());
 
-        SSTableImportOptions options = job.importOptions;
+        SSTableImportOptions options = slice.job().importOptions;
         SSTableImporter.ImportOptions importOptions = new 
SSTableImporter.ImportOptions.Builder()
                                                       
.host(slice.owner().host())
                                                       
.keyspace(slice.keyspace())
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index 621c1dc..f7825f6 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -152,7 +152,7 @@ public class StorageClient
                         .bucket(slice.bucket())
                         .key(slice.key())
                         .build();
-        Path objectPath = slice.targetPathInStaging().resolve(slice.key());
+        Path objectPath = slice.stagedObjectPath();
         File object = objectPath.toFile();
         if (object.exists())
         {
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
index 1c190d9..28ecd8b 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
@@ -81,9 +81,11 @@ public class AbortRestoreJobHandler extends 
AbstractHandler<String>
 
             restoreJobDatabaseAccessor.abort(job.jobId);
             restoreJobManagerGroup.signalRefreshRestoreJob();
-            return Future.succeededFuture();
+            return Future.succeededFuture(job);
         })
-        .onSuccess(ignored -> {
+        .onSuccess(job -> {
+            logger.info("Successfully aborted restore job. job={}, 
remoteAddress={}, instance={}",
+                        job, remoteAddress, host);
             stats.captureFailedJob();
             
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
         })
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java
index d6a36fb..5820a9b 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java
@@ -86,7 +86,7 @@ public class CreateRestoreJobHandler extends 
AbstractHandler<CreateRestoreJobReq
     @Override
     protected CreateRestoreJobRequestPayload 
extractParamsOrThrow(RoutingContext context)
     {
-        String bodyString = context.getBodyAsString();
+        String bodyString = context.body().asString();
         if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json 
encoder writes null as "null"
         {
             logger.warn("Bad request to create restore job. Received null 
payload.");
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java
index e7c005a..7269ef6 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java
@@ -98,7 +98,7 @@ public class CreateRestoreSliceHandler extends 
AbstractHandler<CreateSliceReques
                                  .qualifiedTableName(tableName)
                                  .createSliceRequestPayload(request)
                                  .ownerInstance(instance)
-                                 
.targetPathInStaging(Paths.get(instance.stagingDir()), uploadId)
+                                 
.stageDirectory(Paths.get(instance.stagingDir()), uploadId)
                                  
.replicaStatus(Collections.singletonMap(String.valueOf(instance.id()),
                                                                          
RestoreSliceStatus.COMMITTING))
                                  
.replicas(Collections.singleton(String.valueOf(instance.id())))
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java
index 6f8add6..9e2a5e7 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java
@@ -30,7 +30,6 @@ import io.vertx.core.json.DecodeException;
 import io.vertx.core.json.Json;
 import io.vertx.core.net.SocketAddress;
 import io.vertx.ext.web.RoutingContext;
-import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.common.data.UpdateRestoreJobRequestPayload;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
@@ -77,10 +76,9 @@ public class UpdateRestoreJobHandler extends 
AbstractHandler<UpdateRestoreJobReq
                                   SocketAddress remoteAddress,
                                   UpdateRestoreJobRequestPayload 
requestPayload)
     {
-        QualifiedTableName qualifiedTableName = qualifiedTableName(context);
-
-        RoutingContextUtils.getAsFuture(context, SC_RESTORE_JOB)
-                           .compose(job -> {
+        RoutingContextUtils
+        .getAsFuture(context, SC_RESTORE_JOB)
+        .compose(job -> {
             if (RestoreJobStatus.isFinalState(job.status))
             {
                 // skip the update, since the job is in the final state already
@@ -91,12 +89,11 @@ public class UpdateRestoreJobHandler extends 
AbstractHandler<UpdateRestoreJobReq
 
             return executorPools.service().<RestoreJob>executeBlocking(promise 
-> {
                 
promise.complete(restoreJobDatabaseAccessor.update(requestPayload,
-                                                                   
qualifiedTableName,
                                                                    job.jobId));
             });
         })
-                           .onSuccess(job -> {
-            logger.info("Successfully update restore job. job={}, request={}, 
remoteAddress={}, instance={}",
+        .onSuccess(job -> {
+            logger.info("Successfully updated restore job. job={}, request={}, 
remoteAddress={}, instance={}",
                         job, requestPayload, remoteAddress, host);
             if (job.status == RestoreJobStatus.SUCCEEDED)
             {
@@ -115,13 +112,13 @@ public class UpdateRestoreJobHandler extends 
AbstractHandler<UpdateRestoreJobReq
             restoreJobManagerGroup.signalRefreshRestoreJob();
             
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
         })
-                           .onFailure(cause -> processFailure(cause, context, 
host, remoteAddress, requestPayload));
+        .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, requestPayload));
     }
 
     @Override
     protected UpdateRestoreJobRequestPayload 
extractParamsOrThrow(RoutingContext context)
     {
-        String bodyString = context.getBodyAsString();
+        String bodyString = context.body().asString();
         if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json 
encoder writes null as "null"
         {
             logger.warn("Bad request to update restore job. Received null 
payload.");
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
index abe6673..a1b4442 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
@@ -58,11 +58,11 @@ class RestoreJobsDatabaseAccessorIntTest extends 
IntegrationTestBase
         RestoreJobSecrets secrets = 
RestoreJobSecretsGen.genRestoreJobSecrets();
         long expiresAtMillis = System.currentTimeMillis() + 
TimeUnit.HOURS.toMillis(1);
         UUID jobId = UUIDs.timeBased();
-        accessor.create(CreateRestoreJobRequestPayload.builder(secrets, 
expiresAtMillis)
-                                                      .jobId(jobId)
-                                                      .jobAgent("agent")
-                                                      .build(),
-                        qualifiedTableName);
+        CreateRestoreJobRequestPayload payload = 
CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis)
+                                                                               
.jobId(jobId)
+                                                                               
.jobAgent("agent")
+                                                                               
.build();
+        accessor.create(payload, qualifiedTableName);
 
         List<RestoreJob> foundJobs = accessor.findAllRecent(3);
         assertThat(foundJobs).hasSize(1);
@@ -70,7 +70,7 @@ class RestoreJobsDatabaseAccessorIntTest extends 
IntegrationTestBase
         assertJob(accessor.find(jobId), jobId, RestoreJobStatus.CREATED, 
expiresAtMillis, secrets);
         UpdateRestoreJobRequestPayload markSucceeded
         = new UpdateRestoreJobRequestPayload(null, null, 
RestoreJobStatus.SUCCEEDED, null);
-        accessor.update(markSucceeded, qualifiedTableName, jobId);
+        accessor.update(markSucceeded, jobId);
         assertJob(accessor.find(jobId), jobId, RestoreJobStatus.SUCCEEDED, 
expiresAtMillis, secrets);
     }
 
diff --git 
a/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
 
b/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
index 9d4e0d5..c1c0187 100644
--- 
a/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
+++ 
b/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
@@ -19,9 +19,16 @@
 package org.apache.cassandra.testing;
 
 import java.io.IOException;
+import java.net.BindException;
 import java.util.function.Consumer;
 
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.utils.Throwables;
 
 /**
  * A Cassandra Test Context implementation that allows advanced cluster 
configuration before cluster creation
@@ -29,6 +36,8 @@ import org.apache.cassandra.distributed.UpgradeableCluster;
  */
 public class ConfigurableCassandraTestContext extends 
AbstractCassandraTestContext
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigurableCassandraTestContext.class);
+
     public static final String BUILT_CLUSTER_CANNOT_BE_CONFIGURED_ERROR =
     "Cannot configure a cluster after it is built. Please set the buildCluster 
annotation attribute to false, "
     + "and do not call `getCluster` before calling this method.";
@@ -57,9 +66,31 @@ public class ConfigurableCassandraTestContext extends 
AbstractCassandraTestConte
     public UpgradeableCluster 
configureAndStartCluster(Consumer<UpgradeableCluster.Builder> configurator)
     throws IOException
     {
-        cluster = configureCluster(configurator);
-        cluster.startup();
-        return cluster;
+        RuntimeException exception = null;
+        for (int i = 0; i < 3; i++)
+        {
+            try
+            {
+                cluster = configureCluster(configurator);
+                cluster.startup();
+                return cluster;
+            }
+            catch (RuntimeException ret)
+            {
+                exception = ret;
+                boolean addressAlreadyInUse = 
Throwables.anyCauseMatches(exception, this::portNotAvailableToBind);
+                if (addressAlreadyInUse)
+                {
+                    LOGGER.warn("Failed to provision cluster after {} 
retries", i, exception);
+                }
+                else
+                {
+                    throw exception;
+                }
+
+            }
+        }
+        throw exception;
     }
 
     @Override
@@ -70,4 +101,10 @@ public class ConfigurableCassandraTestContext extends 
AbstractCassandraTestConte
                + ", builder=" + builder
                + '}';
     }
+
+    private boolean portNotAvailableToBind(Throwable ex)
+    {
+        return ex instanceof BindException &&
+               StringUtils.contains(ex.getMessage(), "Address already in use");
+    }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java 
b/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java
index b5d4940..eb20157 100644
--- a/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import org.junit.jupiter.api.Test;
 
 import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
 
@@ -51,6 +52,21 @@ public class RestoreJobTest
         return builder.build();
     }
 
+    public static RestoreJob createUpdatedJob(UUID jobId, String jobAgent,
+                                              RestoreJobStatus status,
+                                              RestoreJobSecrets secrets,
+                                              Date expireAt)
+    throws DataObjectMappingException
+    {
+        RestoreJob.Builder builder = RestoreJob.builder();
+        builder.createdAt(RestoreJob.toLocalDate(jobId))
+               .jobId(jobId).jobAgent(jobAgent)
+               .jobStatus(status)
+               .jobSecrets(secrets)
+               .expireAt(expireAt);
+        return builder.build();
+    }
+
     @Test
     void testDefaultImportOptionsWhenNotSetInDb()
     {
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java 
b/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
index 86db504..b5c93de 100644
--- a/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
@@ -19,11 +19,15 @@
 package org.apache.cassandra.sidecar.db;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -116,7 +120,9 @@ public class SidecarSchemaTest
         sidecarSchema.startSidecarSchemaInitializer();
         context.verify(() -> {
             int maxWaitTime = 20; // about 10 seconds
-            while (interceptedExecStmts.size() < 1 || 
!sidecarSchema.isInitialized())
+            while (interceptedPrepStmts.size() < 10
+                   || interceptedExecStmts.size() < 3
+                   || !sidecarSchema.isInitialized())
             {
                 if (maxWaitTime-- <= 0)
                 {
@@ -129,6 +135,51 @@ public class SidecarSchemaTest
             assertEquals(3, interceptedExecStmts.size());
             assertTrue(interceptedExecStmts.get(0).contains("CREATE KEYSPACE 
IF NOT EXISTS sidecar_internal"),
                        "Create keyspace should be executed the first");
+            assertTrue(hasElementContains(interceptedExecStmts,
+                                          "CREATE TABLE IF NOT EXISTS 
sidecar_internal.restore_job_v2"),
+                       "Create table should be executed the next for job 
table");
+            assertTrue(hasElementContains(interceptedExecStmts,
+                                          "CREATE TABLE IF NOT EXISTS 
sidecar_internal.restore_slice_v2"),
+                       "Create table should be executed the next for slice 
table");
+
+            List<String> expectedPrepStatements = Arrays.asList(
+            "INSERT INTO sidecar_internal.restore_job_v2 (  created_at,  
job_id,  keyspace_name,  table_name,  " +
+            "job_agent,  status,  blob_secrets,  import_options,  
consistency_level,  expire_at) " +
+            "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+
+            "INSERT INTO sidecar_internal.restore_job_v2 (  created_at,  
job_id,  blob_secrets) VALUES (?, ? ,?)",
+
+            "INSERT INTO sidecar_internal.restore_job_v2 (  created_at,  
job_id,  status) VALUES (?, ?, ?)",
+
+            "INSERT INTO sidecar_internal.restore_job_v2 (  created_at,  
job_id,  job_agent) VALUES (?, ?, ?)",
+
+            "INSERT INTO sidecar_internal.restore_job_v2 (  created_at,  
job_id,  expire_at) VALUES (?, ?, ?)",
+
+            "SELECT created_at, job_id, keyspace_name, table_name, job_agent, 
status, blob_secrets, import_options, " +
+            "consistency_level, expire_at FROM sidecar_internal.restore_job_v2 
WHERE created_at = ? AND job_id = ?",
+
+            "SELECT created_at, job_id, keyspace_name, table_name, job_agent, 
status, blob_secrets, import_options, " +
+            "consistency_level, expire_at FROM sidecar_internal.restore_job_v2 
WHERE created_at = ?",
+
+            "INSERT INTO sidecar_internal.restore_slice_v2 (  job_id,  
bucket_id,  slice_id,  bucket,  key,  " +
+            "checksum,  start_token,  end_token,  compressed_size,  
uncompressed_size,  status_by_replica,  " +
+            "all_replicas) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+
+            "SELECT job_id, bucket_id, slice_id, bucket, key, checksum, 
start_token, end_token, compressed_size, " +
+            "uncompressed_size, status_by_replica, all_replicas FROM 
sidecar_internal.restore_slice_v2 " +
+            "WHERE job_id = ? AND bucket_id = ? AND end_token >= ? AND 
start_token < ? ALLOW FILTERING",
+
+            "UPDATE sidecar_internal.restore_slice_v2 SET status_by_replica = 
status_by_replica + ?, " +
+            "all_replicas = all_replicas + ? WHERE job_id = ? AND bucket_id = 
? AND start_token = ? AND slice_id = ?"
+            );
+
+            Set<String> expected = new HashSet<>(expectedPrepStatements);
+            Set<String> actual = new HashSet<>(interceptedPrepStmts);
+            Set<String> notInExpected = Sets.difference(actual, expected);
+            assertEquals(expected.size(), actual.size(), "Number of prepared 
statements should match");
+            assertTrue(notInExpected.isEmpty(),
+                       "Found the following statements that not in expected: " 
+ notInExpected);
+
             assertTrue(sidecarSchema.isInitialized());
             context.completeNow();
         });
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
index 8d5abee..6efc192 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
@@ -38,13 +38,15 @@ import 
org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
-import org.apache.cassandra.sidecar.db.RestoreJobTest;
 import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.stats.TestRestoreJobStats;
 import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor;
 import org.mockito.ArgumentCaptor;
 
+import static 
org.apache.cassandra.sidecar.db.RestoreJobTest.createNewTestingJob;
+import static org.apache.cassandra.sidecar.db.RestoreJobTest.createUpdatedJob;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doNothing;
@@ -57,18 +59,20 @@ class RestoreJobDiscovererTest
     private static final long idleLoopDelay = 2000;
     private static final int recencyDays = 10;
     private final TestRestoreJobStats stats = new TestRestoreJobStats();
-    private RestoreJobDatabaseAccessor mockJobAccessor = 
mock(RestoreJobDatabaseAccessor.class);
-    private RestoreSliceDatabaseAccessor mockSliceAccessor = 
mock(RestoreSliceDatabaseAccessor.class);
-    private RestoreJobManagerGroup mockManagers = 
mock(RestoreJobManagerGroup.class);
-    private PeriodicTaskExecutor executor = mock(PeriodicTaskExecutor.class);
-    private SidecarSchema sidecarSchema = mock(SidecarSchema.class);
-    private RestoreJobDiscoverer loop = new RestoreJobDiscoverer(testConfig(),
-                                                                 sidecarSchema,
-                                                                 
mockJobAccessor,
-                                                                 
mockSliceAccessor,
-                                                                 () -> 
mockManagers,
-                                                                 null,
-                                                                 stats);
+    private final RestoreJobDatabaseAccessor mockJobAccessor = 
mock(RestoreJobDatabaseAccessor.class);
+    private final RestoreSliceDatabaseAccessor mockSliceAccessor = 
mock(RestoreSliceDatabaseAccessor.class);
+    private final RestoreJobManagerGroup mockManagers = 
mock(RestoreJobManagerGroup.class);
+    private final LocalTokenRangesProvider rangesProvider = 
mock(LocalTokenRangesProvider.class);
+    private final PeriodicTaskExecutor executor = 
mock(PeriodicTaskExecutor.class);
+    private final SidecarSchema sidecarSchema = mock(SidecarSchema.class);
+    private final RestoreJobDiscoverer loop = new 
RestoreJobDiscoverer(testConfig(),
+                                                                       
sidecarSchema,
+                                                                       
mockJobAccessor,
+                                                                       
mockSliceAccessor,
+                                                                       () -> 
mockManagers,
+                                                                       
rangesProvider,
+                                                                       null,
+                                                                       stats);
 
     @Test
     void testGetDelay()
@@ -78,9 +82,13 @@ class RestoreJobDiscovererTest
         // when there is active restore job (status: CREATED)
         UUID jobId = UUIDs.timeBased();
         when(mockJobAccessor.findAllRecent(anyInt()))
-        .thenReturn(Collections.singletonList(RestoreJob.forUpdates(jobId, 
"agent",
-                                                                    
RestoreJobStatus.CREATED, null,
-                                                                    new 
Date(System.currentTimeMillis() + 10000L))));
+        .thenReturn(Collections.singletonList(RestoreJob.builder()
+                                                        
.createdAt(RestoreJob.toLocalDate(jobId))
+                                                        .jobId(jobId)
+                                                        .jobAgent("agent")
+                                                        
.jobStatus(RestoreJobStatus.CREATED)
+                                                        .expireAt(new 
Date(System.currentTimeMillis() + 10000L))
+                                                        .build()));
         loop.registerPeriodicTaskExecutor(executor);
         executeBlocking();
         assertThat(stats.activeJobCount).describedAs("active jobs count is 
updated")
@@ -88,9 +96,13 @@ class RestoreJobDiscovererTest
         assertThat(loop.delay()).isEqualTo(activeLoopDelay);
         // when no more jobs are active, the delay is reset back to idle loop 
delay accordingly.
         when(mockJobAccessor.findAllRecent(anyInt()))
-        .thenReturn(Collections.singletonList(RestoreJob.forUpdates(jobId, 
"agent",
-                                                                    
RestoreJobStatus.SUCCEEDED, null,
-                                                                    new 
Date(System.currentTimeMillis() + 10000L))));
+        .thenReturn(Collections.singletonList(RestoreJob.builder()
+                                                        
.createdAt(RestoreJob.toLocalDate(jobId))
+                                                        .jobId(jobId)
+                                                        .jobAgent("agent")
+                                                        
.jobStatus(RestoreJobStatus.SUCCEEDED)
+                                                        .expireAt(new 
Date(System.currentTimeMillis() + 10000L))
+                                                        .build()));
         executeBlocking();
         assertThat(stats.activeJobCount).describedAs("active jobs count is 
updated")
                                         .isZero();
@@ -112,11 +124,11 @@ class RestoreJobDiscovererTest
         UUID newJobId = UUIDs.timeBased();
         UUID failedJobId = UUIDs.timeBased();
         UUID succeededJobId = UUIDs.timeBased();
-        mockResult.add(RestoreJobTest.createNewTestingJob(newJobId));
-        mockResult.add(RestoreJob.forUpdates(failedJobId, "agent", 
RestoreJobStatus.ABORTED, null,
-                                             new 
Date(System.currentTimeMillis() + 10000L)));
-        mockResult.add(RestoreJob.forUpdates(succeededJobId, "agent", 
RestoreJobStatus.SUCCEEDED, null,
-                                             new 
Date(System.currentTimeMillis() + 10000L)));
+        mockResult.add(createNewTestingJob(newJobId));
+        mockResult.add(createUpdatedJob(failedJobId, "agent", 
RestoreJobStatus.ABORTED, null,
+                                        new Date(System.currentTimeMillis() + 
10000L)));
+        mockResult.add(createUpdatedJob(succeededJobId, "agent", 
RestoreJobStatus.SUCCEEDED, null,
+                                        new Date(System.currentTimeMillis() + 
10000L)));
         ArgumentCaptor<UUID> jobIdCapture = 
ArgumentCaptor.forClass(UUID.class);
         
doNothing().when(mockManagers).removeJobInternal(jobIdCapture.capture());
         when(mockJobAccessor.findAllRecent(anyInt())).thenReturn(mockResult);
@@ -144,8 +156,8 @@ class RestoreJobDiscovererTest
 
         // Execution 2
         when(mockJobAccessor.findAllRecent(anyInt()))
-        .thenReturn(Collections.singletonList(RestoreJob.forUpdates(newJobId, 
"agent",
-                                                  RestoreJobStatus.SUCCEEDED, 
null, new Date())));
+        .thenReturn(Collections.singletonList(createUpdatedJob(newJobId, 
"agent",
+                                                               
RestoreJobStatus.SUCCEEDED, null, new Date())));
         executeBlocking();
 
         assertThat(stats.activeJobCount).isZero();
@@ -164,7 +176,7 @@ class RestoreJobDiscovererTest
         loop.signalRefresh();
         UUID newJobId2 = UUIDs.timeBased();
         when(mockJobAccessor.findAllRecent(anyInt()))
-        
.thenReturn(Collections.singletonList(RestoreJobTest.createNewTestingJob(newJobId2)));
+        .thenReturn(Collections.singletonList(createNewTestingJob(newJobId2)));
         assertThat(loop.isRefreshSignaled()).isTrue();
 
         executeBlocking();
@@ -179,8 +191,8 @@ class RestoreJobDiscovererTest
 
         // Execution 5
         when(mockJobAccessor.findAllRecent(anyInt()))
-        .thenReturn(Collections.singletonList(RestoreJob.forUpdates(newJobId2, 
"agent",
-                                                                    
RestoreJobStatus.ABORTED, null, new Date())));
+        .thenReturn(Collections.singletonList(createUpdatedJob(newJobId2, 
"agent",
+                                                               
RestoreJobStatus.ABORTED, null, new Date())));
         executeBlocking();
 
         assertThat(stats.activeJobCount).isZero();
@@ -200,16 +212,16 @@ class RestoreJobDiscovererTest
     }
 
     @Test
-    void testExecuteWithExpiredJobs() throws Exception
+    void testExecuteWithExpiredJobs()
     {
         // setup: all 3 jobs are expired. All of them should be aborted via 
mockJobAccessor
         when(sidecarSchema.isInitialized()).thenReturn(true);
-        List<RestoreJob> mockResult
-        = IntStream.range(0, 3)
-                   .boxed()
-                   .map(x -> RestoreJob.forUpdates(UUIDs.timeBased(), "agent", 
RestoreJobStatus.CREATED, null,
-                                                   new 
Date(System.currentTimeMillis() - 1000L)))
-                   .collect(Collectors.toList());
+        List<RestoreJob> mockResult = IntStream.range(0, 3)
+                                               .boxed()
+                                               .map(x -> 
createUpdatedJob(UUIDs.timeBased(), "agent",
+                                                                          
RestoreJobStatus.CREATED, null,
+                                                                          new 
Date(System.currentTimeMillis() - 1000L)))
+                                               .collect(Collectors.toList());
         ArgumentCaptor<UUID> abortedJobs = ArgumentCaptor.forClass(UUID.class);
         doNothing().when(mockJobAccessor).abort(abortedJobs.capture());
         when(mockJobAccessor.findAllRecent(anyInt())).thenReturn(mockResult);
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
index 36556a2..3d20810 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
@@ -245,11 +245,16 @@ class RestoreJobManagerTest
 
     private RestoreSlice getTestSlice(RestoreJob job)
     {
+        InstanceMetadata owner = mock(InstanceMetadata.class);
+        when(owner.id()).thenReturn(1);
         RestoreSlice slice = RestoreSlice
                              .builder()
                              .jobId(job.jobId)
                              .bucketId((short) 0)
-                             .targetPathInStaging(testDir, "uploadId")
+                             .stageDirectory(testDir, "uploadId")
+                             .storageKey("storageKey")
+                             .storageBucket("storageBucket")
+                             .ownerInstance(owner)
                              .replicaStatus(Collections.emptyMap())
                              .replicas(Collections.emptySet())
                              .build();
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index f7761c5..83ddc70 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -175,10 +175,11 @@ class RestoreProcessorTest
         RestoreSlice slice = mock(RestoreSlice.class, 
Mockito.RETURNS_DEEP_STUBS);
         when(slice.jobId()).thenReturn(UUIDs.timeBased());
         when(slice.owner().id()).thenReturn(1);
-        when(slice.toAsyncTask(any(), any(), any(), anyDouble(), 
any())).thenReturn(promise -> {
+        when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), 
any())).thenReturn(promise -> {
             Uninterruptibles.awaitUninterruptibly(latch);
             promise.complete(slice);
         });
+        when(slice.hasImported()).thenReturn(true);
         return slice;
     }
 }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 8353f4f..9598ad2 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -19,12 +19,15 @@
 package org.apache.cassandra.sidecar.restore;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import com.datastax.driver.core.utils.UUIDs;
 import com.google.inject.Guice;
@@ -36,7 +39,9 @@ import 
org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools.TaskExecutorPool;
 import org.apache.cassandra.sidecar.db.RestoreJob;
+import org.apache.cassandra.sidecar.db.RestoreJobTest;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
+import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.apache.cassandra.sidecar.server.MainModule;
@@ -48,33 +53,30 @@ import 
software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 
 import static org.apache.cassandra.sidecar.AssertionUtils.getBlocking;
-import static org.apache.cassandra.sidecar.db.RestoreJob.toLocalDate;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class RestoreSliceTaskTest
 {
-    private RestoreJob restoreJob;
     private RestoreSlice restoreSlice;
     private StorageClient storageClient;
     private TaskExecutorPool executorPool;
     private SSTableImporter importer;
     private TestRestoreJobStats stats;
     private RestoreSliceTask task;
+    private RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
 
     @BeforeEach
     void setup()
     {
-        UUID jobId = UUIDs.timeBased();
-        restoreJob = RestoreJob.builder()
-                               .jobId(jobId)
-                               .createdAt(toLocalDate(jobId))
-                               .jobStatus(RestoreJobStatus.CREATED)
-                               .build();
         restoreSlice = mock(RestoreSlice.class, Mockito.RETURNS_DEEP_STUBS);
-        when(restoreSlice.targetPathInStaging()).thenReturn(Paths.get("."));
+        when(restoreSlice.stageDirectory()).thenReturn(Paths.get("."));
         when(restoreSlice.sliceId()).thenReturn("testing-slice");
         when(restoreSlice.key()).thenReturn("storage-key");
         when(restoreSlice.owner().id()).thenReturn(1);
@@ -83,8 +85,10 @@ class RestoreSliceTaskTest
         Injector injector = Guice.createInjector(Modules.override(new 
MainModule()).with(new TestModule()));
         executorPool = injector.getInstance(ExecutorPools.class).internal();
         stats = new TestRestoreJobStats();
-        task = new TestRestoreSliceTask(restoreJob, restoreSlice, 
storageClient,
-                                        executorPool, importer, 0, stats);
+        sliceDatabaseAccessor = mock(RestoreSliceDatabaseAccessor.class);
+        task = new TestRestoreSliceTask(restoreSlice, storageClient,
+                                        executorPool, importer, 0,
+                                        sliceDatabaseAccessor, stats);
     }
 
     @Test
@@ -155,26 +159,101 @@ class RestoreSliceTaskTest
         .hasMessageContaining("Object not found");
     }
 
+    @Test
+    void testSliceStaging()
+    {
+        // test specific setup
+        RestoreJob job = 
spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.CREATED));
+        doReturn(true).when(job).isManagedBySidecar();
+        when(restoreSlice.job()).thenReturn(job);
+        
when(restoreSlice.stagedObjectPath()).thenReturn(Paths.get("nonexist"));
+        
when(storageClient.objectExists(restoreSlice)).thenReturn(CompletableFuture.completedFuture(null));
+        when(storageClient.downloadObjectIfAbsent(restoreSlice))
+        .thenReturn(CompletableFuture.completedFuture(new File(".")));
+
+        Promise<RestoreSlice> promise = Promise.promise();
+        task.handle(promise);
+        getBlocking(promise.future()); // no error is thrown
+
+        verify(restoreSlice, times(1)).completeStagePhase();
+        verify(restoreSlice, times(0)).completeImportPhase(); // should not be 
called in this phase
+        verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
+    }
+
+    @Test
+    void testSliceStagingWithExistingObject(@TempDir Path testFolder) throws 
IOException
+    {
+        // test specific setup
+        RestoreJob job = 
spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.CREATED));
+        doReturn(true).when(job).isManagedBySidecar();
+        when(restoreSlice.job()).thenReturn(job);
+        Path stagedPath = testFolder.resolve("slice.zip");
+        Files.createFile(stagedPath);
+        when(restoreSlice.stagedObjectPath()).thenReturn(stagedPath);
+        when(storageClient.objectExists(restoreSlice))
+        .thenThrow(new RuntimeException("Should not call this method"));
+        when(storageClient.downloadObjectIfAbsent(restoreSlice))
+        .thenThrow(new RuntimeException("Should not call this method"));
+
+        Promise<RestoreSlice> promise = Promise.promise();
+        task.handle(promise);
+        getBlocking(promise.future()); // no error is thrown
+
+        verify(restoreSlice, times(1)).completeStagePhase();
+        verify(restoreSlice, times(0)).completeImportPhase(); // should not be 
called in this phase
+        verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
+    }
+
+    @Test
+    void testSliceImport()
+    {
+        // test specific setup
+        RestoreJob job = 
spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.STAGED));
+        doReturn(true).when(job).isManagedBySidecar();
+        when(restoreSlice.job()).thenReturn(job);
+
+        Promise<RestoreSlice> promise = Promise.promise();
+        task.handle(promise);
+        getBlocking(promise.future()); // no error is thrown
+
+        verify(restoreSlice, times(0)).completeStagePhase(); // should not be 
called in the phase
+        verify(restoreSlice, times(1)).completeImportPhase();
+        verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
+    }
+
+
     static class TestRestoreSliceTask extends RestoreSliceTask
     {
         private final RestoreSlice slice;
         private final RestoreJobStats stats;
 
-        public TestRestoreSliceTask(RestoreJob job, RestoreSlice slice, 
StorageClient s3Client,
-                                    TaskExecutorPool executorPool, 
SSTableImporter importer,
-                                    double requiredUsableSpacePercentage, 
RestoreJobStats stats)
+        public TestRestoreSliceTask(RestoreSlice slice, StorageClient 
s3Client, TaskExecutorPool executorPool,
+                                    SSTableImporter importer, double 
requiredUsableSpacePercentage,
+                                    RestoreSliceDatabaseAccessor 
sliceDatabaseAccessor, RestoreJobStats stats)
         {
-            super(job, slice, s3Client, executorPool, importer, 
requiredUsableSpacePercentage, stats);
+            super(slice, s3Client, executorPool, importer, 
requiredUsableSpacePercentage, sliceDatabaseAccessor, stats);
             this.slice = slice;
             this.stats = stats;
         }
 
-        void unzipAndImport(Promise<RestoreSlice> event, File file)
+        @Override
+        void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable 
onSuccessCommit)
         {
             stats.captureSliceUnzipTime(1, 123L);
             stats.captureSliceValidationTime(1, 123L);
             stats.captureSliceImportTime(1, 123L);
+            slice.completeImportPhase();
             event.tryComplete(slice);
+            if (onSuccessCommit != null)
+            {
+                onSuccessCommit.run();
+            }
+        }
+
+        @Override
+        void unzipAndImport(Promise<RestoreSlice> event, File file)
+        {
+            unzipAndImport(event, file, null);
         }
     }
 }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java
index 7ef7e48..52f19fd 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java
@@ -45,7 +45,7 @@ class RestoreSliceTest
                       
.startToken(BigInteger.ONE).endToken(BigInteger.valueOf(2))
                       .replicaStatus(Collections.singletonMap("replica1", 
RestoreSliceStatus.COMMITTING))
                       .replicas(Collections.singleton("replica1"))
-                      .targetPathInStaging(path, "uploadId")
+                      .stageDirectory(path, "uploadId")
                       .build();
         RestoreSlice slice2 = slice1.unbuild().build();
         assertThat(slice1).isEqualTo(slice2);
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
 
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
index 2453a96..7047122 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
@@ -184,7 +184,6 @@ public abstract class BaseRestoreJobTests
 
             @Override
             public RestoreJob update(UpdateRestoreJobRequestPayload payload,
-                                     QualifiedTableName qualifiedTableName,
                                      UUID jobId)
             {
                 return updateFunc.apply(payload);
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java
index c62d4ae..24fd12f 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java
@@ -52,10 +52,14 @@ class RestoreJobSummaryHandlerTest extends 
BaseRestoreJobTests
         mockLookupRestoreJob(x -> {
             UUID id = UUID.fromString(jobId);
             // keyspace name is different
-
-            return 
RestoreJob.create(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(id)), id,
-                                     "ks", "table", "job agent",
-                                     RestoreJobStatus.CREATED, SECRETS, 
SSTableImportOptions.defaults());
+            return RestoreJob.builder()
+                             
.createdAt(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(id)))
+                             .jobId(id).jobAgent("job agent")
+                             .keyspace("ks").table("table")
+                             .jobStatus(RestoreJobStatus.CREATED)
+                             .jobSecrets(SECRETS)
+                             
.sstableImportOptions(SSTableImportOptions.defaults())
+                             .build();
         });
         sendGetRestoreJobSummaryRequestAndVerify("ks", "table", jobId, 
context, HttpResponseStatus.OK.code());
     }
@@ -80,8 +84,12 @@ class RestoreJobSummaryHandlerTest extends 
BaseRestoreJobTests
         String jobId = "7cd82ff9-d276-11ed-93e5-7fce0df1306f";
         mockLookupRestoreJob(x -> {
             // keyspace name is different
-            return RestoreJob.create(null, UUID.fromString(jobId), "ks",
-                                     "table", null, RestoreJobStatus.CREATED, 
null, null);
+            return RestoreJob.builder()
+                             .createdAt(null)
+                             .jobId(UUID.fromString(jobId))
+                             .keyspace("ks").table("table")
+                             .jobStatus(RestoreJobStatus.CREATED)
+                             .build();
         });
         sendGetRestoreJobSummaryRequestAndVerify("ks1", "table", 
"7cd82ff9-d276-11ed-93e5-7fce0df1306f",
                                                  context,  
HttpResponseStatus.NOT_FOUND.code());
@@ -111,9 +119,12 @@ class RestoreJobSummaryHandlerTest extends 
BaseRestoreJobTests
     {
         mockLookupRestoreJob(x -> {
             UUID jobId = 
UUID.fromString("7cd82ff9-d276-11ed-93e5-7fce0df1306f");
-            return 
RestoreJob.create(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(jobId)), 
jobId,
-                                     "ks", "table", "job agent",
-                                     RestoreJobStatus.CREATED, null, null);
+            return RestoreJob.builder()
+                             
.createdAt(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(jobId)))
+                             .jobId(jobId).jobAgent("job agent")
+                             .keyspace("ks").table("table")
+                             .jobStatus(RestoreJobStatus.CREATED)
+                             .build();
         });
         sendGetRestoreJobSummaryRequestAndVerify("ks", "table", 
"7cd82ff9-d276-11ed-93e5-7fce0df1306f",
                                                  context, 
HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java
index c0bbe26..0a3cf93 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java
@@ -143,8 +143,14 @@ class UpdateRestoreJobHandlerTest extends 
BaseRestoreJobTests
 
     private RestoreJob createTestNewJob(String jobId)
     {
-        return RestoreJob.create(null, UUID.fromString(jobId), "ks", "table",
-                                 "agent", RestoreJobStatus.SUCCEEDED, SECRETS, 
SSTableImportOptions.defaults());
+        return RestoreJob.builder()
+                         .jobId(UUID.fromString(jobId))
+                         .keyspace("ks").table("table")
+                         .jobAgent("agent")
+                         .jobStatus(RestoreJobStatus.SUCCEEDED)
+                         .jobSecrets(SECRETS)
+                         .sstableImportOptions(SSTableImportOptions.defaults())
+                         .build();
     }
 
     private JsonObject getRequestPayload()
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java
new file mode 100644
index 0000000..fc4d354
--- /dev/null
+++ 
b/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.util.UUID;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.AssertionUtils;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.exceptions.InsufficientStorageException;
+import org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.FileStoreProps;
+
+import static 
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage;
+import static 
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.fileStoreProps;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class AsyncFileSystemUtilsTest
+{
+    private ExecutorPools executorPools;
+
+    @BeforeEach
+    void setup()
+    {
+        executorPools = new ExecutorPools(Vertx.vertx(), new 
ServiceConfigurationImpl());
+    }
+
+    @AfterEach
+    void teardown()
+    {
+        executorPools.close();
+    }
+
+    @Test
+    void testReadFileStoreProps()
+    {
+        FileStoreProps props = AssertionUtils.getBlocking(fileStoreProps(".", 
executorPools.internal()));
+        assertThat(props.name).isNotBlank();
+
+        long total = props.totalSpace;
+        long usable = props.usableSpace;
+        long unallocated = props.unallocatedSpace;
+        assertThat(total)
+        .isGreaterThan(usable)
+        .isGreaterThan(unallocated)
+        .isGreaterThan(0L);
+
+        assertThat(unallocated)
+        .isGreaterThanOrEqualTo(usable)
+        .isGreaterThan(0L);
+
+        assertThat(usable).isGreaterThan(0L);
+    }
+
+    @Test
+    void testEnsureSufficientStorage() throws Exception
+    {
+        // this check should pass (hopefully), as the required usable 
percentage is 0.0001
+        AssertionUtils.getBlocking(ensureSufficientStorage(".", 0L, 0.0001, 
executorPools.internal()));
+
+        // requesting half of the usable space should pass
+        FileStoreProps props = AssertionUtils.getBlocking(fileStoreProps(".", 
executorPools.internal()));
+        AssertionUtils.getBlocking(ensureSufficientStorage(".", 
props.usableSpace / 2,
+                                                           0, 
executorPools.internal()));
+
+        assertThatThrownBy(() -> 
AssertionUtils.getBlocking(ensureSufficientStorage(".", Long.MAX_VALUE,
+                                                                               
     0.0001,
+                                                                               
     executorPools.internal())))
+        .describedAs("Request Long.MAX_VALUE on the local file store should 
fail")
+        .hasRootCauseExactlyInstanceOf(InsufficientStorageException.class)
+        .hasMessageContaining("FileStore has insufficient space");
+
+        assertThatThrownBy(() -> 
AssertionUtils.getBlocking(ensureSufficientStorage(".", 123L,
+                                                                               
     1.0, executorPools.internal())))
+        .describedAs("Require 100% usable disk of the local file store should 
fail")
+        .hasRootCauseExactlyInstanceOf(InsufficientStorageException.class)
+        .hasMessageContaining("FileStore has insufficient space");
+    }
+
+    @Test
+    void testEnsureSufficientStorageWithNonexistingFilePath()
+    {
+        // The input path `./non-existing + uuid` does not exist.
+        // `ensureSufficientStorage` should navigate to parent paths until 
finding an existing path
+        // to be used for checking
+        // The test expects no exception is thrown
+        AssertionUtils.getBlocking(ensureSufficientStorage("./non-existing" + 
UUID.randomUUID(), 0L,
+                                                           0.0001, 
executorPools.internal()));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to