johnyangk closed pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 06dde5861..a4315082a 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -240,15 +241,6 @@ private void triggerTimers(final K key, // The DoFnRunner interface requires WindowedValue, // but this windowed value is actually not used in the ReduceFnRunner internal. getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem)); - - // output watermark - // we set output watermark to the minimum of the timer data - long keyOutputTimestamp = Long.MAX_VALUE; - for (final TimerInternals.TimerData timer : timerDataList) { - keyOutputTimestamp = Math.min(keyOutputTimestamp, timer.getTimestamp().getMillis()); - } - - timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp)); } } @@ -349,14 +341,21 @@ public TimerInternals timerInternalsForKey(final K key) { @Override public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) { - // adds the output timestamp to the watermark hold of each key - // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 - // TODO #270: consider early firing - // TODO #270: This logic may not be applied to early firing outputs - keyAndWatermarkHoldMap.put(output.getValue().getKey(), - new Watermark(output.getTimestamp().getMillis() + 1)); + + // The watermark advances only in ON_TIME + if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) { + final K key = output.getValue().getKey(); + final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals) + inMemoryTimerInternalsFactory.timerInternalsForKey(key); + keyAndWatermarkHoldMap.put(key, + // adds the output timestamp to the watermark hold of each key + // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 + new Watermark(output.getTimestamp().getMillis() + 1)); + timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1)); + } outputCollector.emit(output); } + @Override public void emitWatermark(final Watermark watermark) { outputCollector.emitWatermark(watermark); diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java index 474c79c0d..f0749c0c0 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java @@ -33,16 +33,20 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; -import static java.util.Collections.emptyList; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.EARLY; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.LATE; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME; +import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; -// TODO #270: Test different triggers public final class GroupByKeyAndWindowDoFnTransformTest { - + private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransformTest.class.getName()); private final static Coder NULL_INPUT_CODER = null; private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null; @@ -248,4 +252,123 @@ public void test() { doFnTransform.close(); } + + /** + * Test complex triggers that emit early and late firing. + */ + @Test + public void eventTimeTriggerTest() { + final Duration lateness = Duration.standardSeconds(1); + final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow() + // early firing + .withEarlyFirings( + AfterProcessingTime + .pastFirstElementInPane() + // early firing 1 sec after receiving an element + .plusDelayOf(Duration.millis(1000))) + // late firing: Fire on any late data. + .withLateFirings(AfterPane.elementCountAtLeast(1)); + + final FixedWindows window = (FixedWindows) Window.into( + FixedWindows.of(Duration.standardSeconds(5))) + // lateness + .withAllowedLateness(lateness) + .triggering(trigger) + // TODO #308: Test discarding of refinements + .accumulatingFiredPanes().getWindowFn(); + + final TupleTag<String> outputTag = new TupleTag<>("main-output"); + final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform = + new GroupByKeyAndWindowDoFnTransform( + NULL_OUTPUT_CODERS, + outputTag, + WindowingStrategy.of(window).withTrigger(trigger) + .withMode(ACCUMULATING_FIRED_PANES) + .withAllowedLateness(lateness), + PipelineOptionsFactory.as(NemoPipelineOptions.class), + SystemReduceFn.buffering(NULL_INPUT_CODER), + DisplayData.none()); + + + final Transform.Context context = mock(Transform.Context.class); + final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector(); + doFnTransform.prepare(context, oc); + + doFnTransform.onData(WindowedValue.of( + KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING)); + + // early firing is not related to the watermark progress + doFnTransform.onWatermark(new Watermark(2)); + assertEquals(1, oc.outputs.size()); + assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming()); + LOG.info("Output: {}", oc.outputs.get(0)); + oc.outputs.clear(); + + doFnTransform.onData(WindowedValue.of( + KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING)); + // EARLY firing... waiting >= 1 sec + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // GBKTransform emits data when receiving watermark + // TODO #250: element-wise processing + doFnTransform.onWatermark(new Watermark(5)); + assertEquals(1, oc.outputs.size()); + assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming()); + // ACCUMULATION MODE + checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue()); + LOG.info("Output: {}", oc.outputs.get(0)); + oc.outputs.clear(); + + // ON TIME + doFnTransform.onData(WindowedValue.of( + KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING)); + doFnTransform.onWatermark(new Watermark(5001)); + assertEquals(1, oc.outputs.size()); + assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming()); + LOG.info("Output: {}", oc.outputs.get(0)); + // ACCUMULATION MODE + checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue()); + oc.outputs.clear(); + + // LATE DATA + // actual window: [0-5000) + // allowed lateness: 1000 (ms) + // current watermark: 5001 + // data: 4500 + // the data timestamp + allowed lateness > current watermark, + // so it should be accumulated to the prev window + doFnTransform.onData(WindowedValue.of( + KV.of("1", "bye!"), new Instant(4500), + window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING)); + doFnTransform.onWatermark(new Watermark(6000)); + assertEquals(1, oc.outputs.size()); + assertEquals(LATE, oc.outputs.get(0).getPane().getTiming()); + LOG.info("Output: {}", oc.outputs.get(0)); + // The data should be accumulated to the previous window because it allows 1 second lateness + checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "bye!")), oc.outputs.get(0).getValue()); + oc.outputs.clear(); + + // LATE DATA + // data timestamp: 4800 + // current watermark: 6000 + // data timestamp + allowed lateness < current watermark + // It should not be accumulated to the prev window + doFnTransform.onData(WindowedValue.of( + KV.of("1", "hello again!"), new Instant(4800), + window.assignWindow(new Instant(4800)), PaneInfo.NO_FIRING)); + doFnTransform.onWatermark(new Watermark(6300)); + assertEquals(1, oc.outputs.size()); + assertEquals(LATE, oc.outputs.get(0).getPane().getTiming()); + LOG.info("Output: {}", oc.outputs.get(0)); + checkOutput(KV.of("1", Arrays.asList("hello again!")), oc.outputs.get(0).getValue()); + oc.outputs.clear(); + + + doFnTransform.close(); + + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services