Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4638#discussion_r139534525
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
    @@ -106,8 +240,191 @@
                return deserializationSchema;
        }
     
    -   @Override
    -   public String explainSource() {
    -           return "";
    +   /**
    +    * Assigns ingestion time timestamps and watermarks.
    +    */
    +   public static class IngestionTimeWatermarkAssigner implements 
AssignerWithPeriodicWatermarks<Row> {
    +
    +           private long curTime = Long.MIN_VALUE;
    +
    +           @Override
    +           public long extractTimestamp(Row element, long 
previousElementTimestamp) {
    +                   long t = System.currentTimeMillis();
    +                   if (t > curTime) {
    +                           curTime = t;
    +                   }
    +                   return curTime;
    +           }
    +
    +           @Nullable
    +           @Override
    +           public Watermark getCurrentWatermark() {
    +                   return new Watermark(curTime - 1);
    +           }
    +   }
    +
    +   protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
    +           return this.timestampAssigner;
    +   }
    +
    +   /**
    +    * Checks that the provided row time attribute is valid, determines its 
position in the schema,
    +    * and adjusts the return type.
    +    *
    +    * @param rowtime The attribute to check.
    +    */
    +   private void configureRowTimeAttribute(String rowtime) {
    +           Preconditions.checkNotNull(rowtime, "Row time attribute must 
not be null.");
    +
    +           if (this.ingestionTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "You can only specify a row time attribute OR 
an ingestion time attribute.");
    +           }
    +
    +           if (this.rowTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "Row time attribute can only be specified 
once.");
    +           }
    +
    +           // get current fields
    +           String[] fieldNames = ((RowTypeInfo) 
this.getReturnType()).getFieldNames();
    +           TypeInformation[] fieldTypes = ((RowTypeInfo) 
this.getReturnType()).getFieldTypes();
    +
    +           // check if the rowtime field exists and remember position
    +           this.rowtimeFieldPos = -1;
    --- End diff --
    
    Thanks for the response!
    
    I would not merge `DefinedProctimeAttribute` and `DefinedProctimeAttribute` 
into a single interface. Both are for different types of timestamps (processing 
time and event time):
    
    The idea is to have two interfaces:
    - `DefinedProctimeAttribute` adds a timestamp attribute for processing time 
operations. The interface specifies the name of the new attribute. The source 
will simply add a virtual attribute, that fetches the current time when it is 
accessed.
    - `DefinedRowtimeAttribute` (or its successor) specifies an existing 
attribute (via its name) to be an event time attribute and provides a watermark 
strategy (ascending, bounded-ooo, custom). If a `TableSource` specifies the 
interface, the scan will generate watermarks based on the existing field and 
the watermark strategy. During registration, we check that the field exists and 
has the right type. So instead of adding a field, we simply ensure that we have 
watermarks for it and change its type to be a time indicator field.
    
    In this design, we `DefinedRowtimeAttribute` specifies the field and the 
watermark strategy as you proposed.


---

Reply via email to