Hi Kostas,

Yes, you have summarized well. I want to only forward the data to the next 
local operator, but broadcast the watermark through the cluster.

- I can’t set parallelism of taskB to 1. The stream is too big for that. Also, 
the data is ordered at each partition. I don’t want to change that order.

- I don’t need KeyedStream. Also taskA and taskB will always have the same 
parallelism with each other. But this parallelism can be increased in the 
future.

The use case is: The source is Kafka. At our peak hours or when we want to run 
the streaming job with old data from Kafka, always the same thing happens. Even 
at trivial jobs. Some consumers consumes faster than others. They produce too 
much data to downstream but watermark advances slowly at the speed of the 
slowest consumer. This extra data gets piled up at downstream operators. When 
the downstream operator is an aggregation, it is ok. But when it is a in-Flink 
join; state size gets too big, checkpoints take much longer and overall the job 
becomes slower or fails. Also it effects other jobs at the cluster.

So, basically I want to implement a throttler. It compares timestamp of a 
record and the global watermark. If the difference is larger than a constant 
threshold it starts sleeping 1 ms for each incoming record. This way, fast 
operators wait for the slowest one.

The only problem is that, this solution came at the cost of one network shuffle 
and data serialization/deserialization. Since the stream is large I want to 
avoid the network shuffle at the least. 

I thought operator instances within a taskmanager would get the same indexId, 
but apparently this is not the case.

Thanks,

> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> 
> Hi Yunus,
> 
> I am not sure if I understand correctly the question.
> 
> Am I correct to assume that you want the following?
> 
>                               ———————————> time
> 
>               ProcessA                                                ProcessB
> 
> Task1: W(3) E(1) E(2) E(5)                    W(3) W(7) E(1) E(2) E(5)
> 
> Task2: W(7) E(3) E(10) E(6)                   W(3) W(7) E(3) E(10) E(6)
> 
> 
> In the above, elements flow from left to right and W() stands for watermark 
> and E() stands for element.
> In other words, between Process(TaksA) and Process(TaskB) you want to only 
> forward the elements, but broadcast the watermarks, right?
> 
> If this is the case, a trivial solution would be to set the parallelism of 
> TaskB to 1, so that all elements go through the same node.
> 
> One other solution is what you did, BUT by using a custom partitioner you 
> cannot use keyed state in your process function B because the 
> stream is no longer keyed.
> 
> A similar approach to what you did but without the limitation above, is that 
> in the first processFunction (TaskA) you can append the 
> taskId to the elements themselves and then do a keyBy(taskId) between the 
> first and the second process function.
> 
> These are the solutions that I can come up with, assuming that you want to do 
> what I described.
> 
> But in general, could you please describe a bit more what is your use case? 
> This way we may figure out another approach to achieve your goal. 
> In fact, I am not sure if you earn anything by broadcasting the watermark, 
> other than 
> re-implementing (to some extent) Flink’s windowing mechanism.
> 
> Thanks,
> Kostas
> 
>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunol...@gmail.com 
>> <mailto:yunol...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I have a simple streaming job such as:
>> 
>> source.process(taskA)
>>           .process(taskB)
>> 
>> I want taskB to access minimum watermark of all parallel taskA instances, 
>> but the data is ordered and should not be shuffled. ForwardPartitioner uses 
>> watermark of only one predecessor. So, I have used a customPartitioner.
>> 
>> source.process(taskA)
>>           .map(AssignPartitionID)
>>           .partitionCustom(IdPartitioner)
>>           .map(StripPartitionID)
>>           .process(taskB)
>> 
>> At AssignPartitionID function, I attach 
>> getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. 
>> At IdPartitioner, I return this partitionId.
>> 
>> This solved the main requirement but I have another concern now,
>> 
>> Network shuffle: I don’t need a network shuffle. I thought within a 
>> taskmanager, indexId of taskA subtasks would be same as indexId of taskB 
>> subtasks. Unfortunately, they are not. Is there a way to make 
>> partitionCustom distribute data like ForwardPartitioner, to the next local 
>> operator? 
>> 
>> As I know, this still requires object serialization/deserialization since 
>> operators can’t be chained anymore. Is there a way to get minimum watermark 
>> from upstream operators without network shuffle and object 
>> serilization/deserialization?
>> 
>> Regards,
> 

Reply via email to