Re: Event time didn't advance because of some idle slots

2018-07-31 Thread vino yang
Hi Soheil,

Hequn has given you the usage of this method, see here :
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L639

Thanks, vino.

2018-07-31 17:56 GMT+08:00 Soheil Pourbafrani :

> Hi vino,
>
> Could you please show markAsTemporary usage by a simple example?
> Thanks
>
> On Tue, Jul 31, 2018 at 2:10 PM, vino yang  wrote:
>
>> Hi Soheil,
>>
>> The documentation of markAsTemporarilyIdle method is here :
>> https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 5/api/java/org/apache/flink/streaming/api/functions/
>> source/SourceFunction.SourceContext.html#markAsTemporarilyIdle--
>>
>> Thanks, vino.
>>
>> 2018-07-31 17:14 GMT+08:00 Hequn Cheng :
>>
>>> Hi Soheil,
>>>
>>> You can set parallelism to 1 to solve the problem.
>>> Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-kafka-base/src/main/java/org/apache/flink/
>>> streaming/connectors/kafka/FlinkKafkaConsumerBase.java line639).
>>>
>>> On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi,

 If you are using a custom source, you can call
 SourceContext.markAsTemporarilyIdle() to indicate that a task is
 currently not producing new records [1].

 Best, Fabian

 2018-07-31 8:50 GMT+02:00 Reza Sameei :

> It's not a real solution; but why you don't change the parallelism for
> your `SourceFunction`?
>
> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
> soheil.i...@gmail.com> wrote:
>
>> In Flink Event time mode, I use the periodic watermark to advance
>> event time. Every slot extract event time from the incoming message and 
>> to
>> emit watermark, subtract it a network delay, say 3000ms.
>>
>> public Watermark getCurrentWatermark() {
>> return new Watermark(MAX_TIMESTAMP - DELEY);
>> }
>>
>> I have 4 active slots. The problem is just two slots get incoming
>> data but all of them call the method getCurrentWatermark(). So in
>> this situation consider a case that thread 1 and 2 get incoming data and
>> thread 3 and 4 will not.
>>
>> Thread-1-watermark ---> 1541217659806
>> Thread-2-watermark ---> 1541217659810
>> Thread-3-watermark ---> (0 - 3000 = -3000)
>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>
>> So as Flink set the lowest watermark as the general watermark, time
>> doesn't go on! If I change the getCurrentWatermark() method as:
>>
>> public Watermark getCurrentWatermark() {
>> return new Watermark(System.currentTimeMillis() - DELEY);
>> }
>>
>> it will solve the problem, but I don't want to use machine's
>> timestamp! How can I fix the problem?
>>
>>
>
> --
> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>


>>>
>>
>


Re: Event time didn't advance because of some idle slots

2018-07-31 Thread vino yang
Hi Soheil,

The documentation of markAsTemporarilyIdle method is here :
https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html#markAsTemporarilyIdle--

Thanks, vino.

2018-07-31 17:14 GMT+08:00 Hequn Cheng :

> Hi Soheil,
>
> You can set parallelism to 1 to solve the problem.
> Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kafka-base/src/main/java/org/
> apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
> line639).
>
> On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> If you are using a custom source, you can call
>> SourceContext.markAsTemporarilyIdle() to indicate that a task is
>> currently not producing new records [1].
>>
>> Best, Fabian
>>
>> 2018-07-31 8:50 GMT+02:00 Reza Sameei :
>>
>>> It's not a real solution; but why you don't change the parallelism for
>>> your `SourceFunction`?
>>>
>>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
>>> soheil.i...@gmail.com> wrote:
>>>
 In Flink Event time mode, I use the periodic watermark to advance event
 time. Every slot extract event time from the incoming message and to emit
 watermark, subtract it a network delay, say 3000ms.

 public Watermark getCurrentWatermark() {
 return new Watermark(MAX_TIMESTAMP - DELEY);
 }

 I have 4 active slots. The problem is just two slots get incoming data
 but all of them call the method getCurrentWatermark(). So in this
 situation consider a case that thread 1 and 2 get incoming data and thread
 3 and 4 will not.

 Thread-1-watermark ---> 1541217659806
 Thread-2-watermark ---> 1541217659810
 Thread-3-watermark ---> (0 - 3000 = -3000)
 Thread-4-watermark ---> (0 - 3000 = -3000)

 So as Flink set the lowest watermark as the general watermark, time
 doesn't go on! If I change the getCurrentWatermark() method as:

 public Watermark getCurrentWatermark() {
 return new Watermark(System.currentTimeMillis() - DELEY);
 }

 it will solve the problem, but I don't want to use machine's timestamp!
 How can I fix the problem?


>>>
>>> --
>>> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>>>
>>
>>
>


Re: Event time didn't advance because of some idle slots

2018-07-31 Thread Hequn Cheng
Hi Soheil,

You can set parallelism to 1 to solve the problem.
Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
line639).

On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske  wrote:

> Hi,
>
> If you are using a custom source, you can call SourceContext.
> markAsTemporarilyIdle() to indicate that a task is currently not
> producing new records [1].
>
> Best, Fabian
>
> 2018-07-31 8:50 GMT+02:00 Reza Sameei :
>
>> It's not a real solution; but why you don't change the parallelism for
>> your `SourceFunction`?
>>
>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
>> soheil.i...@gmail.com> wrote:
>>
>>> In Flink Event time mode, I use the periodic watermark to advance event
>>> time. Every slot extract event time from the incoming message and to emit
>>> watermark, subtract it a network delay, say 3000ms.
>>>
>>> public Watermark getCurrentWatermark() {
>>> return new Watermark(MAX_TIMESTAMP - DELEY);
>>> }
>>>
>>> I have 4 active slots. The problem is just two slots get incoming data
>>> but all of them call the method getCurrentWatermark(). So in this
>>> situation consider a case that thread 1 and 2 get incoming data and thread
>>> 3 and 4 will not.
>>>
>>> Thread-1-watermark ---> 1541217659806
>>> Thread-2-watermark ---> 1541217659810
>>> Thread-3-watermark ---> (0 - 3000 = -3000)
>>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>>
>>> So as Flink set the lowest watermark as the general watermark, time
>>> doesn't go on! If I change the getCurrentWatermark() method as:
>>>
>>> public Watermark getCurrentWatermark() {
>>> return new Watermark(System.currentTimeMillis() - DELEY);
>>> }
>>>
>>> it will solve the problem, but I don't want to use machine's timestamp!
>>> How can I fix the problem?
>>>
>>>
>>
>> --
>> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>>
>
>


Re: Event time didn't advance because of some idle slots

2018-07-31 Thread Fabian Hueske
Hi,

If you are using a custom source, you can call
SourceContext.markAsTemporarilyIdle() to indicate that a task is currently
not producing new records [1].

Best, Fabian

2018-07-31 8:50 GMT+02:00 Reza Sameei :

> It's not a real solution; but why you don't change the parallelism for
> your `SourceFunction`?
>
> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani 
> wrote:
>
>> In Flink Event time mode, I use the periodic watermark to advance event
>> time. Every slot extract event time from the incoming message and to emit
>> watermark, subtract it a network delay, say 3000ms.
>>
>> public Watermark getCurrentWatermark() {
>> return new Watermark(MAX_TIMESTAMP - DELEY);
>> }
>>
>> I have 4 active slots. The problem is just two slots get incoming data
>> but all of them call the method getCurrentWatermark(). So in this
>> situation consider a case that thread 1 and 2 get incoming data and thread
>> 3 and 4 will not.
>>
>> Thread-1-watermark ---> 1541217659806
>> Thread-2-watermark ---> 1541217659810
>> Thread-3-watermark ---> (0 - 3000 = -3000)
>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>
>> So as Flink set the lowest watermark as the general watermark, time
>> doesn't go on! If I change the getCurrentWatermark() method as:
>>
>> public Watermark getCurrentWatermark() {
>> return new Watermark(System.currentTimeMillis() - DELEY);
>> }
>>
>> it will solve the problem, but I don't want to use machine's timestamp!
>> How can I fix the problem?
>>
>>
>
> --
> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>


Re: Event time didn't advance because of some idle slots

2018-07-30 Thread Reza Sameei
It's not a real solution; but why you don't change the parallelism for your
`SourceFunction`?

On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani 
wrote:

> In Flink Event time mode, I use the periodic watermark to advance event
> time. Every slot extract event time from the incoming message and to emit
> watermark, subtract it a network delay, say 3000ms.
>
> public Watermark getCurrentWatermark() {
> return new Watermark(MAX_TIMESTAMP - DELEY);
> }
>
> I have 4 active slots. The problem is just two slots get incoming data but
> all of them call the method getCurrentWatermark(). So in this situation
> consider a case that thread 1 and 2 get incoming data and thread 3 and 4
> will not.
>
> Thread-1-watermark ---> 1541217659806
> Thread-2-watermark ---> 1541217659810
> Thread-3-watermark ---> (0 - 3000 = -3000)
> Thread-4-watermark ---> (0 - 3000 = -3000)
>
> So as Flink set the lowest watermark as the general watermark, time
> doesn't go on! If I change the getCurrentWatermark() method as:
>
> public Watermark getCurrentWatermark() {
> return new Watermark(System.currentTimeMillis() - DELEY);
> }
>
> it will solve the problem, but I don't want to use machine's timestamp!
> How can I fix the problem?
>
>

-- 
رضا سامعی  | Reza Sameei | Software Developer | 09126662695