BIOINSu commented on code in PR #24128: URL: https://github.com/apache/flink/pull/24128#discussion_r1464827047
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java: ########## @@ -105,54 +117,131 @@ protected Transformation<RowData> translateToPlanInternal( planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner)); ScanTableSource.ScanRuntimeProvider provider = tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + final int sourceParallelism = deriveSourceParallelism(provider); + final boolean sourceParallelismConfigured = isParallelismConfigured(provider); if (provider instanceof SourceFunctionProvider) { final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider; final SourceFunction<RowData> function = sourceFunctionProvider.createSourceFunction(); - final Transformation<RowData> transformation = + sourceTransform = createSourceFunctionTransformation( env, function, sourceFunctionProvider.isBounded(), meta.getName(), - outputTypeInfo); - return meta.fill(transformation); + outputTypeInfo, + sourceParallelism, + sourceParallelismConfigured); + return meta.fill(sourceTransform); } else if (provider instanceof InputFormatProvider) { final InputFormat<RowData, ?> inputFormat = ((InputFormatProvider) provider).createInputFormat(); - final Transformation<RowData> transformation = + sourceTransform = createInputFormatTransformation( env, inputFormat, outputTypeInfo, meta.getName()); - return meta.fill(transformation); + meta.fill(sourceTransform); } else if (provider instanceof SourceProvider) { final Source<RowData, ?, ?> source = ((SourceProvider) provider).createSource(); // TODO: Push down watermark strategy to source scan - final Transformation<RowData> transformation = + sourceTransform = env.fromSource( source, WatermarkStrategy.noWatermarks(), meta.getName(), outputTypeInfo) .getTransformation(); - return meta.fill(transformation); + meta.fill(sourceTransform); } else if (provider instanceof DataStreamScanProvider) { - Transformation<RowData> transformation = + sourceTransform = ((DataStreamScanProvider) provider) .produceDataStream(createProviderContext(config), env) .getTransformation(); - meta.fill(transformation); - transformation.setOutputType(outputTypeInfo); - return transformation; + meta.fill(sourceTransform); + sourceTransform.setOutputType(outputTypeInfo); } else if (provider instanceof TransformationScanProvider) { - final Transformation<RowData> transformation = + sourceTransform = ((TransformationScanProvider) provider) .createTransformation(createProviderContext(config)); - meta.fill(transformation); - transformation.setOutputType(outputTypeInfo); - return transformation; + meta.fill(sourceTransform); + sourceTransform.setOutputType(outputTypeInfo); } else { throw new UnsupportedOperationException( provider.getClass().getSimpleName() + " is unsupported now."); } + + if (sourceParallelismConfigured) { + return applySourceTransformationWrapper( + sourceTransform, + planner.getFlinkContext().getClassLoader(), + outputTypeInfo, + config, + sourceParallelism); + } else { + return sourceTransform; + } + } + + private boolean isParallelismConfigured(ScanTableSource.ScanRuntimeProvider runtimeProvider) { + return runtimeProvider instanceof ParallelismProvider + && ((ParallelismProvider) runtimeProvider).getParallelism().isPresent(); + } + + private int deriveSourceParallelism(ScanTableSource.ScanRuntimeProvider runtimeProvider) { + if (isParallelismConfigured(runtimeProvider)) { + int sourceParallelism = ((ParallelismProvider) runtimeProvider).getParallelism().get(); + if (sourceParallelism <= 0) { + throw new TableException( + String.format( + "Invalid configured parallelism %s for table '%s'.", + sourceParallelism, + tableSourceSpec + .getContextResolvedTable() + .getIdentifier() + .asSummaryString())); + } + return sourceParallelism; + } else { + return ExecutionConfig.PARALLELISM_DEFAULT; + } + } + + protected RowType getPhysicalRowType(ResolvedSchema schema) { + return (RowType) schema.toPhysicalRowDataType().getLogicalType(); + } + + protected int[] getPrimaryKeyIndices(RowType sourceRowType, ResolvedSchema schema) { + return schema.getPrimaryKey() + .map(k -> k.getColumns().stream().mapToInt(sourceRowType::getFieldIndex).toArray()) + .orElse(new int[0]); + } + + private Transformation<RowData> applySourceTransformationWrapper( + Transformation<RowData> sourceTransform, + ClassLoader classLoader, + InternalTypeInfo<RowData> outputTypeInfo, + ExecNodeConfig config, + int sourceParallelism) { + sourceTransform.setParallelism(sourceParallelism, true); + Transformation<RowData> sourceTransformationWrapper = + new SourceTransformationWrapper<>(sourceTransform); + + final ResolvedSchema schema = tableSourceSpec.getContextResolvedTable().getResolvedSchema(); + final RowType physicalRowType = getPhysicalRowType(schema); + final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema); + final boolean hasPk = primaryKeys.length > 0; + + if (hasPk) { Review Comment: Thanks for the comments. 1. If parallelism is equal to the job default parallelism, the user would not need to configure parallelism and the `sourceParallelismConfigured` would be false, so no extra hash shuffle would be applied. 2. Yes, thanks for the remind. In the latest version of code, if the source produces upsert data and configures parallelism, hash shuffle would be applied. Notice that, in this case, the primary key must be configured. Otherwise an exception would be thrown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org