Re: Event time didn't advance because of some idle slots
Hi Soheil, Hequn has given you the usage of this method, see here : https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L639 Thanks, vino. 2018-07-31 17:56 GMT+08:00 Soheil Pourbafrani : > Hi vino, > > Could you please show markAsTemporary usage by a simple example? > Thanks > > On Tue, Jul 31, 2018 at 2:10 PM, vino yang wrote: > >> Hi Soheil, >> >> The documentation of markAsTemporarilyIdle method is here : >> https://ci.apache.org/projects/flink/flink-docs-release-1. >> 5/api/java/org/apache/flink/streaming/api/functions/ >> source/SourceFunction.SourceContext.html#markAsTemporarilyIdle-- >> >> Thanks, vino. >> >> 2018-07-31 17:14 GMT+08:00 Hequn Cheng : >> >>> Hi Soheil, >>> >>> You can set parallelism to 1 to solve the problem. >>> Or use markAsTemporarilyIdle() as Fabian said(the link maybe is >>> https://github.com/apache/flink/blob/master/flink-connectors >>> /flink-connector-kafka-base/src/main/java/org/apache/flink/ >>> streaming/connectors/kafka/FlinkKafkaConsumerBase.java line639). >>> >>> On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske >>> wrote: >>> Hi, If you are using a custom source, you can call SourceContext.markAsTemporarilyIdle() to indicate that a task is currently not producing new records [1]. Best, Fabian 2018-07-31 8:50 GMT+02:00 Reza Sameei : > It's not a real solution; but why you don't change the parallelism for > your `SourceFunction`? > > On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani < > soheil.i...@gmail.com> wrote: > >> In Flink Event time mode, I use the periodic watermark to advance >> event time. Every slot extract event time from the incoming message and >> to >> emit watermark, subtract it a network delay, say 3000ms. >> >> public Watermark getCurrentWatermark() { >> return new Watermark(MAX_TIMESTAMP - DELEY); >> } >> >> I have 4 active slots. The problem is just two slots get incoming >> data but all of them call the method getCurrentWatermark(). So in >> this situation consider a case that thread 1 and 2 get incoming data and >> thread 3 and 4 will not. >> >> Thread-1-watermark ---> 1541217659806 >> Thread-2-watermark ---> 1541217659810 >> Thread-3-watermark ---> (0 - 3000 = -3000) >> Thread-4-watermark ---> (0 - 3000 = -3000) >> >> So as Flink set the lowest watermark as the general watermark, time >> doesn't go on! If I change the getCurrentWatermark() method as: >> >> public Watermark getCurrentWatermark() { >> return new Watermark(System.currentTimeMillis() - DELEY); >> } >> >> it will solve the problem, but I don't want to use machine's >> timestamp! How can I fix the problem? >> >> > > -- > رضا سامعی | Reza Sameei | Software Developer | 09126662695 > >>> >> >
Re: Event time didn't advance because of some idle slots
Hi Soheil, The documentation of markAsTemporarilyIdle method is here : https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html#markAsTemporarilyIdle-- Thanks, vino. 2018-07-31 17:14 GMT+08:00 Hequn Cheng : > Hi Soheil, > > You can set parallelism to 1 to solve the problem. > Or use markAsTemporarilyIdle() as Fabian said(the link maybe is > https://github.com/apache/flink/blob/master/flink- > connectors/flink-connector-kafka-base/src/main/java/org/ > apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java > line639). > > On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske wrote: > >> Hi, >> >> If you are using a custom source, you can call >> SourceContext.markAsTemporarilyIdle() to indicate that a task is >> currently not producing new records [1]. >> >> Best, Fabian >> >> 2018-07-31 8:50 GMT+02:00 Reza Sameei : >> >>> It's not a real solution; but why you don't change the parallelism for >>> your `SourceFunction`? >>> >>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani < >>> soheil.i...@gmail.com> wrote: >>> In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms. public Watermark getCurrentWatermark() { return new Watermark(MAX_TIMESTAMP - DELEY); } I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not. Thread-1-watermark ---> 1541217659806 Thread-2-watermark ---> 1541217659810 Thread-3-watermark ---> (0 - 3000 = -3000) Thread-4-watermark ---> (0 - 3000 = -3000) So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as: public Watermark getCurrentWatermark() { return new Watermark(System.currentTimeMillis() - DELEY); } it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem? >>> >>> -- >>> رضا سامعی | Reza Sameei | Software Developer | 09126662695 >>> >> >> >
Re: Event time didn't advance because of some idle slots
Hi Soheil, You can set parallelism to 1 to solve the problem. Or use markAsTemporarilyIdle() as Fabian said(the link maybe is https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java line639). On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske wrote: > Hi, > > If you are using a custom source, you can call SourceContext. > markAsTemporarilyIdle() to indicate that a task is currently not > producing new records [1]. > > Best, Fabian > > 2018-07-31 8:50 GMT+02:00 Reza Sameei : > >> It's not a real solution; but why you don't change the parallelism for >> your `SourceFunction`? >> >> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani < >> soheil.i...@gmail.com> wrote: >> >>> In Flink Event time mode, I use the periodic watermark to advance event >>> time. Every slot extract event time from the incoming message and to emit >>> watermark, subtract it a network delay, say 3000ms. >>> >>> public Watermark getCurrentWatermark() { >>> return new Watermark(MAX_TIMESTAMP - DELEY); >>> } >>> >>> I have 4 active slots. The problem is just two slots get incoming data >>> but all of them call the method getCurrentWatermark(). So in this >>> situation consider a case that thread 1 and 2 get incoming data and thread >>> 3 and 4 will not. >>> >>> Thread-1-watermark ---> 1541217659806 >>> Thread-2-watermark ---> 1541217659810 >>> Thread-3-watermark ---> (0 - 3000 = -3000) >>> Thread-4-watermark ---> (0 - 3000 = -3000) >>> >>> So as Flink set the lowest watermark as the general watermark, time >>> doesn't go on! If I change the getCurrentWatermark() method as: >>> >>> public Watermark getCurrentWatermark() { >>> return new Watermark(System.currentTimeMillis() - DELEY); >>> } >>> >>> it will solve the problem, but I don't want to use machine's timestamp! >>> How can I fix the problem? >>> >>> >> >> -- >> رضا سامعی | Reza Sameei | Software Developer | 09126662695 >> > >
Re: Event time didn't advance because of some idle slots
Hi, If you are using a custom source, you can call SourceContext.markAsTemporarilyIdle() to indicate that a task is currently not producing new records [1]. Best, Fabian 2018-07-31 8:50 GMT+02:00 Reza Sameei : > It's not a real solution; but why you don't change the parallelism for > your `SourceFunction`? > > On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani > wrote: > >> In Flink Event time mode, I use the periodic watermark to advance event >> time. Every slot extract event time from the incoming message and to emit >> watermark, subtract it a network delay, say 3000ms. >> >> public Watermark getCurrentWatermark() { >> return new Watermark(MAX_TIMESTAMP - DELEY); >> } >> >> I have 4 active slots. The problem is just two slots get incoming data >> but all of them call the method getCurrentWatermark(). So in this >> situation consider a case that thread 1 and 2 get incoming data and thread >> 3 and 4 will not. >> >> Thread-1-watermark ---> 1541217659806 >> Thread-2-watermark ---> 1541217659810 >> Thread-3-watermark ---> (0 - 3000 = -3000) >> Thread-4-watermark ---> (0 - 3000 = -3000) >> >> So as Flink set the lowest watermark as the general watermark, time >> doesn't go on! If I change the getCurrentWatermark() method as: >> >> public Watermark getCurrentWatermark() { >> return new Watermark(System.currentTimeMillis() - DELEY); >> } >> >> it will solve the problem, but I don't want to use machine's timestamp! >> How can I fix the problem? >> >> > > -- > رضا سامعی | Reza Sameei | Software Developer | 09126662695 >
Re: Event time didn't advance because of some idle slots
It's not a real solution; but why you don't change the parallelism for your `SourceFunction`? On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani wrote: > In Flink Event time mode, I use the periodic watermark to advance event > time. Every slot extract event time from the incoming message and to emit > watermark, subtract it a network delay, say 3000ms. > > public Watermark getCurrentWatermark() { > return new Watermark(MAX_TIMESTAMP - DELEY); > } > > I have 4 active slots. The problem is just two slots get incoming data but > all of them call the method getCurrentWatermark(). So in this situation > consider a case that thread 1 and 2 get incoming data and thread 3 and 4 > will not. > > Thread-1-watermark ---> 1541217659806 > Thread-2-watermark ---> 1541217659810 > Thread-3-watermark ---> (0 - 3000 = -3000) > Thread-4-watermark ---> (0 - 3000 = -3000) > > So as Flink set the lowest watermark as the general watermark, time > doesn't go on! If I change the getCurrentWatermark() method as: > > public Watermark getCurrentWatermark() { > return new Watermark(System.currentTimeMillis() - DELEY); > } > > it will solve the problem, but I don't want to use machine's timestamp! > How can I fix the problem? > > -- رضا سامعی | Reza Sameei | Software Developer | 09126662695