mjsax commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420463508



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
     <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, 
final String childName);
 
     /**
-     * Requests a commit
+     * Requests a commit.
      */
     void commit();
 
     /**
      * Returns the topic name of the current input record; could be null if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call)
+     * available (for example, if this method is invoked from the punctuate 
call).
      *
      * @return the topic name
      */
     String topic();
 
     /**
      * Returns the partition id of the current input record; could be -1 if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call)
+     * available (for example, if this method is invoked from the punctuate 
call).
      *
      * @return the partition id
      */
     int partition();
 
     /**
      * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate 
call)
+     * available (for example, if this method is invoked from the punctuate 
call).
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is 
not available
+     * Returns the headers of the current input record; could be null if it is 
not
+     * available (for example, if this method is invoked from the punctuate 
call).
+     *
      * @return the headers
      */
     Headers headers();
 
     /**
      * Returns the current timestamp.
      *
-     * If it is triggered while processing a record streamed from the source 
processor, timestamp is defined as the timestamp of the current input record; 
the timestamp is extracted from
+     * <p> If it is triggered while processing a record streamed from the 
source processor,
+     * timestamp is defined as the timestamp of the current input record; the 
timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} 
by {@link TimestampExtractor}.
      *
-     * If it is triggered while processing a record generated not from the 
source processor (for example,
+     * <p> If it is triggered while processing a record generated not from the 
source processor (for example,
      * if this method is invoked from the punctuate call), timestamp is 
defined as the current
-     * task's stream time, which is defined as the smallest among all its 
input stream partition timestamps.
+     * task's stream time, which is defined as the largest among all its input 
stream partition timestamps.

Review comment:
       Sound like a bug :)
   
   But it seems to be a one line fix that I can piggy-back on this PR. We 
advance the "partition time" too early. If we advance it when the return the 
record for processing, all should be fixed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to