Hi Soheil, The documentation of markAsTemporarilyIdle method is here : https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html#markAsTemporarilyIdle--
Thanks, vino. 2018-07-31 17:14 GMT+08:00 Hequn Cheng <chenghe...@gmail.com>: > Hi Soheil, > > You can set parallelism to 1 to solve the problem. > Or use markAsTemporarilyIdle() as Fabian said(the link maybe is > https://github.com/apache/flink/blob/master/flink- > connectors/flink-connector-kafka-base/src/main/java/org/ > apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java > line639). > > On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> If you are using a custom source, you can call >> SourceContext.markAsTemporarilyIdle() to indicate that a task is >> currently not producing new records [1]. >> >> Best, Fabian >> >> 2018-07-31 8:50 GMT+02:00 Reza Sameei <reza.sa...@gmail.com>: >> >>> It's not a real solution; but why you don't change the parallelism for >>> your `SourceFunction`? >>> >>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani < >>> soheil.i...@gmail.com> wrote: >>> >>>> In Flink Event time mode, I use the periodic watermark to advance event >>>> time. Every slot extract event time from the incoming message and to emit >>>> watermark, subtract it a network delay, say 3000ms. >>>> >>>> public Watermark getCurrentWatermark() { >>>> return new Watermark(MAX_TIMESTAMP - DELEY); >>>> } >>>> >>>> I have 4 active slots. The problem is just two slots get incoming data >>>> but all of them call the method getCurrentWatermark(). So in this >>>> situation consider a case that thread 1 and 2 get incoming data and thread >>>> 3 and 4 will not. >>>> >>>> Thread-1-watermark ---> 1541217659806 >>>> Thread-2-watermark ---> 1541217659810 >>>> Thread-3-watermark ---> (0 - 3000 = -3000) >>>> Thread-4-watermark ---> (0 - 3000 = -3000) >>>> >>>> So as Flink set the lowest watermark as the general watermark, time >>>> doesn't go on! If I change the getCurrentWatermark() method as: >>>> >>>> public Watermark getCurrentWatermark() { >>>> return new Watermark(System.currentTimeMillis() - DELEY); >>>> } >>>> >>>> it will solve the problem, but I don't want to use machine's timestamp! >>>> How can I fix the problem? >>>> >>>> >>> >>> -- >>> رضا سامعی | Reza Sameei | Software Developer | 09126662695 >>> >> >> >