This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 86420f9 CASSANDRA-19582: Consume new Sidecar client API to stream SSTables (#54) 86420f9 is described below commit 86420f9d52991fb148b322031df55494669532d3 Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Tue Apr 23 12:51:53 2024 -0700 CASSANDRA-19582: Consume new Sidecar client API to stream SSTables (#54) Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19582 --- .../cassandra/spark/bulkwriter/BulkSparkConf.java | 7 +++-- .../spark/data/SidecarProvisionedSSTable.java | 32 +++++++++------------- .../spark/data/SidecarProvisionedSSTableTest.java | 1 + scripts/build-sidecar.sh | 2 +- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index 2db19d5..8bc9a7f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -29,6 +29,7 @@ import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -155,7 +156,7 @@ public class BulkSparkConf implements Serializable Optional<Integer> sidecarPortFromOptions = MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar port"); this.userProvidedSidecarPort = sidecarPortFromOptions.isPresent() ? sidecarPortFromOptions.get() : getOptionalInt(SIDECAR_PORT).orElse(-1); this.effectiveSidecarPort = this.userProvidedSidecarPort == -1 ? DEFAULT_SIDECAR_PORT : this.userProvidedSidecarPort; - this.sidecarInstancesValue = MapUtils.getOrThrow(options, WriterOptions.SIDECAR_INSTANCES.name(), "sidecar_instances"); + this.sidecarInstancesValue = MapUtils.getOrDefault(options, WriterOptions.SIDECAR_INSTANCES.name(), null); this.sidecarInstances = sidecarInstances(); this.keyspace = MapUtils.getOrThrow(options, WriterOptions.KEYSPACE.name()); this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name()); @@ -264,7 +265,9 @@ public class BulkSparkConf implements Serializable protected Set<? extends SidecarInstance> buildSidecarInstances() { - return Arrays.stream(sidecarInstancesValue.split(",")) + String[] split = Objects.requireNonNull(sidecarInstancesValue, "Unable to build sidecar instances from null value") + .split(","); + return Arrays.stream(split) .map(hostname -> new SidecarInstanceImpl(hostname, effectiveSidecarPort)) .collect(Collectors.toSet()); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java index 6e4ff0f..db9e2fd 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java @@ -124,7 +124,7 @@ public class SidecarProvisionedSSTable extends SSTable { return null; } - return openStream(snapshotFile.fileName, snapshotFile.size, fileType); + return openStream(snapshotFile, fileType); } public long length(FileType fileType) @@ -144,20 +144,20 @@ public class SidecarProvisionedSSTable extends SSTable } @Nullable - private InputStream openStream(String component, long size, FileType fileType) + private InputStream openStream(ListSnapshotFilesResponse.FileInfo snapshotFile, FileType fileType) { - if (component == null) + if (snapshotFile == null) { return null; } if (fileType == FileType.COMPRESSION_INFO) { - String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, component); + String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, snapshotFile.fileName); byte[] bytes; try { - bytes = COMPRESSION_CACHE.get(key, () -> IOUtils.toByteArray(open(component, fileType, size))); + bytes = COMPRESSION_CACHE.get(key, () -> IOUtils.toByteArray(open(snapshotFile, fileType))); } catch (ExecutionException exception) { @@ -166,24 +166,23 @@ public class SidecarProvisionedSSTable extends SSTable return new ByteArrayInputStream(bytes); } - return open(component, fileType, size); + return open(snapshotFile, fileType); } - public InputStream open(String component, FileType fileType, long size) + public InputStream open(ListSnapshotFilesResponse.FileInfo fileInfo, FileType fileType) { - SSTableSource<SidecarProvisionedSSTable> ssTableSource = source(component, fileType, size); + SSTableSource<SidecarProvisionedSSTable> ssTableSource = source(fileInfo, fileType); return new SSTableInputStream<>(ssTableSource, stats); } /** * Build an SSTableSource to async provide the bytes * - * @param componentName the SSTable component to stream - * @param fileType SSTable file type - * @param size file size in bytes + * @param fileInfo contains information about the file to stream + * @param fileType SSTable file type * @return an SSTableSource implementation that uses Sidecar client to request bytes */ - private SSTableSource<SidecarProvisionedSSTable> source(String componentName, FileType fileType, long size) + private SSTableSource<SidecarProvisionedSSTable> source(ListSnapshotFilesResponse.FileInfo fileInfo, FileType fileType) { SidecarProvisionedSSTable thisSSTable = this; return new SSTableSource<SidecarProvisionedSSTable>() @@ -191,12 +190,7 @@ public class SidecarProvisionedSSTable extends SSTable @Override public void request(long start, long end, StreamConsumer consumer) { - sidecar.streamSSTableComponent(instance, - keyspace, - table, - snapshotName, - componentName, - HttpRange.of(start, end), + sidecar.streamSSTableComponent(instance, fileInfo, HttpRange.of(start, end), new SidecarStreamConsumerAdapter(consumer)); } @@ -235,7 +229,7 @@ public class SidecarProvisionedSSTable extends SSTable @Override public long size() { - return size; + return fileInfo.size; } }; } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java index 0ce0e4f..c24c5e8 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java @@ -156,6 +156,7 @@ class SidecarProvisionedSSTableTest snapshot, keyspace, table, + "abc1234", dataFileName); return new SidecarProvisionedSSTable(mockSidecarClient, sidecarClientConfig, diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh index 09a59ec..a639701 100755 --- a/scripts/build-sidecar.sh +++ b/scripts/build-sidecar.sh @@ -24,7 +24,7 @@ else SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; ) SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}" SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}" - SIDECAR_COMMIT="${SIDECAR_COMMIT:-fd6f7ac5f9f19dbbeeb9e7f80ca1fcbf60d5a4c6}" + SIDECAR_COMMIT="${SIDECAR_COMMIT:-4a6b8c9cfe0c6286d12c7d561941a24c25a206ef}" SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies" SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR} SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org