tomstepp commented on code in PR #36112:
URL: https://github.com/apache/beam/pull/36112#discussion_r2364159993


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -132,33 +133,55 @@ public void processElement(
   public static class RedistributeArbitrarily<T>
       extends PTransform<PCollection<T>, PCollection<T>> {
     // The number of buckets to shard into.
-    // A runner is free to ignore this (a runner may ignore the transorm
+    // A runner is free to ignore this (a runner may ignore the transform
     // entirely!) This is a performance optimization to prevent having
     // unit sized bundles on the output. If unset, uses a random integer key.
     private @Nullable Integer numBuckets = null;
     private boolean allowDuplicates = false;
+    private boolean deterministicSharding = false;
 
-    private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean 
allowDuplicates) {
+    private RedistributeArbitrarily(
+        @Nullable Integer numBuckets, boolean allowDuplicates, boolean 
deterministicSharding) {
       this.numBuckets = numBuckets;
       this.allowDuplicates = allowDuplicates;
+      this.deterministicSharding = deterministicSharding;
     }
 
     public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer 
numBuckets) {
-      return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+      return new RedistributeArbitrarily<>(
+          numBuckets, this.allowDuplicates, this.deterministicSharding);
     }
 
     public RedistributeArbitrarily<T> withAllowDuplicates(boolean 
allowDuplicates) {
-      return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+      return new RedistributeArbitrarily<>(
+          this.numBuckets, allowDuplicates, this.deterministicSharding);
+    }
+
+    public RedistributeArbitrarily<T> withDeterministicSharding(boolean 
deterministicSharding) {

Review Comment:
   Reworked to Kafka-specific redistribute. Is it ok to continue with using 
KafkaRecord offset or key? I think that is enough so the timestamp and window 
are not needed, but please lmk if I'm missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to