This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 458a363 CASSANDRA-19778: Split out BufferingInputStream stats into separate i… (#66) 458a363 is described below commit 458a3630f882ae2b2a9cee272cf85ca7ff42f5cd Author: jberragan <jberra...@gmail.com> AuthorDate: Wed Jul 17 14:29:21 2024 -0700 CASSANDRA-19778: Split out BufferingInputStream stats into separate i… (#66) Split BufferingInputStream stats into separate interface so class level generics are not required for the Stats interface Patch by James Berragan; Reviewed by Bernardo Botella, Francisco Guerrero, Yifan Cai for CASSANDRA-19778 --- CHANGES.txt | 1 + .../cassandra/spark/data/FileSystemSSTable.java | 6 +- .../spark/stats/BufferingInputStreamStats.java | 131 +++++++++++++++++++++ .../org/apache/cassandra/spark/stats/IStats.java | 59 ---------- .../utils/streaming/BufferingInputStream.java | 8 +- .../spark/bulkwriter/blobupload/SSTableLister.java | 2 +- .../cassandra/spark/data/LocalDataLayer.java | 2 +- .../spark/data/SidecarProvisionedSSTable.java | 2 +- .../org/apache/cassandra/spark/EndToEndTests.java | 21 ++-- .../spark/utils/BufferingInputStreamHttpTest.java | 2 +- .../spark/utils/BufferingInputStreamTests.java | 10 +- .../org/apache/cassandra/spark/stats/Stats.java | 104 ++-------------- .../cassandra/spark/reader/SSTableReaderTests.java | 4 +- 13 files changed, 171 insertions(+), 181 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 01181ec..d9231b6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Split out BufferingInputStream stats into separate interface (CASSANDRA-19778) * Bump Sidecar version to 55a9efee (CASSANDRA-19774) * Add new module cassandra-analytics-common to store common code with minimal dependencies (CASSANDRA-19748) * Bulk writer fails validation stage when writing to a cluster using RandomPartitioner (CASSANDRA-19727) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java index ef41dc2..cffbc9a 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java @@ -30,7 +30,7 @@ import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.spark.stats.IStats; +import org.apache.cassandra.spark.stats.BufferingInputStreamStats; import org.apache.cassandra.spark.utils.IOUtils; import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.spark.utils.streaming.BufferingInputStream; @@ -44,9 +44,9 @@ public class FileSystemSSTable extends SSTable private final transient Path dataFilePath; private final transient boolean useBufferingInputStream; - private final transient Supplier<IStats<SSTable>> stats; + private final transient Supplier<BufferingInputStreamStats<SSTable>> stats; - public FileSystemSSTable(@NotNull Path dataFilePath, boolean useBufferingInputStream, @NotNull Supplier<IStats<SSTable>> stats) + public FileSystemSSTable(@NotNull Path dataFilePath, boolean useBufferingInputStream, @NotNull Supplier<BufferingInputStreamStats<SSTable>> stats) { this.dataFilePath = dataFilePath; this.useBufferingInputStream = useBufferingInputStream; diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java new file mode 100644 index 0000000..dbd5019 --- /dev/null +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.stats; + +import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.utils.streaming.CassandraFile; +import org.apache.cassandra.spark.utils.streaming.CassandraFileSource; + +/** + * Stats for {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream}. + * @param <T> + */ +public interface BufferingInputStreamStats<T extends CassandraFile> +{ + static <T extends CassandraFile> BufferingInputStreamStats<T> doNothingStats() + { + return new BufferingInputStreamStats<T>() + { + }; + } + + /** + * When {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} queue is full, usually indicating + * job is CPU-bound and blocked on the CompactionIterator + * + * @param ssTable the SSTable source for this input stream + */ + default void inputStreamQueueFull(CassandraFileSource<? extends SSTable> ssTable) + { + } + + /** + * Failure occurred in the {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} + * + * @param ssTable the SSTable source for this input stream + * @param throwable throwable + */ + default void inputStreamFailure(CassandraFileSource<T> ssTable, Throwable throwable) + { + } + + /** + * Time the {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} spent blocking on queue + * waiting for bytes. High time spent blocking indicates the job is network-bound, or blocked on the + * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} to supply the bytes. + * + * @param ssTable the SSTable source for this input stream + * @param nanos time in nanoseconds + */ + default void inputStreamTimeBlocked(CassandraFileSource<T> ssTable, long nanos) + { + } + + /** + * Bytes written to {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} + * by the {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} + * + * @param ssTable the SSTable source for this input stream + * @param length number of bytes written + */ + default void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int length) + { + } + + /** + * Bytes read from {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} + * + * @param ssTable the SSTable source for this input stream + * @param length number of bytes read + * @param queueSize current queue size + * @param percentComplete % completion + */ + default void inputStreamByteRead(CassandraFileSource<T> ssTable, + int length, + int queueSize, + int percentComplete) + { + } + + /** + * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} has finished writing + * to {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} after reaching expected file length + * + * @param ssTable the SSTable source for this input stream + */ + default void inputStreamEndBuffer(CassandraFileSource<T> ssTable) + { + } + + /** + * {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} finished and closed + * + * @param ssTable the SSTable source for this input stream + * @param runTimeNanos total time open in nanoseconds + * @param totalNanosBlocked total time blocked on queue waiting for bytes in nanoseconds + */ + default void inputStreamEnd(CassandraFileSource<T> ssTable, long runTimeNanos, long totalNanosBlocked) + { + } + + /** + * Called when the InputStream skips bytes + * + * @param ssTable the SSTable source for this input stream + * @param bufferedSkipped the number of bytes already buffered in memory skipped + * @param rangeSkipped the number of bytes skipped + * by efficiently incrementing the start range for the next request + */ + default void inputStreamBytesSkipped(CassandraFileSource<T> ssTable, + long bufferedSkipped, + long rangeSkipped) + { + } +} diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java deleted file mode 100644 index 2d783f5..0000000 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.spark.stats; - -import org.apache.cassandra.spark.utils.streaming.CassandraFile; -import org.apache.cassandra.spark.utils.streaming.CassandraFileSource; - -/** - * Generic Stats interface that works across all CassandraFile FileTypes. - * - * @param <T> - */ -public interface IStats<T extends CassandraFile> -{ - default void inputStreamEnd(CassandraFileSource<T> source, long runTimeNanos, long totalNanosBlocked) - { - } - - default void inputStreamEndBuffer(CassandraFileSource<T> ssTable) - { - } - - default void inputStreamTimeBlocked(CassandraFileSource<T> source, long nanos) - { - } - - default void inputStreamByteRead(CassandraFileSource<T> source, int len, int queueSize, int percentComplete) - { - } - - default void inputStreamFailure(CassandraFileSource<T> source, Throwable t) - { - } - - default void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int len) - { - } - - default void inputStreamBytesSkipped(CassandraFileSource<T> source, long bufferedSkipped, long rangeSkipped) - { - } -} diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java index 20f54c3..2e5f075 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java @@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import org.apache.cassandra.spark.stats.IStats; +import org.apache.cassandra.spark.stats.BufferingInputStreamStats; import org.apache.cassandra.spark.utils.ThrowableUtils; import org.jetbrains.annotations.NotNull; @@ -68,7 +68,7 @@ public class BufferingInputStream<T extends CassandraFile> extends InputStream i private final BlockingQueue<StreamBuffer> queue; private final CassandraFileSource<T> source; - private final IStats<T> stats; + private final BufferingInputStreamStats<T> stats; private final long startTimeNanos; // Variables accessed by both producer, consumer & timeout thread so must be volatile or atomic @@ -90,9 +90,9 @@ public class BufferingInputStream<T extends CassandraFile> extends InputStream i /** * @param source CassandraFileSource to async provide the bytes after {@link CassandraFileSource#request(long, long, StreamConsumer)} is called * - * @param stats {@link IStats} implementation for recording instrumentation + * @param stats {@link BufferingInputStreamStats} implementation for recording instrumentation */ - public BufferingInputStream(CassandraFileSource<T> source, IStats<T> stats) + public BufferingInputStream(CassandraFileSource<T> source, BufferingInputStreamStats<T> stats) { this.source = source; this.queue = new LinkedBlockingQueue<>(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java index 8b7a227..2f3b546 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java @@ -193,6 +193,6 @@ public class SSTableLister implements SSTableCollector { throw new IllegalArgumentException("SSTable should have only one data component"); } - return new FileSystemSSTable(dataComponents.get(0), true, () -> Stats.DoNothingStats.INSTANCE); + return new FileSystemSSTable(dataComponents.get(0), true, Stats.DoNothingStats.INSTANCE::bufferingInputStreamStats); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java index e12c004..b0e6161 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java @@ -324,7 +324,7 @@ public class LocalDataLayer extends DataLayer implements Serializable .map(Paths::get) .flatMap(Throwing.function(Files::list)) .filter(path -> path.getFileName().toString().endsWith("-" + FileType.DATA.getFileSuffix())) - .map(path -> new FileSystemSSTable(path, useBufferingInputStream, this::stats)) + .map(path -> new FileSystemSSTable(path, useBufferingInputStream, () -> this.stats.bufferingInputStreamStats())) .collect(Collectors.toSet())); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java index f1b249c..418604f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java @@ -172,7 +172,7 @@ public class SidecarProvisionedSSTable extends SSTable public InputStream open(ListSnapshotFilesResponse.FileInfo fileInfo, FileType fileType) { CassandraFileSource<SidecarProvisionedSSTable> ssTableSource = source(fileInfo, fileType); - return new BufferingInputStream<>(ssTableSource, stats); + return new BufferingInputStream<>(ssTableSource, stats.bufferingInputStreamStats()); } /** diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java index c0e01e2..acf439e 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java @@ -49,6 +49,7 @@ import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.stats.BufferingInputStreamStats; import org.apache.cassandra.spark.stats.Stats; import org.apache.cassandra.spark.utils.RandomUtils; import org.apache.cassandra.spark.utils.streaming.CassandraFileSource; @@ -1948,7 +1949,7 @@ public class EndToEndTests } @SuppressWarnings("unused") // Actually used via reflection in testLargeBlobExclude() - public static final Stats<SSTable> STATS = new Stats<SSTable>() + public static final Stats STATS = new Stats() { @Override public void skippedBytes(long length) @@ -1956,13 +1957,19 @@ public class EndToEndTests skippedRawBytes.addAndGet(length); } - @Override - public void inputStreamBytesSkipped(CassandraFileSource<SSTable> ssTable, - long bufferedSkipped, - long rangeSkipped) + public BufferingInputStreamStats<SSTable> bufferingInputStreamStats() { - skippedInputStreamBytes.addAndGet(bufferedSkipped); - skippedRangeBytes.addAndGet(rangeSkipped); + return new BufferingInputStreamStats<SSTable>() + { + @Override + public void inputStreamBytesSkipped(CassandraFileSource<SSTable> ssTable, + long bufferedSkipped, + long rangeSkipped) + { + skippedInputStreamBytes.addAndGet(bufferedSkipped); + skippedRangeBytes.addAndGet(rangeSkipped); + } + }; } }; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java index c40a808..4c6505b 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java @@ -290,7 +290,7 @@ public class BufferingInputStreamHttpTest Files.size(path), maxBufferSize, chunkBufferSize); - try (BufferingInputStream<SSTable> is = new BufferingInputStream<>(source, BufferingInputStreamTests.STATS)) + try (BufferingInputStream<SSTable> is = new BufferingInputStream<>(source, BufferingInputStreamTests.STATS.bufferingInputStreamStats())) { actualMD5 = DigestUtils.md5(is); blockingTimeMillis = TimeUnit.MILLISECONDS.convert(is.timeBlockedNanos(), TimeUnit.NANOSECONDS); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java index 6cdefda..2652e3f 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java @@ -140,7 +140,7 @@ public class BufferingInputStreamTests requestCount.incrementAndGet(); writeBuffers(consumer, randomBuffers(chunksPerRequest)); }, null); - BufferingInputStream<SSTable> is = new BufferingInputStream<>(mockedClient, STATS); + BufferingInputStream<SSTable> is = new BufferingInputStream<>(mockedClient, STATS.bufferingInputStreamStats()); readStreamFully(is); assertEquals(numRequests, requestCount.get()); assertEquals(0L, is.bytesBuffered()); @@ -170,7 +170,7 @@ public class BufferingInputStreamTests } }, null); assertThrows(IOException.class, - () -> readStreamFully(new BufferingInputStream<>(source, STATS)) + () -> readStreamFully(new BufferingInputStream<>(source, STATS.bufferingInputStreamStats())) ); } @@ -220,7 +220,7 @@ public class BufferingInputStreamTests }); } }, timeout); - BufferingInputStream<SSTable> inputStream = new BufferingInputStream<>(source, STATS); + BufferingInputStream<SSTable> inputStream = new BufferingInputStream<>(source, STATS.bufferingInputStreamStats()); try { readStreamFully(inputStream); @@ -292,7 +292,7 @@ public class BufferingInputStreamTests int bytesToRead = chunkSize * numChunks; long skipAhead = size - bytesToRead + 1; - try (BufferingInputStream<SSTable> stream = new BufferingInputStream<>(source, STATS)) + try (BufferingInputStream<SSTable> stream = new BufferingInputStream<>(source, STATS.bufferingInputStreamStats())) { // Skip ahead so we only read the final chunks ByteBufferUtils.skipFully(stream, skipAhead); @@ -341,7 +341,7 @@ public class BufferingInputStreamTests } }; - try (BufferingInputStream<SSTable> stream = new BufferingInputStream<>(source, STATS)) + try (BufferingInputStream<SSTable> stream = new BufferingInputStream<>(source, STATS.bufferingInputStreamStats())) { ByteBufferUtils.skipFully(stream, 20971520); readStreamFully(stream); diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java index e280a74..eca2fcb 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java @@ -30,11 +30,11 @@ import org.apache.cassandra.spark.data.SSTablesSupplier; import org.apache.cassandra.spark.reader.IndexEntry; import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; -import org.apache.cassandra.spark.utils.streaming.CassandraFileSource; +import org.apache.cassandra.spark.utils.streaming.CassandraFile; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -public abstract class Stats<T extends SSTable> implements IStats<T> +public abstract class Stats { public static class DoNothingStats extends Stats @@ -42,6 +42,11 @@ public abstract class Stats<T extends SSTable> implements IStats<T> public static final DoNothingStats INSTANCE = new DoNothingStats(); } + public <T extends CassandraFile> BufferingInputStreamStats<T> bufferingInputStreamStats() + { + return BufferingInputStreamStats.doNothingStats(); + } + // Spark Row Iterator /** @@ -357,101 +362,6 @@ public abstract class Stats<T extends SSTable> implements IStats<T> { } - // SSTable Input Stream - - /** - * When {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} queue is full, usually indicating - * job is CPU-bound and blocked on the CompactionIterator - * - * @param ssTable the SSTable source for this input stream - */ - public void inputStreamQueueFull(CassandraFileSource<? extends SSTable> ssTable) - { - } - - /** - * Failure occurred in the {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} - * - * @param ssTable the SSTable source for this input stream - * @param throwable throwable - */ - public void inputStreamFailure(CassandraFileSource<T> ssTable, Throwable throwable) - { - } - - /** - * Time the {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} spent blocking on queue - * waiting for bytes. High time spent blocking indicates the job is network-bound, or blocked on the - * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} to supply the bytes. - * - * @param ssTable the SSTable source for this input stream - * @param nanos time in nanoseconds - */ - public void inputStreamTimeBlocked(CassandraFileSource<T> ssTable, long nanos) - { - } - - /** - * Bytes written to {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} - * by the {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} - * - * @param ssTable the SSTable source for this input stream - * @param length number of bytes written - */ - public void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int length) - { - } - - /** - * Bytes read from {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} - * - * @param ssTable the SSTable source for this input stream - * @param length number of bytes read - * @param queueSize current queue size - * @param percentComplete % completion - */ - public void inputStreamByteRead(CassandraFileSource<T> ssTable, - int length, - int queueSize, - int percentComplete) - { - } - - /** - * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource} has finished writing - * to {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} after reaching expected file length - * - * @param ssTable the SSTable source for this input stream - */ - public void inputStreamEndBuffer(CassandraFileSource<T> ssTable) - { - } - - /** - * {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream} finished and closed - * - * @param ssTable the SSTable source for this input stream - * @param runTimeNanos total time open in nanoseconds - * @param totalNanosBlocked total time blocked on queue waiting for bytes in nanoseconds - */ - public void inputStreamEnd(CassandraFileSource<T> ssTable, long runTimeNanos, long totalNanosBlocked) - { - } - - /** - * Called when the InputStream skips bytes - * - * @param ssTable the SSTable source for this input stream - * @param bufferedSkipped the number of bytes already buffered in memory skipped - * @param rangeSkipped the number of bytes skipped - * by efficiently incrementing the start range for the next request - */ - public void inputStreamBytesSkipped(CassandraFileSource<T> ssTable, - long bufferedSkipped, - long rangeSkipped) - { - } - // PartitionSizeIterator stats /** diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java index 04cbc24..d10ccbc 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java @@ -592,7 +592,7 @@ public class SSTableReaderTests AtomicBoolean pass = new AtomicBoolean(true); AtomicInteger skipCount = new AtomicInteger(0); - Stats<SSTable> stats = new Stats<SSTable>() + Stats stats = new Stats() { @Override public void skippedSSTable(@Nullable SparkRangeFilter sparkRangeFilter, @@ -652,7 +652,7 @@ public class SSTableReaderTests AtomicBoolean pass = new AtomicBoolean(true); AtomicInteger skipCount = new AtomicInteger(0); - Stats<SSTable> stats = new Stats<SSTable>() + Stats stats = new Stats() { @Override public void skippedSSTable(@Nullable SparkRangeFilter sparkRangeFilter, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org