Hi Soheil, You can set parallelism to 1 to solve the problem. Or use markAsTemporarilyIdle() as Fabian said(the link maybe is https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java line639).
On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > If you are using a custom source, you can call SourceContext. > markAsTemporarilyIdle() to indicate that a task is currently not > producing new records [1]. > > Best, Fabian > > 2018-07-31 8:50 GMT+02:00 Reza Sameei <reza.sa...@gmail.com>: > >> It's not a real solution; but why you don't change the parallelism for >> your `SourceFunction`? >> >> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani < >> soheil.i...@gmail.com> wrote: >> >>> In Flink Event time mode, I use the periodic watermark to advance event >>> time. Every slot extract event time from the incoming message and to emit >>> watermark, subtract it a network delay, say 3000ms. >>> >>> public Watermark getCurrentWatermark() { >>> return new Watermark(MAX_TIMESTAMP - DELEY); >>> } >>> >>> I have 4 active slots. The problem is just two slots get incoming data >>> but all of them call the method getCurrentWatermark(). So in this >>> situation consider a case that thread 1 and 2 get incoming data and thread >>> 3 and 4 will not. >>> >>> Thread-1-watermark ---> 1541217659806 >>> Thread-2-watermark ---> 1541217659810 >>> Thread-3-watermark ---> (0 - 3000 = -3000) >>> Thread-4-watermark ---> (0 - 3000 = -3000) >>> >>> So as Flink set the lowest watermark as the general watermark, time >>> doesn't go on! If I change the getCurrentWatermark() method as: >>> >>> public Watermark getCurrentWatermark() { >>> return new Watermark(System.currentTimeMillis() - DELEY); >>> } >>> >>> it will solve the problem, but I don't want to use machine's timestamp! >>> How can I fix the problem? >>> >>> >> >> -- >> رضا سامعی | Reza Sameei | Software Developer | 09126662695 >> > >