This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86420f9  CASSANDRA-19582: Consume new Sidecar client API to stream 
SSTables (#54)
86420f9 is described below

commit 86420f9d52991fb148b322031df55494669532d3
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Tue Apr 23 12:51:53 2024 -0700

    CASSANDRA-19582: Consume new Sidecar client API to stream SSTables (#54)
    
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19582
---
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  7 +++--
 .../spark/data/SidecarProvisionedSSTable.java      | 32 +++++++++-------------
 .../spark/data/SidecarProvisionedSSTableTest.java  |  1 +
 scripts/build-sidecar.sh                           |  2 +-
 4 files changed, 20 insertions(+), 22 deletions(-)

diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 2db19d5..8bc9a7f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -29,6 +29,7 @@ import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -155,7 +156,7 @@ public class BulkSparkConf implements Serializable
         Optional<Integer> sidecarPortFromOptions = 
MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar 
port");
         this.userProvidedSidecarPort = sidecarPortFromOptions.isPresent() ? 
sidecarPortFromOptions.get() : getOptionalInt(SIDECAR_PORT).orElse(-1);
         this.effectiveSidecarPort = this.userProvidedSidecarPort == -1 ? 
DEFAULT_SIDECAR_PORT : this.userProvidedSidecarPort;
-        this.sidecarInstancesValue = MapUtils.getOrThrow(options, 
WriterOptions.SIDECAR_INSTANCES.name(), "sidecar_instances");
+        this.sidecarInstancesValue = MapUtils.getOrDefault(options, 
WriterOptions.SIDECAR_INSTANCES.name(), null);
         this.sidecarInstances = sidecarInstances();
         this.keyspace = MapUtils.getOrThrow(options, 
WriterOptions.KEYSPACE.name());
         this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
@@ -264,7 +265,9 @@ public class BulkSparkConf implements Serializable
 
     protected Set<? extends SidecarInstance> buildSidecarInstances()
     {
-        return Arrays.stream(sidecarInstancesValue.split(","))
+        String[] split = Objects.requireNonNull(sidecarInstancesValue, "Unable 
to build sidecar instances from null value")
+                                .split(",");
+        return Arrays.stream(split)
                      .map(hostname -> new SidecarInstanceImpl(hostname, 
effectiveSidecarPort))
                      .collect(Collectors.toSet());
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index 6e4ff0f..db9e2fd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -124,7 +124,7 @@ public class SidecarProvisionedSSTable extends SSTable
         {
             return null;
         }
-        return openStream(snapshotFile.fileName, snapshotFile.size, fileType);
+        return openStream(snapshotFile, fileType);
     }
 
     public long length(FileType fileType)
@@ -144,20 +144,20 @@ public class SidecarProvisionedSSTable extends SSTable
     }
 
     @Nullable
-    private InputStream openStream(String component, long size, FileType 
fileType)
+    private InputStream openStream(ListSnapshotFilesResponse.FileInfo 
snapshotFile, FileType fileType)
     {
-        if (component == null)
+        if (snapshotFile == null)
         {
             return null;
         }
 
         if (fileType == FileType.COMPRESSION_INFO)
         {
-            String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), 
keyspace, table, snapshotName, component);
+            String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), 
keyspace, table, snapshotName, snapshotFile.fileName);
             byte[] bytes;
             try
             {
-                bytes = COMPRESSION_CACHE.get(key, () -> 
IOUtils.toByteArray(open(component, fileType, size)));
+                bytes = COMPRESSION_CACHE.get(key, () -> 
IOUtils.toByteArray(open(snapshotFile, fileType)));
             }
             catch (ExecutionException exception)
             {
@@ -166,24 +166,23 @@ public class SidecarProvisionedSSTable extends SSTable
             return new ByteArrayInputStream(bytes);
         }
 
-        return open(component, fileType, size);
+        return open(snapshotFile, fileType);
     }
 
-    public InputStream open(String component, FileType fileType, long size)
+    public InputStream open(ListSnapshotFilesResponse.FileInfo fileInfo, 
FileType fileType)
     {
-        SSTableSource<SidecarProvisionedSSTable> ssTableSource = 
source(component, fileType, size);
+        SSTableSource<SidecarProvisionedSSTable> ssTableSource = 
source(fileInfo, fileType);
         return new SSTableInputStream<>(ssTableSource, stats);
     }
 
     /**
      * Build an SSTableSource to async provide the bytes
      *
-     * @param componentName the SSTable component to stream
-     * @param fileType      SSTable file type
-     * @param size          file size in bytes
+     * @param fileInfo contains information about the file to stream
+     * @param fileType SSTable file type
      * @return an SSTableSource implementation that uses Sidecar client to 
request bytes
      */
-    private SSTableSource<SidecarProvisionedSSTable> source(String 
componentName, FileType fileType, long size)
+    private SSTableSource<SidecarProvisionedSSTable> 
source(ListSnapshotFilesResponse.FileInfo fileInfo, FileType fileType)
     {
         SidecarProvisionedSSTable thisSSTable = this;
         return new SSTableSource<SidecarProvisionedSSTable>()
@@ -191,12 +190,7 @@ public class SidecarProvisionedSSTable extends SSTable
             @Override
             public void request(long start, long end, StreamConsumer consumer)
             {
-                sidecar.streamSSTableComponent(instance,
-                                               keyspace,
-                                               table,
-                                               snapshotName,
-                                               componentName,
-                                               HttpRange.of(start, end),
+                sidecar.streamSSTableComponent(instance, fileInfo, 
HttpRange.of(start, end),
                                                new 
SidecarStreamConsumerAdapter(consumer));
             }
 
@@ -235,7 +229,7 @@ public class SidecarProvisionedSSTable extends SSTable
             @Override
             public long size()
             {
-                return size;
+                return fileInfo.size;
             }
         };
     }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index 0ce0e4f..c24c5e8 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -156,6 +156,7 @@ class SidecarProvisionedSSTableTest
                                                                                
              snapshot,
                                                                                
              keyspace,
                                                                                
              table,
+                                                                               
              "abc1234",
                                                                                
              dataFileName);
         return new SidecarProvisionedSSTable(mockSidecarClient,
                                              sidecarClientConfig,
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index 09a59ec..a639701 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@ else
   SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
   
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}";
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-fd6f7ac5f9f19dbbeeb9e7f80ca1fcbf60d5a4c6}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-4a6b8c9cfe0c6286d12c7d561941a24c25a206ef}"
   SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
   SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
   SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to