kennknowles commented on code in PR #24205:
URL: https://github.com/apache/beam/pull/24205#discussion_r1035087107


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -545,6 +547,68 @@ private PipelineResult runWithStopReadingFn(
     return readResult;
   }
 
+  @Test
+  public void testWatermarkUpdateWithSparseMessages() throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", 
options.getKafkaBootstrapServerAddresses()));
+
+    String topicName = "SparseDataTopicPartition-" + UUID.randomUUID();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < 5; i++) {
+      records.put(i, String.valueOf(i));
+    }
+
+    try {
+      client.createTopics(ImmutableSet.of(new NewTopic(topicName, 1, (short) 
1)));
+
+      writePipeline
+          .apply("Generate Write Elements", Create.of(records))
+          .apply(
+              "Write to Kafka",
+              KafkaIO.<Integer, String>write()
+                  
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                  .withTopic(topicName)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(StringSerializer.class));
+
+      writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+      client.createPartitions(ImmutableMap.of(topicName, 
NewPartitions.increaseTo(3)));
+
+      PCollection<String> values =
+          sdfReadPipeline
+              .apply(
+                  "Read from Kafka",
+                  KafkaIO.<Integer, String>read()
+                      
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                      
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
+                      .withTopic(topicName)
+                      .withKeyDeserializer(IntegerDeserializer.class)
+                      .withValueDeserializer(StringDeserializer.class)
+                      .withoutMetadata())
+              .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
+              .apply("GroupKey", GroupByKey.create())
+              .apply("GetValues", Values.create())
+              .apply(
+                  "Flatten",
+                  
FlatMapElements.into(TypeDescriptor.of(String.class)).via(iterable -> 
iterable));
+
+      PAssert.that(values).containsInAnyOrder("0", "1", "2", "3", "4");
+
+      PipelineResult readResult = sdfReadPipeline.run();
+
+      PipelineResult.State readState =
+          
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout() / 
2));

Review Comment:
   Does this test fail before your change? I would expect all the output to be 
produced based on `waitToFinish`. Is the set up that you only wait half as long 
as the read, so you are depending on the timestamp policy to advance the 
watermark? In this case, it would be good to have some assertion or way of 
making sure you don't get spurious success.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to