This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 458a363  CASSANDRA-19778: Split out BufferingInputStream stats into 
separate i… (#66)
458a363 is described below

commit 458a3630f882ae2b2a9cee272cf85ca7ff42f5cd
Author: jberragan <jberra...@gmail.com>
AuthorDate: Wed Jul 17 14:29:21 2024 -0700

    CASSANDRA-19778: Split out BufferingInputStream stats into separate i… (#66)
    
    Split BufferingInputStream stats into separate interface so class level 
generics are not required for the Stats interface
    
    Patch by James Berragan; Reviewed by Bernardo Botella, Francisco Guerrero, 
Yifan Cai for CASSANDRA-19778
---
 CHANGES.txt                                        |   1 +
 .../cassandra/spark/data/FileSystemSSTable.java    |   6 +-
 .../spark/stats/BufferingInputStreamStats.java     | 131 +++++++++++++++++++++
 .../org/apache/cassandra/spark/stats/IStats.java   |  59 ----------
 .../utils/streaming/BufferingInputStream.java      |   8 +-
 .../spark/bulkwriter/blobupload/SSTableLister.java |   2 +-
 .../cassandra/spark/data/LocalDataLayer.java       |   2 +-
 .../spark/data/SidecarProvisionedSSTable.java      |   2 +-
 .../org/apache/cassandra/spark/EndToEndTests.java  |  21 ++--
 .../spark/utils/BufferingInputStreamHttpTest.java  |   2 +-
 .../spark/utils/BufferingInputStreamTests.java     |  10 +-
 .../org/apache/cassandra/spark/stats/Stats.java    | 104 ++--------------
 .../cassandra/spark/reader/SSTableReaderTests.java |   4 +-
 13 files changed, 171 insertions(+), 181 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 01181ec..d9231b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Split out BufferingInputStream stats into separate interface 
(CASSANDRA-19778)
  * Bump Sidecar version to 55a9efee (CASSANDRA-19774)
  * Add new module cassandra-analytics-common to store common code with minimal 
dependencies (CASSANDRA-19748)
  * Bulk writer fails validation stage when writing to a cluster using 
RandomPartitioner (CASSANDRA-19727)
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
index ef41dc2..cffbc9a 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
@@ -30,7 +30,7 @@ import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.spark.stats.IStats;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
 import org.apache.cassandra.spark.utils.IOUtils;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.apache.cassandra.spark.utils.streaming.BufferingInputStream;
@@ -44,9 +44,9 @@ public class FileSystemSSTable extends SSTable
 
     private final transient Path dataFilePath;
     private final transient boolean useBufferingInputStream;
-    private final transient Supplier<IStats<SSTable>> stats;
+    private final transient Supplier<BufferingInputStreamStats<SSTable>> stats;
 
-    public FileSystemSSTable(@NotNull Path dataFilePath, boolean 
useBufferingInputStream, @NotNull Supplier<IStats<SSTable>> stats)
+    public FileSystemSSTable(@NotNull Path dataFilePath, boolean 
useBufferingInputStream, @NotNull Supplier<BufferingInputStreamStats<SSTable>> 
stats)
     {
         this.dataFilePath = dataFilePath;
         this.useBufferingInputStream = useBufferingInputStream;
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java
new file mode 100644
index 0000000..dbd5019
--- /dev/null
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.stats;
+
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.utils.streaming.CassandraFile;
+import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
+
+/**
+ * Stats for {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}.
+ * @param <T>
+ */
+public interface BufferingInputStreamStats<T extends CassandraFile>
+{
+    static <T extends CassandraFile> BufferingInputStreamStats<T> 
doNothingStats()
+    {
+        return new BufferingInputStreamStats<T>()
+        {
+        };
+    }
+
+    /**
+     * When {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} queue is full, 
usually indicating
+     * job is CPU-bound and blocked on the CompactionIterator
+     *
+     * @param ssTable the SSTable source for this input stream
+     */
+    default void inputStreamQueueFull(CassandraFileSource<? extends SSTable> 
ssTable)
+    {
+    }
+
+    /**
+     * Failure occurred in the {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
+     *
+     * @param ssTable   the SSTable source for this input stream
+     * @param throwable throwable
+     */
+    default void inputStreamFailure(CassandraFileSource<T> ssTable, Throwable 
throwable)
+    {
+    }
+
+    /**
+     * Time the {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} spent blocking 
on queue
+     * waiting for bytes. High time spent blocking indicates the job is 
network-bound, or blocked on the
+     * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} 
to supply the bytes.
+     *
+     * @param ssTable the SSTable source for this input stream
+     * @param nanos   time in nanoseconds
+     */
+    default void inputStreamTimeBlocked(CassandraFileSource<T> ssTable, long 
nanos)
+    {
+    }
+
+    /**
+     * Bytes written to {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
+     * by the {@link 
org.apache.cassandra.spark.utils.streaming.CassandraFileSource}
+     *
+     * @param ssTable the SSTable source for this input stream
+     * @param length  number of bytes written
+     */
+    default void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int 
length)
+    {
+    }
+
+    /**
+     * Bytes read from {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
+     *
+     * @param ssTable         the SSTable source for this input stream
+     * @param length          number of bytes read
+     * @param queueSize       current queue size
+     * @param percentComplete % completion
+     */
+    default void inputStreamByteRead(CassandraFileSource<T> ssTable,
+                                     int length,
+                                     int queueSize,
+                                     int percentComplete)
+    {
+    }
+
+    /**
+     * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} 
has finished writing
+     * to {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} after reaching 
expected file length
+     *
+     * @param ssTable the SSTable source for this input stream
+     */
+    default void inputStreamEndBuffer(CassandraFileSource<T> ssTable)
+    {
+    }
+
+    /**
+     * {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} 
finished and closed
+     *
+     * @param ssTable           the SSTable source for this input stream
+     * @param runTimeNanos      total time open in nanoseconds
+     * @param totalNanosBlocked total time blocked on queue waiting for bytes 
in nanoseconds
+     */
+    default void inputStreamEnd(CassandraFileSource<T> ssTable, long 
runTimeNanos, long totalNanosBlocked)
+    {
+    }
+
+    /**
+     * Called when the InputStream skips bytes
+     *
+     * @param ssTable         the SSTable source for this input stream
+     * @param bufferedSkipped the number of bytes already buffered in memory 
skipped
+     * @param rangeSkipped    the number of bytes skipped
+     *                        by efficiently incrementing the start range for 
the next request
+     */
+    default void inputStreamBytesSkipped(CassandraFileSource<T> ssTable,
+                                         long bufferedSkipped,
+                                         long rangeSkipped)
+    {
+    }
+}
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java
deleted file mode 100644
index 2d783f5..0000000
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.spark.stats;
-
-import org.apache.cassandra.spark.utils.streaming.CassandraFile;
-import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
-
-/**
- * Generic Stats interface that works across all CassandraFile FileTypes.
- *
- * @param <T>
- */
-public interface IStats<T extends CassandraFile>
-{
-    default void inputStreamEnd(CassandraFileSource<T> source, long 
runTimeNanos, long totalNanosBlocked)
-    {
-    }
-
-    default void inputStreamEndBuffer(CassandraFileSource<T> ssTable)
-    {
-    }
-
-    default void inputStreamTimeBlocked(CassandraFileSource<T> source, long 
nanos)
-    {
-    }
-
-    default void inputStreamByteRead(CassandraFileSource<T> source, int len, 
int queueSize, int percentComplete)
-    {
-    }
-
-    default void inputStreamFailure(CassandraFileSource<T> source, Throwable t)
-    {
-    }
-
-    default void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int 
len)
-    {
-    }
-
-    default void inputStreamBytesSkipped(CassandraFileSource<T> source, long 
bufferedSkipped, long rangeSkipped)
-    {
-    }
-}
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java
index 20f54c3..2e5f075 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.cassandra.spark.stats.IStats;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.jetbrains.annotations.NotNull;
 
@@ -68,7 +68,7 @@ public class BufferingInputStream<T extends CassandraFile> 
extends InputStream i
 
     private final BlockingQueue<StreamBuffer> queue;
     private final CassandraFileSource<T> source;
-    private final IStats<T> stats;
+    private final BufferingInputStreamStats<T> stats;
     private final long startTimeNanos;
 
     // Variables accessed by both producer, consumer & timeout thread so must 
be volatile or atomic
@@ -90,9 +90,9 @@ public class BufferingInputStream<T extends CassandraFile> 
extends InputStream i
     /**
      * @param source CassandraFileSource to async provide the bytes after 
{@link CassandraFileSource#request(long, long, StreamConsumer)} is called
      *
-     * @param stats {@link IStats} implementation for recording instrumentation
+     * @param stats {@link BufferingInputStreamStats} implementation for 
recording instrumentation
      */
-    public BufferingInputStream(CassandraFileSource<T> source, IStats<T> stats)
+    public BufferingInputStream(CassandraFileSource<T> source, 
BufferingInputStreamStats<T> stats)
     {
         this.source = source;
         this.queue = new LinkedBlockingQueue<>();
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java
index 8b7a227..2f3b546 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java
@@ -193,6 +193,6 @@ public class SSTableLister implements SSTableCollector
         {
             throw new IllegalArgumentException("SSTable should have only one 
data component");
         }
-        return new FileSystemSSTable(dataComponents.get(0), true, () -> 
Stats.DoNothingStats.INSTANCE);
+        return new FileSystemSSTable(dataComponents.get(0), true, 
Stats.DoNothingStats.INSTANCE::bufferingInputStreamStats);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
index e12c004..b0e6161 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
@@ -324,7 +324,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
                 .map(Paths::get)
                 .flatMap(Throwing.function(Files::list))
                 .filter(path -> path.getFileName().toString().endsWith("-" + 
FileType.DATA.getFileSuffix()))
-                .map(path -> new FileSystemSSTable(path, 
useBufferingInputStream, this::stats))
+                .map(path -> new FileSystemSSTable(path, 
useBufferingInputStream, () -> this.stats.bufferingInputStreamStats()))
                 .collect(Collectors.toSet()));
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index f1b249c..418604f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -172,7 +172,7 @@ public class SidecarProvisionedSSTable extends SSTable
     public InputStream open(ListSnapshotFilesResponse.FileInfo fileInfo, 
FileType fileType)
     {
         CassandraFileSource<SidecarProvisionedSSTable> ssTableSource = 
source(fileInfo, fileType);
-        return new BufferingInputStream<>(ssTableSource, stats);
+        return new BufferingInputStream<>(ssTableSource, 
stats.bufferingInputStreamStats());
     }
 
     /**
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
index c0e01e2..acf439e 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
 import org.apache.cassandra.spark.stats.Stats;
 import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
@@ -1948,7 +1949,7 @@ public class EndToEndTests
     }
 
     @SuppressWarnings("unused")  // Actually used via reflection in 
testLargeBlobExclude()
-    public static final Stats<SSTable> STATS = new Stats<SSTable>()
+    public static final Stats STATS = new Stats()
     {
         @Override
         public void skippedBytes(long length)
@@ -1956,13 +1957,19 @@ public class EndToEndTests
             skippedRawBytes.addAndGet(length);
         }
 
-        @Override
-        public void inputStreamBytesSkipped(CassandraFileSource<SSTable> 
ssTable,
-                                            long bufferedSkipped,
-                                            long rangeSkipped)
+        public BufferingInputStreamStats<SSTable> bufferingInputStreamStats()
         {
-            skippedInputStreamBytes.addAndGet(bufferedSkipped);
-            skippedRangeBytes.addAndGet(rangeSkipped);
+            return new BufferingInputStreamStats<SSTable>()
+            {
+                @Override
+                public void 
inputStreamBytesSkipped(CassandraFileSource<SSTable> ssTable,
+                                                    long bufferedSkipped,
+                                                    long rangeSkipped)
+                {
+                    skippedInputStreamBytes.addAndGet(bufferedSkipped);
+                    skippedRangeBytes.addAndGet(rangeSkipped);
+                }
+            };
         }
     };
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java
index c40a808..4c6505b 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java
@@ -290,7 +290,7 @@ public class BufferingInputStreamHttpTest
                                                                   
Files.size(path),
                                                                   
maxBufferSize,
                                                                   
chunkBufferSize);
-            try (BufferingInputStream<SSTable> is = new 
BufferingInputStream<>(source, BufferingInputStreamTests.STATS))
+            try (BufferingInputStream<SSTable> is = new 
BufferingInputStream<>(source, 
BufferingInputStreamTests.STATS.bufferingInputStreamStats()))
             {
                 actualMD5 = DigestUtils.md5(is);
                 blockingTimeMillis = 
TimeUnit.MILLISECONDS.convert(is.timeBlockedNanos(), TimeUnit.NANOSECONDS);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java
index 6cdefda..2652e3f 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java
@@ -140,7 +140,7 @@ public class BufferingInputStreamTests
             requestCount.incrementAndGet();
             writeBuffers(consumer, randomBuffers(chunksPerRequest));
         }, null);
-        BufferingInputStream<SSTable> is = new 
BufferingInputStream<>(mockedClient, STATS);
+        BufferingInputStream<SSTable> is = new 
BufferingInputStream<>(mockedClient, STATS.bufferingInputStreamStats());
         readStreamFully(is);
         assertEquals(numRequests, requestCount.get());
         assertEquals(0L, is.bytesBuffered());
@@ -170,7 +170,7 @@ public class BufferingInputStreamTests
             }
         }, null);
         assertThrows(IOException.class,
-                     () -> readStreamFully(new BufferingInputStream<>(source, 
STATS))
+                     () -> readStreamFully(new BufferingInputStream<>(source, 
STATS.bufferingInputStreamStats()))
         );
     }
 
@@ -220,7 +220,7 @@ public class BufferingInputStreamTests
                 });
             }
         }, timeout);
-        BufferingInputStream<SSTable> inputStream = new 
BufferingInputStream<>(source, STATS);
+        BufferingInputStream<SSTable> inputStream = new 
BufferingInputStream<>(source, STATS.bufferingInputStreamStats());
         try
         {
             readStreamFully(inputStream);
@@ -292,7 +292,7 @@ public class BufferingInputStreamTests
 
         int bytesToRead = chunkSize * numChunks;
         long skipAhead = size - bytesToRead + 1;
-        try (BufferingInputStream<SSTable> stream = new 
BufferingInputStream<>(source, STATS))
+        try (BufferingInputStream<SSTable> stream = new 
BufferingInputStream<>(source, STATS.bufferingInputStreamStats()))
         {
             // Skip ahead so we only read the final chunks
             ByteBufferUtils.skipFully(stream, skipAhead);
@@ -341,7 +341,7 @@ public class BufferingInputStreamTests
             }
         };
 
-        try (BufferingInputStream<SSTable> stream = new 
BufferingInputStream<>(source, STATS))
+        try (BufferingInputStream<SSTable> stream = new 
BufferingInputStream<>(source, STATS.bufferingInputStreamStats()))
         {
             ByteBufferUtils.skipFully(stream, 20971520);
             readStreamFully(stream);
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
index e280a74..eca2fcb 100644
--- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
+++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
@@ -30,11 +30,11 @@ import org.apache.cassandra.spark.data.SSTablesSupplier;
 import org.apache.cassandra.spark.reader.IndexEntry;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
-import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
+import org.apache.cassandra.spark.utils.streaming.CassandraFile;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-public abstract class Stats<T extends SSTable> implements IStats<T>
+public abstract class Stats
 {
 
     public static class DoNothingStats extends Stats
@@ -42,6 +42,11 @@ public abstract class Stats<T extends SSTable> implements 
IStats<T>
         public static final DoNothingStats INSTANCE = new DoNothingStats();
     }
 
+    public <T extends CassandraFile> BufferingInputStreamStats<T> 
bufferingInputStreamStats()
+    {
+        return BufferingInputStreamStats.doNothingStats();
+    }
+
     // Spark Row Iterator
 
     /**
@@ -357,101 +362,6 @@ public abstract class Stats<T extends SSTable> implements 
IStats<T>
     {
     }
 
-    // SSTable Input Stream
-
-    /**
-     * When {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} queue is full, 
usually indicating
-     * job is CPU-bound and blocked on the CompactionIterator
-     *
-     * @param ssTable the SSTable source for this input stream
-     */
-    public void inputStreamQueueFull(CassandraFileSource<? extends SSTable> 
ssTable)
-    {
-    }
-
-    /**
-     * Failure occurred in the {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
-     *
-     * @param ssTable   the SSTable source for this input stream
-     * @param throwable throwable
-     */
-    public void inputStreamFailure(CassandraFileSource<T> ssTable, Throwable 
throwable)
-    {
-    }
-
-    /**
-     * Time the {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} spent blocking 
on queue
-     * waiting for bytes. High time spent blocking indicates the job is 
network-bound, or blocked on the
-     * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} 
to supply the bytes.
-     *
-     * @param ssTable the SSTable source for this input stream
-     * @param nanos   time in nanoseconds
-     */
-    public void inputStreamTimeBlocked(CassandraFileSource<T> ssTable, long 
nanos)
-    {
-    }
-
-    /**
-     * Bytes written to {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
-     * by the {@link 
org.apache.cassandra.spark.utils.streaming.CassandraFileSource}
-     *
-     * @param ssTable the SSTable source for this input stream
-     * @param length  number of bytes written
-     */
-    public void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int 
length)
-    {
-    }
-
-    /**
-     * Bytes read from {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
-     *
-     * @param ssTable         the SSTable source for this input stream
-     * @param length          number of bytes read
-     * @param queueSize       current queue size
-     * @param percentComplete % completion
-     */
-    public void inputStreamByteRead(CassandraFileSource<T> ssTable,
-                                    int length,
-                                    int queueSize,
-                                    int percentComplete)
-    {
-    }
-
-    /**
-     * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} 
has finished writing
-     * to {@link 
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} after reaching 
expected file length
-     *
-     * @param ssTable the SSTable source for this input stream
-     */
-    public void inputStreamEndBuffer(CassandraFileSource<T> ssTable)
-    {
-    }
-
-    /**
-     * {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} 
finished and closed
-     *
-     * @param ssTable           the SSTable source for this input stream
-     * @param runTimeNanos      total time open in nanoseconds
-     * @param totalNanosBlocked total time blocked on queue waiting for bytes 
in nanoseconds
-     */
-    public void inputStreamEnd(CassandraFileSource<T> ssTable, long 
runTimeNanos, long totalNanosBlocked)
-    {
-    }
-
-    /**
-     * Called when the InputStream skips bytes
-     *
-     * @param ssTable         the SSTable source for this input stream
-     * @param bufferedSkipped the number of bytes already buffered in memory 
skipped
-     * @param rangeSkipped    the number of bytes skipped
-     *                        by efficiently incrementing the start range for 
the next request
-     */
-    public void inputStreamBytesSkipped(CassandraFileSource<T> ssTable,
-                                        long bufferedSkipped,
-                                        long rangeSkipped)
-    {
-    }
-
     // PartitionSizeIterator stats
 
     /**
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
index 04cbc24..d10ccbc 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
@@ -592,7 +592,7 @@ public class SSTableReaderTests
 
                     AtomicBoolean pass = new AtomicBoolean(true);
                     AtomicInteger skipCount = new AtomicInteger(0);
-                    Stats<SSTable> stats = new Stats<SSTable>()
+                    Stats stats = new Stats()
                     {
                         @Override
                         public void skippedSSTable(@Nullable SparkRangeFilter 
sparkRangeFilter,
@@ -652,7 +652,7 @@ public class SSTableReaderTests
 
                     AtomicBoolean pass = new AtomicBoolean(true);
                     AtomicInteger skipCount = new AtomicInteger(0);
-                    Stats<SSTable> stats = new Stats<SSTable>()
+                    Stats stats = new Stats()
                     {
                         @Override
                         public void skippedSSTable(@Nullable SparkRangeFilter 
sparkRangeFilter,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to