[ 
https://issues.apache.org/jira/browse/BEAM-9008?focusedWorklogId=440459&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-440459
 ]

ASF GitHub Bot logged work on BEAM-9008:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Jun/20 21:28
            Start Date: 02/Jun/20 21:28
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #10546:
URL: https://github.com/apache/beam/pull/10546#discussion_r433896372



##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -55,4 +58,9 @@ public boolean isWrapping() {
   public String toString() {
     return String.format("(%s,%s]", start.toString(), end.toString());
   }
+
+  public static RingRange fromEncodedKey(Metadata metadata, ByteBuffer... bb) {

Review comment:
       Can we move this method to the test class where it is used. I don't want 
to add Cassandra specific `Metadata` to the public API of RingRange with the 
hope this will help us evolve RingRange into a proper `Restriction` (future 
work out of the scope of this PR)
   
   Can you also please add a `public static RingRange of(BigInteger start, 
BigInteger send)` method and make the normal constructor private and refactor 
in every use.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))

Review comment:
       Move the Split as the first step of the `ReadAll` expansion so non 
advanced users (those who do not specify `RingRange` manually could get their 
code 'partitioned' correctly.

##########
File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##########
@@ -317,9 +302,70 @@ public void testRead() throws Exception {
     PAssert.that(mapped.apply("Count occurrences per scientist", 
Count.perKey()))
         .satisfies(
             input -> {
+              int count = 0;
               for (KV<String, Long> element : input) {
+                count++;
                 assertEquals(element.getKey(), NUM_ROWS / 10, 
element.getValue().longValue());
               }
+              assertEquals(11, count);
+              return null;
+            });
+
+    pipeline.run();
+  }
+
+  CassandraIO.Read<Scientist> getReadWithRingRange(RingRange... rr) {

Review comment:
       private

##########
File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##########
@@ -191,39 +186,44 @@ private static void insertData() throws Exception {
     LOG.info("Create Cassandra tables");
     session.execute(
         String.format(
-            "CREATE TABLE IF NOT EXISTS %s.%s(person_id int, person_name text, 
PRIMARY KEY"
-                + "(person_id));",
+            "CREATE TABLE IF NOT EXISTS %s.%s(person_department text, 
person_id int, person_name text, PRIMARY KEY"

Review comment:
       nit: For the tests can we assure that every reference to the tables and 
Scientist object usage follows this order: id, name, department.
   

##########
File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##########
@@ -480,66 +527,22 @@ public void testCustomMapperImplDelete() {
     assertEquals(1, counter.intValue());
   }
 
-  @Test
-  public void testSplit() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    CassandraIO.Read<Scientist> read =
-        CassandraIO.<Scientist>read()
-            .withHosts(Collections.singletonList(CASSANDRA_HOST))
-            .withPort(cassandraPort)
-            .withKeyspace(CASSANDRA_KEYSPACE)
-            .withTable(CASSANDRA_TABLE)
-            .withEntity(Scientist.class)
-            .withCoder(SerializableCoder.of(Scientist.class));
-
-    // initialSource will be read without splitting (which does not happen in 
production)
-    // so we need to provide splitQueries to avoid NPE in source.reader.start()
-    String splitQuery = QueryBuilder.select().from(CASSANDRA_KEYSPACE, 
CASSANDRA_TABLE).toString();
-    CassandraIO.CassandraSource<Scientist> initialSource =
-        new CassandraIO.CassandraSource<>(read, 
Collections.singletonList(splitQuery));
-    int desiredBundleSizeBytes = 2048;
-    long estimatedSize = initialSource.getEstimatedSizeBytes(options);
-    List<BoundedSource<Scientist>> splits = 
initialSource.split(desiredBundleSizeBytes, options);
-    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, 
options);
-    float expectedNumSplitsloat =
-        (float) initialSource.getEstimatedSizeBytes(options) / 
desiredBundleSizeBytes;
-    long sum = 0;
-
-    for (BoundedSource<Scientist> subSource : splits) {
-      sum += subSource.getEstimatedSizeBytes(options);
-    }
-
-    // due to division and cast estimateSize != sum but will be close. Exact 
equals checked below
-    assertEquals((long) (estimatedSize / splits.size()) * splits.size(), sum);
-
-    int expectedNumSplits = (int) Math.ceil(expectedNumSplitsloat);
-    assertEquals("Wrong number of splits", expectedNumSplits, splits.size());
-    int emptySplits = 0;
-    for (BoundedSource<Scientist> subSource : splits) {
-      if (readFromSource(subSource, options).isEmpty()) {
-        emptySplits += 1;
-      }
-    }
-    assertThat(
-        "There are too many empty splits, parallelism is sub-optimal",
-        emptySplits,
-        lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
-  }
-
   private List<Row> getRows(String table) {
     ResultSet result =
         session.execute(
             String.format("select person_id,person_name from %s.%s", 
CASSANDRA_KEYSPACE, table));
     return result.all();
   }
 
+  // TEMP TEST

Review comment:
       why TEMP ?

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())

Review comment:
       Remove

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());

Review comment:
       move down into the apply, it makes the code more readable.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.cassandra.CassandraIO.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReadFn<T> extends DoFn<Read<T>, T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);
+
+  private transient Cluster cluster;
+
+  private transient Session session;
+
+  private transient Read<T> lastRead;
+
+  @Teardown
+  public void teardown() {

Review comment:
       nit: move below we tend to preserve the lifecycle order of methods 
setup-startbundle-processElement-finishbundle-teardown

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
+import com.datastax.driver.core.Metadata;
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /** Models a Cassandra token range. */
-final class RingRange {
+public final class RingRange implements Serializable {

Review comment:
       Since it seems the new split implementation does not use `isWrapping()` 
please remove that method from the `RingRange` class

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> 
outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {
+            LOG.info("Murmur3Partitioner detected, splitting");
+
+            List<BigInteger> tokens =
+                cluster.getMetadata().getTokenRanges().stream()

Review comment:
       Since we need `Metadata` below and `getMetadata()` makes a synchronized 
operation maybe obtain `Metadata` once and reuse after.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> 
outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {
+            LOG.info("Murmur3Partitioner detected, splitting");
+
+            List<BigInteger> tokens =
+                cluster.getMetadata().getTokenRanges().stream()
+                    .map(tokenRange -> new 
BigInteger(tokenRange.getEnd().getValue().toString()))
+                    .collect(Collectors.toList());
+            Integer splitCount = cluster.getMetadata().getAllHosts().size();
+            if (read.minNumberOfSplits() != null && 
read.minNumberOfSplits().get() != null) {
+              splitCount = read.minNumberOfSplits().get();
+            }
+
+            SplitGenerator splitGenerator =
+                new SplitGenerator(cluster.getMetadata().getPartitioner());
+            splitGenerator
+                .generateSplits(splitCount, tokens)
+                .forEach(
+                    rr ->
+                        outputReceiver.output(
+                            CassandraIO.<T>read()
+                                .withRingRanges(new HashSet<>(rr))
+                                .withCoder(coder())
+                                .withConsistencyLevel(consistencyLevel())
+                                .withEntity(entity())
+                                .withHosts(hosts())
+                                .withKeyspace(keyspace())
+                                .withLocalDc(localDc())
+                                .withPort(port())
+                                .withPassword(password())
+                                .withQuery(query())
+                                .withTable(table())
+                                .withUsername(username())
+                                .withMapperFactoryFn(mapperFactoryFn())));
+          } else {
+            LOG.warn(
+                "Only Murmur3Partitioner is supported for splitting, using an 
unique source for "
+                    + "the read");
+            String partitioner = cluster.getMetadata().getPartitioner();
+            RingRange totalRingRange =
+                new RingRange(
+                    SplitGenerator.getRangeMin(partitioner),
+                    SplitGenerator.getRangeMax(partitioner));
+            outputReceiver.output(
+                CassandraIO.<T>read()

Review comment:
       same as above the output is the initial `read` with the modified 
`RingRange` otherwise you would need to copy/define all attributes.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws 
ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link 
CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends 
PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link 
PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<Read<T>> input) {
+      checkArgument(coder() != null, "withCoder() is required");
+      return input
+          .apply("shuffle", Reshuffle.viaRandomKey())

Review comment:
       "Reshuffle"

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws 
ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link 
CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends 
PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link 
PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<Read<T>> input) {
+      checkArgument(coder() != null, "withCoder() is required");
+      return input
+          .apply("shuffle", Reshuffle.viaRandomKey())
+          .apply("read", ParDo.of(new ReadFn<>()))
+          .setCoder(this.coder());

Review comment:
       "Read"

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
+import com.datastax.driver.core.Metadata;
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /** Models a Cassandra token range. */
-final class RingRange {
+public final class RingRange implements Serializable {

Review comment:
       :+1: for making it public now!

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -370,384 +488,16 @@ private CassandraIO() {}
         return autoBuild();
       }
     }
-  }
-
-  @VisibleForTesting
-  static class CassandraSource<T> extends BoundedSource<T> {
-    final Read<T> spec;
-    final List<String> splitQueries;
-    // split source ached size - can't be calculated when already split
-    Long estimatedSize;
-    private static final String MURMUR3PARTITIONER = 
"org.apache.cassandra.dht.Murmur3Partitioner";
-
-    CassandraSource(Read<T> spec, List<String> splitQueries) {
-      this(spec, splitQueries, null);
-    }
-
-    private CassandraSource(Read<T> spec, List<String> splitQueries, Long 
estimatedSize) {
-      this.estimatedSize = estimatedSize;
-      this.spec = spec;
-      this.splitQueries = splitQueries;
-    }
-
-    @Override
-    public Coder<T> getOutputCoder() {
-      return spec.coder();
-    }
-
-    @Override
-    public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
-      return new CassandraReader(this);
-    }
-
-    @Override
-    public List<BoundedSource<T>> split(
-        long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
-      try (Cluster cluster =
-          getCluster(
-              spec.hosts(),
-              spec.port(),
-              spec.username(),
-              spec.password(),
-              spec.localDc(),
-              spec.consistencyLevel())) {
-        if (isMurmur3Partitioner(cluster)) {
-          LOG.info("Murmur3Partitioner detected, splitting");
-          return splitWithTokenRanges(
-              spec, desiredBundleSizeBytes, 
getEstimatedSizeBytes(pipelineOptions), cluster);
-        } else {
-          LOG.warn(
-              "Only Murmur3Partitioner is supported for splitting, using a 
unique source for "
-                  + "the read");
-          return Collections.singletonList(
-              new CassandraIO.CassandraSource<>(spec, 
Collections.singletonList(buildQuery(spec))));
-        }
-      }
-    }
-
-    private static String buildQuery(Read spec) {
-      return (spec.query() == null)
-          ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), 
spec.table().get())
-          : spec.query().get().toString();
-    }
-
-    /**
-     * Compute the number of splits based on the estimated size and the 
desired bundle size, and
-     * create several sources.
-     */
-    private List<BoundedSource<T>> splitWithTokenRanges(
-        CassandraIO.Read<T> spec,
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        Cluster cluster) {
-      long numSplits =
-          getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes, 
spec.minNumberOfSplits());
-      LOG.info("Number of desired splits is {}", numSplits);
-
-      SplitGenerator splitGenerator = new 
SplitGenerator(cluster.getMetadata().getPartitioner());
-      List<BigInteger> tokens =
-          cluster.getMetadata().getTokenRanges().stream()
-              .map(tokenRange -> new 
BigInteger(tokenRange.getEnd().getValue().toString()))
-              .collect(Collectors.toList());
-      List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, 
tokens);
-      LOG.info("{} splits were actually generated", splits.size());
-
-      final String partitionKey =
-          
cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
-              .getPartitionKey().stream()
-              .map(ColumnMetadata::getName)
-              .collect(Collectors.joining(","));
-
-      List<TokenRange> tokenRanges =
-          getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
-      final long estimatedSize = 
getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
-      List<BoundedSource<T>> sources = new ArrayList<>();
-      for (List<RingRange> split : splits) {
-        List<String> queries = new ArrayList<>();
-        for (RingRange range : split) {
-          if (range.isWrapping()) {

Review comment:
       Do you think this logic is already covered by the new implementation or 
that we cannot end up having issues on wrapping ranges e.g. repeated data. I 
can barely remember why we went into these 'hacks' I expected this to be a 
responsability of `SplitGenerator`?

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> 
outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {
+            LOG.info("Murmur3Partitioner detected, splitting");
+
+            List<BigInteger> tokens =
+                cluster.getMetadata().getTokenRanges().stream()
+                    .map(tokenRange -> new 
BigInteger(tokenRange.getEnd().getValue().toString()))
+                    .collect(Collectors.toList());
+            Integer splitCount = cluster.getMetadata().getAllHosts().size();
+            if (read.minNumberOfSplits() != null && 
read.minNumberOfSplits().get() != null) {
+              splitCount = read.minNumberOfSplits().get();
+            }
+
+            SplitGenerator splitGenerator =
+                new SplitGenerator(cluster.getMetadata().getPartitioner());
+            splitGenerator
+                .generateSplits(splitCount, tokens)
+                .forEach(
+                    rr ->
+                        outputReceiver.output(
+                            CassandraIO.<T>read()

Review comment:
       You do not need to rebuild the object the current output here should be 
simply `read.withRingRanges(new HashSet<>(rr));`

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
+import com.datastax.driver.core.Metadata;
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /** Models a Cassandra token range. */
-final class RingRange {
+public final class RingRange implements Serializable {

Review comment:
       We should add the `equals()` and `hashCode()` methods, those are now 
mandatory for the `Set` contract used in `CassandraIO` to be consistent.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))

Review comment:
       Also add the `"Split",` name to the step

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> 
outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {
+            LOG.info("Murmur3Partitioner detected, splitting");
+
+            List<BigInteger> tokens =
+                cluster.getMetadata().getTokenRanges().stream()
+                    .map(tokenRange -> new 
BigInteger(tokenRange.getEnd().getValue().toString()))
+                    .collect(Collectors.toList());
+            Integer splitCount = cluster.getMetadata().getAllHosts().size();

Review comment:
       Better define `Integer splitCount = read.minNumberOfSplits().get();` 
first, this will allow you to skip one server visit if it is already set up by 
the user.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> 
outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {

Review comment:
       I think we are missing here the case when the user already defines his 
`RingRange`  in the input read, what should we do in that case? my intutition 
tells me we should just pass the read as an output without changes, but if we 
want to split his decision we should have the `RingRange` input in account 
somehow.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws 
ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link 
CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends 
PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link 
PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<Read<T>> input) {
+      checkArgument(coder() != null, "withCoder() is required");
+      return input
+          .apply("shuffle", Reshuffle.viaRandomKey())
+          .apply("read", ParDo.of(new ReadFn<>()))
+          .setCoder(this.coder());

Review comment:
       coder should come from the input read

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
+import com.datastax.driver.core.Metadata;
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /** Models a Cassandra token range. */
-final class RingRange {
+public final class RingRange implements Serializable {

Review comment:
       Please also add `@Experimental(Kind.SOURCE_SINK)` to the class.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws 
ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link 
CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends 
PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link 
PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {

Review comment:
       I am not sure if we need this, I assume the coder comes form the 
`Read<T> input.

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -370,384 +488,16 @@ private CassandraIO() {}
         return autoBuild();
       }
     }
-  }
-
-  @VisibleForTesting
-  static class CassandraSource<T> extends BoundedSource<T> {
-    final Read<T> spec;
-    final List<String> splitQueries;
-    // split source ached size - can't be calculated when already split
-    Long estimatedSize;
-    private static final String MURMUR3PARTITIONER = 
"org.apache.cassandra.dht.Murmur3Partitioner";
-
-    CassandraSource(Read<T> spec, List<String> splitQueries) {
-      this(spec, splitQueries, null);
-    }
-
-    private CassandraSource(Read<T> spec, List<String> splitQueries, Long 
estimatedSize) {
-      this.estimatedSize = estimatedSize;
-      this.spec = spec;
-      this.splitQueries = splitQueries;
-    }
-
-    @Override
-    public Coder<T> getOutputCoder() {
-      return spec.coder();
-    }
-
-    @Override
-    public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
-      return new CassandraReader(this);
-    }
-
-    @Override
-    public List<BoundedSource<T>> split(
-        long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
-      try (Cluster cluster =
-          getCluster(
-              spec.hosts(),
-              spec.port(),
-              spec.username(),
-              spec.password(),
-              spec.localDc(),
-              spec.consistencyLevel())) {
-        if (isMurmur3Partitioner(cluster)) {
-          LOG.info("Murmur3Partitioner detected, splitting");
-          return splitWithTokenRanges(
-              spec, desiredBundleSizeBytes, 
getEstimatedSizeBytes(pipelineOptions), cluster);
-        } else {
-          LOG.warn(
-              "Only Murmur3Partitioner is supported for splitting, using a 
unique source for "
-                  + "the read");
-          return Collections.singletonList(
-              new CassandraIO.CassandraSource<>(spec, 
Collections.singletonList(buildQuery(spec))));
-        }
-      }
-    }
-
-    private static String buildQuery(Read spec) {
-      return (spec.query() == null)
-          ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), 
spec.table().get())
-          : spec.query().get().toString();
-    }
-
-    /**
-     * Compute the number of splits based on the estimated size and the 
desired bundle size, and
-     * create several sources.
-     */
-    private List<BoundedSource<T>> splitWithTokenRanges(
-        CassandraIO.Read<T> spec,
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        Cluster cluster) {
-      long numSplits =
-          getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes, 
spec.minNumberOfSplits());
-      LOG.info("Number of desired splits is {}", numSplits);
-
-      SplitGenerator splitGenerator = new 
SplitGenerator(cluster.getMetadata().getPartitioner());
-      List<BigInteger> tokens =
-          cluster.getMetadata().getTokenRanges().stream()
-              .map(tokenRange -> new 
BigInteger(tokenRange.getEnd().getValue().toString()))
-              .collect(Collectors.toList());
-      List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, 
tokens);
-      LOG.info("{} splits were actually generated", splits.size());
-
-      final String partitionKey =
-          
cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
-              .getPartitionKey().stream()
-              .map(ColumnMetadata::getName)
-              .collect(Collectors.joining(","));
-
-      List<TokenRange> tokenRanges =
-          getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
-      final long estimatedSize = 
getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
-      List<BoundedSource<T>> sources = new ArrayList<>();
-      for (List<RingRange> split : splits) {
-        List<String> queries = new ArrayList<>();
-        for (RingRange range : split) {
-          if (range.isWrapping()) {
-            // A wrapping range is one that overlaps from the end of the 
partitioner range and its
-            // start (ie : when the start token of the split is greater than 
the end token)
-            // We need to generate two queries here : one that goes from the 
start token to the end
-            // of
-            // the partitioner range, and the other from the start of the 
partitioner range to the
-            // end token of the split.
-            queries.add(generateRangeQuery(spec, partitionKey, 
range.getStart(), null));
-            // Generation of the second query of the wrapping range
-            queries.add(generateRangeQuery(spec, partitionKey, null, 
range.getEnd()));
-          } else {
-            queries.add(generateRangeQuery(spec, partitionKey, 
range.getStart(), range.getEnd()));
-          }
-        }
-        sources.add(new CassandraIO.CassandraSource<>(spec, queries, 
estimatedSize));
-      }
-      return sources;
-    }
-
-    private static String generateRangeQuery(
-        Read spec, String partitionKey, BigInteger rangeStart, BigInteger 
rangeEnd) {
-      final String rangeFilter =
-          Joiner.on(" AND ")
-              .skipNulls()
-              .join(
-                  rangeStart == null
-                      ? null
-                      : String.format("(token(%s) >= %d)", partitionKey, 
rangeStart),
-                  rangeEnd == null
-                      ? null
-                      : String.format("(token(%s) < %d)", partitionKey, 
rangeEnd));
-      final String query =
-          (spec.query() == null)
-              ? buildQuery(spec) + " WHERE " + rangeFilter
-              : buildQuery(spec) + " AND " + rangeFilter;
-      LOG.debug("CassandraIO generated query : {}", query);
-      return query;
-    }
-
-    private static long getNumSplits(
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        @Nullable ValueProvider<Integer> minNumberOfSplits) {
-      long numSplits =
-          desiredBundleSizeBytes > 0 ? (estimatedSizeBytes / 
desiredBundleSizeBytes) : 1;
-      if (numSplits <= 0) {
-        LOG.warn("Number of splits is less than 0 ({}), fallback to 1", 
numSplits);
-        numSplits = 1;
-      }
-      return minNumberOfSplits != null ? Math.max(numSplits, 
minNumberOfSplits.get()) : numSplits;
-    }
-
-    /**
-     * Returns cached estimate for split or if missing calculate size for 
whole table. Highly
-     * innacurate if query is specified.
-     *
-     * @param pipelineOptions
-     * @return
-     */
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
-      if (estimatedSize != null) {
-        return estimatedSize;
-      } else {
-        try (Cluster cluster =
-            getCluster(
-                spec.hosts(),
-                spec.port(),
-                spec.username(),
-                spec.password(),
-                spec.localDc(),
-                spec.consistencyLevel())) {
-          if (isMurmur3Partitioner(cluster)) {
-            try {
-              List<TokenRange> tokenRanges =
-                  getTokenRanges(cluster, spec.keyspace().get(), 
spec.table().get());
-              this.estimatedSize = 
getEstimatedSizeBytesFromTokenRanges(tokenRanges);
-              return this.estimatedSize;
-            } catch (Exception e) {
-              LOG.warn("Can't estimate the size", e);
-              return 0L;
-            }
-          } else {
-            LOG.warn("Only Murmur3 partitioner is supported, can't estimate 
the size");
-            return 0L;
-          }
-        }
-      }
-    }
-
-    @VisibleForTesting
-    static long getEstimatedSizeBytesFromTokenRanges(List<TokenRange> 
tokenRanges) {
-      long size = 0L;
-      for (TokenRange tokenRange : tokenRanges) {
-        size = size + tokenRange.meanPartitionSize * tokenRange.partitionCount;
-      }
-      return Math.round(size / getRingFraction(tokenRanges));
-    }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      if (spec.hosts() != null) {
-        builder.add(DisplayData.item("hosts", spec.hosts().toString()));
-      }
-      if (spec.port() != null) {
-        builder.add(DisplayData.item("port", spec.port()));
-      }
-      builder.addIfNotNull(DisplayData.item("keyspace", spec.keyspace()));
-      builder.addIfNotNull(DisplayData.item("table", spec.table()));
-      builder.addIfNotNull(DisplayData.item("username", spec.username()));
-      builder.addIfNotNull(DisplayData.item("localDc", spec.localDc()));
-      builder.addIfNotNull(DisplayData.item("consistencyLevel", 
spec.consistencyLevel()));
-    }
     // ------------- CASSANDRA SOURCE UTIL METHODS ---------------//
 
-    /**
-     * Gets the list of token ranges that a table occupies on a give Cassandra 
node.
-     *
-     * <p>NB: This method is compatible with Cassandra 2.1.5 and greater.
-     */
-    private static List<TokenRange> getTokenRanges(Cluster cluster, String 
keyspace, String table) {
-      try (Session session = cluster.newSession()) {
-        ResultSet resultSet =
-            session.execute(
-                "SELECT range_start, range_end, partitions_count, 
mean_partition_size FROM "
-                    + "system.size_estimates WHERE keyspace_name = ? AND 
table_name = ?",
-                keyspace,
-                table);
-
-        ArrayList<TokenRange> tokenRanges = new ArrayList<>();
-        for (Row row : resultSet) {
-          TokenRange tokenRange =
-              new TokenRange(
-                  row.getLong("partitions_count"),
-                  row.getLong("mean_partition_size"),
-                  new BigInteger(row.getString("range_start")),
-                  new BigInteger(row.getString("range_end")));
-          tokenRanges.add(tokenRange);
-        }
-        // The table may not contain the estimates yet
-        // or have partitions_count and mean_partition_size fields = 0
-        // if the data was just inserted and the amount of data in the table 
was small.
-        // This is very common situation during tests,
-        // when we insert a few rows and immediately query them.
-        // However, for tiny data sets the lack of size estimates is not a 
problem at all,
-        // because we don't want to split tiny data anyways.
-        // Therefore, we're not issuing a warning if the result set was empty
-        // or mean_partition_size and partitions_count = 0.
-        return tokenRanges;
-      }
-    }
-
-    /** Compute the percentage of token addressed compared with the whole 
tokens in the cluster. */
-    @VisibleForTesting
-    static double getRingFraction(List<TokenRange> tokenRanges) {
-      double ringFraction = 0;
-      for (TokenRange tokenRange : tokenRanges) {
-        ringFraction =
-            ringFraction
-                + (distance(tokenRange.rangeStart, 
tokenRange.rangeEnd).doubleValue()
-                    / 
SplitGenerator.getRangeSize(MURMUR3PARTITIONER).doubleValue());
-      }
-      return ringFraction;
-    }
-
-    /**
-     * Check if the current partitioner is the Murmur3 (default in Cassandra 
version newer than 2).
-     */
-    @VisibleForTesting
-    static boolean isMurmur3Partitioner(Cluster cluster) {
-      return MURMUR3PARTITIONER.equals(cluster.getMetadata().getPartitioner());
-    }
-
     /** Measure distance between two tokens. */
     @VisibleForTesting
     static BigInteger distance(BigInteger left, BigInteger right) {
       return (right.compareTo(left) > 0)
           ? right.subtract(left)
           : 
right.subtract(left).add(SplitGenerator.getRangeSize(MURMUR3PARTITIONER));
     }
-
-    /**
-     * Represent a token range in Cassandra instance, wrapping the partition 
count, size and token
-     * range.
-     */
-    @VisibleForTesting
-    static class TokenRange {

Review comment:
       :clap: remove repeated and useless :clap: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 440459)
    Time Spent: 12.5h  (was: 12h 20m)

> Add readAll() method to CassandraIO
> -----------------------------------
>
>                 Key: BEAM-9008
>                 URL: https://issues.apache.org/jira/browse/BEAM-9008
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-cassandra
>    Affects Versions: 2.16.0
>            Reporter: vincent marquez
>            Assignee: vincent marquez
>            Priority: P3
>          Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> When querying a large cassandra database, it's often *much* more useful to 
> programatically generate the queries needed to to be run rather than reading 
> all partitions and attempting some filtering.  
> As an example:
> {code:java}
> public class Event { 
>    @PartitionKey(0) public UUID accountId;
>    @PartitionKey(1)public String yearMonthDay; 
>    @ClusteringKey public UUID eventId;  
>    //other data...
> }{code}
> If there is ten years worth of data, you may want to only query one year's 
> worth.  Here each token range would represent one 'token' but all events for 
> the day. 
> {code:java}
> Set<UUID> accounts = getRelevantAccounts();
> Set<String> dateRange = generateDateRange("2018-01-01", "2019-01-01");
> PCollection<TokenRange> tokens = generateTokens(accounts, dateRange); 
> {code}
>  
>  I propose an additional _readAll()_ PTransform that can take a PCollection 
> of token ranges and can return a PCollection<T> of what the query would 
> return. 
> *Question: How much code should be in common between both methods?* 
> Currently the read connector already groups all partitions into a List of 
> Token Ranges, so it would be simple to refactor the current read() based 
> method to a 'ParDo' based one and have them both share the same function.  
> Reasons against sharing code between read and readAll
>  * Not having the read based method return a BoundedSource connector would 
> mean losing the ability to know the size of the data returned
>  * Currently the CassandraReader executes all the grouped TokenRange queries 
> *asynchronously* which is (maybe?) fine when all that's happening is 
> splitting up all the partition ranges but terrible for executing potentially 
> millions of queries. 
>  Reasons _for_ sharing code would be simplified code base and that both of 
> the above issues would most likely have a negligable performance impact. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to