This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ac957ae [FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable ac957ae is described below commit ac957ae07a49ab88ad46a14c6ea0d4891d5437d7 Author: Marios Trivyzas <mat...@gmail.com> AuthorDate: Tue Nov 30 11:59:04 2021 +0100 [FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable This closes #17962 Co-authored-by: guanghxu <xuguangheng1...@gmail.com> --- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 37 +----- .../plan/metadata/FlinkRelMdUniqueKeys.scala | 1 - .../metadata/FlinkRelMdColumnUniquenessTest.scala | 47 +++++++ .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 105 +++++++++++++++- .../plan/metadata/FlinkRelMdUniqueKeysTest.scala | 16 +++ .../planner/plan/metadata/MetadataTestUtil.scala | 135 +++++++++++++++++---- 6 files changed, 282 insertions(+), 59 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala index 6d7382c..241f529 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala @@ -23,16 +23,12 @@ import org.apache.flink.table.planner.JBoolean import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty import org.apache.flink.table.planner.plan.nodes.FlinkRelNode import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate} -import org.apache.flink.table.planner.plan.nodes.logical._ import org.apache.flink.table.planner.plan.nodes.physical.batch._ import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin import org.apache.flink.table.planner.plan.nodes.physical.stream._ -import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, RankUtil} import org.apache.flink.table.runtime.operators.rank.RankType -import org.apache.flink.table.sources.TableSource -import org.apache.calcite.plan.RelOptTable import org.apache.calcite.plan.volcano.RelSubset import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.convert.Converter @@ -61,42 +57,21 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata mq: RelMetadataQuery, columns: ImmutableBitSet, ignoreNulls: Boolean): JBoolean = { - areTableColumnsUnique(rel, null, rel.getTable, columns) - } - - def areColumnsUnique( - rel: FlinkLogicalLegacyTableSourceScan, - mq: RelMetadataQuery, - columns: ImmutableBitSet, - ignoreNulls: Boolean): JBoolean = { - areTableColumnsUnique(rel, rel.tableSource, rel.getTable, columns) + areTableColumnsUnique(rel, mq.getUniqueKeys(rel, ignoreNulls), columns) } private def areTableColumnsUnique( rel: TableScan, - tableSource: TableSource[_], - relOptTable: RelOptTable, + uniqueKeys: util.Set[ImmutableBitSet], columns: ImmutableBitSet): JBoolean = { if (columns.cardinality == 0) { return false } - // TODO get uniqueKeys from TableSchema of TableSource - - relOptTable match { - case table: FlinkPreparingTableBase => { - val ukOptional = table.uniqueKeysSet - if (ukOptional.isPresent) { - if (ukOptional.get().isEmpty) { - false - } else { - ukOptional.get().exists(columns.contains) - } - } else { - null - } - } - case _ => rel.getTable.isKey(columns) + if (uniqueKeys != null) { + uniqueKeys.exists(columns.contains) || rel.getTable.isKey(columns) + } else { + null } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala index 4a76839..63547eb 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala @@ -45,7 +45,6 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util} import java.util -import java.util.Set import scala.collection.JavaConversions._ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala index 31c8cf0..4e545d3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala @@ -622,4 +622,51 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase { } } + @Test + def testAreColumnsUniqueOnTableSourceTable(): Unit = { + Array( + tableSourceTableLogicalScan, + tableSourceTableFlinkLogicalScan, + tableSourceTableBatchScan, + tableSourceTableStreamScan + ) + .foreach { scan => + assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3))) + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(1, 2))) + assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1))) + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0))) + } + } + + @Test + def testAreColumnsUniqueOnTablePartiallyProjectedKey(): Unit = { + Array( + tablePartiallyProjectedKeyLogicalScan, + tablePartiallyProjectedKeyFlinkLogicalScan, + tablePartiallyProjectedKeyBatchScan, + tablePartiallyProjectedKeyStreamScan + ) + .foreach { scan => + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0))) + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1))) + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2))) + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3))) + } + } + + @Test + def testAreColumnsUniqueOntableSourceTableNonKeyNonKey(): Unit = { + Array( + tableSourceTableNonKeyLogicalScan, + tableSourceTableNonKeyFlinkLogicalScan, + tableSourceTableNonKeyBatchScan, + tableSourceTableNonKeyStreamScan + ) + .foreach { scan => + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0))) + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1))) + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2))) + assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3))) + } + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 3a3acc8..f935697 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical._ import org.apache.flink.table.planner.plan.nodes.physical.batch._ import org.apache.flink.table.planner.plan.nodes.physical.stream._ -import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable} +import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable, TableSourceTable} import org.apache.flink.table.planner.plan.stream.sql.join.TestTemporalTable import org.apache.flink.table.planner.plan.utils._ import org.apache.flink.table.planner.utils.Top3 @@ -171,6 +171,37 @@ class FlinkRelMdHandlerTestBase { protected lazy val empStreamScan: StreamPhysicalDataStreamScan = createDataStreamScan(ImmutableList.of("emp"), streamPhysicalTraits) + protected lazy val tableSourceTableLogicalScan: LogicalTableScan = + createTableSourceTable(ImmutableList.of("TableSourceTable1"), logicalTraits) + protected lazy val tableSourceTableFlinkLogicalScan: FlinkLogicalDataStreamTableScan = + createTableSourceTable(ImmutableList.of("TableSourceTable1"), flinkLogicalTraits) + protected lazy val tableSourceTableBatchScan: BatchPhysicalBoundedStreamScan = + createTableSourceTable(ImmutableList.of("TableSourceTable1"), batchPhysicalTraits) + protected lazy val tableSourceTableStreamScan: StreamPhysicalDataStreamScan = + createTableSourceTable(ImmutableList.of("TableSourceTable1"), streamPhysicalTraits) + + protected lazy val tablePartiallyProjectedKeyLogicalScan: LogicalTableScan = + createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"), + logicalTraits) + protected lazy val tablePartiallyProjectedKeyFlinkLogicalScan: FlinkLogicalDataStreamTableScan = + createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"), + flinkLogicalTraits) + protected lazy val tablePartiallyProjectedKeyBatchScan: BatchPhysicalBoundedStreamScan = + createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"), + batchPhysicalTraits) + protected lazy val tablePartiallyProjectedKeyStreamScan: StreamPhysicalDataStreamScan = + createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"), + streamPhysicalTraits) + + protected lazy val tableSourceTableNonKeyLogicalScan: LogicalTableScan = + createTableSourceTable(ImmutableList.of("TableSourceTable3"), logicalTraits) + protected lazy val tableSourceTableNonKeyFlinkLogicalScan: FlinkLogicalDataStreamTableScan = + createTableSourceTable(ImmutableList.of("TableSourceTable3"), flinkLogicalTraits) + protected lazy val tableSourceTableNonKeyBatchScan: BatchPhysicalBoundedStreamScan = + createTableSourceTable(ImmutableList.of("TableSourceTable3"), batchPhysicalTraits) + protected lazy val tableSourceTableNonKeyStreamScan: StreamPhysicalDataStreamScan = + createTableSourceTable(ImmutableList.of("TableSourceTable3"), streamPhysicalTraits) + private lazy val valuesType = relBuilder.getTypeFactory .builder() .add("a", SqlTypeName.BIGINT) @@ -2727,6 +2758,51 @@ class FlinkRelMdHandlerTestBase { } } + // select * from TableSourceTable1 + // left join TableSourceTable2 on TableSourceTable1.b = TableSourceTable2.b + protected lazy val logicalLeftJoinOnContainedUniqueKeys: RelNode = relBuilder + .scan("TableSourceTable1") + .scan("TableSourceTable2") + .join( + JoinRelType.LEFT, + relBuilder.call( + EQUALS, + relBuilder.field(2, 0, 1), + relBuilder.field(2, 1, 1) + ) + ) + .build + + // select * from TableSourceTable1 + // left join TableSourceTable2 on TableSourceTable1.a = TableSourceTable2.a + protected lazy val logicalLeftJoinOnDisjointUniqueKeys: RelNode = relBuilder + .scan("TableSourceTable1") + .scan("TableSourceTable2") + .join( + JoinRelType.LEFT, + relBuilder.call( + EQUALS, + relBuilder.field(2, 0, 0), + relBuilder.field(2, 1, 0) + ) + ) + .build + + // select * from TableSourceTable1 + // left join TableSourceTable3 on TableSourceTable1.a = TableSourceTable3.a + protected lazy val logicalLeftJoinWithNoneKeyTableUniqueKeys: RelNode = relBuilder + .scan("TableSourceTable1") + .scan("TableSourceTable3") + .join( + JoinRelType.LEFT, + relBuilder.call( + EQUALS, + relBuilder.field(2, 0, 0), + relBuilder.field(2, 1, 0) + ) + ) + .build + protected def createDataStreamScan[T]( tableNames: util.List[String], traitSet: RelTraitSet): T = { val table = relBuilder @@ -2754,6 +2830,33 @@ class FlinkRelMdHandlerTestBase { scan.asInstanceOf[T] } + protected def createTableSourceTable[T]( + tableNames: util.List[String], traitSet: RelTraitSet): T = { + val table = relBuilder + .getRelOptSchema + .asInstanceOf[CalciteCatalogReader] + .getTable(tableNames) + .asInstanceOf[TableSourceTable] + val conventionTrait = traitSet.getTrait(ConventionTraitDef.INSTANCE) + val scan = conventionTrait match { + case Convention.NONE => + relBuilder.clear() + val scan = relBuilder.scan(tableNames).build() + scan.copy(traitSet, scan.getInputs) + case FlinkConventions.LOGICAL => + new FlinkLogicalDataStreamTableScan( + cluster, traitSet, Collections.emptyList[RelHint](), table) + case FlinkConventions.BATCH_PHYSICAL => + new BatchPhysicalBoundedStreamScan( + cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType) + case FlinkConventions.STREAM_PHYSICAL => + new StreamPhysicalDataStreamScan( + cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType) + case _ => throw new TableException(s"Unsupported convention trait: $conventionTrait") + } + scan.asInstanceOf[T] + } + protected def createLiteralList( rowType: RelDataType, literalValues: Seq[String]): util.List[RexLiteral] = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala index 9ebee0d..ec748a4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala @@ -314,6 +314,22 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { assertNull(mq.getUniqueKeys(testRel)) } + @Test + def testGetUniqueKeysOnTableScanTable(): Unit = { + assertEquals( + uniqueKeys(Array(0, 1), Array(0, 1, 5)), + mq.getUniqueKeys(logicalLeftJoinOnContainedUniqueKeys).toSet + ) + assertEquals( + uniqueKeys(Array(0, 1, 5)), + mq.getUniqueKeys(logicalLeftJoinOnDisjointUniqueKeys).toSet + ) + assertEquals( + uniqueKeys(), + mq.getUniqueKeys(logicalLeftJoinWithNoneKeyTableUniqueKeys).toSet + ) + } + private def uniqueKeys(keys: Array[Int]*): Set[ImmutableBitSet] = { keys.map(k => ImmutableBitSet.of(k: _*)).toSet } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala index 72dab24..f12993f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala @@ -58,6 +58,9 @@ object MetadataTestUtil { rootSchema.add("TemporalTable1", createTemporalTable1()) rootSchema.add("TemporalTable2", createTemporalTable2()) rootSchema.add("TemporalTable3", createTemporalTable3()) + rootSchema.add("TableSourceTable1", createTableSourceTable1()) + rootSchema.add("TableSourceTable2", createTableSourceTable2()) + rootSchema.add("TableSourceTable3", createTableSourceTable3()) rootSchema.add("projected_table_source_table", createProjectedTableSourceTable()) rootSchema.add( "projected_table_source_table_with_partial_pk", @@ -258,31 +261,17 @@ object MetadataTestUtil { null) private def createProjectedTableSourceTable(): Table = { - val catalogTable = CatalogTable.fromProperties( - Map( - "connector" -> "values", - "bounded" -> "true", - "schema.0.name" -> "a", - "schema.0.data-type" -> "BIGINT NOT NULL", - "schema.1.name" -> "b", - "schema.1.data-type" -> "INT", - "schema.2.name" -> "c", - "schema.2.data-type" -> "VARCHAR(2147483647)", - "schema.3.name" -> "d", - "schema.3.data-type" -> "BIGINT NOT NULL", - "schema.primary-key.name" -> "PK_1", - "schema.primary-key.columns" -> "a,d") - ) - val resolvedSchema = new ResolvedSchema( util.Arrays.asList( Column.physical("a", DataTypes.BIGINT().notNull()), Column.physical("b", DataTypes.INT()), - Column.physical("c", DataTypes.STRING()), + Column.physical("c", DataTypes.VARCHAR(2147483647)), Column.physical("d", DataTypes.BIGINT().notNull())), Collections.emptyList(), UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "d"))) + val catalogTable = getCatalogTable(resolvedSchema) + val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem) val rowType = typeFactory.buildRelNodeRowType( Seq("a", "c", "d"), @@ -298,26 +287,108 @@ object MetadataTestUtil { ) } - private def createProjectedTableSourceTableWithPartialCompositePrimaryKey(): Table = { - val catalogTable = CatalogTable.fromProperties( + private def createTableSourceTable1(): Table = { + val catalogTable = CatalogTable.of( + org.apache.flink.table.api.Schema.newBuilder + .column("a", DataTypes.BIGINT.notNull) + .column("b", DataTypes.INT.notNull) + .column("c", DataTypes.VARCHAR(2147483647).notNull) + .column("d", DataTypes.BIGINT.notNull) + .primaryKeyNamed("PK_1", "a", "b") + .build, + null, + Collections.emptyList(), Map( "connector" -> "values", - "bounded" -> "true", - "schema.0.name" -> "a", - "schema.0.data-type" -> "BIGINT NOT NULL", - "schema.1.name" -> "b", - "schema.1.data-type" -> "BIGINT NOT NULL", - "schema.primary-key.name" -> "PK_1", - "schema.primary-key.columns" -> "a,b") + "bounded" -> "true" + ) ) val resolvedSchema = new ResolvedSchema( util.Arrays.asList( Column.physical("a", DataTypes.BIGINT().notNull()), + Column.physical("b", DataTypes.INT().notNull()), + Column.physical("c", DataTypes.STRING().notNull()), + Column.physical("d", DataTypes.BIGINT().notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b"))) + + val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem) + val rowType = typeFactory.buildRelNodeRowType( + Seq("a", "b", "c", "d"), + Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false))) + + new MockTableSourceTable( + ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable1"), + rowType, + new TestTableSource(), + true, + new ResolvedCatalogTable(catalogTable, resolvedSchema), + flinkContext) + } + + private def createTableSourceTable2(): Table = { + val resolvedSchema = new ResolvedSchema( + util.Arrays.asList( + Column.physical("a", DataTypes.BIGINT().notNull()), + Column.physical("b", DataTypes.INT().notNull()), + Column.physical("c", DataTypes.STRING().notNull()), + Column.physical("d", DataTypes.BIGINT().notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("b"))) + + val catalogTable = getCatalogTable(resolvedSchema) + + val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem) + val rowType = typeFactory.buildRelNodeRowType( + Seq("a", "b", "c", "d"), + Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false))) + + new MockTableSourceTable( + ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable2"), + rowType, + new TestTableSource(), + true, + new ResolvedCatalogTable(catalogTable, resolvedSchema), + flinkContext) + } + + private def createTableSourceTable3(): Table = { + val resolvedSchema = new ResolvedSchema( + util.Arrays.asList( + Column.physical("a", DataTypes.BIGINT().notNull()), + Column.physical("b", DataTypes.INT().notNull()), + Column.physical("c", DataTypes.STRING().notNull()), + Column.physical("d", DataTypes.BIGINT().notNull())), + Collections.emptyList(), + null) + + val catalogTable = getCatalogTable(resolvedSchema) + + val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem) + val rowType = typeFactory.buildRelNodeRowType( + Seq("a", "b", "c", "d"), + Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false))) + + new MockTableSourceTable( + ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable3"), + rowType, + new TestTableSource(), + true, + new ResolvedCatalogTable(catalogTable, resolvedSchema), + flinkContext) + } + + private def createProjectedTableSourceTableWithPartialCompositePrimaryKey(): Table = { + val resolvedSchema = new ResolvedSchema( + util.Arrays.asList( + Column.physical("a", DataTypes.BIGINT().notNull()), Column.physical("b", DataTypes.BIGINT().notNull())), Collections.emptyList(), UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b"))) + val catalogTable = getCatalogTable(resolvedSchema) + val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem) val rowType = typeFactory.buildRelNodeRowType( Seq("a"), @@ -335,6 +406,18 @@ object MetadataTestUtil { flinkContext) } + private def getCatalogTable(resolvedSchema: ResolvedSchema) = { + CatalogTable.of( + org.apache.flink.table.api.Schema.newBuilder.fromResolvedSchema(resolvedSchema).build, + null, + Collections.emptyList(), + Map( + "connector" -> "values", + "bounded" -> "true" + ) + ) + } + private def getMetadataTable( tableSchema: TableSchema, statistic: FlinkStatistic,