5 commented on code in PR #62:
URL: 
https://github.com/apache/cassandra-analytics/pull/62#discussion_r1667082786


##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.stats;
+
+import org.apache.cassandra.spark.utils.streaming.CassandraFile;
+import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
+
+/**
+ * Generic Stats interface that works across all CassandraFile FileTypes.
+ *
+ * @param <FileType>
+ */
+public interface IStats<FileType extends CassandraFile>
+{
+    default void inputStreamEnd(CassandraFileSource<FileType> source, long 
runTimeNanos, long totalNanosBlocked)
+    {
+

Review Comment:
   Nit: No point in having extra empty lines within empty default methods here 
and below.



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -222,4 +215,55 @@ public static void skipFully(InputStream is, long length) 
throws IOException
             throw new EOFException("EOF after " + skipped + " bytes out of " + 
length);
         }
     }
+
+    // Extract component position from buffer; return null if there are not 
enough components
+    public static ByteBuffer extractComponent(ByteBuffer buffer, int position)
+    {
+        buffer = buffer.duplicate();
+        readStatic(buffer);
+        int index = 0;
+        while (buffer.remaining() > 0)
+        {
+            ByteBuffer c = readBytesWithShortLength(buffer);

Review Comment:
   Nit: I would avoid single-symbol variable names and use something like 
`character` instead of `c`.



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -93,25 +86,27 @@ public static byte[] getArray(ByteBuffer buffer)
         return bytes;
     }
 
-    public static String stringThrowRuntime(ByteBuffer buffer)
+    /**
+     * Decode ByteBuffer into String using provided CharsetDecoder.
+     *
+     * @param buffer          byte buffer
+     * @param decoderSupplier let the user provide their own CharsetDecoder 
provider
+     *                        e.g. using 
io.netty.util.concurrent.FastThreadLocal over java.lang.ThreadLocal
+     * @return decoded string
+     * @throws CharacterCodingException charset decoding exception
+     */
+    public static String string(ByteBuffer buffer, Supplier<CharsetDecoder> 
decoderSupplier) throws CharacterCodingException
     {
-        try
-        {
-            return ByteBufferUtils.string(buffer);
-        }
-        catch (CharacterCodingException exception)
+        if (buffer.remaining() <= 0)
         {
-            throw new RuntimeException(exception);
+            return EMPTY_STRING;
         }
+        return decoderSupplier.get().decode(buffer.duplicate()).toString();

Review Comment:
   Nit: I personally would use a conditional operator here for improved 
readability:
   ```java
   return buffer.remaining() > 0 ? 
decoderSupplier.get().decode(buffer.duplicate()).toString() : EMPTY_STRING;
   ```



##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/MapBuffer.java:
##########
@@ -17,28 +17,21 @@
  * under the License.
  */
 
-package org.apache.cassandra.bridge;
+package org.apache.cassandra.spark.reader;
 
-public class RangeTombstone
-{
-    public final Bound open;
-    public final Bound close;
+import org.apache.cassandra.db.rows.Cell;
 
-    public RangeTombstone(Bound open, Bound close)
+public class MapBuffer extends ComplexTypeBuffer
+{
+    MapBuffer(int cellCount)
     {
-        this.open = open;
-        this.close = close;
+        super(cellCount, cellCount * 2);

Review Comment:
   Can you please add a comment about why double cell count is used here?



##########
cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FastThreadLocalUtf8Decoder.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import io.netty.util.concurrent.FastThreadLocal;
+
+public final class FastThreadLocalUtf8Decoder
+{
+    private FastThreadLocalUtf8Decoder()
+    {
+        super();
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    private final static FastThreadLocal<CharsetDecoder> UTF8_DECODER = new 
FastThreadLocal<CharsetDecoder>()
+    {
+        @Override
+        protected CharsetDecoder initialValue()
+        {
+            return StandardCharsets.UTF_8.newDecoder();
+        }
+    };
+
+    public static String string(ByteBuffer buffer) throws 
CharacterCodingException
+    {
+        return ByteBufferUtils.string(buffer, 
FastThreadLocalUtf8Decoder.UTF8_DECODER::get);
+    }
+
+    public static String stringThrowRuntime(ByteBuffer buffer)
+    {
+        try
+        {
+            return string(buffer);
+        }
+        catch (final CharacterCodingException e)

Review Comment:
   Nit: I'd prefer we avoid single-symbol variable names for the sake of 
consistency and replace `e` with `exception`.



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java:
##########
@@ -167,7 +167,7 @@ static Dataset<Row> 
openLocalPartitionSizeSource(CassandraBridge bridge,
                                            .option("createStmt", createStmt)
                                            .option("dirs", 
dir.toAbsolutePath().toString())
                                            .option("version", 
version.toString())
-                                           .option("useSSTableInputStream", 
true) // use in the test system to test the SSTableInputStream
+                                           .option("useBufferingInputStream", 
true) // use in the test system to test the BufferingInputStream

Review Comment:
   Nit: I'd add and extra space and capitalization for the sake of consistency:
   ```java
   .option("useBufferingInputStream", true)  // Use in the test system to test 
the BufferingInputStream
   ```



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -30,20 +30,13 @@
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-
-import io.netty.util.concurrent.FastThreadLocal;
+import java.util.function.Supplier;
 
 public final class ByteBufferUtils
 {
+    public static final ThreadLocal<CharsetDecoder> UTF8_DECODER_PROVIDER = 
ThreadLocal.withInitial(StandardCharsets.UTF_8::newDecoder);
+    public static final int STATIC_MARKER = 0xFFFF;

Review Comment:
   Could you please add a comment about where this value comes from?



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -30,20 +30,13 @@
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-
-import io.netty.util.concurrent.FastThreadLocal;
+import java.util.function.Supplier;
 
 public final class ByteBufferUtils
 {
+    public static final ThreadLocal<CharsetDecoder> UTF8_DECODER_PROVIDER = 
ThreadLocal.withInitial(StandardCharsets.UTF_8::newDecoder);

Review Comment:
   This line seems similar to what is going on in `FastThreadLocalUtf8Decoder`, 
can it be referenced and reused here?



##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ComplexTypeBuffer.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.Cell;
+
+/**
+ * ComplexTypeBuffer is a util class for reconstructing multi-cell data into 
complex types such as unfrozen lists, maps, sets, or UDTs.
+ * ComplexTypeBuffer buffers all the cell ByteBuffers then reconstructs as a 
single ByteBuffer.
+ */
+public abstract class ComplexTypeBuffer
+{
+    private final List<ByteBuffer> buffers;
+    private final int cellCount;
+    private int length = 0;
+
+    public ComplexTypeBuffer(int cellCount, int bufferSize)
+    {
+        this.cellCount = cellCount;
+        this.buffers = new ArrayList<>(bufferSize);
+    }
+
+    static ComplexTypeBuffer newBuffer(AbstractType<?> type, int cellCount)
+    {
+        ComplexTypeBuffer buffer;
+        if (type instanceof SetType)
+        {
+            buffer = new SetBuffer(cellCount);
+        }
+        else if (type instanceof ListType)
+        {
+            buffer = new ListBuffer(cellCount);
+        }
+        else if (type instanceof MapType)
+        {
+            buffer = new MapBuffer(cellCount);
+        }
+        else if (type instanceof UserType)
+        {
+            buffer = new UdtBuffer(cellCount);
+        }
+        else
+        {
+            throw new IllegalStateException("Unexpected type deserializing CQL 
Collection: " + type);
+        }
+        return buffer;
+    }
+
+    void addCell(Cell cell)
+    {
+        add(cell.buffer());  // Copy over value
+    }
+
+    void add(ByteBuffer buffer)
+    {
+        buffers.add(buffer);
+        length += buffer.remaining();
+    }
+
+    ByteBuffer build()
+    {
+        ByteBuffer result = ByteBuffer.allocate(4 + (buffers.size() * 4) + 
length);

Review Comment:
   Could you add a comment about why this size calculation makes sense?
   
   Also, can that `4` be replaced with `Integer.BYTES` or something similar?



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/CassandraFile.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils.streaming;
+
+public interface CassandraFile

Review Comment:
   Could you please add a short JavaDoc here explaining the purpose of this 
empty interface?



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -222,4 +215,55 @@ public static void skipFully(InputStream is, long length) 
throws IOException
             throw new EOFException("EOF after " + skipped + " bytes out of " + 
length);
         }
     }
+
+    // Extract component position from buffer; return null if there are not 
enough components
+    public static ByteBuffer extractComponent(ByteBuffer buffer, int position)
+    {
+        buffer = buffer.duplicate();
+        readStatic(buffer);
+        int index = 0;
+        while (buffer.remaining() > 0)
+        {
+            ByteBuffer c = readBytesWithShortLength(buffer);
+            if (index == position)
+            {
+                return c;
+            }
+
+            buffer.get();  // Skip end-of-component
+            ++index;
+        }
+        return null;
+    }
+
+    public static ByteBuffer[] split(ByteBuffer name, int numKeys)
+    {
+        // Assume all components, we'll trunk the array afterwards if need be, 
but most names will be complete
+        ByteBuffer[] l = new ByteBuffer[numKeys];

Review Comment:
   Nit: I would avoid single-symbol variable names like `l` (especially `l`, as 
it looks awfully like `1`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to