[ https://issues.apache.org/jira/browse/FLINK-23373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timo Walther closed FLINK-23373. -------------------------------- Fix Version/s: 1.14.0 Resolution: Fixed Fixed in 1.14.0: commit 2fe7b938e77b3ea88979ab1d36502604042e2ffc [task] Fully support object reuse in OperatorChain commit 8e53edb885a4eadc20e9b43dc2b54730ee7b899c [task] Fix source input serializer in StreamTaskMailboxTestHarness > Support object reuse disabled in OperatorChain > ---------------------------------------------- > > Key: FLINK-23373 > URL: https://issues.apache.org/jira/browse/FLINK-23373 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task > Reporter: Timo Walther > Assignee: Timo Walther > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > Currently, object reuse must be enabled in order to use chained sources. > Tests such as `HiveDialectQueryITCase` will fail with an exception: > {code} > 2021-07-12T14:47:55.8233741Z Jul 12 14:47:55 [ERROR] > testQueries(org.apache.flink.connectors.hive.HiveDialectQueryITCase) Time > elapsed: 12.283 s <<< ERROR! > 2021-07-12T14:47:55.8234433Z Jul 12 14:47:55 java.lang.RuntimeException: > Failed to fetch next result > 2021-07-12T14:47:55.8235133Z Jul 12 14:47:55 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) > 2021-07-12T14:47:55.8235958Z Jul 12 14:47:55 at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > 2021-07-12T14:47:55.8236774Z Jul 12 14:47:55 at > org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) > .... > 2021-07-12T14:47:55.8313594Z Jul 12 14:47:55 Caused by: > java.lang.UnsupportedOperationException: Currently chained sources are > supported only with objectReuse enabled > 2021-07-12T14:47:55.8314356Z Jul 12 14:47:55 at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSourceOutput(OperatorChain.java:355) > 2021-07-12T14:47:55.8315109Z Jul 12 14:47:55 at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSources(OperatorChain.java:322) > 2021-07-12T14:47:55.8315820Z Jul 12 14:47:55 at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:220) > 2021-07-12T14:47:55.8316506Z Jul 12 14:47:55 at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:558) > 2021-07-12T14:47:55.8317209Z Jul 12 14:47:55 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:661) > 2021-07-12T14:47:55.8317948Z Jul 12 14:47:55 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:547) > 2021-07-12T14:47:55.8318626Z Jul 12 14:47:55 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > 2021-07-12T14:47:55.8319205Z Jul 12 14:47:55 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > 2021-07-12T14:47:55.8319725Z Jul 12 14:47:55 at > java.lang.Thread.run(Thread.java:748) > 2021-07-12T14:47:55.8320122Z Jul 12 1 > {code} > The fix should looks as follows: > This particular exception should be rather straightforward to fix. The reason > it's not implemented is because the chaining sources feature was implemented > in the minimal scope required by blink planner and is missing around ~50-100 > lines of production code to work with the object reuse disabled. > In the {{OperatorChain#createChainedSourceOutput}} we need to something > similar as is done in {{OperatorChain#wrapOperatorIntoOutput}} , so something > like: > {code} > if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { > return closer.register(new ChainingOutput(input, metricGroup, > outputTag)); > } else { > TypeSerializer<IN> inSerializer = > operatorConfig.getTypeSerializerIn1(userCodeClassloader); > return closer.register(new CopyingChainingOutput(input, > inSerializer, metricGroup, outputTag)); > } > {code} > the missing part to do that is to make {{CopyingChainingOutput}} work with an > Input instead of an Operator. -- This message was sent by Atlassian Jira (v8.3.4#803005)