Hi Yunus,

I am not sure if I understand correctly the question.

Am I correct to assume that you want the following?

                                ———————————> time

                ProcessA                                                ProcessB

Task1: W(3) E(1) E(2) E(5)                      W(3) W(7) E(1) E(2) E(5)

Task2: W(7) E(3) E(10) E(6)                     W(3) W(7) E(3) E(10) E(6)


In the above, elements flow from left to right and W() stands for watermark and 
E() stands for element.
In other words, between Process(TaksA) and Process(TaskB) you want to only 
forward the elements, but broadcast the watermarks, right?

If this is the case, a trivial solution would be to set the parallelism of 
TaskB to 1, so that all elements go through the same node.

One other solution is what you did, BUT by using a custom partitioner you 
cannot use keyed state in your process function B because the 
stream is no longer keyed.

A similar approach to what you did but without the limitation above, is that in 
the first processFunction (TaskA) you can append the 
taskId to the elements themselves and then do a keyBy(taskId) between the first 
and the second process function.

These are the solutions that I can come up with, assuming that you want to do 
what I described.

But in general, could you please describe a bit more what is your use case? 
This way we may figure out another approach to achieve your goal. 
In fact, I am not sure if you earn anything by broadcasting the watermark, 
other than 
re-implementing (to some extent) Flink’s windowing mechanism.

Thanks,
Kostas

> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunol...@gmail.com> wrote:
> 
> Hi,
> 
> I have a simple streaming job such as:
> 
> source.process(taskA)
>           .process(taskB)
> 
> I want taskB to access minimum watermark of all parallel taskA instances, but 
> the data is ordered and should not be shuffled. ForwardPartitioner uses 
> watermark of only one predecessor. So, I have used a customPartitioner.
> 
> source.process(taskA)
>           .map(AssignPartitionID)
>           .partitionCustom(IdPartitioner)
>           .map(StripPartitionID)
>           .process(taskB)
> 
> At AssignPartitionID function, I attach 
> getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. 
> At IdPartitioner, I return this partitionId.
> 
> This solved the main requirement but I have another concern now,
> 
> Network shuffle: I don’t need a network shuffle. I thought within a 
> taskmanager, indexId of taskA subtasks would be same as indexId of taskB 
> subtasks. Unfortunately, they are not. Is there a way to make partitionCustom 
> distribute data like ForwardPartitioner, to the next local operator? 
> 
> As I know, this still requires object serialization/deserialization since 
> operators can’t be chained anymore. Is there a way to get minimum watermark 
> from upstream operators without network shuffle and object 
> serilization/deserialization?
> 
> Regards,

Reply via email to