Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70059282
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
    @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
         * This method is called by {@link ShardConsumer}s.
         *
         * @param record the record to collect
    +    * @param recordTimestamp timestamp to attach to the collected record
         * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
         *                        this index should be the returned value from
         *                        {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
         *                        when the shard state was registered.
         * @param lastSequenceNumber the last sequence number value to update
         */
    -   protected void emitRecordAndUpdateState(T record, int shardStateIndex, 
SequenceNumber lastSequenceNumber) {
    +   protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
                synchronized (checkpointLock) {
                        sourceContext.collect(record);
    --- End diff --
    
    Did you also consider passing the record to the 
`sourceContext.collectWithTimestamp()` method in addition to passing it to the 
serialization schema?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to