>
> My top guess was that the mapping of keys to task managers changes
> depending on the operator.  E.g. if the same key (regardless of the
> operator) goes to the same task manager, then I'd assume it's fine to
> support multiple inputs into an operator with reinterpretAsKeyedStream.

I agree with you at this point.

My guess is after you change the parallelism, the partitioners between
those operators are not 'forward'?  Because the operator with two input
could not be chained with its previous operator. If you change the
parallelism, it is possible to introduce other partitioner instead of
'forward'.
Let's assume I1 is first input vertex of join, I2 is second input vertex of
join, J is the vertex contain join operator. Before stop with savepoint,
whether partitioner between l1 and J is 'forward'? whether partitioner
between l2 and J is 'forward? After recover with savepoint, you change the
parallelism, whether the partitioner l1-> J and l2-> J still be 'forward'?
Would you please check this? If they are still be 'forward', we need to
look for other reason for this problem.

Dan Hill <quietgol...@gmail.com> 于2021年10月25日周一 上午10:43写道:

> On Sat, Oct 23, 2021 at 9:16 PM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> I found the following link about this.  Still looks applicable.  In my
>>> case, I don't need to do a broadcast join.
>>>
>>> https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
>>>
>> This is a good document. This document describes how to remove redundant
>> shuffles in batch jobs. You can find the operator implementation in
>> `BatchMultipleInputStreamOperator`[1]. It's a subclass of
>> `MultipleInputStreamOperator`[2] which we referred in the last email.
>>
>> I'm not sure I understand why having two inputs is an issue in my case.?
>>>
>> We guess there exists a network shuffle between your two operators
>> because your second operator has multiple inputs. The operator with
>> multiple inputs would not be chained with it's previous operator, please
>> see more information in Thias's last email.
>>
>
> Yea, I'm more curious "why?"  My top guess was that the mapping of keys to
> task managers changes depending on the operator.  E.g. if the same key
> (regardless of the operator) goes to the same task manager, then I'd assume
> it's fine to support multiple inputs into an operator with
> reinterpretAsKeyedStream.
>
>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java
>> [2]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MultipleInputStreamOperator.java
>>
>>
>> Dan Hill <quietgol...@gmail.com> 于2021年10月22日周五 下午1:34写道:
>>
>>> Probably worth restating.  I was hoping to avoid a lot of shuffles in my
>>> Flink job.  A bunch of the data is already keyed by the userId (and stays
>>> keyed by it most of the Flink job).
>>>
>>> I'm not sure I understand why having two inputs is an issue in my case.
>>> Does Flink send the same keys to different task managers depending on the
>>> operator (even if parallelism is the same)?  It's also weird the output
>>> looks fine as is and fails on rescaling partitions.
>>>
>>>
>>> On Thu, Oct 21, 2021 at 10:23 PM Dan Hill <quietgol...@gmail.com> wrote:
>>>
>>>> I found the following link about this.  Still looks applicable.  In my
>>>> case, I don't need to do a broadcast join.
>>>>
>>>> https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
>>>>
>>>> On Thu, Oct 21, 2021 at 9:51 PM Dan Hill <quietgol...@gmail.com> wrote:
>>>>
>>>>> Interesting.  Thanks, JING ZHANG!
>>>>>
>>>>> On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG <beyond1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Dan,
>>>>>> > I'm guessing I violate the "The second operator needs to be
>>>>>> single-input (i.e. no TwoInputOp nor union() before)" part.
>>>>>> I think.you are right.
>>>>>>
>>>>>> Do you want to remove shuffle of two inputs in your case? If yes,
>>>>>> Flink provides support for multiple input operators since 1.11
>>>>>> version. I think it might satisfy your need. You could find more in [1].
>>>>>> However, at present, this function does not provide a complete
>>>>>> interface of dataStream API. If users want to use it, they need to 
>>>>>> manually
>>>>>> create multipleInputTransformation and multipleConnectedStreams.
>>>>>>
>>>>>>> MultipleInputTransformation<Long> transform = new 
>>>>>>> MultipleInputTransformation<>(
>>>>>>>    "My Operator",
>>>>>>>    new SumAllInputOperatorFactory(),
>>>>>>>    BasicTypeInfo.LONG_TYPE_INFO,
>>>>>>>    1);
>>>>>>>
>>>>>>> env.addOperator(transform
>>>>>>>    .addInput(source1.getTransformation())
>>>>>>>    .addInput(source2.getTransformation())
>>>>>>>    .addInput(source3.getTransformation()));
>>>>>>> new MultipleConnectedStreams(env)
>>>>>>>    .transform(transform)
>>>>>>>    .addSink(resultSink);
>>>>>>>
>>>>>>>
>>>>>> I would invite @Piotr to double check this conclusion. He is more
>>>>>> professional on this topic.
>>>>>>
>>>>>> @Piotr, Would you please check Dan's question? Please correct me if
>>>>>> I'm wrong.
>>>>>>
>>>>>> [1]
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR
>>>>>>
>>>>>> Best,
>>>>>> JING ZHANG
>>>>>>
>>>>>> Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道:
>>>>>>
>>>>>>> Thanks Thias and JING ZHANG!
>>>>>>>
>>>>>>> Here's a Google drive folder link
>>>>>>> <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing>
>>>>>>> with the execution plan and two screenshots from the job graph.
>>>>>>>
>>>>>>> I'm guessing I violate the "The second operator needs to be
>>>>>>> single-input (i.e. no TwoInputOp nor union() before)" part.
>>>>>>>
>>>>>>> After I do a transformation on a KeyedStream (so it goes back to a
>>>>>>> SingleOutputStreamOperator), even if I do a simple map, it usually
>>>>>>> disallows operator chaining.  Even with reinterpretAsKeyedStream, it
>>>>>>> doesn't work.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>> Sorry for tipos,  I meant to provide the code to reproduce the
>>>>>>>> problem. If the current program is complex and secret, maybe you could 
>>>>>>>> try
>>>>>>>> to simplify the code.
>>>>>>>> Besides, Matthias's guess is very reasonable. Could you please
>>>>>>>> whether is there network shuffle between your two operators. Were
>>>>>>>> those two operators chained into one vertex?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> JING ZHANG
>>>>>>>>
>>>>>>>> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五
>>>>>>>> 下午2:57写道:
>>>>>>>>
>>>>>>>>> … didn’t mean to hit the send button so soon 😊
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I guess we are getting closer to a solution
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From:* Schwalbe Matthias
>>>>>>>>> *Sent:* Freitag, 15. Oktober 2021 08:49
>>>>>>>>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <
>>>>>>>>> user@flink.apache.org>
>>>>>>>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when
>>>>>>>>> scaling partitions?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi Dan again 😊,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I shed a second look … from what I see from your call stack I
>>>>>>>>> conclude that indeed you have a network shuffle between your two 
>>>>>>>>> operators,
>>>>>>>>>
>>>>>>>>> In which case reinterpretAsKeyedStream wouldn’t work
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>>>>>>>>> indicates that the two operators are not chained)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> … just as a double-check could you please share both your
>>>>>>>>>
>>>>>>>>>    - Execution plan (call println(env.getExecutionPlan) right
>>>>>>>>>    before your call env.execute) (json), and
>>>>>>>>>    - Your job plan (screenshot from flink dashboard)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> There is a number of preconditions before two operators get
>>>>>>>>> chained, and probably one of them fails (see [1]):
>>>>>>>>>
>>>>>>>>>    - The two operators need to allow chaining the resp. other
>>>>>>>>>    (see [2] … chaining strategy)
>>>>>>>>>    - We need a ForwardPartitioner in between
>>>>>>>>>    - We need to be in streaming mode
>>>>>>>>>    - Both operators need the same parallelism
>>>>>>>>>    - Chaining needs to be enabled for the streaming environment
>>>>>>>>>    - The second operator needs to be single-input (i.e. no
>>>>>>>>>    TwoInputOp nor union() before)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>>>>>>>>
>>>>>>>>> [2]
>>>>>>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From:* Dan Hill <quietgol...@gmail.com>
>>>>>>>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>>>>>>>>> partitions?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I have a job that uses reinterpretAsKeyedStream across a simple
>>>>>>>>> map to avoid a shuffle.  When changing the number of partitions, I'm
>>>>>>>>> hitting an issue with registerEventTimeTimer complaining that "key 
>>>>>>>>> group
>>>>>>>>> from 110 to 119 does not contain 186".  I'm using Flink v1.12.3.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any thoughts on this?  I don't know if there is a known issue
>>>>>>>>> with reinterpretAsKeyedStream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Rough steps:
>>>>>>>>>
>>>>>>>>> 1. I have a raw input stream of View records.  I keyBy the View
>>>>>>>>> using Tuple2<Long, String>(platform_id, log_user_id).
>>>>>>>>>
>>>>>>>>> 2. I do a small transformation of View to a TinyView.  I
>>>>>>>>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same 
>>>>>>>>> key.
>>>>>>>>> The keys are the same.
>>>>>>>>>
>>>>>>>>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> When I savepoint and start again with a different number of
>>>>>>>>> partitions, my KeyedCoProcessFunction hits an issue
>>>>>>>>> with registerEventTimeTimer and complains that "key group from 110 to 
>>>>>>>>> 119
>>>>>>>>> does not contain 186".  I verified that the key does not change and 
>>>>>>>>> that we
>>>>>>>>> use Tuple2 with primitives Long and String.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2021-10-14 08:17:07
>>>>>>>>>
>>>>>>>>> java.lang.IllegalArgumentException: view x insertion issue with
>>>>>>>>> registerEventTimeTimer for 
>>>>>>>>> key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
>>>>>>>>> flat=platform_id: 120
>>>>>>>>>
>>>>>>>>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>>>>>>>>>
>>>>>>>>> log_timestamp: 1634224329606
>>>>>>>>>
>>>>>>>>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>>>>>
>>>>>>>>>                 at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.IllegalArgumentException: key group from 110
>>>>>>>>> to 119 does not contain 186
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
>>>>>>>>>
>>>>>>>>>                 ... 17 more
>>>>>>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt
>>>>>>>>> und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>>>>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden 
>>>>>>>>> kann,
>>>>>>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>>>>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten 
>>>>>>>>> wir
>>>>>>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht 
>>>>>>>>> sowie
>>>>>>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder 
>>>>>>>>> Verbreitung
>>>>>>>>> dieser Informationen ist streng verboten.
>>>>>>>>>
>>>>>>>>> This message is intended only for the named recipient and may
>>>>>>>>> contain confidential or privileged information. As the 
>>>>>>>>> confidentiality of
>>>>>>>>> email communication cannot be guaranteed, we do not accept any
>>>>>>>>> responsibility for the confidentiality and the intactness of this 
>>>>>>>>> message.
>>>>>>>>> If you have received it in error, please advise the sender by return 
>>>>>>>>> e-mail
>>>>>>>>> and delete this message and any attachments. Any unauthorised use or
>>>>>>>>> dissemination of this information is strictly prohibited.
>>>>>>>>>
>>>>>>>>

Reply via email to