[ https://issues.apache.org/jira/browse/BEAM-5197?focusedWorklogId=169828&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169828 ]
ASF GitHub Bot logged work on BEAM-5197: ---------------------------------------- Author: ASF GitHub Bot Created on: 27/Nov/18 15:34 Start Date: 27/Nov/18 15:34 Worklog Time Spent: 10m Work Description: tweise closed pull request #7140: [BEAM-5197] Simplify synchronization logic for UnboundedSourceWrapperTest URL: https://github.com/apache/beam/pull/7140 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 28589d52f628..ea96a8718324 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; @@ -58,8 +58,8 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.OutputTag; import org.joda.time.Instant; -import org.junit.Ignore; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.junit.runners.Parameterized; @@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory; /** Tests for {@link UnboundedSourceWrapper}. */ +@RunWith(Enclosed.class) public class UnboundedSourceWrapperTest { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapperTest.class); @@ -221,7 +222,6 @@ public void close() {} * <p>This test verifies that watermark are correctly forwarded. */ @Test(timeout = 30_000) - @Ignore("https://issues.apache.org/jira/browse/BEAM-5197") // deadlock on some platforms public void testWatermarkEmission() throws Exception { final int numElements = 500; final Object checkpointLock = new Object(); @@ -257,7 +257,7 @@ public void testWatermarkEmission() throws Exception { // use the AtomicBoolean just for the set()/get() functionality for communicating // with the outer Thread - final AtomicBoolean seenWatermark = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); Thread sourceThread = new Thread( @@ -270,16 +270,27 @@ public void testWatermarkEmission() throws Exception { new Output< StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { + private void advanceProcessingTime(long timestamp) { + try { + // Need to advance this so that the watermark timers fire + testHarness.setProcessingTime(timestamp); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public void emitWatermark(Watermark watermark) { if (watermark.getTimestamp() >= numElements / 2) { - seenWatermark.set(true); + latch.countDown(); } } @Override public <X> void collect( - OutputTag<X> outputTag, StreamRecord<X> streamRecord) {} + OutputTag<X> outputTag, StreamRecord<X> streamRecord) { + advanceProcessingTime(streamRecord.getTimestamp()); + } @Override public void emitLatencyMarker(LatencyMarker latencyMarker) {} @@ -287,7 +298,9 @@ public void emitLatencyMarker(LatencyMarker latencyMarker) {} @Override public void collect( StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> - windowedValueStreamRecord) {} + windowedValueStreamRecord) { + advanceProcessingTime(windowedValueStreamRecord.getTimestamp()); + } @Override public void close() {} @@ -295,26 +308,17 @@ public void close() {} } catch (Exception e) { LOG.info("Caught exception:", e); caughtExceptions.add(e); + latch.countDown(); } }); sourceThread.start(); - while (true) { - if (!caughtExceptions.isEmpty()) { - fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions)); - } - if (seenWatermark.get()) { - break; - } - Thread.sleep(50); + // wait until watermarks have been emitted + latch.countDown(); - // Need to advance this so that the watermark timers in the source wrapper fire - // Synchronize is necessary because this can interfere with updating the PriorityQueue - // of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper. - synchronized (checkpointLock) { - testHarness.setProcessingTime(Instant.now().getMillis()); - } + if (!caughtExceptions.isEmpty()) { + fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions)); } sourceOperator.cancel(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 169828) Time Spent: 4h (was: 3h 50m) > Flaky test: UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest > ------------------------------------------------------------------------------ > > Key: BEAM-5197 > URL: https://issues.apache.org/jira/browse/BEAM-5197 > Project: Beam > Issue Type: Improvement > Components: runner-flink > Reporter: Thomas Weise > Assignee: Maximilian Michels > Priority: Critical > Labels: flake > Fix For: 2.10.0 > > Time Spent: 4h > Remaining Estimate: 0h > > {code:java} > java.lang.NullPointerException > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$1.compare(TestProcessingTimeService.java:52) > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$1.compare(TestProcessingTimeService.java:49) > at java.util.PriorityQueue.siftUpUsingComparator(PriorityQueue.java:670) > at java.util.PriorityQueue.siftUp(PriorityQueue.java:646) > at java.util.PriorityQueue.offer(PriorityQueue.java:345) > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.registerTimer(TestProcessingTimeService.java:93) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(UnboundedSourceWrapper.java:452) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:225) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.beam.runners.flink.streaming.UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest.testValueEmission(UnboundedSourceWrapperTest.java:153) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)