Hi Sudan,

I noticed a few issues with your code:

1) Please check the computation of timestamps. Your code

public long extractAscendingTimestamp(Eventi.Event element) {
      return element.getEventTime().getSeconds() * 1000;
}

only seems to look at the seconds of a timestamp. Typically, you would just
return the whole timestamp encoded as a long that represents the
milliseconds since epoch (1970-01-01 00:00:00.000).
Why do you multiple with 1000?

2) An AscendingTimestampExtractor assumes that records arrive with strictly
ascending timestamps.
If the timestamps in your data are slightly out of order, you probably want
another watermark assigner for example a
BoundedOutOfOrdernessTimestampExtractor [1].

3) You probably don't want to key on event time:

keyBy(Eventi.Event::getEventTime)

Usually, you choose a partitioning key here. If you cannot partition your
data and all records should be grouped in the single stream of windows you
should use DataStream.windowAll().
Note however, that this means that your code cannot run in parallel. See
[2] for details.

 Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#keyed-vs-non-keyed-windows

Am So., 19. Apr. 2020 um 21:37 Uhr schrieb Sudan S <su...@cred.club>:

> Hi,
>
> I am having a problem getting watermark right. The setup is
> - I have a Flink Job which reads from a Kafka topic, uses Protobuf
> Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up
> the value and finally returns the result.
>
> The code is pasted below.
>
> The problem here is, I'm not able to reach the sink. I am able to reach
> the assignTimestamp when the timestamp arrives, but past that, neither
> process function nor the sink function is getting invoked in spite of
> pumping events regularly. I'm not able to figure out how to debug this
> issue.
> Plz help.
>
> public class StreamingJob {
>
>     public static void main(String[] args) throws Exception {
>
>         Properties kafkaConsumerProps = new Properties();
>         kafkaConsumerProps.setProperty("bootstrap.servers",
> "{bootstrap_servers}");
>         kafkaConsumerProps.setProperty("group.id", "{group_id}");
>
>
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration());
>         env.enableCheckpointing(100);
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.setMaxParallelism(5);
>         env.setParallelism(5);
>
>         SingleOutputStreamOperator<Eventi.Event> texStream = env
>                 .addSource(new FlinkKafkaConsumer011<>("auth", new
> EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
>         SlidingEventTimeWindows window =
> SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
>         texStream.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Eventi.Event>() {
>             @Override
>             public long extractAscendingTimestamp(Eventi.Event element) {
>                 return element.getEventTime().getSeconds() * 1000;
>             }
>         }).keyBy(Eventi.Event::getEventTime).window(window).process(new
> ProcessWindowFunction<Eventi.Event, Object, Timestamp, TimeWindow>() {
>             @Override
>             public void process(Timestamp timestamp, Context context,
> Iterable<Eventi.Event> elements, Collector<Object> out) throws Exception {
>                 int sum = 0;
>                 for (Eventi.Event element : elements) {
>                     sum++;
>                 }
>                 out.collect(sum);
>             }
>         }).print()
>
>         env.execute();
>     }
> }
>
> ------------------------------
> *"The information contained in this e-mail and any accompanying documents
> may contain information that is confidential or otherwise protected from
> disclosure. If you are not the intended recipient of this message, or if
> this message has been addressed to you in error, please immediately alert
> the sender by replying to this e-mail and then delete this message,
> including any attachments. Any dissemination, distribution or other use of
> the contents of this message by anyone other than the intended recipient is
> strictly prohibited. All messages sent to and from this e-mail address may
> be monitored as permitted by applicable law and regulations to ensure
> compliance with our internal policies and to protect our business."*
> ------------------------------
>

Reply via email to