This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d12fd3d729f772d5c545d58987049bb9ce9f1da8 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Mon Nov 8 10:46:10 2021 +0100 [FLINK-24687][table-planner] Remove planner dependency on FileSystemConnectorOptions Signed-off-by: slinkydeveloper <francescogu...@gmail.com> --- .../rules/physical/batch/BatchPhysicalLegacySinkRule.scala | 11 ++++++----- .../plan/rules/physical/batch/BatchPhysicalSinkRule.scala | 11 ++++++----- .../rules/physical/stream/StreamPhysicalLegacySinkRule.scala | 11 ++++++----- .../plan/rules/physical/stream/StreamPhysicalSinkRule.scala | 7 ++++--- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala index edebe6c..72d8afd 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala @@ -28,7 +28,6 @@ import org.apache.flink.table.sinks.PartitionableTableSink import org.apache.calcite.plan.RelOptRule import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.{RelCollations, RelNode} -import org.apache.flink.table.filesystem.FileSystemConnectorOptions import scala.collection.JavaConversions._ @@ -53,12 +52,14 @@ class BatchPhysicalLegacySinkRule extends ConverterRule( val dynamicPartIndices = dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_)) + // TODO This option is hardcoded to remove the dependency of planner from + // flink-connector-files. We should move this option out of FileSystemConnectorOptions val shuffleEnable = sink - .catalogTable - .getOptions - .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key()) + .catalogTable + .getOptions + .getOrDefault("sink.shuffle-by-partition.enable", "false") - if (shuffleEnable != null && shuffleEnable.toBoolean) { + if (shuffleEnable.toBoolean) { requiredTraitSet = requiredTraitSet.plus( FlinkRelDistribution.hash(dynamicPartIndices .map(Integer.valueOf), requireStrict = false)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala index 5a00b51..b9c9f8f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala @@ -30,7 +30,6 @@ import org.apache.flink.table.types.logical.RowType import org.apache.calcite.plan.RelOptRule import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.{RelCollationTraitDef, RelCollations, RelNode} -import org.apache.flink.table.filesystem.FileSystemConnectorOptions import scala.collection.JavaConversions._ import scala.collection.mutable @@ -68,12 +67,14 @@ class BatchPhysicalSinkRule extends ConverterRule( val dynamicPartIndices = dynamicPartFields.map(fieldNames.indexOf(_)) + // TODO This option is hardcoded to remove the dependency of planner from + // flink-connector-files. We should move this option out of FileSystemConnectorOptions val shuffleEnable = sink - .catalogTable - .getOptions - .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key()) + .catalogTable + .getOptions + .getOrDefault("sink.shuffle-by-partition.enable", "false") - if (shuffleEnable != null && shuffleEnable.toBoolean) { + if (shuffleEnable.toBoolean) { requiredTraitSet = requiredTraitSet.plus( FlinkRelDistribution.hash(dynamicPartIndices .map(Integer.valueOf), requireStrict = false)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala index 82af4cd..5286b81 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala @@ -27,7 +27,6 @@ import org.apache.flink.table.sinks.PartitionableTableSink import org.apache.calcite.plan.RelOptRule import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.table.filesystem.FileSystemConnectorOptions import scala.collection.JavaConversions._ @@ -52,12 +51,14 @@ class StreamPhysicalLegacySinkRule extends ConverterRule( val dynamicPartIndices = dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_)) + // TODO This option is hardcoded to remove the dependency of planner from + // flink-connector-files. We should move this option out of FileSystemConnectorOptions val shuffleEnable = sink - .catalogTable - .getOptions - .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key()) + .catalogTable + .getOptions + .getOrDefault("sink.shuffle-by-partition.enable", "false") - if (shuffleEnable != null && shuffleEnable.toBoolean) { + if (shuffleEnable.toBoolean) { requiredTraitSet = requiredTraitSet.plus( FlinkRelDistribution.hash(dynamicPartIndices .map(Integer.valueOf), requireStrict = false)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala index 13645e5..b5867e4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala @@ -29,7 +29,6 @@ import org.apache.flink.table.types.logical.RowType import org.apache.calcite.plan.RelOptRule import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.table.filesystem.FileSystemConnectorOptions import scala.collection.JavaConversions._ import scala.collection.mutable @@ -67,12 +66,14 @@ class StreamPhysicalSinkRule extends ConverterRule( val dynamicPartIndices = dynamicPartFields.map(fieldNames.indexOf(_)) + // TODO This option is hardcoded to remove the dependency of planner from + // flink-connector-files. We should move this option out of FileSystemConnectorOptions val shuffleEnable = sink .catalogTable .getOptions - .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key()) + .getOrDefault("sink.shuffle-by-partition.enable", "false") - if (shuffleEnable != null && shuffleEnable.toBoolean) { + if (shuffleEnable.toBoolean) { requiredTraitSet = requiredTraitSet.plus( FlinkRelDistribution.hash(dynamicPartIndices .map(Integer.valueOf), requireStrict = false))