wuchong commented on code in PR #20549: URL: https://github.com/apache/flink/pull/20549#discussion_r948851556
########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ########## @@ -134,6 +134,17 @@ public class HiveOptions { public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME; + public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE = + key("table.exec.hive.sink.statistic-auto-gather.enable") + .booleanType() + .defaultValue(false) Review Comment: What's the default behavior of Hive/Spark? Should we enable it by default? The streaming mode can ignore this configuration by default. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ########## @@ -134,6 +134,17 @@ public class HiveOptions { public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME; + public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE = + key("table.exec.hive.sink.statistic-auto-gather.enable") Review Comment: `table.exec.hive.sink.statistic-auto-collect.enable` ? ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java: ########## @@ -140,12 +187,124 @@ private void alterPartition( partSD.setNumBuckets(sd.getNumBuckets()); partSD.setSortCols(sd.getSortCols()); partSD.setLocation(partitionPath.toString()); + if (autoGatherStatistic) { + currentPartition.getParameters().putAll(gatherStats(partitionPath, true)); + } client.alter_partition(database, tableName, currentPartition); } + private Map<String, String> gatherStats(Path path, boolean isForAlterPartition) + throws Exception { + Map<String, String> statistic = new HashMap<>(); + InputFormat<?, ?> inputFormat = + ReflectionUtil.newInstance(getInputFormatClz(sd.getInputFormat()), conf.conf()); + long numRows = 0; + long fileSize = 0; + int numFiles = 0; + long rawDataSize = 0; + List<FileStatus> fileStatuses = + listDataFileRecursively(fileSystemFactory.create(path.toUri()), path); + for (FileStatus file : fileStatuses) { + InputSplit dummySplit = + new FileSplit( + toHadoopPath(file.getPath()), + 0, + -1, + new String[] {sd.getLocation()}); + org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = + inputFormat.getRecordReader(dummySplit, conf.conf(), Reporter.NULL); + try { + if (recordReader instanceof StatsProvidingRecordReader) { + StatsProvidingRecordReader statsRR = + (StatsProvidingRecordReader) recordReader; + rawDataSize += statsRR.getStats().getRawDataSize(); + numRows += statsRR.getStats().getRowCount(); + fileSize += file.getLen(); + numFiles += 1; + } else { + // if the reader initialized according to input format class isn't instance Review Comment: Move the else logic out of the foreach. It's very strange to iterate `fileStatuses` in a foreach loop of `fileStatuses`. ########## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java: ########## @@ -576,6 +581,147 @@ public void testWriteSuccessFile() throws Exception { assertThat(new File(changeFileNameTablePath, "dt=2022-08-15/_ZM")).exists(); } + @Test + public void testAutoGatherStatisticForBatchWriting() throws Exception { + TableEnvironment tEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE); + tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tEnv.useCatalog(hiveCatalog.getName()); + String wareHouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); + // test non-partition table + tEnv.executeSql("create table t1(x int)"); + tEnv.executeSql("create table t2(x int) stored as orc"); + tEnv.executeSql("create table t3(x int) stored as parquet"); + tEnv.executeSql("insert into t1 values (1)").await(); + tEnv.executeSql("insert into t2 values (1)").await(); + tEnv.executeSql("insert into t3 values (1)").await(); + // check the statistic for these table + // the statistics should be empty since the auto gather statistic is disabled + for (int i = 1; i <= 3; i++) { + CatalogTableStatistics statistics = + hiveCatalog.getTableStatistics(new ObjectPath("default", "t" + i)); + assertThat(statistics).isEqualTo(CatalogTableStatistics.UNKNOWN); + } + // now enable auto gather statistic + tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE, true); + tEnv.executeSql("insert into t1 values (1)").await(); + tEnv.executeSql("insert into t2 values (1)").await(); + tEnv.executeSql("insert into t3 values (1)").await(); + CatalogTableStatistics statistics = + hiveCatalog.getTableStatistics(new ObjectPath("default", "t1")); + // t1 is neither stored as orc nor parquet, so only fileCount and totalSize is + // calculated + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + -1, 2, getPathSize(Paths.get(wareHouse, "t1")), -1)); + statistics = hiveCatalog.getTableStatistics(new ObjectPath("default", "t2")); + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + 2, 2, getPathSize(Paths.get(wareHouse, "t2")), 8)); + statistics = hiveCatalog.getTableStatistics(new ObjectPath("default", "t3")); + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + 2, 2, getPathSize(Paths.get(wareHouse, "t3")), 66)); + + // test partition table + tEnv.executeSql("create table pt1(x int) partitioned by (y int)"); + tEnv.executeSql("create table pt2(x int) partitioned by (y int) stored as orc"); + tEnv.executeSql("create table pt3(x int) partitioned by (y int) stored as parquet"); + tEnv.executeSql("insert into pt1 partition(y=1) values (1)").await(); + tEnv.executeSql("insert into pt2 partition(y=2) values (2)").await(); + tEnv.executeSql("insert into pt3 partition(y=3) values (3)").await(); + + // verify statistic + statistics = + hiveCatalog.getPartitionStatistics( + new ObjectPath("default", "pt1"), + new CatalogPartitionSpec(Collections.singletonMap("y", "1"))); + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + -1, 1, getPathSize(Paths.get(wareHouse, "pt1", "y=1")), -1)); + statistics = + hiveCatalog.getPartitionStatistics( + new ObjectPath("default", "pt2"), + new CatalogPartitionSpec(Collections.singletonMap("y", "2"))); + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + 1, 1, getPathSize(Paths.get(wareHouse, "pt2", "y=2")), 4)); + statistics = + hiveCatalog.getPartitionStatistics( + new ObjectPath("default", "pt3"), + new CatalogPartitionSpec(Collections.singletonMap("y", "3"))); + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + 1, 1, getPathSize(Paths.get(wareHouse, "pt3", "y=3")), 33)); + + // insert data into partition again + tEnv.executeSql("insert into pt1 partition(y=1) values (1)").await(); + tEnv.executeSql("insert into pt2 partition(y=2) values (2)").await(); + tEnv.executeSql("insert into pt3 partition(y=3) values (3)").await(); + + // verify statistic + statistics = + hiveCatalog.getPartitionStatistics( + new ObjectPath("default", "pt1"), + new CatalogPartitionSpec(Collections.singletonMap("y", "1"))); + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + -1, 2, getPathSize(Paths.get(wareHouse, "pt1", "y=1")), -1)); + + statistics = + hiveCatalog.getPartitionStatistics( + new ObjectPath("default", "pt2"), + new CatalogPartitionSpec(Collections.singletonMap("y", "2"))); + + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + 2, 2, getPathSize(Paths.get(wareHouse, "pt2", "y=2")), 8)); + + statistics = + hiveCatalog.getPartitionStatistics( + new ObjectPath("default", "pt3"), + new CatalogPartitionSpec(Collections.singletonMap("y", "3"))); + assertThat(statistics) + .isEqualTo( + new CatalogTableStatistics( + 2, 2, getPathSize(Paths.get(wareHouse, "pt3", "y=3")), 66)); + + // test overwrite table/partition + tEnv.executeSql("create table src(x int)"); + tEnv.executeSql("insert overwrite table pt1 partition(y=1) select * from src").await(); + tEnv.executeSql("insert overwrite table pt2 partition(y=2) select * from src").await(); + tEnv.executeSql("insert overwrite table pt3 partition(y=3) select * from src").await(); + + for (int i = 1; i <= 3; i++) { + statistics = + hiveCatalog.getPartitionStatistics( + new ObjectPath("default", "pt" + i), + new CatalogPartitionSpec( + Collections.singletonMap("y", String.valueOf(i)))); + assertThat(statistics).isEqualTo(CatalogTableStatistics.UNKNOWN); Review Comment: Why not report zero stats instead of UNKOWN when overwriting partition as an empty partition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org