kennknowles commented on code in PR #31347:
URL: https://github.com/apache/beam/pull/31347#discussion_r1612123815


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2295,6 +2391,15 @@ public ReadSourceDescriptors<K, V> withProcessingTime() {
           ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
     }
 
+    /** Enable Redistribute. */
+    public ReadSourceDescriptors<K, V> withRedistributeEnabled() {
+      return toBuilder().setRedistributeEnabled(true).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withNumShards(int numShards) {
+      return toBuilder().setNumShards(numShards).build();

Review Comment:
   `setNumRedistributeShards` because it has to do with the redistribute not 
the top-level KafkaIO transform



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2136,6 +2230,8 @@ public static <K, V> ReadSourceDescriptors<K, V> read() {
           .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
           .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
           .setConsumerPollingTimeout(2L)
+          .setRedistributeEnabled(false)

Review Comment:
   nit: just say `withRedistribute`



##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
##########
@@ -616,6 +624,58 @@ public void testRiskyConfigurationWarnsProperly() {
     p.run();
   }
 
+  @Test
+  public void testRiskyConfigurationWarnsProperlyWithNumShardsNotSet() {
+    int numElements = 1000;
+
+    PCollection<Long> input =
+        p.apply(
+                mkKafkaReadTransform(numElements, numElements, new 
ValueAsTimestampFn(), true, 0)
+                    .withConsumerConfigUpdates(
+                        
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
+                    .withoutMetadata())
+            .apply(Values.create());
+
+    addCountingAsserts(input, numElements);
+
+    kafkaIOExpectedLogs.verifyWarn(
+        "This will redistribute the load across the same number of shards as 
the Kafka source.");

Review Comment:
   Won't `Redistribute.arbitrarily` currently create a random key for each 
record and just create way too many keys and that is the problem? In other 
words, numShards is used to _decrease_ the number of keys, not to increase. And 
that is an implementation detail that actually could/should change in the 
future if we do something clever and actually make it a black box that uses 
e.g. least loaded algorithms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to