Hi,

I have a simple streaming job such as:

source.process(taskA)
           .process(taskB)

I want taskB to access minimum watermark of all parallel taskA instances, but 
the data is ordered and should not be shuffled. ForwardPartitioner uses 
watermark of only one predecessor. So, I have used a customPartitioner.

source.process(taskA)
           .map(AssignPartitionID)
           .partitionCustom(IdPartitioner)
           .map(StripPartitionID)
           .process(taskB)

At AssignPartitionID function, I attach 
getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At 
IdPartitioner, I return this partitionId.

This solved the main requirement but I have another concern now,

Network shuffle: I don’t need a network shuffle. I thought within a 
taskmanager, indexId of taskA subtasks would be same as indexId of taskB 
subtasks. Unfortunately, they are not. Is there a way to make partitionCustom 
distribute data like ForwardPartitioner, to the next local operator? 

As I know, this still requires object serialization/deserialization since 
operators can’t be chained anymore. Is there a way to get minimum watermark 
from upstream operators without network shuffle and object 
serilization/deserialization?

Regards,

Reply via email to