xinyuiscool commented on code in PR #23434:
URL: https://github.com/apache/beam/pull/23434#discussion_r1003789363
##########
runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java:
##########
@@ -168,4 +174,57 @@ public void testPipelineWithAggregation() {
pipeline.run();
}
+
+ @Test
+ public void testKeyedOutputFutures() {
+ // We test the scenario that two elements of the same key needs to be
processed in order.
+ final DoFnRunner<KV<String, Integer>, Void> doFnRunner =
mock(DoFnRunner.class);
+ final AtomicInteger prev = new AtomicInteger(0);
+ doAnswer(
+ invocation -> {
+ WindowedValue<KV<String, Integer>> wv =
invocation.getArgument(0);
+ Integer val = wv.getValue().getValue();
+
+ // Verify the previous element has been fully processed by
checking the prev value
+ assertEquals(val - 1, prev.get());
+
+ prev.set(val);
+ return null;
+ })
+ .when(doFnRunner)
+ .processElement(any());
+
+ SamzaPipelineOptions options =
PipelineOptionsFactory.as(SamzaPipelineOptions.class);
+ options.setNumThreadsForProcessElement(4);
+
+ final OpEmitter<Void> opEmitter = new OpAdapter.OpEmitterImpl<>();
+ final FutureCollector<Void> futureCollector = new
DoFnOp.FutureCollectorImpl<>();
+ futureCollector.prepare();
+
+ final AsyncDoFnRunner<KV<String, Integer>, Void> asyncDoFnRunner =
+ AsyncDoFnRunner.create(doFnRunner, opEmitter, futureCollector, true,
options);
+
+ final String appleKey = "apple";
+
+ final WindowedValue<KV<String, Integer>> input1 =
+ WindowedValue.valueInGlobalWindow(KV.of(appleKey, 1));
+
+ final WindowedValue<KV<String, Integer>> input2 =
+ WindowedValue.valueInGlobalWindow(KV.of(appleKey, 2));
+
+ asyncDoFnRunner.processElement(input1);
+ asyncDoFnRunner.processElement(input2);
Review Comment:
Good point! I added a countdown latch to let processElement(input2) to run
before the actual processing of input1. Please take a look again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]