This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new ff9ac41 CASSANDRA-19923: Add transport extension for coordinated write (#83) ff9ac41 is described below commit ff9ac41b4695c1df59f5293f69e0d3a1ce0da9f4 Author: Yifan Cai <y...@apache.org> AuthorDate: Wed Sep 18 12:54:47 2024 -0700 CASSANDRA-19923: Add transport extension for coordinated write (#83) Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRA-19923 --- CHANGES.txt | 1 + .../example/ExampleStorageTransportExtension.java | 33 ++++++++ .../example/LocalStorageTransportExtension.java | 33 ++++++++ .../bulkwriter/blobupload/BlobDataTransferApi.java | 4 +- .../bulkwriter/blobupload/BlobStreamSession.java | 2 +- .../extensions/CoordinatedTransportExtension.java | 96 ++++++++++++++++++++++ ...stener.java => CoordinationSignalListener.java} | 27 +++--- .../extensions/CredentialChangeListener.java | 2 +- .../DriverStorageTransportExtension.java | 2 +- .../extensions/StorageTransportHandler.java | 34 +++++++- .../blobupload/BlobStreamSessionTest.java | 2 +- .../testing/SharedClusterIntegrationTestBase.java | 30 ++++++- .../BulkWriteS3CompatModeSimpleTest.java | 13 ++- .../LocalStorageTransportExtension.java | 31 +++++++ cassandra-analytics-spark-converter/build.gradle | 12 +++ gradle.properties | 2 +- scripts/build-sidecar.sh | 2 +- 17 files changed, 299 insertions(+), 27 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9d7288a..a45595e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Add transport extension for coordinated write (CASSANDRA-19923) * Support data partitioning for multiple clusters coordinated write (CASSANDRA-19910) * Add writer option COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters (CASSANDRA-19909) * Decouple Cassandra types from Spark types so Cassandra types can be used independently from Spark (CASSANDRA-19815) diff --git a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/ExampleStorageTransportExtension.java b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/ExampleStorageTransportExtension.java index 1b71b1a..3d16485 100644 --- a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/ExampleStorageTransportExtension.java +++ b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/ExampleStorageTransportExtension.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.util.ThreadUtil; import org.apache.cassandra.spark.transports.storage.StorageCredentialPair; import org.apache.cassandra.spark.transports.storage.StorageCredentials; +import org.apache.cassandra.spark.transports.storage.extensions.CoordinationSignalListener; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportExtension; import org.apache.cassandra.spark.transports.storage.extensions.ObjectFailureListener; @@ -47,6 +48,7 @@ public class ExampleStorageTransportExtension implements StorageTransportExtensi private long tokenCount = 0; private CredentialChangeListener credentialChangeListener; private ObjectFailureListener objectFailureListener; + private CoordinationSignalListener coordinationSignalListener; private boolean shouldFail; @Override @@ -151,4 +153,35 @@ public class ExampleStorageTransportExtension implements StorageTransportExtensi "readSecretKey-" + tokenCount, "readSessionToken-" + tokenCount)); } + + @Override + public void onStageSucceeded(String clusterId, long objectsCount, long rowsCount, long elapsedMillis) + { + LOGGER.info("Job {} has all objects staged at cluster {} after {}ms", jobId, clusterId, elapsedMillis); + } + + @Override + public void onStageFailed(String clusterId, Throwable cause) + { + LOGGER.error("Cluster {} failed to stage objects", clusterId, cause); + } + + @Override + public void onApplySucceeded(String clusterId, long objectsCount, long rowsCount, long elapsedMillis) + { + LOGGER.info("Job {} has all objects applied at cluster {} after {}ms", jobId, clusterId, elapsedMillis); + } + + @Override + public void onApplyFailed(String clusterId, Throwable cause) + { + LOGGER.error("Cluster {} failed to apply objects", clusterId, cause); + } + + @Override + public void setCoordinationSignalListener(CoordinationSignalListener listener) + { + this.coordinationSignalListener = listener; + LOGGER.info("CoordinationSignalListener initialized. listener={}", listener); + } } diff --git a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalStorageTransportExtension.java b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalStorageTransportExtension.java index 95d5a96..58e7d03 100644 --- a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalStorageTransportExtension.java +++ b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalStorageTransportExtension.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.spark.transports.storage.StorageCredentialPair; import org.apache.cassandra.spark.transports.storage.StorageCredentials; +import org.apache.cassandra.spark.transports.storage.extensions.CoordinationSignalListener; import org.apache.cassandra.spark.transports.storage.extensions.CredentialChangeListener; import org.apache.cassandra.spark.transports.storage.extensions.ObjectFailureListener; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration; @@ -38,6 +39,7 @@ public class LocalStorageTransportExtension implements StorageTransportExtension private static final Logger LOGGER = LoggerFactory.getLogger(LocalStorageTransportExtension.class); private String jobId; + private CoordinationSignalListener coordinationSignalListener; @Override public void initialize(String jobId, SparkConf conf, boolean isOnDriver) @@ -114,4 +116,35 @@ public class LocalStorageTransportExtension implements StorageTransportExtension "readSecret", "readSessionToken")); } + + @Override + public void onStageSucceeded(String clusterId, long objectsCount, long rowsCount, long elapsedMillis) + { + LOGGER.info("Job {} has all objects staged at cluster {} after {}ms", jobId, clusterId, elapsedMillis); + } + + @Override + public void onStageFailed(String clusterId, Throwable cause) + { + LOGGER.error("Job {} failed to stage objects at cluster {}", jobId, clusterId, cause); + } + + @Override + public void onApplySucceeded(String clusterId, long objectsCount, long rowsCount, long elapsedMillis) + { + LOGGER.info("Job {} has all objects applied at cluster {} after {}ms", jobId, clusterId, elapsedMillis); + } + + @Override + public void onApplyFailed(String clusterId, Throwable cause) + { + LOGGER.error("Cluster {} failed to apply objects", clusterId, cause); + } + + @Override + public void setCoordinationSignalListener(CoordinationSignalListener listener) + { + this.coordinationSignalListener = listener; + LOGGER.info("CoordinationSignalListener initialized. listener={}", listener); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobDataTransferApi.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobDataTransferApi.java index 8791bb9..a76ddf2 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobDataTransferApi.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobDataTransferApi.java @@ -30,10 +30,10 @@ import io.netty.handler.codec.http.HttpResponseStatus; import o.a.c.sidecar.client.shaded.common.request.CreateRestoreJobSliceRequest; import o.a.c.sidecar.client.shaded.common.request.Request; import o.a.c.sidecar.client.shaded.common.request.data.CreateRestoreJobRequestPayload; -import o.a.c.sidecar.client.shaded.common.request.data.CreateRestoreJobResponsePayload; import o.a.c.sidecar.client.shaded.common.request.data.CreateSliceRequestPayload; -import o.a.c.sidecar.client.shaded.common.request.data.RestoreJobSummaryResponsePayload; import o.a.c.sidecar.client.shaded.common.request.data.UpdateRestoreJobRequestPayload; +import o.a.c.sidecar.client.shaded.common.response.data.CreateRestoreJobResponsePayload; +import o.a.c.sidecar.client.shaded.common.response.data.RestoreJobSummaryResponsePayload; import org.apache.cassandra.sidecar.client.HttpResponse; import org.apache.cassandra.sidecar.client.HttpResponseImpl; import org.apache.cassandra.sidecar.client.SidecarClient; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java index 681b6e0..c912e7a 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import o.a.c.sidecar.client.shaded.common.request.data.CreateSliceRequestPayload; -import o.a.c.sidecar.client.shaded.common.request.data.RestoreJobSummaryResponsePayload; +import o.a.c.sidecar.client.shaded.common.response.data.RestoreJobSummaryResponsePayload; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.SSTableDescriptor; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinatedTransportExtension.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinatedTransportExtension.java new file mode 100644 index 0000000..3c5cf98 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinatedTransportExtension.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.transports.storage.extensions; + +import org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation; + +/** + * Extension methods that enables coordinated write to multiple target clusters + * Package-private interface only to be extended by {@link StorageTransportExtension} + * <p> + * Note that the methods defined in this extension run in Spark Driver only + * <p> + * The coordinated write has 2 phases, i.e. staging phase and importing phase. In the happy path, the steps of a run are the following: + * <ol> + * <li>Extension sets the {@link CoordinationSignalListener} on initialization.</li> + * <li>Extension invokes {@link CoordinationSignalListener#onStageReady(String)}, + * when it decides it is time to stage SSTables on all clusters.</li> + * <li>Cassandra Analytics calls Sidecars to stage data. + * {@link #onStageSucceeded(String, long, long, long)} is called per cluster to notify the extension.</li> + * <li>Extension invokes {@link CoordinationSignalListener#onApplyReady(String)}, + * when it decides it is time to apply/import SSTables on all clusters.</li> + * <li>Cassandra Analytics calls Sidecars to import data. + * {@link #onApplySucceeded(String, long, long, long)} is called per cluster to notify the extension.</li> + * <li>{@link DriverStorageTransportExtension#onAllObjectsPersisted(long, long, long)} + * is called to indicate the completion.</li> + * </ol> + */ +interface CoordinatedTransportExtension +{ + /** + * Notifies the {@link CoordinatedTransportExtension} implementation that all objects have been staged on the cluster. + * The callback should only be invoked once per cluster + * + * @param clusterId identifies a Cassandra cluster + * @param objectsCount the total count of the objects + * @param rowsCount the total count of the rows + * @param elapsedMillis the elapsed time from the start of the bulk write run in milliseconds + */ + void onStageSucceeded(String clusterId, long objectsCount, long rowsCount, long elapsedMillis); + + /** + * Notifies the {@link CoordinatedTransportExtension} implementation that it fails to stage objects on the cluster. + * The callback should only be invoked once per cluster + * + * @param clusterId identifies a Cassandra cluster + * @param cause failure + */ + void onStageFailed(String clusterId, Throwable cause); + + /** + * Notifies the {@link CoordinatedTransportExtension} implementation that all objects have been applied on the cluster. + * The callback should only be invoked once per cluster + * + * @param clusterId identifies a Cassandra cluster + * @param objectsCount the total count of the objects + * @param rowsCount the total count of the rows + * @param elapsedMillis the elapsed time from the start of the bulk write run in milliseconds + */ + void onApplySucceeded(String clusterId, long objectsCount, long rowsCount, long elapsedMillis); + + /** + * Notifies the {@link CoordinatedTransportExtension} implementation that it fails to apply objects on the cluster. + * The callback should only be invoked once per cluster + * + * @param clusterId identifies a Cassandra cluster + * @param cause failure + */ + void onApplyFailed(String clusterId, Throwable cause); + + /** + * Set the {@link CoordinationSignalListener} to receive coordination signals from {@link CoordinatedTransportExtension} implementation + * <p> + * Note to {@link CoordinatedTransportExtension} implementor: + * this method is called during setup of {@link CassandraBulkSourceRelation}, and a {@link CoordinationSignalListener} instance is provided + * + * @param listener receives coordination signals + */ + void setCoordinationSignalListener(CoordinationSignalListener listener); +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java similarity index 50% copy from cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java copy to cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java index e725c8c..9e50bf3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java @@ -19,22 +19,25 @@ package org.apache.cassandra.spark.transports.storage.extensions; -import org.apache.cassandra.spark.transports.storage.StorageCredentialPair; - /** - * A listener interface that is notified on access token changes + * A listener interface that receives coordination signals. + * It works in cooperation with {@link CoordinatedTransportExtension} to enable coordinated write */ -public interface CredentialChangeListener +public interface CoordinationSignalListener { /** - * Method called when new access tokens are available for the job with ID {@code jobId}. - * The previous set of credentials and the newly-provided set must both be valid simultaneously - * for the Spark job to have time to rotate credentials without interruption. - * These tokens should be provided with plenty of time for the job to distribute them to - * the consumers of the storage transport endpoint to update their tokens before expiration. + * The method is called when the write coordinator, i.e. implementation of {@link CoordinatedTransportExtension}, decides that + * it is ready to stage SSTable bundles on all participating clusters. + * + * @param jobId the unique identifier for the job + */ + void onStageReady(String jobId); + + /** + * The method is called when the write coordinator, i.e. implementation of {@link CoordinatedTransportExtension}, decides that + * it is ready to apply/import SSTables on all participating clusters. * - * @param jobId the unique identifier for the job - * @param newTokens a map of access tokens used to authenticate to the storage transport + * @param jobId the unique identifier for the job */ - void onCredentialsChanged(String jobId, StorageCredentialPair newTokens); + void onApplyReady(String jobId); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java index e725c8c..5a35c7f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CredentialChangeListener.java @@ -27,7 +27,7 @@ import org.apache.cassandra.spark.transports.storage.StorageCredentialPair; public interface CredentialChangeListener { /** - * Method called when new access tokens are available for the job with ID {@code jobId}. + * The method is called when new access tokens are available for the job with ID {@code jobId}. * The previous set of credentials and the newly-provided set must both be valid simultaneously * for the Spark job to have time to rotate credentials without interruption. * These tokens should be provided with plenty of time for the job to distribute them to diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/DriverStorageTransportExtension.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/DriverStorageTransportExtension.java index 7c9fb99..94baef1 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/DriverStorageTransportExtension.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/DriverStorageTransportExtension.java @@ -23,7 +23,7 @@ package org.apache.cassandra.spark.transports.storage.extensions; * Extension methods that are invoked in Spark driver only * Package-private interface only to be extended by {@link StorageTransportExtension} */ -interface DriverStorageTransportExtension +interface DriverStorageTransportExtension extends CoordinatedTransportExtension { /** * Notifies the extension that data transport has been started. This method will be called from the driver. diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java index f0a422e..f852027 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import o.a.c.sidecar.client.shaded.common.data.RestoreJobSecrets; +import o.a.c.sidecar.client.shaded.common.data.RestoreJobStatus; import o.a.c.sidecar.client.shaded.common.request.data.UpdateRestoreJobRequestPayload; import org.apache.cassandra.spark.bulkwriter.CancelJobEvent; import org.apache.cassandra.spark.bulkwriter.JobInfo; @@ -34,7 +35,7 @@ import org.apache.cassandra.spark.bulkwriter.TransportContext; import org.apache.cassandra.spark.common.client.ClientException; import org.apache.cassandra.spark.transports.storage.StorageCredentialPair; -public class StorageTransportHandler implements CredentialChangeListener, ObjectFailureListener +public class StorageTransportHandler implements CredentialChangeListener, ObjectFailureListener, CoordinationSignalListener { private final TransportContext.CloudStorageTransportContext transportContext; private final Consumer<CancelJobEvent> cancelConsumer; @@ -74,6 +75,22 @@ public class StorageTransportHandler implements CredentialChangeListener, Object cancelConsumer.accept(new CancelJobEvent(errorMessage)); } + @Override + public void onStageReady(String jobId) + { + validateReceivedJobId(jobId); + LOGGER.info("Received stage ready signal for coordinated write. jobId={}", jobId); + sendCoordinationSignal(jobInfo.getRestoreJobId(), RestoreJobStatus.STAGE_READY); + } + + @Override + public void onApplyReady(String jobId) + { + validateReceivedJobId(jobId); + LOGGER.info("Received apply ready signal for coordinated write. jobId={}", jobId); + sendCoordinationSignal(jobInfo.getRestoreJobId(), RestoreJobStatus.IMPORT_READY); + } + private void updateCredentials(UUID jobId, StorageCredentialPair credentialPair) { StorageTransportConfiguration conf = transportContext.transportConfiguration(); @@ -85,7 +102,20 @@ public class StorageTransportHandler implements CredentialChangeListener, Object } catch (ClientException e) { - throw new RuntimeException("Failed to update secretes for restore job. restoreJobId: " + jobId, e); + throw new RuntimeException("Failed to update secrets for restore job. restoreJobId: " + jobId, e); + } + } + + private void sendCoordinationSignal(UUID jobId, RestoreJobStatus status) + { + UpdateRestoreJobRequestPayload requestPayload = new UpdateRestoreJobRequestPayload(null, null, status, null); + try + { + transportContext.dataTransferApi().updateRestoreJob(requestPayload); + } + catch (ClientException e) + { + throw new RuntimeException("Failed to send coordination signal for restore job. restoreJobId: " + jobId + ", status=" + status, e); } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java index cbba94d..0957258 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java @@ -40,7 +40,7 @@ import org.junit.jupiter.api.io.TempDir; import o.a.c.sidecar.client.shaded.common.data.RestoreJobSecrets; import o.a.c.sidecar.client.shaded.common.request.data.CreateSliceRequestPayload; -import o.a.c.sidecar.client.shaded.common.request.data.RestoreJobSummaryResponsePayload; +import o.a.c.sidecar.client.shaded.common.response.data.RestoreJobSummaryResponsePayload; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.SSTableSummary; import org.apache.cassandra.sidecar.client.SidecarClient; diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java index a67591d..0330f1e 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java @@ -37,6 +37,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; @@ -75,6 +76,7 @@ import org.apache.cassandra.sidecar.common.server.JmxClient; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider; +import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils; import org.apache.cassandra.sidecar.config.JmxConfiguration; import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; import org.apache.cassandra.sidecar.config.S3ClientConfiguration; @@ -88,10 +90,10 @@ import org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl; -import org.apache.cassandra.sidecar.exceptions.ThrowableUtils; import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics; import org.apache.cassandra.sidecar.server.MainModule; import org.apache.cassandra.sidecar.server.Server; +import org.apache.cassandra.sidecar.server.SidecarServerEvents; import org.apache.cassandra.sidecar.utils.CassandraVersionProvider; import org.apache.cassandra.testing.ClusterBuilderConfiguration; import org.apache.cassandra.testing.IClusterExtension; @@ -100,6 +102,7 @@ import org.apache.cassandra.testing.TestUtils; import org.apache.cassandra.testing.TestVersion; import org.apache.cassandra.testing.TestVersionSupplier; +import static org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT_MILLIS; import static org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS; import static org.assertj.core.api.Assertions.assertThat; @@ -154,6 +157,7 @@ public abstract class SharedClusterIntegrationTestBase protected TestVersion testVersion; protected MtlsTestHelper mtlsTestHelper; private IsolatedDTestClassLoaderWrapper classLoaderWrapper; + private Injector sidecarServerInjector; static { @@ -345,8 +349,8 @@ public abstract class SharedClusterIntegrationTestBase VertxTestContext context = new VertxTestContext(); AbstractModule testModule = new IntegrationTestModule(instances, classLoaderWrapper, mtlsTestHelper, dnsResolver, configurationOverrides()); - Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(testModule)); - Server sidecarServer = injector.getInstance(Server.class); + sidecarServerInjector = Guice.createInjector(Modules.override(new MainModule()).with(testModule)); + Server sidecarServer = sidecarServerInjector.getInstance(Server.class); sidecarServer.start() .onSuccess(s -> context.completeNow()) .onFailure(context::failNow); @@ -355,6 +359,22 @@ public abstract class SharedClusterIntegrationTestBase return sidecarServer; } + protected void waitForSchemaReady(long timeout, TimeUnit timeUnit) + { + assertThat(sidecarServerInjector) + .describedAs("Sidecar is started") + .isNotNull(); + + CountDownLatch latch = new CountDownLatch(1); + Vertx vertx = sidecarServerInjector.getInstance(Vertx.class); + vertx.eventBus() + .localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg -> latch.countDown()); + + assertThat(Uninterruptibles.awaitUninterruptibly(latch, timeout, timeUnit)) + .describedAs("Sidecar schema is not initialized after " + timeout + ' ' + timeUnit) + .isTrue(); + } + /** * Stops the Sidecar service * @@ -588,7 +608,9 @@ public abstract class SharedClusterIntegrationTestBase LOGGER.info("Not enabling mTLS for testing purposes. Set '{}' to 'true' if you would " + "like mTLS enabled.", CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS); } - S3ClientConfiguration s3ClientConfig = new S3ClientConfigurationImpl("s3-client", 4, 60L, buildTestS3ProxyConfig()); + S3ClientConfiguration s3ClientConfig = new S3ClientConfigurationImpl("s3-client", 4, 60L, + 5242880, DEFAULT_API_CALL_TIMEOUT_MILLIS, + buildTestS3ProxyConfig()); SidecarConfigurationImpl.Builder builder = SidecarConfigurationImpl.builder() .serviceConfiguration(conf) .sslConfiguration(sslConfiguration) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java index 125ddae..dd91674 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.analytics.testcontainer; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import com.google.common.collect.ImmutableMap; @@ -39,6 +40,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import static org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT_MILLIS; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; import static org.apache.cassandra.testing.TestUtils.DC1_RF3; import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; @@ -86,12 +88,21 @@ class BulkWriteS3CompatModeSimpleTest extends SharedClusterSparkIntegrationTestB protected Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> configurationOverrides() { return builder -> { - S3ClientConfiguration s3ClientConfig = new S3ClientConfigurationImpl("s3-client", 4, 60L, buildTestS3ProxyConfig()); + S3ClientConfiguration s3ClientConfig = new S3ClientConfigurationImpl("s3-client", 4, 60L, + 5242880, DEFAULT_API_CALL_TIMEOUT_MILLIS, + buildTestS3ProxyConfig()); builder.s3ClientConfiguration(s3ClientConfig); return builder; }; } + @Override + protected void beforeTestStart() + { + super.beforeTestStart(); + waitForSchemaReady(10, TimeUnit.SECONDS); + } + private S3ProxyConfiguration buildTestS3ProxyConfig() { return new S3MockProxyConfigurationImpl(s3Mock.getHttpEndpoint()); diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/LocalStorageTransportExtension.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/LocalStorageTransportExtension.java index 9ab92d9..33c0029 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/LocalStorageTransportExtension.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/LocalStorageTransportExtension.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.cassandra.spark.transports.storage.StorageCredentialPair; import org.apache.cassandra.spark.transports.storage.StorageCredentials; +import org.apache.cassandra.spark.transports.storage.extensions.CoordinationSignalListener; import org.apache.cassandra.spark.transports.storage.extensions.CredentialChangeListener; import org.apache.cassandra.spark.transports.storage.extensions.ObjectFailureListener; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration; @@ -99,4 +100,34 @@ public class LocalStorageTransportExtension implements StorageTransportExtension "readSecret", "readSessionToken")); } + + @Override + public void onStageSucceeded(String clusterId, long objectsCount, long rowsCount, long elapsedMillis) + { + + } + + @Override + public void onStageFailed(String clusterId, Throwable cause) + { + + } + + @Override + public void onApplySucceeded(String clusterId, long objectsCount, long rowsCount, long elapsedMillis) + { + + } + + @Override + public void onApplyFailed(String clusterId, Throwable cause) + { + + } + + @Override + public void setCoordinationSignalListener(CoordinationSignalListener listener) + { + + } } diff --git a/cassandra-analytics-spark-converter/build.gradle b/cassandra-analytics-spark-converter/build.gradle index f023efa..cf4ca38 100644 --- a/cassandra-analytics-spark-converter/build.gradle +++ b/cassandra-analytics-spark-converter/build.gradle @@ -19,6 +19,7 @@ plugins { id('java-library') + id('maven-publish') } configurations { @@ -26,6 +27,17 @@ configurations { all*.exclude(group: 'log4j', module: 'log4j') } +publishing { + publications { + maven(MavenPublication) { + from components.java + groupId project.group + artifactId "${archivesBaseName}" + version System.getenv("CODE_VERSION") ?: "${version}" + } + } +} + dependencies { compileOnly project(":cassandra-analytics-common") compileOnly(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") diff --git a/gradle.properties b/gradle.properties index aa7c785..5f6d4a1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -35,6 +35,6 @@ guavaVersion=16.0.1 # force version 4.5.1 of vertx to prevent issues initializing io.vertx.core.json.jackson.JacksonCodec, # which requires a newer version of jackson, which is not available in spark 2 vertxVersion=4.5.1 -aswSdkVersion=2.25.31 +aswSdkVersion=2.26.12 org.gradle.jvmargs=-Xmx4096m diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh index 8375dbc..1955041 100755 --- a/scripts/build-sidecar.sh +++ b/scripts/build-sidecar.sh @@ -24,7 +24,7 @@ else SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; ) SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}" SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}" - SIDECAR_COMMIT="${SIDECAR_COMMIT:-55a9efee30555d3645680c6524043a6c9bc1194b}" + SIDECAR_COMMIT="${SIDECAR_COMMIT:-f07e248d0ce8303a06daf93b462190ef7be7304d}" SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies" SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR} SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org