boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442566478



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -795,6 +828,12 @@ public void setValueDeserializer(String valueDeserializer) 
{
       return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
     }
 
+    /** A function to the compute output timestamp from a {@link KafkaRecord}. 
*/
+    public Read<K, V> withExtractOutputTimestampFn(

Review comment:
       > How is this different from withTimestampFn2?
   
   `withTimestampFn2` has been deprecated  in `KafkaIO.Read`. The major concern 
of reusing `withTimestampFn2` is, it will means differently under SDF and 
UnboundedSource, which causes confusion.
   
    > Setting the top level properties allow us to say that this property is 
supported when used as an SDF.
   
   This is what I want to do by having `withExtractOutputTimestampFn `
   
   > Also, what prevents us from supporting TimestampPolicy? We should be able 
to call it and give it the three pieces of information it requests (message 
backlog / backlog check time / current kafka record).
   
   The difficulty is the message backlog / backlog check time is not memorized 
per (element, restriction). With SDF framework, the backlog is retrieved by 
called `RestricitonTracker.getProgress()`, we cannot call it per element in 
order to extract timestamp. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to