[ 
https://issues.apache.org/jira/browse/FLINK-3435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15163579#comment-15163579
 ] 

ASF GitHub Bot commented on FLINK-3435:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1699#discussion_r53994466
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
 ---
    @@ -51,17 +48,19 @@ public TimestampedCollector(Output<StreamRecord<T>> 
output) {
     
        @Override
        public void collect(T record) {
    -           output.collect(reuse.replace(record, timestamp));
    +           output.collect(reuse.replace(record));
    +   }
    +   
    +   public void setTimestamp(StreamRecord<?> timestampBase) {
    +           if (timestampBase.hasTimestamp()) {
    +                   reuse.setTimestamp(timestampBase.getTimestamp());
    +           } else {
    +                   reuse.eraseTimestamp();
    +           }
        }
     
    -   /**
    -    * Sets the timestamp (long milliseconds) that is attached to elements 
that get emitted using
    -    * {@link #collect(Object)}
    -    * 
    -    * @param timestamp The timestamp in milliseconds
    -    */
    -   public void setTimestamp(long timestamp) {
    -           this.timestamp = timestamp;
    +   public void setAbsoluteTimestamp(long timestamp) {
    --- End diff --
    
    I renamed that to make sure I do not overlook any places where the better 
method would have been `setTimestamp(StreamRecord)`. Ideally, we'd not have the 
`setAbsoluteTimestamp(long)` method at all. But that would need changes in the 
window operators.


> Change interplay of Ingestion Time and Event Time
> -------------------------------------------------
>
>                 Key: FLINK-3435
>                 URL: https://issues.apache.org/jira/browse/FLINK-3435
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.10.2
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> Currently, "EventTime" and "IngestionTime" are completely the same.
> For both happens the following:
>   - Sources generate ingestion time timestamps and watermarks
>   - If a user adds a manual timestamp extractor / watermark generator, then 
> those override the ingestion time timestamps and watermarks
>   - That implies that event time on a certain input falls back to ingestion 
> time, if one forgets (or incorrectly uses) the timestamp extractors
>   - Also, Ingestion Time and Event Time simply mix if some inputs have 
> timestamp assigners, and others have not.
> This behavior is quite tricky to understand. After some discussions with 
> [~aljoscha] and [~rmetzger], we suggest to change it the following way.
>   1. On Ingestion Time, the timestamps and watermarks are generated as they 
> are now.
>   2. On event time, no default timestamps and watermarks are generated. If a 
> user does not implement a timestamp extractor / watermark generator, then the 
> event time operations will fail fast.
>   3. If one wants to use ingestion time on event time settings (mix), one can 
> use an explicit "WallClockTimetampsAndWatermark" generator.
>   4. Later, the "Ingestion Time" settings should automatically disable and 
> user-defined timestamp extractors / assigners.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to