kennknowles commented on code in PR #37411:
URL: https://github.com/apache/beam/pull/37411#discussion_r2817972711


##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -99,6 +103,8 @@ public abstract static class Read<V> extends 
PTransform<PBegin, PCollection<V>>
 
     abstract @Nullable Long getStartOffset();
 
+    abstract @Nullable Integer getNumReaders();

Review Comment:
   What is the intended behavior if this is `null`? The user should also be 
able to restore this default behavior with `setNumReaders(null)`, so the getter 
and setter should match. (I notice this is not the case for start offset, but 
that is a bug).
   
   However, I think you could potentially remove nullability from all of them 
by setting defaults in the factory method. This is often the best choice.



##########
sdks/java/io/sparkreceiver/3/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOTest.java:
##########
@@ -241,4 +241,32 @@ public void testReadFromReceiverIteratorData() {
     PAssert.that(actual).containsInAnyOrder(expected);
     pipeline.run().waitUntilFinish(Duration.standardSeconds(15));
   }
+
+  @Test
+  public void testReadFromCustomReceiverWithParallelism() {
+    CustomReceiverWithOffset.shouldFailInTheMiddle = false;
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new 
ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SparkReceiverIO.Read<String> reader =
+        SparkReceiverIO.<String>read()
+            .withGetOffsetFn(Long::valueOf)
+            .withTimestampFn(Instant::parse)
+            .withPullFrequencySec(PULL_FREQUENCY_SEC)
+            .withStartPollTimeoutSec(START_POLL_TIMEOUT_SEC)
+            .withStartOffset(START_OFFSET)
+            .withSparkReceiverBuilder(receiverBuilder)
+            .withNumReaders(3);
+
+    List<String> expected = new ArrayList<>();
+    // With sharding enabled in CustomReceiverWithOffset, the total records 
read
+    // across all workers
+    // should be exactly the set of 0..RECORDS_COUNT-1, each read exactly once.
+    for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) {
+      expected.add(String.valueOf(i));
+    }
+    PCollection<String> actual = 
pipeline.apply(reader).setCoder(StringUtf8Coder.of());

Review Comment:
   `setCoder` shouldn't be needed, as we should infer string coder 
automatically. If this isn't happening, let's look at why it is not happening.



##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -284,6 +290,9 @@ public ProcessContinuation processElement(
     }
     LOG.debug("Restriction {}", tracker.currentRestriction().toString());
     sparkConsumer = new 
SparkConsumerWithOffset<>(tracker.currentRestriction().getFrom());
+    if (sparkReceiver instanceof HasOffset) {
+      ((HasOffset) sparkReceiver).setShard(element, numReaders);

Review Comment:
   I don't know this code, really, but the javadoc of the class suggests that 
it _only_ supports receivers that implement `HasOffset`. So this check should 
not be necessary, or it should be moved earlier in the DoFn instantiation, I 
think.



##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -191,10 +209,20 @@ public PCollection<V> expand(PBegin input) {
                 sparkReceiverBuilder.getSparkReceiverClass().getName()));
       } else {
         LOG.info("{} started reading", 
ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
-        return input
-            .apply(Impulse.create())
-            .apply(ParDo.of(new 
ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
-        // TODO: Split data from SparkReceiver into multiple workers
+        Integer numReadersObj = sparkReceiverRead.getNumReaders();
+        if (numReadersObj == null || numReadersObj == 1) {
+          return input
+              .apply(Create.of(0))
+              .apply(ParDo.of(new 
ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
+        } else {
+          int numReaders = numReadersObj;
+          List<Integer> shards =
+              IntStream.range(0, 
numReaders).boxed().collect(Collectors.toList());
+          return input
+              .apply(Create.of(shards))
+              .apply(Reshuffle.viaRandomKey())

Review Comment:
   Typically, use `Redistribute`, which is a new synonym for Reshuffle but more 
clearly indicates that it is _only_ for redistributing data and not any of the 
other things that Reshuffle implies. (some uses of Reshuffle are for 
checkpointing, etc).
   
   But I think it is not necessary to do anything here - the runner already 
knows that a splittable DoFn has large output per element, and saves its 
checkpoint state, so redistribution should happen automatically. (not sure if 
SparkRunner does this right or not)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to