Hi,

As you noticed, Flink does currently not put Source-X and Throttler-X (for some 
X) in the same task slot (TaskManager). In the low-level execution system, 
there are two connection patterns: ALL_TO_ALL and POINTWISE. Flink will only 
schedule Source-X and Throttler-X on the same slot when the POINTWISE pattern 
is used. At the API level, only when there is a simple "forward" connection 
between operations in the streaming API will the POINTWISE pattern be used, for 
all other partitioning schemes ALL_TO_ALL is used with a custom (API-level 
partitioner).

It might be possible to change this, I'll get back to you once I investigated 
more.

Best,
Aljoscha

> On 29. Sep 2017, at 00:05, Yunus Olgun <yunol...@gmail.com> wrote:
> 
> Hi Kostas, Aljoscha,
> 
> To answer Kostas’s concern, the algorithm works this way:
> 
> Let’s say we have two sources Source-0 and Source-1. Source-0 is slow and 
> Source-1 is fast. Sources read from Kafka at different paces. Threshold is 10 
> time units.
> 
> 1st cycle: Source-0 sends records with timestamp 1,2 and emit watermark 2. 
> Throttle-0 has WM 2.
>                 Source-1 sends records with timestamp 1,2,3 and emit 
> watermark 3. Throttle-1 has also WM 2.
> .
> .
> .
> 10th cycle: Source-0 sends records with timestamp 19, 20 and emit watermark 
> 20. Throttle-0 has WM 20.
>                   Source-1 sends records with timestamp 28, 29, 30 and emit 
> watermark 30. Throttle-1 has also WM 20.
> 
> 11th cycle: Source-0 sends records with timestamp 21,22 and emit watermark 
> 22. Throttle-0 has WM 22.
>                   Source-1 sends records with timestamp 31,32,33 and emit 
> watermark 33. Since, Throttle-1 has a WM of 20 at the beginning of the cycle 
> ,it will start sleeping a very short amount of time for each incoming record. 
> This eventually causes a backpressure to Source-1 and only Source-1. Source-1 
> starts to poll less frequently from Kafka.
> 
> For this algorithm to work each Throttler should receive records from only 
> one source. Otherwise backpressure will be applied to both sources. I achive 
> that using a custom partitioner and indexIds. Everything that comes from 
> Source-n goes to Throttler-n. Since it is a custom partitioner watermarks 
> gets broadcasted to all throttlers.
> 
> The problem is I thought Source-0 and Throttler-0 will be colocated in the 
> same taskmanager. Unfortunately this is not the case. Source-0 and 
> Throttler-1 can end up in TM-0; Source-1 and Throttler-0 at TM-1. This causes 
> a network shuffle, one more data serialization/deserialization. I want to 
> avoid that if it is possible, since the stream is big.
> 
> Regards,  
>  
>> On 28. Sep 2017, at 23:03, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> 
>> To quickly make Kostas' intuition concrete: it's currently not possible to 
>> have watermarks broadcast but the data be locally forwarded. The reason is 
>> that watermarks and data travel in the same channels so if the watermark 
>> needs to be broadcast there needs to be an n to m (in this case m == n) 
>> connection pattern between the operations (tasks).
>> 
>> I think your algorithm should work if you take the correct difference, i.e. 
>> throttle when timestamp - "global watermark" > threshold. The inverted diff 
>> would be "global watermark" - timestamp. I think you're already doing the 
>> correct thing, just wanted to clarify for others who might be reading.
>> 
>> Did you check on which TaskManagers the taskA and taskB operators run? I 
>> think they should still be running on the same TM if resources permit.
>> 
>> Best,
>> Aljoscha
>>> On 28. Sep 2017, at 10:25, Kostas Kloudas <k.klou...@data-artisans.com 
>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>> 
>>> Hi Yunus,
>>> 
>>> I see. Currently I am not sure that you can simply broadcast the watermark 
>>> only, without 
>>> having a shuffle.
>>> 
>>> But one thing to notice about your algorithm is that, I am not sure if your 
>>> algorithm solves 
>>> the problem you encounter.
>>> 
>>> Your algorithm seems to prioritize the stream with the elements with the 
>>> smallest timestamps,
>>> rather than throttling fast streams so that slow ones can catch up.
>>> 
>>> Example: Reading a partition from Kafka that has elements with timestamps 
>>> 1,2,3
>>> will emit watermark 3 (assuming ascending watermark extractor), while 
>>> another task that reads 
>>> another partition with elements with timestamps 5,6,7 will emit watermark 
>>> 7. With your algorithm, 
>>> if I get it right, you will throttle the second partition/task, while allow 
>>> the first one to advance, although
>>> both read at the same pace (e.g. 3 elements per unit of time).
>>> 
>>> I will think a bit more on the solution. 
>>> 
>>> Some sketches that I can find, they all introduce some latency, e.g. 
>>> measuring throughput in taskA
>>> and sending it to a side output with a taksID, then broadcasting the side 
>>> output to a downstream operator
>>> which is sth like a coprocess function (taskB) and receives the original 
>>> stream and the side output, and 
>>> this is the one that checks if “my task" is slow. 
>>> 
>>> As I said I will think on it a bit more,
>>> Kostas
>>> 
>>>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yunol...@gmail.com 
>>>> <mailto:yunol...@gmail.com>> wrote:
>>>> 
>>>> Hi Kostas,
>>>> 
>>>> Yes, you have summarized well. I want to only forward the data to the next 
>>>> local operator, but broadcast the watermark through the cluster.
>>>> 
>>>> - I can’t set parallelism of taskB to 1. The stream is too big for that. 
>>>> Also, the data is ordered at each partition. I don’t want to change that 
>>>> order.
>>>> 
>>>> - I don’t need KeyedStream. Also taskA and taskB will always have the same 
>>>> parallelism with each other. But this parallelism can be increased in the 
>>>> future.
>>>> 
>>>> The use case is: The source is Kafka. At our peak hours or when we want to 
>>>> run the streaming job with old data from Kafka, always the same thing 
>>>> happens. Even at trivial jobs. Some consumers consumes faster than others. 
>>>> They produce too much data to downstream but watermark advances slowly at 
>>>> the speed of the slowest consumer. This extra data gets piled up at 
>>>> downstream operators. When the downstream operator is an aggregation, it 
>>>> is ok. But when it is a in-Flink join; state size gets too big, 
>>>> checkpoints take much longer and overall the job becomes slower or fails. 
>>>> Also it effects other jobs at the cluster.
>>>> 
>>>> So, basically I want to implement a throttler. It compares timestamp of a 
>>>> record and the global watermark. If the difference is larger than a 
>>>> constant threshold it starts sleeping 1 ms for each incoming record. This 
>>>> way, fast operators wait for the slowest one.
>>>> 
>>>> The only problem is that, this solution came at the cost of one network 
>>>> shuffle and data serialization/deserialization. Since the stream is large 
>>>> I want to avoid the network shuffle at the least. 
>>>> 
>>>> I thought operator instances within a taskmanager would get the same 
>>>> indexId, but apparently this is not the case.
>>>> 
>>>> Thanks,
>>>> 
>>>>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.klou...@data-artisans.com 
>>>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>>>> 
>>>>> Hi Yunus,
>>>>> 
>>>>> I am not sure if I understand correctly the question.
>>>>> 
>>>>> Am I correct to assume that you want the following?
>>>>> 
>>>>>                           ———————————> time
>>>>> 
>>>>>           ProcessA                                                ProcessB
>>>>> 
>>>>> Task1: W(3) E(1) E(2) E(5)                        W(3) W(7) E(1) E(2) E(5)
>>>>> 
>>>>> Task2: W(7) E(3) E(10) E(6)                       W(3) W(7) E(3) E(10) 
>>>>> E(6)
>>>>> 
>>>>> 
>>>>> In the above, elements flow from left to right and W() stands for 
>>>>> watermark and E() stands for element.
>>>>> In other words, between Process(TaksA) and Process(TaskB) you want to 
>>>>> only forward the elements, but broadcast the watermarks, right?
>>>>> 
>>>>> If this is the case, a trivial solution would be to set the parallelism 
>>>>> of TaskB to 1, so that all elements go through the same node.
>>>>> 
>>>>> One other solution is what you did, BUT by using a custom partitioner you 
>>>>> cannot use keyed state in your process function B because the 
>>>>> stream is no longer keyed.
>>>>> 
>>>>> A similar approach to what you did but without the limitation above, is 
>>>>> that in the first processFunction (TaskA) you can append the 
>>>>> taskId to the elements themselves and then do a keyBy(taskId) between the 
>>>>> first and the second process function.
>>>>> 
>>>>> These are the solutions that I can come up with, assuming that you want 
>>>>> to do what I described.
>>>>> 
>>>>> But in general, could you please describe a bit more what is your use 
>>>>> case? 
>>>>> This way we may figure out another approach to achieve your goal. 
>>>>> In fact, I am not sure if you earn anything by broadcasting the 
>>>>> watermark, other than 
>>>>> re-implementing (to some extent) Flink’s windowing mechanism.
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunol...@gmail.com 
>>>>>> <mailto:yunol...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I have a simple streaming job such as:
>>>>>> 
>>>>>> source.process(taskA)
>>>>>>           .process(taskB)
>>>>>> 
>>>>>> I want taskB to access minimum watermark of all parallel taskA 
>>>>>> instances, but the data is ordered and should not be shuffled. 
>>>>>> ForwardPartitioner uses watermark of only one predecessor. So, I have 
>>>>>> used a customPartitioner.
>>>>>> 
>>>>>> source.process(taskA)
>>>>>>           .map(AssignPartitionID)
>>>>>>           .partitionCustom(IdPartitioner)
>>>>>>           .map(StripPartitionID)
>>>>>>           .process(taskB)
>>>>>> 
>>>>>> At AssignPartitionID function, I attach 
>>>>>> getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the 
>>>>>> object. At IdPartitioner, I return this partitionId.
>>>>>> 
>>>>>> This solved the main requirement but I have another concern now,
>>>>>> 
>>>>>> Network shuffle: I don’t need a network shuffle. I thought within a 
>>>>>> taskmanager, indexId of taskA subtasks would be same as indexId of taskB 
>>>>>> subtasks. Unfortunately, they are not. Is there a way to make 
>>>>>> partitionCustom distribute data like ForwardPartitioner, to the next 
>>>>>> local operator? 
>>>>>> 
>>>>>> As I know, this still requires object serialization/deserialization 
>>>>>> since operators can’t be chained anymore. Is there a way to get minimum 
>>>>>> watermark from upstream operators without network shuffle and object 
>>>>>> serilization/deserialization?
>>>>>> 
>>>>>> Regards,
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to