syhily commented on code in PR #19972: URL: https://github.com/apache/flink/pull/19972#discussion_r921758152
########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java: ########## @@ -65,28 +65,48 @@ static StopCursor latest() { } /** - * Stop when the messageId is equal or greater than the specified messageId. Message that is - * equal to the specified messageId will not be consumed. + * Stop consuming when the messageId is equal or greater than the specified messageId. Message + * that is equal to the specified messageId will not be consumed. */ static StopCursor atMessageId(MessageId messageId) { - return new MessageIdStopCursor(messageId); + if (MessageId.latest.equals(messageId)) { + return new LatestMessageStopCursor(true); + } else { + return new MessageIdStopCursor(messageId); + } } /** - * Stop when the messageId is greater than the specified messageId. Message that is equal to the - * specified messageId will be consumed. + * Stop consuming when the messageId is greater than the specified messageId. Message that is + * equal to the specified messageId will be consumed. */ static StopCursor afterMessageId(MessageId messageId) { - return new MessageIdStopCursor(messageId, false); + if (MessageId.latest.equals(messageId)) { + return new LatestMessageStopCursor(false); + } else { + return new MessageIdStopCursor(messageId, false); + } } - @Deprecated + /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */ static StopCursor atEventTime(long timestamp) { return new EventTimestampStopCursor(timestamp); } - /** Stop when message publishTime is greater than the specified timestamp. */ + /** Stop consuming when message eventTime is greater than the specified timestamp. */ + static StopCursor afterEventTime(long timestamp) { + return new EventTimestampStopCursor(timestamp + 1); + } + + /** + * Stop consuming when message publishTime is greater than or equals the specified timestamp. + */ static StopCursor atPublishTime(long timestamp) { return new PublishTimestampStopCursor(timestamp); } + + /** Stop consuming when message publishTime is greater than the specified timestamp. */ + static StopCursor afterPublishTime(long timestamp) { + return new PublishTimestampStopCursor(timestamp + 1); + } Review Comment: Yep. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org