BIOINSu commented on code in PR #24128:
URL: https://github.com/apache/flink/pull/24128#discussion_r1464827047


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java:
##########
@@ -105,54 +117,131 @@ protected Transformation<RowData> 
translateToPlanInternal(
                         planner.getFlinkContext(), 
ShortcutUtils.unwrapTypeFactory(planner));
         ScanTableSource.ScanRuntimeProvider provider =
                 
tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        final int sourceParallelism = deriveSourceParallelism(provider);
+        final boolean sourceParallelismConfigured = 
isParallelismConfigured(provider);
         if (provider instanceof SourceFunctionProvider) {
             final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
             final SourceFunction<RowData> function = 
sourceFunctionProvider.createSourceFunction();
-            final Transformation<RowData> transformation =
+            sourceTransform =
                     createSourceFunctionTransformation(
                             env,
                             function,
                             sourceFunctionProvider.isBounded(),
                             meta.getName(),
-                            outputTypeInfo);
-            return meta.fill(transformation);
+                            outputTypeInfo,
+                            sourceParallelism,
+                            sourceParallelismConfigured);
+            return meta.fill(sourceTransform);
         } else if (provider instanceof InputFormatProvider) {
             final InputFormat<RowData, ?> inputFormat =
                     ((InputFormatProvider) provider).createInputFormat();
-            final Transformation<RowData> transformation =
+            sourceTransform =
                     createInputFormatTransformation(
                             env, inputFormat, outputTypeInfo, meta.getName());
-            return meta.fill(transformation);
+            meta.fill(sourceTransform);
         } else if (provider instanceof SourceProvider) {
             final Source<RowData, ?, ?> source = ((SourceProvider) 
provider).createSource();
             // TODO: Push down watermark strategy to source scan
-            final Transformation<RowData> transformation =
+            sourceTransform =
                     env.fromSource(
                                     source,
                                     WatermarkStrategy.noWatermarks(),
                                     meta.getName(),
                                     outputTypeInfo)
                             .getTransformation();
-            return meta.fill(transformation);
+            meta.fill(sourceTransform);
         } else if (provider instanceof DataStreamScanProvider) {
-            Transformation<RowData> transformation =
+            sourceTransform =
                     ((DataStreamScanProvider) provider)
                             .produceDataStream(createProviderContext(config), 
env)
                             .getTransformation();
-            meta.fill(transformation);
-            transformation.setOutputType(outputTypeInfo);
-            return transformation;
+            meta.fill(sourceTransform);
+            sourceTransform.setOutputType(outputTypeInfo);
         } else if (provider instanceof TransformationScanProvider) {
-            final Transformation<RowData> transformation =
+            sourceTransform =
                     ((TransformationScanProvider) provider)
                             
.createTransformation(createProviderContext(config));
-            meta.fill(transformation);
-            transformation.setOutputType(outputTypeInfo);
-            return transformation;
+            meta.fill(sourceTransform);
+            sourceTransform.setOutputType(outputTypeInfo);
         } else {
             throw new UnsupportedOperationException(
                     provider.getClass().getSimpleName() + " is unsupported 
now.");
         }
+
+        if (sourceParallelismConfigured) {
+            return applySourceTransformationWrapper(
+                    sourceTransform,
+                    planner.getFlinkContext().getClassLoader(),
+                    outputTypeInfo,
+                    config,
+                    sourceParallelism);
+        } else {
+            return sourceTransform;
+        }
+    }
+
+    private boolean 
isParallelismConfigured(ScanTableSource.ScanRuntimeProvider runtimeProvider) {
+        return runtimeProvider instanceof ParallelismProvider
+                && ((ParallelismProvider) 
runtimeProvider).getParallelism().isPresent();
+    }
+
+    private int deriveSourceParallelism(ScanTableSource.ScanRuntimeProvider 
runtimeProvider) {
+        if (isParallelismConfigured(runtimeProvider)) {
+            int sourceParallelism = ((ParallelismProvider) 
runtimeProvider).getParallelism().get();
+            if (sourceParallelism <= 0) {
+                throw new TableException(
+                        String.format(
+                                "Invalid configured parallelism %s for table 
'%s'.",
+                                sourceParallelism,
+                                tableSourceSpec
+                                        .getContextResolvedTable()
+                                        .getIdentifier()
+                                        .asSummaryString()));
+            }
+            return sourceParallelism;
+        } else {
+            return ExecutionConfig.PARALLELISM_DEFAULT;
+        }
+    }
+
+    protected RowType getPhysicalRowType(ResolvedSchema schema) {
+        return (RowType) schema.toPhysicalRowDataType().getLogicalType();
+    }
+
+    protected int[] getPrimaryKeyIndices(RowType sourceRowType, ResolvedSchema 
schema) {
+        return schema.getPrimaryKey()
+                .map(k -> 
k.getColumns().stream().mapToInt(sourceRowType::getFieldIndex).toArray())
+                .orElse(new int[0]);
+    }
+
+    private Transformation<RowData> applySourceTransformationWrapper(
+            Transformation<RowData> sourceTransform,
+            ClassLoader classLoader,
+            InternalTypeInfo<RowData> outputTypeInfo,
+            ExecNodeConfig config,
+            int sourceParallelism) {
+        sourceTransform.setParallelism(sourceParallelism, true);
+        Transformation<RowData> sourceTransformationWrapper =
+                new SourceTransformationWrapper<>(sourceTransform);
+
+        final ResolvedSchema schema = 
tableSourceSpec.getContextResolvedTable().getResolvedSchema();
+        final RowType physicalRowType = getPhysicalRowType(schema);
+        final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, 
schema);
+        final boolean hasPk = primaryKeys.length > 0;
+
+        if (hasPk) {

Review Comment:
   Thanks for the comments.
   1. If parallelism is equal to the job default parallelism, the user would 
not need to configure parallelism and the `sourceParallelismConfigured` would 
be false, so no extra hash shuffle would be applied.
   2. Yes, thanks for the remind. In the latest version of code, if the source 
produces upsert data and configures parallelism, hash shuffle would be applied. 
Notice that, in this case, the primary key must be configured. Otherwise an 
exception would be thrown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to