wuchong commented on a change in pull request #15060:
URL: https://github.com/apache/flink/pull/15060#discussion_r589401674
##########
File path:
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -70,6 +78,56 @@ public static void closeCatalog() {
}
}
+ @Test
+ public void testHiveTableSinkWithParallelismInBatch() {
+ final TableEnvironment tEnv =
+
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+ testHiveTableSinkWithParallelismBase(
+ tEnv, "/explain/testHiveTableSinkWithParallelismInBatch.out",
null);
+ }
+
+ @Test
+ public void testHiveTableSinkWithParallelismInStreaming() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final TableEnvironment tEnv =
+ HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env,
SqlDialect.HIVE);
+ testHiveTableSinkWithParallelismBase(
+ tEnv,
"/explain/testHiveTableSinkWithParallelismInStreaming.out", null);
+ }
+
+ private void testHiveTableSinkWithParallelismBase(
+ final TableEnvironment tEnv,
+ final String expectedResourceFileName,
+ @Nullable Integer parallelism) {
+ parallelism = Optional.ofNullable(parallelism).orElse(8);
+ tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tEnv.useCatalog(hiveCatalog.getName());
+ tEnv.executeSql("create database db1");
+ tEnv.useDatabase("db1");
+
+ String tableName = "test_table";
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE %s ("
+ + " id int,"
+ + " real_col int"
Review comment:
We can hardcode the tableName into DDL and DML as it is not a variable.
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
##########
@@ -128,6 +132,11 @@ public HiveTableSink(
"Hive version is not defined");
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
+ this.configuredParallelism =
+ Optional.ofNullable(
+
table.getOptions().get(FileSystemOptions.SINK_PARALLELISM.key()))
+ .map(Integer::valueOf)
+ .orElse(null);
Review comment:
Why not use
`Configuration.fromMap(table.getOptions).get(FileSystemOptions.SINK_PARALLELISM)`?
Is it because of the conflict class name? If yes, I think we can move it to
factory and make parallelism as a parameter of the construct, e.g.
`KafkaDynamicSink`.
##########
File path: docs/content.zh/docs/connectors/table/hive/hive_read_write.md
##########
@@ -409,6 +409,28 @@ This configuration is set in the `TableConfig` and will
affect all sinks of the
</tbody>
</table>
+### Sink Parallelism
+
+The parallelism of writing data into Hive can be configured by the
corresponding table option. By default, the parallelism is configured to being
the same as the parallelism of its last upstream chained operator. When the
parallelism which is different from the parallelism of the upstream parallelism
is configured, the writer operator will apply the parallelism.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.parallelism</h5></td>
+ <td style="word-wrap: break-word;">true</td>
Review comment:
```suggestion
<td style="word-wrap: break-word;">(none)</td>
```
##########
File path: docs/content/docs/connectors/table/hive/hive_read_write.md
##########
@@ -409,6 +409,28 @@ This configuration is set in the `TableConfig` and will
affect all sinks of the
</tbody>
</table>
+### Sink Parallelism
+
+The parallelism of writing data into Hive can be configured by the
corresponding table option. By default, the parallelism is configured to being
the same as the parallelism of its last upstream chained operator. When the
parallelism which is different from the parallelism of the upstream parallelism
is configured, the writer operator will apply the parallelism.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.parallelism</h5></td>
+ <td style="word-wrap: break-word;">true</td>
Review comment:
ditto.
##########
File path:
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -70,6 +78,56 @@ public static void closeCatalog() {
}
}
+ @Test
+ public void testHiveTableSinkWithParallelismInBatch() {
+ final TableEnvironment tEnv =
+
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+ testHiveTableSinkWithParallelismBase(
+ tEnv, "/explain/testHiveTableSinkWithParallelismInBatch.out",
null);
+ }
+
+ @Test
+ public void testHiveTableSinkWithParallelismInStreaming() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final TableEnvironment tEnv =
+ HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env,
SqlDialect.HIVE);
+ testHiveTableSinkWithParallelismBase(
+ tEnv,
"/explain/testHiveTableSinkWithParallelismInStreaming.out", null);
+ }
+
+ private void testHiveTableSinkWithParallelismBase(
+ final TableEnvironment tEnv,
+ final String expectedResourceFileName,
+ @Nullable Integer parallelism) {
+ parallelism = Optional.ofNullable(parallelism).orElse(8);
Review comment:
It seems the `parallelism` is hardcode. If yes, we can hardcode it into
DDL.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]