[ 
https://issues.apache.org/jira/browse/BEAM-5197?focusedWorklogId=169828&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169828
 ]

ASF GitHub Bot logged work on BEAM-5197:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Nov/18 15:34
            Start Date: 27/Nov/18 15:34
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #7140: [BEAM-5197] Simplify 
synchronization logic for UnboundedSourceWrapperTest
URL: https://github.com/apache/beam/pull/7140
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 28589d52f628..ea96a8718324 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -31,7 +31,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -58,8 +58,8 @@
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
 import org.joda.time.Instant;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.junit.runners.Parameterized;
@@ -68,6 +68,7 @@
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link UnboundedSourceWrapper}. */
+@RunWith(Enclosed.class)
 public class UnboundedSourceWrapperTest {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceWrapperTest.class);
@@ -221,7 +222,6 @@ public void close() {}
      * <p>This test verifies that watermark are correctly forwarded.
      */
     @Test(timeout = 30_000)
-    @Ignore("https://issues.apache.org/jira/browse/BEAM-5197";) // deadlock on 
some platforms
     public void testWatermarkEmission() throws Exception {
       final int numElements = 500;
       final Object checkpointLock = new Object();
@@ -257,7 +257,7 @@ public void testWatermarkEmission() throws Exception {
 
       // use the AtomicBoolean just for the set()/get() functionality for 
communicating
       // with the outer Thread
-      final AtomicBoolean seenWatermark = new AtomicBoolean(false);
+      final CountDownLatch latch = new CountDownLatch(1);
 
       Thread sourceThread =
           new Thread(
@@ -270,16 +270,27 @@ public void testWatermarkEmission() throws Exception {
                       new Output<
                           
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
 
+                        private void advanceProcessingTime(long timestamp) {
+                          try {
+                            // Need to advance this so that the watermark 
timers fire
+                            testHarness.setProcessingTime(timestamp);
+                          } catch (Exception e) {
+                            throw new RuntimeException(e);
+                          }
+                        }
+
                         @Override
                         public void emitWatermark(Watermark watermark) {
                           if (watermark.getTimestamp() >= numElements / 2) {
-                            seenWatermark.set(true);
+                            latch.countDown();
                           }
                         }
 
                         @Override
                         public <X> void collect(
-                            OutputTag<X> outputTag, StreamRecord<X> 
streamRecord) {}
+                            OutputTag<X> outputTag, StreamRecord<X> 
streamRecord) {
+                          advanceProcessingTime(streamRecord.getTimestamp());
+                        }
 
                         @Override
                         public void emitLatencyMarker(LatencyMarker 
latencyMarker) {}
@@ -287,7 +298,9 @@ public void emitLatencyMarker(LatencyMarker latencyMarker) 
{}
                         @Override
                         public void collect(
                             
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
-                                windowedValueStreamRecord) {}
+                                windowedValueStreamRecord) {
+                          
advanceProcessingTime(windowedValueStreamRecord.getTimestamp());
+                        }
 
                         @Override
                         public void close() {}
@@ -295,26 +308,17 @@ public void close() {}
                 } catch (Exception e) {
                   LOG.info("Caught exception:", e);
                   caughtExceptions.add(e);
+                  latch.countDown();
                 }
               });
 
       sourceThread.start();
 
-      while (true) {
-        if (!caughtExceptions.isEmpty()) {
-          fail("Caught exception(s): " + 
Joiner.on(",").join(caughtExceptions));
-        }
-        if (seenWatermark.get()) {
-          break;
-        }
-        Thread.sleep(50);
+      // wait until watermarks have been emitted
+      latch.countDown();
 
-        // Need to advance this so that the watermark timers in the source 
wrapper fire
-        // Synchronize is necessary because this can interfere with updating 
the PriorityQueue
-        // of the ProcessingTimeService which is also accessed through 
UnboundedSourceWrapper.
-        synchronized (checkpointLock) {
-          testHarness.setProcessingTime(Instant.now().getMillis());
-        }
+      if (!caughtExceptions.isEmpty()) {
+        fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
       }
 
       sourceOperator.cancel();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 169828)
    Time Spent: 4h  (was: 3h 50m)

> Flaky test: UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-5197
>                 URL: https://issues.apache.org/jira/browse/BEAM-5197
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Thomas Weise
>            Assignee: Maximilian Michels
>            Priority: Critical
>              Labels: flake
>             Fix For: 2.10.0
>
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> {code:java}
> java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$1.compare(TestProcessingTimeService.java:52)
>       at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$1.compare(TestProcessingTimeService.java:49)
>       at java.util.PriorityQueue.siftUpUsingComparator(PriorityQueue.java:670)
>       at java.util.PriorityQueue.siftUp(PriorityQueue.java:646)
>       at java.util.PriorityQueue.offer(PriorityQueue.java:345)
>       at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.registerTimer(TestProcessingTimeService.java:93)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(UnboundedSourceWrapper.java:452)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:225)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.beam.runners.flink.streaming.UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest.testValueEmission(UnboundedSourceWrapperTest.java:153)
>       {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to