scwhittle commented on code in PR #36112: URL: https://github.com/apache/beam/pull/36112#discussion_r2354742130
########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java: ########## @@ -157,8 +168,16 @@ public boolean getAllowDuplicates() { @Override public PCollection<T> expand(PCollection<T> input) { - return input - .apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets))) + PCollection<KV<Integer, T>> sharded; + if (deterministicSharding) { + sharded = + input.apply( + "Pair with deterministic key", + ParDo.of(new AssignDeterministicShardFn<T>(numBuckets))); Review Comment: One idea would be to have pipeline level test verify the AssignDeterministicShardFn is used and then a unit test of AssignDeterministicShardFn that just creates fake ProcessContext to input and verify the output. But see other comment before doing that. ########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java: ########## @@ -132,33 +133,55 @@ public void processElement( public static class RedistributeArbitrarily<T> extends PTransform<PCollection<T>, PCollection<T>> { // The number of buckets to shard into. - // A runner is free to ignore this (a runner may ignore the transorm + // A runner is free to ignore this (a runner may ignore the transform // entirely!) This is a performance optimization to prevent having // unit sized bundles on the output. If unset, uses a random integer key. private @Nullable Integer numBuckets = null; private boolean allowDuplicates = false; + private boolean deterministicSharding = false; - private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean allowDuplicates) { + private RedistributeArbitrarily( + @Nullable Integer numBuckets, boolean allowDuplicates, boolean deterministicSharding) { this.numBuckets = numBuckets; this.allowDuplicates = allowDuplicates; + this.deterministicSharding = deterministicSharding; } public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer numBuckets) { - return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates); + return new RedistributeArbitrarily<>( + numBuckets, this.allowDuplicates, this.deterministicSharding); } public RedistributeArbitrarily<T> withAllowDuplicates(boolean allowDuplicates) { - return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates); + return new RedistributeArbitrarily<>( + this.numBuckets, allowDuplicates, this.deterministicSharding); + } + + public RedistributeArbitrarily<T> withDeterministicSharding(boolean deterministicSharding) { Review Comment: this seems dangerous as is because it is unclear that it is going to use the offset for the sharing, and general uses will not have that set and just end up going to a single key. Another idea would be to fingerprint the timestamp/window instead or as well (doing the T itself seems difficult without encoding it) but see my comment in KafkaIO about possibly just doing it there instead of changing Redistribute.arbitrariy ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java: ########## @@ -1845,18 +1846,16 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) { "Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled"); } - if (kafkaRead.getRedistributeNumKeys() == 0) { - return output.apply( - "Insert Redistribute", - Redistribute.<KafkaRecord<K, V>>arbitrarily() - .withAllowDuplicates(kafkaRead.isAllowDuplicates())); - } else { - return output.apply( - "Insert Redistribute with Shards", - Redistribute.<KafkaRecord<K, V>>arbitrarily() - .withAllowDuplicates(kafkaRead.isAllowDuplicates()) - .withNumBuckets((int) kafkaRead.getRedistributeNumKeys())); + RedistributeArbitrarily<KafkaRecord<K, V>> redistribute = + Redistribute.<KafkaRecord<K, V>>arbitrarily() + .withAllowDuplicates(kafkaRead.isAllowDuplicates()); + if (kafkaRead.getOffsetDeduplication() != null && kafkaRead.getOffsetDeduplication()) { + redistribute = redistribute.withDeterministicSharding(true); Review Comment: thinking about this some more another option would be to not make this part of Redistribute.arbitrarily, but to instead to assign deterministic keys as part of KafkaIO read. We could for example redistribute records by key based upon the K within KafkaRecord<K, V> or have some option to use the shards based upon the offset like we are doing within redistribute.arbitrarily I believe @stankiewicz indicated that this would be more useful anyway for customers that we were hoping would benefit from this feature. And it keeps the complexity of using a deterministic output within KafkaIO instead of Redistribute. ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java: ########## @@ -1845,18 +1846,16 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) { "Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled"); } - if (kafkaRead.getRedistributeNumKeys() == 0) { - return output.apply( - "Insert Redistribute", - Redistribute.<KafkaRecord<K, V>>arbitrarily() - .withAllowDuplicates(kafkaRead.isAllowDuplicates())); - } else { - return output.apply( - "Insert Redistribute with Shards", Review Comment: this name is still changing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org