Timo Walther created FLINK-23373:
------------------------------------
Summary: Support object reuse disabled in OperatorChain
Key: FLINK-23373
URL: https://issues.apache.org/jira/browse/FLINK-23373
Project: Flink
Issue Type: Sub-task
Components: Runtime / Task
Reporter: Timo Walther
Currently, object reuse must be enabled in order to use chained sources.
Tests such as `HiveDialectQueryITCase` will fail with an exception:
{code}
2021-07-12T14:47:55.8233741Z Jul 12 14:47:55 [ERROR]
testQueries(org.apache.flink.connectors.hive.HiveDialectQueryITCase) Time
elapsed: 12.283 s <<< ERROR!
2021-07-12T14:47:55.8234433Z Jul 12 14:47:55 java.lang.RuntimeException: Failed
to fetch next result
2021-07-12T14:47:55.8235133Z Jul 12 14:47:55 at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
2021-07-12T14:47:55.8235958Z Jul 12 14:47:55 at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
2021-07-12T14:47:55.8236774Z Jul 12 14:47:55 at
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
....
2021-07-12T14:47:55.8313594Z Jul 12 14:47:55 Caused by:
java.lang.UnsupportedOperationException: Currently chained sources are
supported only with objectReuse enabled
2021-07-12T14:47:55.8314356Z Jul 12 14:47:55 at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSourceOutput(OperatorChain.java:355)
2021-07-12T14:47:55.8315109Z Jul 12 14:47:55 at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSources(OperatorChain.java:322)
2021-07-12T14:47:55.8315820Z Jul 12 14:47:55 at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:220)
2021-07-12T14:47:55.8316506Z Jul 12 14:47:55 at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:558)
2021-07-12T14:47:55.8317209Z Jul 12 14:47:55 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:661)
2021-07-12T14:47:55.8317948Z Jul 12 14:47:55 at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:547)
2021-07-12T14:47:55.8318626Z Jul 12 14:47:55 at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
2021-07-12T14:47:55.8319205Z Jul 12 14:47:55 at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
2021-07-12T14:47:55.8319725Z Jul 12 14:47:55 at
java.lang.Thread.run(Thread.java:748)
2021-07-12T14:47:55.8320122Z Jul 12 1
{code}
The fix should looks as follows:
This particular exception should be rather straightforward to fix. The reason
it's not implemented is because the chaining sources feature was implemented in
the minimal scope required by blink planner and is missing around ~50-100 lines
of production code to work with the object reuse disabled.
In the {{OperatorChain#createChainedSourceOutput}} we need to something similar
as is done in {{OperatorChain#wrapOperatorIntoOutput}} , so something like:
{code}
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
return closer.register(new ChainingOutput(input, metricGroup,
outputTag));
} else {
TypeSerializer<IN> inSerializer =
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
return closer.register(new CopyingChainingOutput(input,
inSerializer, metricGroup, outputTag));
}
{code}
the missing part to do that is to make {{CopyingChainingOutput}} work with an
Input instead of an Operator.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)