JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513954139
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/OutputFormatProvider.java ########## @@ -20,13 +20,15 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; /** * Provider of an {@link OutputFormat} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface OutputFormatProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface OutputFormatProvider Review comment: Code format: It is ok to include in one line ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -79,6 +87,10 @@ class CommonPhysicalSink ( val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames) runtimeProvider match { + case _: DataStreamSinkProvider with ParallelismProvider => throw new TableException( Review comment: Code format: Better to break line when `throw new Table...` ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) + assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + + val inputParallelism = inputTransformation.getParallelism + val parallelism = { + val parallelismOptional = runtimeProvider + .asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { + val parallelismPassedIn = parallelismOptional.get().intValue() + if(parallelismPassedIn <= 0) { + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") + } + parallelismPassedIn + } else inputParallelism + } + + val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) + val theFinalInputTransformation = + (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match { + // if the inputParallelism equals parallelism, do nothing. + case (true, _, _) => inputTransformation + case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation + case (_, _, Nil) => + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " + + s"while the input parallelism is: $inputParallelism. " + + s"Since the changelog mode " + + s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " + + s"which is not INSERT_ONLY mode, " + + s"primary key is required but no primary key is found" + ) + case (_, _, pks) => + //key by before sink + //according to [[StreamExecExchange]] + val selector = KeySelectorUtil.getRowDataSelector( + pks.toArray, inputTypeInfo) + // in case of maxParallelism is negative + val keyGroupNum = env.getMaxParallelism match { Review comment: Why need to check `env.getMaxParallelism`? ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) + assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + + val inputParallelism = inputTransformation.getParallelism + val parallelism = { + val parallelismOptional = runtimeProvider + .asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { + val parallelismPassedIn = parallelismOptional.get().intValue() + if(parallelismPassedIn <= 0) { + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") + } + parallelismPassedIn + } else inputParallelism + } + + val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) + val theFinalInputTransformation = + (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match { + // if the inputParallelism equals parallelism, do nothing. + case (true, _, _) => inputTransformation + case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation + case (_, _, Nil) => + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " + + s"while the input parallelism is: $inputParallelism. " + + s"Since the changelog mode " + + s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " + + s"which is not INSERT_ONLY mode, " + + s"primary key is required but no primary key is found" + ) + case (_, _, pks) => + //key by before sink + //according to [[StreamExecExchange]] + val selector = KeySelectorUtil.getRowDataSelector( + pks.toArray, inputTypeInfo) + // in case of maxParallelism is negative + val keyGroupNum = env.getMaxParallelism match { + case -1 => env.getParallelism + case x if(x > 0) => env.getMaxParallelism + case _ => DEFAULT_LOWER_BOUND_MAX_PARALLELISM + } + val partitioner = new KeyGroupStreamPartitioner(selector,keyGroupNum) Review comment: `selector,keyGroupNum` => `selector, keyGroupNum` ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -79,6 +87,10 @@ class CommonPhysicalSink ( val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames) runtimeProvider match { + case _: DataStreamSinkProvider with ParallelismProvider => throw new TableException( + "`DataStreamSinkProvider` is not allowed to " + Review comment: Code format: Better to fill one line (max 100 chars) ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) + assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + + val inputParallelism = inputTransformation.getParallelism + val parallelism = { + val parallelismOptional = runtimeProvider + .asInstanceOf[ParallelismProvider].getParallelism Review comment: Code format:Not break line ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) + assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + + val inputParallelism = inputTransformation.getParallelism + val parallelism = { + val parallelismOptional = runtimeProvider + .asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { + val parallelismPassedIn = parallelismOptional.get().intValue() + if(parallelismPassedIn <= 0) { + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") + } + parallelismPassedIn + } else inputParallelism Review comment: Code format: ``` else { inputParallelism } ``` ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) + assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + + val inputParallelism = inputTransformation.getParallelism + val parallelism = { + val parallelismOptional = runtimeProvider + .asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { + val parallelismPassedIn = parallelismOptional.get().intValue() + if(parallelismPassedIn <= 0) { + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") + } + parallelismPassedIn + } else inputParallelism + } + + val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) + val theFinalInputTransformation = + (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match { + // if the inputParallelism equals parallelism, do nothing. + case (true, _, _) => inputTransformation + case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation + case (_, _, Nil) => + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " + + s"while the input parallelism is: $inputParallelism. " + Review comment: Code format: Better to fill one line (max 100 chars) ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) + assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + + val inputParallelism = inputTransformation.getParallelism + val parallelism = { + val parallelismOptional = runtimeProvider + .asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { + val parallelismPassedIn = parallelismOptional.get().intValue() + if(parallelismPassedIn <= 0) { + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") + } + parallelismPassedIn + } else inputParallelism + } + + val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) + val theFinalInputTransformation = + (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match { + // if the inputParallelism equals parallelism, do nothing. + case (true, _, _) => inputTransformation + case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation + case (_, _, Nil) => + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " + + s"while the input parallelism is: $inputParallelism. " + + s"Since the changelog mode " + + s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " + + s"which is not INSERT_ONLY mode, " + + s"primary key is required but no primary key is found" + ) + case (_, _, pks) => + //key by before sink + //according to [[StreamExecExchange]] + val selector = KeySelectorUtil.getRowDataSelector( + pks.toArray, inputTypeInfo) Review comment: Not break line ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ########## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) + assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + + val inputParallelism = inputTransformation.getParallelism + val parallelism = { + val parallelismOptional = runtimeProvider + .asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { + val parallelismPassedIn = parallelismOptional.get().intValue() + if(parallelismPassedIn <= 0) { + throw new TableException( + s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") + } + parallelismPassedIn + } else inputParallelism + } + + val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) + val theFinalInputTransformation = + (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match { Review comment: I think it is better to use `if else` here. ########## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ########## @@ -20,13 +20,15 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider Review comment: Code format: It is ok to include in one line ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org