[ 
https://issues.apache.org/jira/browse/BEAM-4086?focusedWorklogId=94300&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94300
 ]

ASF GitHub Bot logged work on BEAM-4086:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/18 21:04
            Start Date: 23/Apr/18 21:04
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5185: [BEAM-4086]: KafkaIO 
tests:  Avoid busy loop in MockConsumer.poll(), reduce flakes.
URL: https://github.com/apache/beam/pull/5185
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 5819a671275..7f9959a6579 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -30,6 +30,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
@@ -248,14 +249,23 @@ public void assign(final Collection<TopicPartition> 
assigned) {
       @Override
       public void run() {
         // add all the records with offset >= current partition position.
+        int recordsAdded = 0;
         for (TopicPartition tp : assignedPartitions.get()) {
           long curPos = consumer.position(tp);
           for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
             if (r.offset() >= curPos) {
               consumer.addRecord(r);
+              recordsAdded++;
             }
           }
         }
+        if (recordsAdded == 0) {
+          // MockConsumer.poll(timeout) does not actually wait even when there 
aren't any records.
+          // Add a small wait here in order to avoid busy looping in the 
reader.
+          Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+          //TODO: BEAM-4086: testUnboundedSourceWithoutBoundedWrapper() 
occasionally hangs
+          //     without this wait. Need to look into it.
+        }
         consumer.schedulePollTask(this);
       }
     };
@@ -605,10 +615,12 @@ public Instant getWatermark(PartitionContext ctx) {
 
   @Test
   public void testUnboundedSourceWithoutBoundedWrapper() {
+    // This is same as testUnboundedSource() without the BoundedSource wrapper.
     // Most of the tests in this file set 'maxNumRecords' on the source, which 
wraps
     // the unbounded source in a bounded source. As a result, the test 
pipeline run as
     // bounded/batch pipelines under direct-runner.
-    // This is same as testUnboundedSource() without the BoundedSource wrapper.
+    // This tests runs without such a wrapper over unbounded wrapper, and 
depends on watermark
+    // progressing to infinity to end the test (see 
TimestampPolicyWithEndOfSource above).
 
     final int numElements = 1000;
     final int numPartitions = 10;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94300)
    Time Spent: 40m  (was: 0.5h)

> KafkaIOTest is flaky
> --------------------
>
>                 Key: BEAM-4086
>                 URL: https://issues.apache.org/jira/browse/BEAM-4086
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.5.0
>            Reporter: Ismaël Mejía
>            Assignee: Raghu Angadi
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Noticed this while trying to do a simple change in KafkaIO this morning and 
> corroborated with other contributors. If you run `./gradlew -p 
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the 
> times. However it passes ok with Maven.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to