wuchong commented on code in PR #20549:
URL: https://github.com/apache/flink/pull/20549#discussion_r948851556


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -134,6 +134,17 @@ public class HiveOptions {
     public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
             FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
 
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE =
+            key("table.exec.hive.sink.statistic-auto-gather.enable")
+                    .booleanType()
+                    .defaultValue(false)

Review Comment:
   What's the default behavior of Hive/Spark? Should we enable it by default? 
The streaming mode can ignore this configuration by default. 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -134,6 +134,17 @@ public class HiveOptions {
     public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
             FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
 
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE =
+            key("table.exec.hive.sink.statistic-auto-gather.enable")

Review Comment:
   `table.exec.hive.sink.statistic-auto-collect.enable` ? 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java:
##########
@@ -140,12 +187,124 @@ private void alterPartition(
             partSD.setNumBuckets(sd.getNumBuckets());
             partSD.setSortCols(sd.getSortCols());
             partSD.setLocation(partitionPath.toString());
+            if (autoGatherStatistic) {
+                
currentPartition.getParameters().putAll(gatherStats(partitionPath, true));
+            }
             client.alter_partition(database, tableName, currentPartition);
         }
 
+        private Map<String, String> gatherStats(Path path, boolean 
isForAlterPartition)
+                throws Exception {
+            Map<String, String> statistic = new HashMap<>();
+            InputFormat<?, ?> inputFormat =
+                    
ReflectionUtil.newInstance(getInputFormatClz(sd.getInputFormat()), conf.conf());
+            long numRows = 0;
+            long fileSize = 0;
+            int numFiles = 0;
+            long rawDataSize = 0;
+            List<FileStatus> fileStatuses =
+                    
listDataFileRecursively(fileSystemFactory.create(path.toUri()), path);
+            for (FileStatus file : fileStatuses) {
+                InputSplit dummySplit =
+                        new FileSplit(
+                                toHadoopPath(file.getPath()),
+                                0,
+                                -1,
+                                new String[] {sd.getLocation()});
+                org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
+                        inputFormat.getRecordReader(dummySplit, conf.conf(), 
Reporter.NULL);
+                try {
+                    if (recordReader instanceof StatsProvidingRecordReader) {
+                        StatsProvidingRecordReader statsRR =
+                                (StatsProvidingRecordReader) recordReader;
+                        rawDataSize += statsRR.getStats().getRawDataSize();
+                        numRows += statsRR.getStats().getRowCount();
+                        fileSize += file.getLen();
+                        numFiles += 1;
+                    } else {
+                        // if the reader initialized according to input format 
class isn't instance

Review Comment:
   Move the else logic out of the foreach. It's very strange to iterate 
`fileStatuses` in a foreach loop of `fileStatuses`.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java:
##########
@@ -576,6 +581,147 @@ public void testWriteSuccessFile() throws Exception {
         assertThat(new File(changeFileNameTablePath, 
"dt=2022-08-15/_ZM")).exists();
     }
 
+    @Test
+    public void testAutoGatherStatisticForBatchWriting() throws Exception {
+        TableEnvironment tEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        String wareHouse = 
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+        // test non-partition table
+        tEnv.executeSql("create table t1(x int)");
+        tEnv.executeSql("create table t2(x int) stored as orc");
+        tEnv.executeSql("create table t3(x int) stored as parquet");
+        tEnv.executeSql("insert into t1 values (1)").await();
+        tEnv.executeSql("insert into t2 values (1)").await();
+        tEnv.executeSql("insert into t3 values (1)").await();
+        // check the statistic for these table
+        // the statistics should be empty since the auto gather statistic is 
disabled
+        for (int i = 1; i <= 3; i++) {
+            CatalogTableStatistics statistics =
+                    hiveCatalog.getTableStatistics(new ObjectPath("default", 
"t" + i));
+            assertThat(statistics).isEqualTo(CatalogTableStatistics.UNKNOWN);
+        }
+        // now enable auto gather statistic
+        
tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE,
 true);
+        tEnv.executeSql("insert into t1 values (1)").await();
+        tEnv.executeSql("insert into t2 values (1)").await();
+        tEnv.executeSql("insert into t3 values (1)").await();
+        CatalogTableStatistics statistics =
+                hiveCatalog.getTableStatistics(new ObjectPath("default", 
"t1"));
+        // t1 is neither stored as orc nor parquet, so only fileCount and 
totalSize is
+        // calculated
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                -1, 2, getPathSize(Paths.get(wareHouse, 
"t1")), -1));
+        statistics = hiveCatalog.getTableStatistics(new ObjectPath("default", 
"t2"));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                2, 2, getPathSize(Paths.get(wareHouse, "t2")), 
8));
+        statistics = hiveCatalog.getTableStatistics(new ObjectPath("default", 
"t3"));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                2, 2, getPathSize(Paths.get(wareHouse, "t3")), 
66));
+
+        // test partition table
+        tEnv.executeSql("create table pt1(x int) partitioned by (y int)");
+        tEnv.executeSql("create table pt2(x int) partitioned by (y int) stored 
as orc");
+        tEnv.executeSql("create table pt3(x int) partitioned by (y int) stored 
as parquet");
+        tEnv.executeSql("insert into pt1 partition(y=1) values (1)").await();
+        tEnv.executeSql("insert into pt2 partition(y=2) values (2)").await();
+        tEnv.executeSql("insert into pt3 partition(y=3) values (3)").await();
+
+        // verify statistic
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt1"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"1")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                -1, 1, getPathSize(Paths.get(wareHouse, "pt1", 
"y=1")), -1));
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt2"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"2")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                1, 1, getPathSize(Paths.get(wareHouse, "pt2", 
"y=2")), 4));
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt3"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"3")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                1, 1, getPathSize(Paths.get(wareHouse, "pt3", 
"y=3")), 33));
+
+        // insert data into partition again
+        tEnv.executeSql("insert into pt1 partition(y=1) values (1)").await();
+        tEnv.executeSql("insert into pt2 partition(y=2) values (2)").await();
+        tEnv.executeSql("insert into pt3 partition(y=3) values (3)").await();
+
+        // verify statistic
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt1"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"1")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                -1, 2, getPathSize(Paths.get(wareHouse, "pt1", 
"y=1")), -1));
+
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt2"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"2")));
+
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                2, 2, getPathSize(Paths.get(wareHouse, "pt2", 
"y=2")), 8));
+
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt3"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"3")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                2, 2, getPathSize(Paths.get(wareHouse, "pt3", 
"y=3")), 66));
+
+        // test overwrite table/partition
+        tEnv.executeSql("create table src(x int)");
+        tEnv.executeSql("insert overwrite table pt1 partition(y=1) select * 
from src").await();
+        tEnv.executeSql("insert overwrite table pt2 partition(y=2) select * 
from src").await();
+        tEnv.executeSql("insert overwrite table pt3 partition(y=3) select * 
from src").await();
+
+        for (int i = 1; i <= 3; i++) {
+            statistics =
+                    hiveCatalog.getPartitionStatistics(
+                            new ObjectPath("default", "pt" + i),
+                            new CatalogPartitionSpec(
+                                    Collections.singletonMap("y", 
String.valueOf(i))));
+            assertThat(statistics).isEqualTo(CatalogTableStatistics.UNKNOWN);

Review Comment:
   Why not report zero stats instead of UNKOWN when overwriting partition as an 
empty partition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to