JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513954139



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/OutputFormatProvider.java
##########
@@ -20,13 +20,15 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
 /**
  * Provider of an {@link OutputFormat} instance as a runtime implementation 
for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface OutputFormatProvider extends 
DynamicTableSink.SinkRuntimeProvider {
+public interface OutputFormatProvider

Review comment:
       Code format: It is ok to include in one line

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -79,6 +87,10 @@ class CommonPhysicalSink (
     val enforcer = new SinkNotNullEnforcer(notNullEnforcer, 
notNullFieldIndices, fieldNames)
 
     runtimeProvider match {
+      case _: DataStreamSinkProvider with ParallelismProvider => throw new 
TableException(

Review comment:
       Code format: Better to break line when `throw new Table...`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is 
required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: 
$parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) 
match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => 
inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: 
$parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +
+            s"Since the changelog mode " +
+            s"contains 
[${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+            s"which is not INSERT_ONLY mode, " +
+            s"primary key is required but no primary key is found"
+          )
+          case (_, _, pks) =>
+            //key by before sink
+            //according to [[StreamExecExchange]]
+            val selector = KeySelectorUtil.getRowDataSelector(
+              pks.toArray, inputTypeInfo)
+            // in case of maxParallelism is negative
+            val keyGroupNum = env.getMaxParallelism match {

Review comment:
       Why need to check `env.getMaxParallelism`?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is 
required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: 
$parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) 
match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => 
inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: 
$parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +
+            s"Since the changelog mode " +
+            s"contains 
[${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+            s"which is not INSERT_ONLY mode, " +
+            s"primary key is required but no primary key is found"
+          )
+          case (_, _, pks) =>
+            //key by before sink
+            //according to [[StreamExecExchange]]
+            val selector = KeySelectorUtil.getRowDataSelector(
+              pks.toArray, inputTypeInfo)
+            // in case of maxParallelism is negative
+            val keyGroupNum = env.getMaxParallelism match {
+              case -1 => env.getParallelism
+              case x if(x > 0) => env.getMaxParallelism
+              case _ => DEFAULT_LOWER_BOUND_MAX_PARALLELISM
+            }
+            val partitioner = new 
KeyGroupStreamPartitioner(selector,keyGroupNum)

Review comment:
       `selector,keyGroupNum` => `selector, keyGroupNum`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -79,6 +87,10 @@ class CommonPhysicalSink (
     val enforcer = new SinkNotNullEnforcer(notNullEnforcer, 
notNullFieldIndices, fieldNames)
 
     runtimeProvider match {
+      case _: DataStreamSinkProvider with ParallelismProvider => throw new 
TableException(
+        "`DataStreamSinkProvider` is not allowed to " +

Review comment:
       Code format: Better to fill one line (max 100 chars)

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is 
required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism

Review comment:
       Code format:Not break line

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is 
required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: 
$parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism

Review comment:
       Code format: 
   ```
   else {
     inputParallelism
   }
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is 
required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: 
$parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) 
match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => 
inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: 
$parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +

Review comment:
       Code format: Better to fill one line (max 100 chars)

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is 
required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: 
$parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) 
match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => 
inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: 
$parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +
+            s"Since the changelog mode " +
+            s"contains 
[${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+            s"which is not INSERT_ONLY mode, " +
+            s"primary key is required but no primary key is found"
+          )
+          case (_, _, pks) =>
+            //key by before sink
+            //according to [[StreamExecExchange]]
+            val selector = KeySelectorUtil.getRowDataSelector(
+              pks.toArray, inputTypeInfo)

Review comment:
       Not break line

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is 
required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: 
$parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) 
match {

Review comment:
       I think it is better to use `if else` here.

##########
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,13 +20,15 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for 
{@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider

Review comment:
       Code format: It is ok to include in one line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to