syhily commented on code in PR #19972:
URL: https://github.com/apache/flink/pull/19972#discussion_r921758152


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java:
##########
@@ -65,28 +65,48 @@ static StopCursor latest() {
     }
 
     /**
-     * Stop when the messageId is equal or greater than the specified 
messageId. Message that is
-     * equal to the specified messageId will not be consumed.
+     * Stop consuming when the messageId is equal or greater than the 
specified messageId. Message
+     * that is equal to the specified messageId will not be consumed.
      */
     static StopCursor atMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(true);
+        } else {
+            return new MessageIdStopCursor(messageId);
+        }
     }
 
     /**
-     * Stop when the messageId is greater than the specified messageId. 
Message that is equal to the
-     * specified messageId will be consumed.
+     * Stop consuming when the messageId is greater than the specified 
messageId. Message that is
+     * equal to the specified messageId will be consumed.
      */
     static StopCursor afterMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId, false);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(false);
+        } else {
+            return new MessageIdStopCursor(messageId, false);
+        }
     }
 
-    @Deprecated
+    /** Stop consuming when message eventTime is greater than or equals the 
specified timestamp. */
     static StopCursor atEventTime(long timestamp) {
         return new EventTimestampStopCursor(timestamp);
     }
 
-    /** Stop when message publishTime is greater than the specified timestamp. 
*/
+    /** Stop consuming when message eventTime is greater than the specified 
timestamp. */
+    static StopCursor afterEventTime(long timestamp) {
+        return new EventTimestampStopCursor(timestamp + 1);
+    }
+
+    /**
+     * Stop consuming when message publishTime is greater than or equals the 
specified timestamp.
+     */
     static StopCursor atPublishTime(long timestamp) {
         return new PublishTimestampStopCursor(timestamp);
     }
+
+    /** Stop consuming when message publishTime is greater than the specified 
timestamp. */
+    static StopCursor afterPublishTime(long timestamp) {
+        return new PublishTimestampStopCursor(timestamp + 1);
+    }

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to