Hi Soheil,

The documentation of markAsTemporarilyIdle method is here :
https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html#markAsTemporarilyIdle--

Thanks, vino.

2018-07-31 17:14 GMT+08:00 Hequn Cheng <chenghe...@gmail.com>:

> Hi Soheil,
>
> You can set parallelism to 1 to solve the problem.
> Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kafka-base/src/main/java/org/
> apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
> line639).
>
> On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> If you are using a custom source, you can call
>> SourceContext.markAsTemporarilyIdle() to indicate that a task is
>> currently not producing new records [1].
>>
>> Best, Fabian
>>
>> 2018-07-31 8:50 GMT+02:00 Reza Sameei <reza.sa...@gmail.com>:
>>
>>> It's not a real solution; but why you don't change the parallelism for
>>> your `SourceFunction`?
>>>
>>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
>>> soheil.i...@gmail.com> wrote:
>>>
>>>> In Flink Event time mode, I use the periodic watermark to advance event
>>>> time. Every slot extract event time from the incoming message and to emit
>>>> watermark, subtract it a network delay, say 3000ms.
>>>>
>>>> public Watermark getCurrentWatermark() {
>>>>             return new Watermark(MAX_TIMESTAMP - DELEY);
>>>>         }
>>>>
>>>> I have 4 active slots. The problem is just two slots get incoming data
>>>> but all of them call the method getCurrentWatermark(). So in this
>>>> situation consider a case that thread 1 and 2 get incoming data and thread
>>>> 3 and 4 will not.
>>>>
>>>> Thread-1-watermark ---> 1541217659806
>>>> Thread-2-watermark ---> 1541217659810
>>>> Thread-3-watermark ---> (0 - 3000 = -3000)
>>>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>>>
>>>> So as Flink set the lowest watermark as the general watermark, time
>>>> doesn't go on! If I change the getCurrentWatermark() method as:
>>>>
>>>> public Watermark getCurrentWatermark() {
>>>>             return new Watermark(System.currentTimeMillis() - DELEY);
>>>>         }
>>>>
>>>> it will solve the problem, but I don't want to use machine's timestamp!
>>>> How can I fix the problem?
>>>>
>>>>
>>>
>>> --
>>> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>>>
>>
>>
>

Reply via email to