[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16632668#comment-16632668
 ] 

ASF GitHub Bot commented on KAFKA-3514:
---------------------------------------

guozhangwang closed pull request #5669: KAFKA-3514: Modify pause logic if we 
being enforced processing
URL: https://github.com/apache/kafka/pull/5669
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 35e1f77fd4c..a4e08bad479 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -90,6 +90,7 @@ <h1>Upgrade Guide and API Changes</h1>
         We have also removed some public APIs that are deprecated prior to 
1.0.x in 2.0.0.
         See below for a detailed list of removed APIs.
     </p>
+
     <h3><a id="streams_api_changes_210" 
href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
     <p>
         We updated <code>TopologyDescription</code> API to allow for better 
runtime checking.
@@ -99,6 +100,14 @@ <h3><a id="streams_api_changes_210" 
href="#streams_api_changes_210">Streams API
         <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes";>KIP-321</a>.
     </p>
 
+    <p>
+        We've added a new config named <code>max.task.idle.ms</code> to allow 
users specify how to handle out-of-order data within a task that may be 
processing multiple
+        topic-partitions (see <a 
href="/{{version}}/documentation/streams/core-concepts.html#streams_out_of_ordering">Out-of-Order
 Handling</a> section for more details).
+        The default value is set to <code>0</code>, to favor minimized latency 
over synchronization between multiple input streams from topic-partitions.
+        If users would like to wait for longer time when some of the 
topic-partitions do not have data available to process and hence cannot 
determine its corresponding stream time,
+        they can override this config to a larger value.
+    </p>
+
     <h3><a id="streams_api_changes_200" 
href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
     <p>
         We have removed the <code>skippedDueToDeserializationError-rate</code> 
and <code>skippedDueToDeserializationError-total</code> metrics.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2f97b7f27ba..b77a18ef85e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -233,6 +233,7 @@ public StreamTask(final TaskId id,
             partitionQueues.put(partition, queue);
         }
 
+        idleStartTime = RecordQueue.UNKNOWN;
         recordInfo = new PartitionGroup.RecordInfo();
         partitionGroup = new PartitionGroup(partitionQueues);
         processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
@@ -355,10 +356,19 @@ public boolean process() {
             consumedOffsets.put(partition, record.offset());
             commitNeeded = true;
 
-            // after processing this record, if its partition queue's buffered 
size has been
-            // decreased to the threshold, we can then resume the consumption 
on this partition
-            if (recordInfo.queue().size() == maxBufferedSize) {
-                consumer.resume(singleton(partition));
+            // if we are not in the enforced processing state, then after 
processing
+            // this record, if its partition queue's buffered size has been 
decreased below
+            // the threshold, we can then resume the consumption on this 
partition;
+            // otherwise, we only resume the consumption on this partition 
after it
+            // has been drained.
+            if (idleStartTime != RecordQueue.UNKNOWN) {
+                if (recordInfo.queue().isEmpty()) {
+                    consumer.resume(singleton(partition));
+                }
+            } else {
+                if (recordInfo.queue().size() == maxBufferedSize) {
+                    consumer.resume(singleton(partition));
+                }
             }
         } catch (final ProducerFencedException fatal) {
             throw new TaskMigratedException(this, fatal);
@@ -713,10 +723,19 @@ public void addRecords(final TopicPartition partition, 
final Iterable<ConsumerRe
             log.trace("Added records into the buffered queue of partition {}, 
new queue size is {}", partition, newQueueSize);
         }
 
-        // if after adding these records, its partition queue's buffered size 
has been
-        // increased beyond the threshold, we can then pause the consumption 
for this partition
-        if (newQueueSize > maxBufferedSize) {
-            consumer.pause(singleton(partition));
+        // if we are not in the enforced processing state, then after adding 
these records,
+        // we can then pause the consumption for this partition if its 
partition queue's
+        // buffered size has been increased beyond the threshold;
+        // otherwise, we will immediately pause the consumption on this 
partition after it
+        // has at least some records already
+        if (idleStartTime != RecordQueue.UNKNOWN) {
+            if (newQueueSize > 0) {
+                consumer.pause(singleton(partition));
+            }
+        } else {
+            if (newQueueSize > maxBufferedSize) {
+                consumer.pause(singleton(partition));
+            }
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index d332b5b8394..7447633c011 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -273,7 +273,12 @@ public void testPauseResume() {
             getConsumerRecord(partition2, 65)
         ));
 
-        assertTrue(task.process());
+        assertEquals(6, task.numBuffered());
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
+
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process());
+        assertEquals(5, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
@@ -286,29 +291,92 @@ public void testPauseResume() {
             getConsumerRecord(partition1, 50)
         ));
 
+        assertEquals(8, task.numBuffered());
         assertEquals(2, consumer.paused().size());
         assertTrue(consumer.paused().contains(partition1));
         assertTrue(consumer.paused().contains(partition2));
 
-        assertTrue(task.process());
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process());
+        assertEquals(7, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
         assertEquals(1, consumer.paused().size());
         assertTrue(consumer.paused().contains(partition2));
 
-        assertTrue(task.process());
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process());
+        assertEquals(6, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
         assertEquals(1, consumer.paused().size());
         assertTrue(consumer.paused().contains(partition2));
 
-        assertTrue(task.process());
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process());
+        assertEquals(5, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
         assertEquals(0, consumer.paused().size());
+
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process()); 
 // 1: 40
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process()); 
 // 2: 45
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process()); 
 // 1: 50
+        assertEquals(2, task.numBuffered());
+        assertEquals(5, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+        assertEquals(0, consumer.paused().size());
+
+        assertFalse(task.isProcessable(time.milliseconds()));  // we are idle 
now
+
+        time.sleep(100L);
+
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process()); 
 // start enforce processing
+        assertEquals(1, task.numBuffered());
+        assertEquals(5, source1.numReceived);
+        assertEquals(3, source2.numReceived);  // 1: 55
+        assertEquals(0, consumer.paused().size());
+
+        task.addRecords(partition2, Arrays.asList(
+            getConsumerRecord(partition2, 70),
+            getConsumerRecord(partition2, 80),
+            getConsumerRecord(partition2, 90)
+        ));
+
+        assertEquals(4, task.numBuffered());
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
+
+        // we are enforced processing now
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process());
+        assertEquals(3, task.numBuffered());
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
+
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process());
+        assertEquals(2, task.numBuffered());
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
+
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process());
+        assertEquals(1, task.numBuffered());
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
+
+        // only resume if we are enforced processing when the fetched 
partition is empty
+        assertTrue(task.isProcessable(time.milliseconds()) && task.process());
+        assertEquals(0, task.numBuffered());
+        assertEquals(0, consumer.paused().size());
+
+        task.addRecords(partition2, Arrays.asList(
+            getConsumerRecord(partition2, 100),
+            getConsumerRecord(partition2, 110)
+        ));
+
+        // pause immediately if we are enforced processing when there are at 
least some records already
+        assertEquals(2, task.numBuffered());
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
     }
 
     @SuppressWarnings("unchecked")
@@ -1392,7 +1460,7 @@ protected void flushState() {
             topicPartition.topic(),
             topicPartition.partition(),
             offset,
-            0L,
+            offset, // use the offset as the timestamp
             TimestampType.CREATE_TIME,
             0L,
             0,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream timestamp computation needs some further thoughts
> --------------------------------------------------------
>
>                 Key: KAFKA-3514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>              Labels: architecture, kip
>             Fix For: 2.1.0
>
>
> KIP-353: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code:java}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code:java}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> [http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor])
> {quote}Let's assume the following case.
>  - a stream processor that uses the Processor API
>  - context.schedule(1000) is called in the init()
>  - the processor reads only one topic that has one partition
>  - using custom timestamp extractor, but that timestamp is just a wall
>  clock time
>  Image the following events:
>  1., for 10 seconds I send in 5 messages / second
>  2., does not send any messages for 3 seconds
>  3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
>  send any messages. This is ok according to the documentation, because 
>  there is not any new messages to trigger the punctuate() call. When the 
>  first few messages arrives after a restart the sending (point 3. above) I 
>  see the following sequence of method calls:
> 1., process() on the 1st message
>  2., punctuate() is called 3 times
>  3., process() on the 2nd message
>  4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
>  process() is called on the messages, because the first message's timestamp 
>  is already 3 seconds older then the last punctuate() was called, so the 
>  first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to