yifan-c commented on code in PR #62:
URL: 
https://github.com/apache/cassandra-analytics/pull/62#discussion_r1674811270


##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java:
##########
@@ -49,30 +49,36 @@
  * <p>
  * Upon return from the next() call the current values of the scanner can be 
obtained by calling
  * the methods in Rid, getPartitionKey(), getColumnName(), getValue().
+ *
  * @param <Type> type of object returned by rid() method.
  */
 @SuppressWarnings("unused")
 public interface StreamScanner<Type> extends Closeable

Review Comment:
   Can you rename the type parameter from `Type` to `T` in this refactoring? I 
am strongly -1 on having the same name pattern as the class names. It is 
confusing when reading the code, mistaking the type parameter for the actual 
class types. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/TokenPartitioner.java:
##########
@@ -51,7 +51,7 @@
 
 /**
  * Util class for partitioning Spark workers across the token ring
- * This class duplicates {@link 
org.apache.cassandra.spark.bulkwriter.TokenPartitioner} but is solely
+ * This class duplicates 
org.apache.cassandra.spark.bulkwriter.TokenPartitioner but is solely

Review Comment:
   The change is unrelated. Let's revert and make it clickable. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java:
##########
@@ -24,13 +24,13 @@ public class EmptyStreamScanner implements 
StreamScanner<Rid>
     public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner();
 
     @Override
-    public Rid rid()
+    public Rid data()

Review Comment:
   Since you are renaming the method, can you rename the class to make it 
easier to understand? I'd propose `RowData`, instead of `Rid`
   
   Is the method name `data()` clear enough to indicate that it returns a 
single element? In this code snippet `scanner.data()`, could it be read as all 
the data it has scanned? If so, it is clearer to call it `nextData`. 
   
   I'd agree with @aweisberg that we should just move to the more common and 
well established interface, `Iterator`. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.stats;
+
+import org.apache.cassandra.spark.utils.streaming.CassandraFile;
+import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
+
+/**
+ * Generic Stats interface that works across all CassandraFile FileTypes.
+ *
+ * @param <FileType>
+ */
+public interface IStats<FileType extends CassandraFile>

Review Comment:
   Can you rename the type parameter to just `F` or `FILE_TYPE` to be 
distinguishable with class names? (I would favor the former)



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java:
##########
@@ -30,29 +30,28 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.cassandra.spark.data.SSTable;
-import org.apache.cassandra.spark.stats.Stats;
+import org.apache.cassandra.spark.stats.IStats;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * The InputStream into the CompactionIterator needs to be a blocking {@link 
java.io.InputStream},
  * but we don't want to block on network calls, or buffer too much data in 
memory otherwise we will hit OOMs for large Data.db files.
  * <p>
- * This helper class uses the {@link SSTableSource} implementation provided to 
asynchronously read
+ * This helper class uses the {@link CassandraFileSource} implementation 
provided to asynchronously read
  * the T bytes on-demand, managing flow control if not ready for more bytes 
and buffering enough without reading entirely into memory.
  * <p>
- * The generic {@link SSTableSource} allows users to pass in their own 
implementations to read from any source.
+ * The generic {@link CassandraFileSource} allows users to pass in their own 
implementations to read from any source.
  * <p>
  * This enables the Bulk Reader library to scale to read many SSTables without 
OOMing, and controls the flow by
  * buffering more bytes on-demand as the data is drained.
  * <p>
  * This class expects the consumer thread to be single-threaded, and the 
producer thread to be single-threaded OR serialized to ensure ordering of 
events.
  *
- * @param <T> SSTable
+ * @param <FileType> CassandraFile type e.g. SSTable, CommitLog
  */
 @SuppressWarnings({"WeakerAccess", "unused"})
-public class SSTableInputStream<T extends SSTable> extends InputStream 
implements StreamConsumer
+public class BufferingInputStream<FileType extends CassandraFile> extends 
InputStream implements StreamConsumer

Review Comment:
   Please either revert it back to `T` or use `F`. I think there are several 
other occurrences that the type parameter is named the same as class names. For 
the sake of readability, please update those and make the type parameter 
distinguishable. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java:
##########
@@ -49,30 +49,36 @@
  * <p>
  * Upon return from the next() call the current values of the scanner can be 
obtained by calling
  * the methods in Rid, getPartitionKey(), getColumnName(), getValue().
+ *
  * @param <Type> type of object returned by rid() method.
  */
 @SuppressWarnings("unused")
 public interface StreamScanner<Type> extends Closeable
 {
     /**
-     * Expose the data/rid to be consumed.
-     * Implementation note: rid should always be updated to the current 
partition if hasNext returns true.
+     * Exposes the data to be consumed.
+     * <p>Implementation note: The data should always be updated to the 
current partition if {@code next} returns {@code true}.
+     * Calls to {@code data} should be idempotent and not alter the state of 
the underlying scanner.
+     * </p>
      *
-     * @return rid
+     * @return data of type {@code Type}
      */
-    Type rid();
+    Type data();
 
     /**
-     * Indicate if there are more data/rid avaiable
+     * Indicate if there are more data available to read.
+     * <p>Implementation note: {@code next} can alter the state of the 
underlying scanner,
+     * so it should be called once and not called again until the {@code data} 
is consumed.
+     * </p>
      *
-     * @return true when the rid is available to be consumed;
+     * @return true when the data is available to be consumed;
      * otherwise, return false to indicate the scanner has exhausted
      * @throws IOException
      */
-    boolean hasNext() throws IOException;
+    boolean next() throws IOException;

Review Comment:
   The renaming does not make sense to me. 
   
   As the updated doc states, it expects the consumer to always consume the 
next data. Meaning, it does not expect calling the `next()` method N times to 
skip N - 1 items. 
   
   So, what we want is still just an `Iterator`. 
   
   It is easier and less error prone by having the data advancing logic coded 
in the implementation, instead of the "implementation note". For example, the 
first call to `hasNext` advance to the next position, but the subsequent calls 
w/o calling `next()` does not advance and the state does not change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to