wuchong commented on a change in pull request #15060:
URL: https://github.com/apache/flink/pull/15060#discussion_r589401674



##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -70,6 +78,56 @@ public static void closeCatalog() {
         }
     }
 
+    @Test
+    public void testHiveTableSinkWithParallelismInBatch() {
+        final TableEnvironment tEnv =
+                
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+        testHiveTableSinkWithParallelismBase(
+                tEnv, "/explain/testHiveTableSinkWithParallelismInBatch.out", 
null);
+    }
+
+    @Test
+    public void testHiveTableSinkWithParallelismInStreaming() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final TableEnvironment tEnv =
+                HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, 
SqlDialect.HIVE);
+        testHiveTableSinkWithParallelismBase(
+                tEnv, 
"/explain/testHiveTableSinkWithParallelismInStreaming.out", null);
+    }
+
+    private void testHiveTableSinkWithParallelismBase(
+            final TableEnvironment tEnv,
+            final String expectedResourceFileName,
+            @Nullable Integer parallelism) {
+        parallelism = Optional.ofNullable(parallelism).orElse(8);
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        tEnv.executeSql("create database db1");
+        tEnv.useDatabase("db1");
+
+        String tableName = "test_table";
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE %s ("
+                                + " id int,"
+                                + " real_col int"

Review comment:
       We can hardcode the tableName into DDL and DML as it is not a variable. 

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
##########
@@ -128,6 +132,11 @@ public HiveTableSink(
                         "Hive version is not defined");
         hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
         tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
+        this.configuredParallelism =
+                Optional.ofNullable(
+                                
table.getOptions().get(FileSystemOptions.SINK_PARALLELISM.key()))
+                        .map(Integer::valueOf)
+                        .orElse(null);

Review comment:
       Why not use 
`Configuration.fromMap(table.getOptions).get(FileSystemOptions.SINK_PARALLELISM)`?
 
   Is it because of the conflict class name? If yes, I think we can move it to 
factory and make parallelism as a parameter of the construct, e.g. 
`KafkaDynamicSink`.

##########
File path: docs/content.zh/docs/connectors/table/hive/hive_read_write.md
##########
@@ -409,6 +409,28 @@ This configuration is set in the `TableConfig` and will 
affect all sinks of the
   </tbody>
 </table>
 
+### Sink Parallelism
+
+The parallelism of writing data into Hive can be configured by the 
corresponding table option. By default, the parallelism is configured to being 
the same as the parallelism of its last upstream chained operator. When the 
parallelism which is different from the parallelism of the upstream parallelism 
is configured, the writer operator will apply the parallelism.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>sink.parallelism</h5></td>
+        <td style="word-wrap: break-word;">true</td>

Review comment:
       ```suggestion
           <td style="word-wrap: break-word;">(none)</td>
   ```

##########
File path: docs/content/docs/connectors/table/hive/hive_read_write.md
##########
@@ -409,6 +409,28 @@ This configuration is set in the `TableConfig` and will 
affect all sinks of the
   </tbody>
 </table>
 
+### Sink Parallelism
+
+The parallelism of writing data into Hive can be configured by the 
corresponding table option. By default, the parallelism is configured to being 
the same as the parallelism of its last upstream chained operator. When the 
parallelism which is different from the parallelism of the upstream parallelism 
is configured, the writer operator will apply the parallelism.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>sink.parallelism</h5></td>
+        <td style="word-wrap: break-word;">true</td>

Review comment:
       ditto.

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -70,6 +78,56 @@ public static void closeCatalog() {
         }
     }
 
+    @Test
+    public void testHiveTableSinkWithParallelismInBatch() {
+        final TableEnvironment tEnv =
+                
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+        testHiveTableSinkWithParallelismBase(
+                tEnv, "/explain/testHiveTableSinkWithParallelismInBatch.out", 
null);
+    }
+
+    @Test
+    public void testHiveTableSinkWithParallelismInStreaming() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final TableEnvironment tEnv =
+                HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, 
SqlDialect.HIVE);
+        testHiveTableSinkWithParallelismBase(
+                tEnv, 
"/explain/testHiveTableSinkWithParallelismInStreaming.out", null);
+    }
+
+    private void testHiveTableSinkWithParallelismBase(
+            final TableEnvironment tEnv,
+            final String expectedResourceFileName,
+            @Nullable Integer parallelism) {
+        parallelism = Optional.ofNullable(parallelism).orElse(8);

Review comment:
       It seems the `parallelism` is hardcode. If yes, we can hardcode it into 
DDL.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to