kennknowles commented on code in PR #31347: URL: https://github.com/apache/beam/pull/31347#discussion_r1612123815
########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java: ########## @@ -2295,6 +2391,15 @@ public ReadSourceDescriptors<K, V> withProcessingTime() { ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime()); } + /** Enable Redistribute. */ + public ReadSourceDescriptors<K, V> withRedistributeEnabled() { + return toBuilder().setRedistributeEnabled(true).build(); + } + + public ReadSourceDescriptors<K, V> withNumShards(int numShards) { + return toBuilder().setNumShards(numShards).build(); Review Comment: `setNumRedistributeShards` because it has to do with the redistribute not the top-level KafkaIO transform ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java: ########## @@ -2136,6 +2230,8 @@ public static <K, V> ReadSourceDescriptors<K, V> read() { .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) .setConsumerPollingTimeout(2L) + .setRedistributeEnabled(false) Review Comment: nit: just say `withRedistribute` ########## sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java: ########## @@ -616,6 +624,58 @@ public void testRiskyConfigurationWarnsProperly() { p.run(); } + @Test + public void testRiskyConfigurationWarnsProperlyWithNumShardsNotSet() { + int numElements = 1000; + + PCollection<Long> input = + p.apply( + mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0) + .withConsumerConfigUpdates( + ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)) + .withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + + kafkaIOExpectedLogs.verifyWarn( + "This will redistribute the load across the same number of shards as the Kafka source."); Review Comment: Won't `Redistribute.arbitrarily` currently create a random key for each record and just create way too many keys and that is the problem? In other words, numShards is used to _decrease_ the number of keys, not to increase. And that is an implementation detail that actually could/should change in the future if we do something clever and actually make it a black box that uses e.g. least loaded algorithms. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org