Hi Community,

Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, fuyao...@oracle.com wrote:

Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, fuyao...@oracle.com wrote:

Hi Kavin,

Thanks for your example. I think I have already done something very very similar before. I didn't post the full WatermarkStrategy interface in my previous email, but I do have that part already. I think the example you gave me is a punctuatedWatermarkStrategy, not boundoutoforderness one. My major concern now is that why my emitted watermark is not available in processElement() and why I have 8 records for each time the code reaches the onPeriodicEmit part. I will post my code following your example below.

The symptom is that I will get the context watermark as LONG.MIN_VALUE if I use the watermark strategy below.

16:35:12,969 INFO org.myorg.quickstart.processor.TableOutputProcessFunction - context current key: 69215, context current watermark: -9223372036854775808


DataStream<Tuple2<Boolean, Row>> retractStream =tEnv.toRetractStream(table, 
Row.class);
retractStream
     .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
     .keyBy(
         value -> {String invoice_id_key = (String) value.f1.getField(0); if 
(invoice_id_key ==null) {
             invoice_id_key = (String) value.f1.getField(4); }
           return invoice_id_key; })
     .process(new TableOutputProcessFunction())
     .name("ProcessTableOutput")
     .uid("ProcessTableOutput")
     .addSink(businessObjectSink)
     .name("businessObjectSink")
     .uid("businessObjectSink")
     .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategyimplements WatermarkStrategy<Tuple2<Boolean, 
Row>> {
     @Override public WatermarkGenerator<Tuple2<Boolean, 
Row>>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
         return new PunctuatedTableOutputWatermarkGenerator(); }

     @Override public TimestampAssigner<Tuple2<Boolean, 
Row>>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
         log.info("Inside timestamp assigner"); return (booleanRowTuple2, 
previousElementTimestamp) -> {
             return my timestamp; }; }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGeneratorimplements 
WatermarkGenerator<Tuple2<Boolean, Row>> {
     @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long 
eventTimestamp, WatermarkOutput watermarkOutput) {
         watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); 
log.info("Emit Punctuated watermark: {}", eventTimestamp); }

     @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
         // don't need to do anything because we emit in reaction to events 
above }
}

16:35:13,584 INFO org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator - Emit Punctuated watermark: 1605054900905

From the log, I can see, it extract the eventTimestamp and emits the watermark. Why i can't access this piece of information in processElement() function.

Any suggestions? Thank you so much!


Best regards,

Fuyao



On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own /WatermarkStrategy/ class and register that to /window/./assignTimestampsAndWatermarks(new YourEventWatermarkStrategy)/
/
/
Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks /if you're using Kafka consumers
/
/
an example code for a booking event that has it's internal timestamp would be

public class BookingWatermarkStrategyimplements WatermarkStrategy<Booking> {

   @Override public WatermarkGenerator<Booking>createWatermarkGenerator(
       WatermarkGeneratorSupplier.Context context
   ) {
     return new WatermarkGenerator<Booking>() {
       private final long OUT_OF_ORDERNESS_MILLIS =30; private long 
currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override 
public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput 
output) {
         currentMaxTimestamp = Math.max(currentMaxTimestamp, 
bookingEvent.getTimestamp()); Watermark watermark =new 
Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); 
output.emitWatermark(watermark); }

       @Override public void onPeriodicEmit(WatermarkOutput output) {
         // Do nothing since watermark will be emitted every event }
     }; }

   @Override public TimestampAssigner<Booking>createTimestampAssigner(
       TimestampAssignerSupplier.Context context
   ) {
     return (booking, recordTimestamp) -> booking.getTimestamp(); }
}

On Wed, Nov 11, 2020 at 12:28 AM <fuyao...@oracle.com <mailto:fuyao...@oracle.com>> wrote:

    Hi Experts,

    I am trying to use to implement a KeyedProcessFunction with
    onTimer()
    callback. I need to use event time and I meet some problems with
    making
    the watermark available to my operator. I meet some strange
    behaviors.

    I have a joined retracted stream without watermark or timestamp
    information and i need to assign timestamps and watermarks to
    it. The
    timestamp is just a field in the stream. For the watermark
    generator part.

    Problem:

    1. I can use timelag watermark generator and make it work. But for
    BoundedOutofOrdernessGenator, The
    context.timerService().currentWatermark() in ProcessElement()
    always
    sticks to the initial setup and never updates.

    2. I set the autoWatermark interval to 5 seconds for debug
    purpose, I
    only attach this watermark generator in one place with
    parallelism 1.
    However, I am getting 8 records at a time. timelag policy will
    advance
    all 8 records, outOfOrderness policy will only advance 1
    records. Maybe
    the mismatch is causing the processElement() to capture the wrong
    default watermark?

    
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAsIUclfM$>

    This is my code for watermark generator:

    @Slf4j
    public class PeriodicTableOutputWatermarkGenerator implements
    WatermarkGenerator<Tuple2<Boolean, Row>> {
         private final long maxTimeLag = 15000;
         private transient long currentMaxTimestamp = 15000;
         @Override
         public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2,
    long
    eventTimestamp, WatermarkOutput output) {
             // the eventTimestamp is get through TimestampAssigner
             //
    
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
    
<https://urldefense.com/v3/__https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAQmdMGjw$>
             currentMaxTimestamp = Math.max(eventTimestamp,
    currentMaxTimestamp);
    log.info
    
<https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("eventTimestamp
    in onEvent method: {}", eventTimestamp);
         }

         @Override
         public void onPeriodicEmit(WatermarkOutput output) {
             // Policy 1: timelag strategy, can work and advance the
    timestamp
             long watermarkEpochTime =
    Math.max(System.currentTimeMillis() -
    maxTimeLag, currentMaxTimestamp);
             output.emitWatermark(new Watermark(watermarkEpochTime));

             // Policy 2: periodic emit based on event
             long periodicEmitWatermarkTime = currentMaxTimestamp -
    maxTimeLag;
             // output.emitWatermark(new
    Watermark(periodicEmitWatermarkTime));

    log.info
    
<https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("Emit
    Watermark: watermark based on system time: {},
    periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
                     , watermarkEpochTime, periodicEmitWatermarkTime,
    currentMaxTimestamp);
         }
    }


    This is my log printed by the slf4j log above. Every time, it
    will give
    me 8 records, why it is 8 records? I think it should be 1 in
    theory. I
    am very confused. Also, the policy 1 is advancing all 8 records.
    Policy
    2 is advancing 1 of the 8 records and not reflected in
    processElement().

    14:28:01,199 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047266199,
    periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
    1605047187881
    14:28:01,199 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047266199,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

    14:28:06,200 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
    1605047187881
    14:28:06,200 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
    org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    -
    Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


    Any insights? Thank you very much!


    Best,

    Fuyao

Reply via email to