This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff9ac41  CASSANDRA-19923: Add transport extension for coordinated 
write (#83)
ff9ac41 is described below

commit ff9ac41b4695c1df59f5293f69e0d3a1ce0da9f4
Author: Yifan Cai <y...@apache.org>
AuthorDate: Wed Sep 18 12:54:47 2024 -0700

    CASSANDRA-19923: Add transport extension for coordinated write (#83)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRA-19923
---
 CHANGES.txt                                        |  1 +
 .../example/ExampleStorageTransportExtension.java  | 33 ++++++++
 .../example/LocalStorageTransportExtension.java    | 33 ++++++++
 .../bulkwriter/blobupload/BlobDataTransferApi.java |  4 +-
 .../bulkwriter/blobupload/BlobStreamSession.java   |  2 +-
 .../extensions/CoordinatedTransportExtension.java  | 96 ++++++++++++++++++++++
 ...stener.java => CoordinationSignalListener.java} | 27 +++---
 .../extensions/CredentialChangeListener.java       |  2 +-
 .../DriverStorageTransportExtension.java           |  2 +-
 .../extensions/StorageTransportHandler.java        | 34 +++++++-
 .../blobupload/BlobStreamSessionTest.java          |  2 +-
 .../testing/SharedClusterIntegrationTestBase.java  | 30 ++++++-
 .../BulkWriteS3CompatModeSimpleTest.java           | 13 ++-
 .../LocalStorageTransportExtension.java            | 31 +++++++
 cassandra-analytics-spark-converter/build.gradle   | 12 +++
 gradle.properties                                  |  2 +-
 scripts/build-sidecar.sh                           |  2 +-
 17 files changed, 299 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9d7288a..a45595e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Add transport extension for coordinated write (CASSANDRA-19923)
  * Support data partitioning for multiple clusters coordinated write 
(CASSANDRA-19910)
  * Add writer option COORDINATED_WRITE_CONFIG to define coordinated write to 
multiple Cassandra clusters (CASSANDRA-19909)
  * Decouple Cassandra types from Spark types so Cassandra types can be used 
independently from Spark (CASSANDRA-19815)
diff --git 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/ExampleStorageTransportExtension.java
 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/ExampleStorageTransportExtension.java
index 1b71b1a..3d16485 100644
--- 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/ExampleStorageTransportExtension.java
+++ 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/ExampleStorageTransportExtension.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.util.ThreadUtil;
 import org.apache.cassandra.spark.transports.storage.StorageCredentialPair;
 import org.apache.cassandra.spark.transports.storage.StorageCredentials;
+import 
org.apache.cassandra.spark.transports.storage.extensions.CoordinationSignalListener;
 import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration;
 import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportExtension;
 import 
org.apache.cassandra.spark.transports.storage.extensions.ObjectFailureListener;
@@ -47,6 +48,7 @@ public class ExampleStorageTransportExtension implements 
StorageTransportExtensi
     private long tokenCount = 0;
     private CredentialChangeListener credentialChangeListener;
     private ObjectFailureListener objectFailureListener;
+    private CoordinationSignalListener coordinationSignalListener;
     private boolean shouldFail;
 
     @Override
@@ -151,4 +153,35 @@ public class ExampleStorageTransportExtension implements 
StorageTransportExtensi
                                          "readSecretKey-" + tokenCount,
                                          "readSessionToken-" + tokenCount));
     }
+
+    @Override
+    public void onStageSucceeded(String clusterId, long objectsCount, long 
rowsCount, long elapsedMillis)
+    {
+        LOGGER.info("Job {} has all objects staged at cluster {} after {}ms", 
jobId, clusterId, elapsedMillis);
+    }
+
+    @Override
+    public void onStageFailed(String clusterId, Throwable cause)
+    {
+        LOGGER.error("Cluster {} failed to stage objects", clusterId, cause);
+    }
+
+    @Override
+    public void onApplySucceeded(String clusterId, long objectsCount, long 
rowsCount, long elapsedMillis)
+    {
+        LOGGER.info("Job {} has all objects applied at cluster {} after {}ms", 
jobId, clusterId, elapsedMillis);
+    }
+
+    @Override
+    public void onApplyFailed(String clusterId, Throwable cause)
+    {
+        LOGGER.error("Cluster {} failed to apply objects", clusterId, cause);
+    }
+
+    @Override
+    public void setCoordinationSignalListener(CoordinationSignalListener 
listener)
+    {
+        this.coordinationSignalListener = listener;
+        LOGGER.info("CoordinationSignalListener initialized. listener={}", 
listener);
+    }
 }
diff --git 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalStorageTransportExtension.java
 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalStorageTransportExtension.java
index 95d5a96..58e7d03 100644
--- 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalStorageTransportExtension.java
+++ 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalStorageTransportExtension.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.spark.transports.storage.StorageCredentialPair;
 import org.apache.cassandra.spark.transports.storage.StorageCredentials;
+import 
org.apache.cassandra.spark.transports.storage.extensions.CoordinationSignalListener;
 import 
org.apache.cassandra.spark.transports.storage.extensions.CredentialChangeListener;
 import 
org.apache.cassandra.spark.transports.storage.extensions.ObjectFailureListener;
 import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration;
@@ -38,6 +39,7 @@ public class LocalStorageTransportExtension implements 
StorageTransportExtension
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalStorageTransportExtension.class);
 
     private String jobId;
+    private CoordinationSignalListener coordinationSignalListener;
 
     @Override
     public void initialize(String jobId, SparkConf conf, boolean isOnDriver)
@@ -114,4 +116,35 @@ public class LocalStorageTransportExtension implements 
StorageTransportExtension
                                                                 "readSecret",
                                                                 
"readSessionToken"));
     }
+
+    @Override
+    public void onStageSucceeded(String clusterId, long objectsCount, long 
rowsCount, long elapsedMillis)
+    {
+        LOGGER.info("Job {} has all objects staged at cluster {} after {}ms", 
jobId, clusterId, elapsedMillis);
+    }
+
+    @Override
+    public void onStageFailed(String clusterId, Throwable cause)
+    {
+        LOGGER.error("Job {} failed to stage objects at cluster {}", jobId, 
clusterId, cause);
+    }
+
+    @Override
+    public void onApplySucceeded(String clusterId, long objectsCount, long 
rowsCount, long elapsedMillis)
+    {
+        LOGGER.info("Job {} has all objects applied at cluster {} after {}ms", 
jobId, clusterId, elapsedMillis);
+    }
+
+    @Override
+    public void onApplyFailed(String clusterId, Throwable cause)
+    {
+        LOGGER.error("Cluster {} failed to apply objects", clusterId, cause);
+    }
+
+    @Override
+    public void setCoordinationSignalListener(CoordinationSignalListener 
listener)
+    {
+        this.coordinationSignalListener = listener;
+        LOGGER.info("CoordinationSignalListener initialized. listener={}", 
listener);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobDataTransferApi.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobDataTransferApi.java
index 8791bb9..a76ddf2 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobDataTransferApi.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobDataTransferApi.java
@@ -30,10 +30,10 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import o.a.c.sidecar.client.shaded.common.request.CreateRestoreJobSliceRequest;
 import o.a.c.sidecar.client.shaded.common.request.Request;
 import 
o.a.c.sidecar.client.shaded.common.request.data.CreateRestoreJobRequestPayload;
-import 
o.a.c.sidecar.client.shaded.common.request.data.CreateRestoreJobResponsePayload;
 import 
o.a.c.sidecar.client.shaded.common.request.data.CreateSliceRequestPayload;
-import 
o.a.c.sidecar.client.shaded.common.request.data.RestoreJobSummaryResponsePayload;
 import 
o.a.c.sidecar.client.shaded.common.request.data.UpdateRestoreJobRequestPayload;
+import 
o.a.c.sidecar.client.shaded.common.response.data.CreateRestoreJobResponsePayload;
+import 
o.a.c.sidecar.client.shaded.common.response.data.RestoreJobSummaryResponsePayload;
 import org.apache.cassandra.sidecar.client.HttpResponse;
 import org.apache.cassandra.sidecar.client.HttpResponseImpl;
 import org.apache.cassandra.sidecar.client.SidecarClient;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java
index 681b6e0..c912e7a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
o.a.c.sidecar.client.shaded.common.request.data.CreateSliceRequestPayload;
-import 
o.a.c.sidecar.client.shaded.common.request.data.RestoreJobSummaryResponsePayload;
+import 
o.a.c.sidecar.client.shaded.common.response.data.RestoreJobSummaryResponsePayload;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.SSTableDescriptor;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinatedTransportExtension.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinatedTransportExtension.java
new file mode 100644
index 0000000..3c5cf98
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinatedTransportExtension.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.transports.storage.extensions;
+
+import org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation;
+
+/**
+ * Extension methods that enables coordinated write to multiple target clusters
+ * Package-private interface only to be extended by {@link 
StorageTransportExtension}
+ * <p>
+ * Note that the methods defined in this extension run in Spark Driver only
+ * <p>
+ * The coordinated write has 2 phases, i.e. staging phase and importing phase. 
In the happy path, the steps of a run are the following:
+ * <ol>
+ *     <li>Extension sets the {@link CoordinationSignalListener} on 
initialization.</li>
+ *     <li>Extension invokes {@link 
CoordinationSignalListener#onStageReady(String)},
+ *     when it decides it is time to stage SSTables on all clusters.</li>
+ *     <li>Cassandra Analytics calls Sidecars to stage data.
+ *     {@link #onStageSucceeded(String, long, long, long)} is called per 
cluster to notify the extension.</li>
+ *     <li>Extension invokes {@link 
CoordinationSignalListener#onApplyReady(String)},
+ *     when it decides it is time to apply/import SSTables on all 
clusters.</li>
+ *     <li>Cassandra Analytics calls Sidecars to import data.
+ *     {@link #onApplySucceeded(String, long, long, long)} is called per 
cluster to notify the extension.</li>
+ *     <li>{@link DriverStorageTransportExtension#onAllObjectsPersisted(long, 
long, long)}
+ *     is called to indicate the completion.</li>
+ * </ol>
+ */
+interface CoordinatedTransportExtension
+{
+    /**
+     * Notifies the {@link CoordinatedTransportExtension} implementation that 
all objects have been staged on the cluster.
+     * The callback should only be invoked once per cluster
+     *
+     * @param clusterId identifies a Cassandra cluster
+     * @param objectsCount the total count of the objects
+     * @param rowsCount the total count of the rows
+     * @param elapsedMillis the elapsed time from the start of the bulk write 
run in milliseconds
+     */
+    void onStageSucceeded(String clusterId, long objectsCount, long rowsCount, 
long elapsedMillis);
+
+    /**
+     * Notifies the {@link CoordinatedTransportExtension} implementation that 
it fails to stage objects on the cluster.
+     * The callback should only be invoked once per cluster
+     *
+     * @param clusterId identifies a Cassandra cluster
+     * @param cause failure
+     */
+    void onStageFailed(String clusterId, Throwable cause);
+
+    /**
+     * Notifies the {@link CoordinatedTransportExtension} implementation that 
all objects have been applied on the cluster.
+     * The callback should only be invoked once per cluster
+     *
+     * @param clusterId identifies a Cassandra cluster
+     * @param objectsCount the total count of the objects
+     * @param rowsCount the total count of the rows
+     * @param elapsedMillis the elapsed time from the start of the bulk write 
run in milliseconds
+     */
+    void onApplySucceeded(String clusterId, long objectsCount, long rowsCount, 
long elapsedMillis);
+
+    /**
+     * Notifies the {@link CoordinatedTransportExtension} implementation that 
it fails to apply objects on the cluster.
+     * The callback should only be invoked once per cluster
+     *
+     * @param clusterId identifies a Cassandra cluster
+     * @param cause failure
+     */
+    void onApplyFailed(String clusterId, Throwable cause);
+
+    /**
+     * Set the {@link CoordinationSignalListener} to receive coordination 
signals from {@link CoordinatedTransportExtension} implementation
+     * <p>
+     * Note to {@link CoordinatedTransportExtension} implementor:
+     * this method is called during setup of {@link 
CassandraBulkSourceRelation}, and a {@link CoordinationSignalListener} instance 
is provided
+     *
+     * @param listener receives coordination signals
+     */
+    void setCoordinationSignalListener(CoordinationSignalListener listener);
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java
similarity index 50%
copy from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java
copy to 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java
index e725c8c..9e50bf3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java
@@ -19,22 +19,25 @@
 
 package org.apache.cassandra.spark.transports.storage.extensions;
 
-import org.apache.cassandra.spark.transports.storage.StorageCredentialPair;
-
 /**
- * A listener interface that is notified on access token changes
+ * A listener interface that receives coordination signals.
+ * It works in cooperation with {@link CoordinatedTransportExtension} to 
enable coordinated write
  */
-public interface CredentialChangeListener
+public interface CoordinationSignalListener
 {
     /**
-     * Method called when new access tokens are available for the job with ID 
{@code jobId}.
-     * The previous set of credentials and the newly-provided set must both be 
valid simultaneously
-     * for the Spark job to have time to rotate credentials without 
interruption.
-     * These tokens should be provided with plenty of time for the job to 
distribute them to
-     * the consumers of the storage transport endpoint to update their tokens 
before expiration.
+     * The method is called when the write coordinator, i.e. implementation of 
{@link CoordinatedTransportExtension}, decides that
+     * it is ready to stage SSTable bundles on all participating clusters.
+     *
+     * @param jobId the unique identifier for the job
+     */
+    void onStageReady(String jobId);
+
+    /**
+     * The method is called when the write coordinator, i.e. implementation of 
{@link CoordinatedTransportExtension}, decides that
+     * it is ready to apply/import SSTables on all participating clusters.
      *
-     * @param jobId     the unique identifier for the job
-     * @param newTokens a map of access tokens used to authenticate to the 
storage transport
+     * @param jobId the unique identifier for the job
      */
-    void onCredentialsChanged(String jobId, StorageCredentialPair newTokens);
+    void onApplyReady(String jobId);
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java
index e725c8c..5a35c7f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java
@@ -27,7 +27,7 @@ import 
org.apache.cassandra.spark.transports.storage.StorageCredentialPair;
 public interface CredentialChangeListener
 {
     /**
-     * Method called when new access tokens are available for the job with ID 
{@code jobId}.
+     * The method is called when new access tokens are available for the job 
with ID {@code jobId}.
      * The previous set of credentials and the newly-provided set must both be 
valid simultaneously
      * for the Spark job to have time to rotate credentials without 
interruption.
      * These tokens should be provided with plenty of time for the job to 
distribute them to
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/DriverStorageTransportExtension.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/DriverStorageTransportExtension.java
index 7c9fb99..94baef1 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/DriverStorageTransportExtension.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/DriverStorageTransportExtension.java
@@ -23,7 +23,7 @@ package 
org.apache.cassandra.spark.transports.storage.extensions;
  * Extension methods that are invoked in Spark driver only
  * Package-private interface only to be extended by {@link 
StorageTransportExtension}
  */
-interface DriverStorageTransportExtension
+interface DriverStorageTransportExtension extends CoordinatedTransportExtension
 {
     /**
      * Notifies the extension that data transport has been started. This 
method will be called from the driver.
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java
index f0a422e..f852027 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import o.a.c.sidecar.client.shaded.common.data.RestoreJobSecrets;
+import o.a.c.sidecar.client.shaded.common.data.RestoreJobStatus;
 import 
o.a.c.sidecar.client.shaded.common.request.data.UpdateRestoreJobRequestPayload;
 import org.apache.cassandra.spark.bulkwriter.CancelJobEvent;
 import org.apache.cassandra.spark.bulkwriter.JobInfo;
@@ -34,7 +35,7 @@ import org.apache.cassandra.spark.bulkwriter.TransportContext;
 import org.apache.cassandra.spark.common.client.ClientException;
 import org.apache.cassandra.spark.transports.storage.StorageCredentialPair;
 
-public class StorageTransportHandler implements CredentialChangeListener, 
ObjectFailureListener
+public class StorageTransportHandler implements CredentialChangeListener, 
ObjectFailureListener, CoordinationSignalListener
 {
     private final TransportContext.CloudStorageTransportContext 
transportContext;
     private final Consumer<CancelJobEvent> cancelConsumer;
@@ -74,6 +75,22 @@ public class StorageTransportHandler implements 
CredentialChangeListener, Object
         cancelConsumer.accept(new CancelJobEvent(errorMessage));
     }
 
+    @Override
+    public void onStageReady(String jobId)
+    {
+        validateReceivedJobId(jobId);
+        LOGGER.info("Received stage ready signal for coordinated write. 
jobId={}", jobId);
+        sendCoordinationSignal(jobInfo.getRestoreJobId(), 
RestoreJobStatus.STAGE_READY);
+    }
+
+    @Override
+    public void onApplyReady(String jobId)
+    {
+        validateReceivedJobId(jobId);
+        LOGGER.info("Received apply ready signal for coordinated write. 
jobId={}", jobId);
+        sendCoordinationSignal(jobInfo.getRestoreJobId(), 
RestoreJobStatus.IMPORT_READY);
+    }
+
     private void updateCredentials(UUID jobId, StorageCredentialPair 
credentialPair)
     {
         StorageTransportConfiguration conf = 
transportContext.transportConfiguration();
@@ -85,7 +102,20 @@ public class StorageTransportHandler implements 
CredentialChangeListener, Object
         }
         catch (ClientException e)
         {
-            throw new RuntimeException("Failed to update secretes for restore 
job. restoreJobId: " + jobId, e);
+            throw new RuntimeException("Failed to update secrets for restore 
job. restoreJobId: " + jobId, e);
+        }
+    }
+
+    private void sendCoordinationSignal(UUID jobId, RestoreJobStatus status)
+    {
+        UpdateRestoreJobRequestPayload requestPayload = new 
UpdateRestoreJobRequestPayload(null, null, status, null);
+        try
+        {
+            
transportContext.dataTransferApi().updateRestoreJob(requestPayload);
+        }
+        catch (ClientException e)
+        {
+            throw new RuntimeException("Failed to send coordination signal for 
restore job. restoreJobId: " + jobId + ", status=" + status, e);
         }
     }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java
index cbba94d..0957258 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java
@@ -40,7 +40,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import o.a.c.sidecar.client.shaded.common.data.RestoreJobSecrets;
 import 
o.a.c.sidecar.client.shaded.common.request.data.CreateSliceRequestPayload;
-import 
o.a.c.sidecar.client.shaded.common.request.data.RestoreJobSummaryResponsePayload;
+import 
o.a.c.sidecar.client.shaded.common.response.data.RestoreJobSummaryResponsePayload;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.SSTableSummary;
 import org.apache.cassandra.sidecar.client.SidecarClient;
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index a67591d..0330f1e 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -37,6 +37,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestInstance;
@@ -75,6 +76,7 @@ import org.apache.cassandra.sidecar.common.server.JmxClient;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
 import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider;
+import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
 import org.apache.cassandra.sidecar.config.JmxConfiguration;
 import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
 import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
@@ -88,10 +90,10 @@ import 
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
-import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
 import org.apache.cassandra.sidecar.server.MainModule;
 import org.apache.cassandra.sidecar.server.Server;
+import org.apache.cassandra.sidecar.server.SidecarServerEvents;
 import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 import org.apache.cassandra.testing.IClusterExtension;
@@ -100,6 +102,7 @@ import org.apache.cassandra.testing.TestUtils;
 import org.apache.cassandra.testing.TestVersion;
 import org.apache.cassandra.testing.TestVersionSupplier;
 
+import static 
org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT_MILLIS;
 import static 
org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -154,6 +157,7 @@ public abstract class SharedClusterIntegrationTestBase
     protected TestVersion testVersion;
     protected MtlsTestHelper mtlsTestHelper;
     private IsolatedDTestClassLoaderWrapper classLoaderWrapper;
+    private Injector sidecarServerInjector;
 
     static
     {
@@ -345,8 +349,8 @@ public abstract class SharedClusterIntegrationTestBase
         VertxTestContext context = new VertxTestContext();
         AbstractModule testModule = new IntegrationTestModule(instances, 
classLoaderWrapper, mtlsTestHelper,
                                                               dnsResolver, 
configurationOverrides());
-        Injector injector = Guice.createInjector(Modules.override(new 
MainModule()).with(testModule));
-        Server sidecarServer = injector.getInstance(Server.class);
+        sidecarServerInjector = Guice.createInjector(Modules.override(new 
MainModule()).with(testModule));
+        Server sidecarServer = sidecarServerInjector.getInstance(Server.class);
         sidecarServer.start()
                      .onSuccess(s -> context.completeNow())
                      .onFailure(context::failNow);
@@ -355,6 +359,22 @@ public abstract class SharedClusterIntegrationTestBase
         return sidecarServer;
     }
 
+    protected void waitForSchemaReady(long timeout, TimeUnit timeUnit)
+    {
+        assertThat(sidecarServerInjector)
+        .describedAs("Sidecar is started")
+        .isNotNull();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
+        vertx.eventBus()
+             
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg 
-> latch.countDown());
+
+        assertThat(Uninterruptibles.awaitUninterruptibly(latch, timeout, 
timeUnit))
+        .describedAs("Sidecar schema is not initialized after " + timeout + ' 
' + timeUnit)
+        .isTrue();
+    }
+
     /**
      * Stops the Sidecar service
      *
@@ -588,7 +608,9 @@ public abstract class SharedClusterIntegrationTestBase
                 LOGGER.info("Not enabling mTLS for testing purposes. Set '{}' 
to 'true' if you would " +
                             "like mTLS enabled.", 
CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
             }
-            S3ClientConfiguration s3ClientConfig = new 
S3ClientConfigurationImpl("s3-client", 4, 60L, buildTestS3ProxyConfig());
+            S3ClientConfiguration s3ClientConfig = new 
S3ClientConfigurationImpl("s3-client", 4, 60L,
+                                                                               
  5242880, DEFAULT_API_CALL_TIMEOUT_MILLIS,
+                                                                               
  buildTestS3ProxyConfig());
             SidecarConfigurationImpl.Builder builder = 
SidecarConfigurationImpl.builder()
                                                                                
.serviceConfiguration(conf)
                                                                                
.sslConfiguration(sslConfiguration)
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java
index 125ddae..dd91674 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java
@@ -20,6 +20,7 @@
 package org.apache.cassandra.analytics.testcontainer;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import com.google.common.collect.ImmutableMap;
@@ -39,6 +40,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import static 
org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT_MILLIS;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
 import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
@@ -86,12 +88,21 @@ class BulkWriteS3CompatModeSimpleTest extends 
SharedClusterSparkIntegrationTestB
     protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
     {
         return builder -> {
-            S3ClientConfiguration s3ClientConfig = new 
S3ClientConfigurationImpl("s3-client", 4, 60L, buildTestS3ProxyConfig());
+            S3ClientConfiguration s3ClientConfig = new 
S3ClientConfigurationImpl("s3-client", 4, 60L,
+                                                                               
  5242880, DEFAULT_API_CALL_TIMEOUT_MILLIS,
+                                                                               
  buildTestS3ProxyConfig());
             builder.s3ClientConfiguration(s3ClientConfig);
             return builder;
         };
     }
 
+    @Override
+    protected void beforeTestStart()
+    {
+        super.beforeTestStart();
+        waitForSchemaReady(10, TimeUnit.SECONDS);
+    }
+
     private S3ProxyConfiguration buildTestS3ProxyConfig()
     {
         return new S3MockProxyConfigurationImpl(s3Mock.getHttpEndpoint());
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/LocalStorageTransportExtension.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/LocalStorageTransportExtension.java
index 9ab92d9..33c0029 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/LocalStorageTransportExtension.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/LocalStorageTransportExtension.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.spark.transports.storage.StorageCredentialPair;
 import org.apache.cassandra.spark.transports.storage.StorageCredentials;
+import 
org.apache.cassandra.spark.transports.storage.extensions.CoordinationSignalListener;
 import 
org.apache.cassandra.spark.transports.storage.extensions.CredentialChangeListener;
 import 
org.apache.cassandra.spark.transports.storage.extensions.ObjectFailureListener;
 import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration;
@@ -99,4 +100,34 @@ public class LocalStorageTransportExtension implements 
StorageTransportExtension
                                                                 "readSecret",
                                                                 
"readSessionToken"));
     }
+
+    @Override
+    public void onStageSucceeded(String clusterId, long objectsCount, long 
rowsCount, long elapsedMillis)
+    {
+
+    }
+
+    @Override
+    public void onStageFailed(String clusterId, Throwable cause)
+    {
+
+    }
+
+    @Override
+    public void onApplySucceeded(String clusterId, long objectsCount, long 
rowsCount, long elapsedMillis)
+    {
+
+    }
+
+    @Override
+    public void onApplyFailed(String clusterId, Throwable cause)
+    {
+
+    }
+
+    @Override
+    public void setCoordinationSignalListener(CoordinationSignalListener 
listener)
+    {
+
+    }
 }
diff --git a/cassandra-analytics-spark-converter/build.gradle 
b/cassandra-analytics-spark-converter/build.gradle
index f023efa..cf4ca38 100644
--- a/cassandra-analytics-spark-converter/build.gradle
+++ b/cassandra-analytics-spark-converter/build.gradle
@@ -19,6 +19,7 @@
 
 plugins {
     id('java-library')
+    id('maven-publish')
 }
 
 configurations {
@@ -26,6 +27,17 @@ configurations {
     all*.exclude(group: 'log4j', module: 'log4j')
 }
 
+publishing {
+    publications {
+        maven(MavenPublication) {
+            from components.java
+            groupId project.group
+            artifactId "${archivesBaseName}"
+            version System.getenv("CODE_VERSION") ?: "${version}"
+        }
+    }
+}
+
 dependencies {
     compileOnly project(":cassandra-analytics-common")
     compileOnly(group: "${sparkGroupId}", name: 
"spark-core_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
diff --git a/gradle.properties b/gradle.properties
index aa7c785..5f6d4a1 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -35,6 +35,6 @@ guavaVersion=16.0.1
 # force version 4.5.1 of vertx to prevent issues initializing 
io.vertx.core.json.jackson.JacksonCodec,
 # which requires a newer version of jackson, which is not available in spark 2
 vertxVersion=4.5.1
-aswSdkVersion=2.25.31
+aswSdkVersion=2.26.12
 
 org.gradle.jvmargs=-Xmx4096m
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index 8375dbc..1955041 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@ else
   SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
   
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}";
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-55a9efee30555d3645680c6524043a6c9bc1194b}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-f07e248d0ce8303a06daf93b462190ef7be7304d}"
   SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
   SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
   SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to