Hi, I have a simple streaming job such as:
source.process(taskA) .process(taskB) I want taskB to access minimum watermark of all parallel taskA instances, but the data is ordered and should not be shuffled. ForwardPartitioner uses watermark of only one predecessor. So, I have used a customPartitioner. source.process(taskA) .map(AssignPartitionID) .partitionCustom(IdPartitioner) .map(StripPartitionID) .process(taskB) At AssignPartitionID function, I attach getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At IdPartitioner, I return this partitionId. This solved the main requirement but I have another concern now, Network shuffle: I don’t need a network shuffle. I thought within a taskmanager, indexId of taskA subtasks would be same as indexId of taskB subtasks. Unfortunately, they are not. Is there a way to make partitionCustom distribute data like ForwardPartitioner, to the next local operator? As I know, this still requires object serialization/deserialization since operators can’t be chained anymore. Is there a way to get minimum watermark from upstream operators without network shuffle and object serilization/deserialization? Regards,