scwhittle commented on code in PR #36112:
URL: https://github.com/apache/beam/pull/36112#discussion_r2354742130


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -157,8 +168,16 @@ public boolean getAllowDuplicates() {
 
     @Override
     public PCollection<T> expand(PCollection<T> input) {
-      return input
-          .apply("Pair with random key", ParDo.of(new 
AssignShardFn<>(numBuckets)))
+      PCollection<KV<Integer, T>> sharded;
+      if (deterministicSharding) {
+        sharded =
+            input.apply(
+                "Pair with deterministic key",
+                ParDo.of(new AssignDeterministicShardFn<T>(numBuckets)));

Review Comment:
   One idea would be to have pipeline level test verify the 
AssignDeterministicShardFn is used and then a unit test of 
AssignDeterministicShardFn that just creates fake ProcessContext to input and 
verify the output.
   
   But see other comment before doing that.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -132,33 +133,55 @@ public void processElement(
   public static class RedistributeArbitrarily<T>
       extends PTransform<PCollection<T>, PCollection<T>> {
     // The number of buckets to shard into.
-    // A runner is free to ignore this (a runner may ignore the transorm
+    // A runner is free to ignore this (a runner may ignore the transform
     // entirely!) This is a performance optimization to prevent having
     // unit sized bundles on the output. If unset, uses a random integer key.
     private @Nullable Integer numBuckets = null;
     private boolean allowDuplicates = false;
+    private boolean deterministicSharding = false;
 
-    private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean 
allowDuplicates) {
+    private RedistributeArbitrarily(
+        @Nullable Integer numBuckets, boolean allowDuplicates, boolean 
deterministicSharding) {
       this.numBuckets = numBuckets;
       this.allowDuplicates = allowDuplicates;
+      this.deterministicSharding = deterministicSharding;
     }
 
     public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer 
numBuckets) {
-      return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+      return new RedistributeArbitrarily<>(
+          numBuckets, this.allowDuplicates, this.deterministicSharding);
     }
 
     public RedistributeArbitrarily<T> withAllowDuplicates(boolean 
allowDuplicates) {
-      return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+      return new RedistributeArbitrarily<>(
+          this.numBuckets, allowDuplicates, this.deterministicSharding);
+    }
+
+    public RedistributeArbitrarily<T> withDeterministicSharding(boolean 
deterministicSharding) {

Review Comment:
   this seems dangerous as is because it is unclear that it is going to use the 
offset for the sharing, and general uses will not have that set and just end up 
going to a single key.
   
   Another idea would be to fingerprint the timestamp/window instead or as well 
(doing the T itself seems difficult without encoding it) but see my comment in 
KafkaIO about possibly just doing it there instead of changing 
Redistribute.arbitrariy



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1845,18 +1846,16 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin 
input) {
                 "Offsets committed due to usage of commitOffsetsInFinalize() 
and may not capture all work processed due to use of withRedistribute() with 
duplicates enabled");
           }
 
-          if (kafkaRead.getRedistributeNumKeys() == 0) {
-            return output.apply(
-                "Insert Redistribute",
-                Redistribute.<KafkaRecord<K, V>>arbitrarily()
-                    .withAllowDuplicates(kafkaRead.isAllowDuplicates()));
-          } else {
-            return output.apply(
-                "Insert Redistribute with Shards",
-                Redistribute.<KafkaRecord<K, V>>arbitrarily()
-                    .withAllowDuplicates(kafkaRead.isAllowDuplicates())
-                    .withNumBuckets((int) kafkaRead.getRedistributeNumKeys()));
+          RedistributeArbitrarily<KafkaRecord<K, V>> redistribute =
+              Redistribute.<KafkaRecord<K, V>>arbitrarily()
+                  .withAllowDuplicates(kafkaRead.isAllowDuplicates());
+          if (kafkaRead.getOffsetDeduplication() != null && 
kafkaRead.getOffsetDeduplication()) {
+            redistribute = redistribute.withDeterministicSharding(true);

Review Comment:
   thinking about this some more another option would be to not make this part 
of  Redistribute.arbitrarily, but to instead to assign deterministic keys as 
part of KafkaIO read.  We could for example redistribute records by key based 
upon the K within KafkaRecord<K, V> or have some option to use the shards based 
upon the offset like we are doing within redistribute.arbitrarily
   
   I believe @stankiewicz indicated that this would be more useful anyway for 
customers that we were hoping would benefit from this feature.  And it keeps 
the complexity of using a deterministic output within KafkaIO instead of 
Redistribute.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1845,18 +1846,16 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin 
input) {
                 "Offsets committed due to usage of commitOffsetsInFinalize() 
and may not capture all work processed due to use of withRedistribute() with 
duplicates enabled");
           }
 
-          if (kafkaRead.getRedistributeNumKeys() == 0) {
-            return output.apply(
-                "Insert Redistribute",
-                Redistribute.<KafkaRecord<K, V>>arbitrarily()
-                    .withAllowDuplicates(kafkaRead.isAllowDuplicates()));
-          } else {
-            return output.apply(
-                "Insert Redistribute with Shards",

Review Comment:
   this name is still changing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to