Hi Soheil,

You can set parallelism to 1 to solve the problem.
Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
line639).

On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> If you are using a custom source, you can call SourceContext.
> markAsTemporarilyIdle() to indicate that a task is currently not
> producing new records [1].
>
> Best, Fabian
>
> 2018-07-31 8:50 GMT+02:00 Reza Sameei <reza.sa...@gmail.com>:
>
>> It's not a real solution; but why you don't change the parallelism for
>> your `SourceFunction`?
>>
>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
>> soheil.i...@gmail.com> wrote:
>>
>>> In Flink Event time mode, I use the periodic watermark to advance event
>>> time. Every slot extract event time from the incoming message and to emit
>>> watermark, subtract it a network delay, say 3000ms.
>>>
>>> public Watermark getCurrentWatermark() {
>>>             return new Watermark(MAX_TIMESTAMP - DELEY);
>>>         }
>>>
>>> I have 4 active slots. The problem is just two slots get incoming data
>>> but all of them call the method getCurrentWatermark(). So in this
>>> situation consider a case that thread 1 and 2 get incoming data and thread
>>> 3 and 4 will not.
>>>
>>> Thread-1-watermark ---> 1541217659806
>>> Thread-2-watermark ---> 1541217659810
>>> Thread-3-watermark ---> (0 - 3000 = -3000)
>>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>>
>>> So as Flink set the lowest watermark as the general watermark, time
>>> doesn't go on! If I change the getCurrentWatermark() method as:
>>>
>>> public Watermark getCurrentWatermark() {
>>>             return new Watermark(System.currentTimeMillis() - DELEY);
>>>         }
>>>
>>> it will solve the problem, but I don't want to use machine's timestamp!
>>> How can I fix the problem?
>>>
>>>
>>
>> --
>> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>>
>
>

Reply via email to