[
https://issues.apache.org/jira/browse/FLINK-21679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he updated FLINK-21679:
-------------------------------
Fix Version/s: 1.13.0
> Set output type for transformations from SourceProvider and
> DataStreamScanProvider in CommonExecTableSourceScan
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21679
> URL: https://issues.apache.org/jira/browse/FLINK-21679
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: Wei Zhong
> Assignee: Wei Zhong
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Currently we only set output type for the transformations from
> SourceFunctionProvider and InputFormatProvider in CommonExecTableSourceScan:
> {code:java}
> @Override
> protected Transformation<RowData> translateToPlanInternal(PlannerBase
> planner) {
> final StreamExecutionEnvironment env = planner.getExecEnv();
> final String operatorName = getDescription();
> final InternalTypeInfo<RowData> outputTypeInfo =
> InternalTypeInfo.of((RowType) getOutputType());
> final ScanTableSource tableSource =
> tableSourceSpec.getScanTableSource(planner);
> ScanTableSource.ScanRuntimeProvider provider =
>
> tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
> if (provider instanceof SourceFunctionProvider) {
> SourceFunction<RowData> sourceFunction =
> ((SourceFunctionProvider) provider).createSourceFunction();
> return env.addSource(sourceFunction, operatorName,
> outputTypeInfo).getTransformation();
> } else if (provider instanceof InputFormatProvider) {
> InputFormat<RowData, ?> inputFormat =
> ((InputFormatProvider) provider).createInputFormat();
> return createInputFormatTransformation(env, inputFormat,
> outputTypeInfo, operatorName);
> } else if (provider instanceof SourceProvider) {
> // outputTypeInfo is not set here
> Source<RowData, ?, ?> source = ((SourceProvider)
> provider).createSource();
> return env.fromSource(source, WatermarkStrategy.noWatermarks(),
> operatorName)
> .getTransformation();
> } else if (provider instanceof DataStreamScanProvider) {
> // outputTypeInfo is not set here
> return ((DataStreamScanProvider)
> provider).produceDataStream(env).getTransformation();
> } else {
> throw new UnsupportedOperationException(
> provider.getClass().getSimpleName() + " is unsupported now.");
> }
> }{code}
> We can also set output type for transformations from SourceProvider and
> DataStreamScanProvider in CommonExecTableSourceScan, so that users do not
> need to implement a ResultQueryable interface when implementing the new
> Source interface in FLIP-27.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)