godfreyhe commented on a change in pull request #10119: [FLINK-14656] [table-planner-blink] blink planner should also fetch catalog statistics for permanent CatalogTableImpl URL: https://github.com/apache/flink/pull/10119#discussion_r344598477
########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java ########## @@ -59,32 +63,75 @@ ).build(); @Test - public void testGetStatsFromCatalog() throws Exception { + public void testGetStatsFromCatalogForConnectorCatalogTable() throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null); assertNotNull(catalog); catalog.createTable( - ObjectPath.fromString("default_database.T1"), - ConnectorCatalogTable.source(new TestTableSource(true, tableSchema), true), - false); + ObjectPath.fromString("default_database.T1"), + ConnectorCatalogTable.source(new TestTableSource(true, tableSchema), true), + false); catalog.createTable( - ObjectPath.fromString("default_database.T2"), - ConnectorCatalogTable.source(new TestTableSource(true, tableSchema), true), - false); + ObjectPath.fromString("default_database.T2"), + ConnectorCatalogTable.source(new TestTableSource(true, tableSchema), true), + false); + alterTableStatistics(catalog); + + Table table = tEnv.sqlQuery("select * from T1, T2 where T1.s3 = T2.s3"); + String result = tEnv.explain(table); + // T1 is broadcast side + String expected = TableTestUtil.readFromResource("/explain/testGetStatsFromCatalogForConnectorCatalogTable.out"); + assertEquals(expected, TableTestUtil.replaceStageId(result)); + } + + @Test + public void testGetStatsFromCatalogForCatalogTableImpl() throws Exception { + EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); + TableEnvironment tEnv = TableEnvironment.create(settings); + Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null); + assertNotNull(catalog); + + Map<String, String> properties = new HashMap<>(); + properties.put("connector.type", "filesystem"); + properties.put("connector.property-version", "1"); + properties.put("connector.path", "/path/to/csv"); + + properties.put("format.type", "csv"); + properties.put("format.property-version", "1"); + properties.put("format.field-delimiter", ";"); + + // schema + DescriptorProperties descriptorProperties = new DescriptorProperties(true); + descriptorProperties.putTableSchema("format.fields", tableSchema); + properties.putAll(descriptorProperties.asMap()); + + catalog.createTable( + ObjectPath.fromString("default_database.T1"), + new CatalogTableImpl(tableSchema, properties, ""), + false); + catalog.createTable( + ObjectPath.fromString("default_database.T2"), + new CatalogTableImpl(tableSchema, properties, ""), + false); + + alterTableStatistics(catalog); + + Table table = tEnv.sqlQuery("select * from T1, T2 where T1.s3 = T2.s3"); + String result = tEnv.explain(table); + // T1 is broadcast side Review comment: i think one choice is using metadata handlers to verify the table stats. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services