[ 
https://issues.apache.org/jira/browse/FLINK-23373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-23373.
--------------------------------
    Fix Version/s: 1.14.0
       Resolution: Fixed

Fixed in 1.14.0:

commit 2fe7b938e77b3ea88979ab1d36502604042e2ffc
[task] Fully support object reuse in OperatorChain

commit 8e53edb885a4eadc20e9b43dc2b54730ee7b899c
[task] Fix source input serializer in StreamTaskMailboxTestHarness

> Support object reuse disabled in OperatorChain
> ----------------------------------------------
>
>                 Key: FLINK-23373
>                 URL: https://issues.apache.org/jira/browse/FLINK-23373
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> Currently, object reuse must be enabled in order to use chained sources.
> Tests such as `HiveDialectQueryITCase` will fail with an exception:
> {code}
> 2021-07-12T14:47:55.8233741Z Jul 12 14:47:55 [ERROR] 
> testQueries(org.apache.flink.connectors.hive.HiveDialectQueryITCase)  Time 
> elapsed: 12.283 s  <<< ERROR!
> 2021-07-12T14:47:55.8234433Z Jul 12 14:47:55 java.lang.RuntimeException: 
> Failed to fetch next result
> 2021-07-12T14:47:55.8235133Z Jul 12 14:47:55  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> 2021-07-12T14:47:55.8235958Z Jul 12 14:47:55  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-07-12T14:47:55.8236774Z Jul 12 14:47:55  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> ....
> 2021-07-12T14:47:55.8313594Z Jul 12 14:47:55 Caused by: 
> java.lang.UnsupportedOperationException: Currently chained sources are 
> supported only with objectReuse enabled
> 2021-07-12T14:47:55.8314356Z Jul 12 14:47:55  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSourceOutput(OperatorChain.java:355)
> 2021-07-12T14:47:55.8315109Z Jul 12 14:47:55  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSources(OperatorChain.java:322)
> 2021-07-12T14:47:55.8315820Z Jul 12 14:47:55  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:220)
> 2021-07-12T14:47:55.8316506Z Jul 12 14:47:55  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:558)
> 2021-07-12T14:47:55.8317209Z Jul 12 14:47:55  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:661)
> 2021-07-12T14:47:55.8317948Z Jul 12 14:47:55  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:547)
> 2021-07-12T14:47:55.8318626Z Jul 12 14:47:55  at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> 2021-07-12T14:47:55.8319205Z Jul 12 14:47:55  at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> 2021-07-12T14:47:55.8319725Z Jul 12 14:47:55  at 
> java.lang.Thread.run(Thread.java:748)
> 2021-07-12T14:47:55.8320122Z Jul 12 1
> {code}
> The fix should looks as follows:
> This particular exception should be rather straightforward to fix. The reason 
> it's not implemented is because the chaining sources feature was implemented 
> in the minimal scope required by blink planner and is missing around ~50-100 
> lines of production code to work with the object reuse disabled.
> In the {{OperatorChain#createChainedSourceOutput}} we need to something 
> similar as is done in {{OperatorChain#wrapOperatorIntoOutput}} , so something 
> like:
> {code}
>         if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
>             return closer.register(new ChainingOutput(input, metricGroup, 
> outputTag));
>         } else {
>             TypeSerializer<IN> inSerializer =
>                     operatorConfig.getTypeSerializerIn1(userCodeClassloader);
>             return closer.register(new CopyingChainingOutput(input, 
> inSerializer, metricGroup, outputTag));
>         }
> {code}
> the missing part to do that is to make {{CopyingChainingOutput}} work with an 
> Input instead of an Operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to