This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dbc7ff712a1 [FLINK-27989][table-planner] Csv format supports reporting statistics dbc7ff712a1 is described below commit dbc7ff712a1694262629b9a349f7a1fa46240b6b Author: zhengyunhong.zyh <337361...@qq.com> AuthorDate: Mon Jun 13 18:14:31 2022 +0800 [FLINK-27989][table-planner] Csv format supports reporting statistics This closes #20007 --- .../flink/formats/csv/CsvFileFormatFactory.java | 70 +++++++++++- .../CsvFormatFilesystemStatisticsReportTest.java | 107 ++++++++++++++++++ .../formats/csv/CsvFormatStatisticsReportTest.java | 122 +++++++++++++++++++++ .../formats/testcsv/TestCsvFormatFactory.java | 42 ++++--- 4 files changed, 325 insertions(+), 16 deletions(-) diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java index 8ea2280eaf1..e59ec63794f 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.formats.csv; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.BulkWriter.Factory; import org.apache.flink.configuration.ConfigOption; @@ -29,16 +30,21 @@ import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory; import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory; import org.apache.flink.connector.file.table.format.BulkDecodingFormat; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.formats.common.Converter; import org.apache.flink.formats.csv.RowDataToCsvConverters.RowDataToCsvConverter; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat; import org.apache.flink.table.connector.format.ProjectableDecodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource.Context; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; @@ -49,7 +55,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.Csv import org.apache.commons.lang3.StringEscapeUtils; +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.Collections; +import java.util.List; import java.util.Set; import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS; @@ -93,9 +105,12 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter return new CsvBulkDecodingFormat(formatOptions); } - private static class CsvBulkDecodingFormat + /** CsvBulkDecodingFormat which implements {@link FileBasedStatisticsReportableInputFormat}. */ + @VisibleForTesting + public static class CsvBulkDecodingFormat implements BulkDecodingFormat<RowData>, - ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>> { + ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>, + FileBasedStatisticsReportableInputFormat { private final ReadableConfig formatOptions; @@ -136,6 +151,57 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } + + @Override + public TableStats reportStatistics(List<Path> files, DataType producedDataType) { + // For Csv format, it's a heavy operation to obtain accurate statistics by scanning all + // files. So, We obtain the estimated statistics by sampling, the specific way is to + // sample the first 100 lines and calculate their row size, then compare row size with + // total file size to get the estimated row count. + final int totalSampleLineCnt = 100; + try { + long totalFileSize = 0; + int sampledRowCnt = 0; + long sampledRowSize = 0; + for (Path file : files) { + FileSystem fs = FileSystem.get(file.toUri()); + FileStatus status = fs.getFileStatus(file); + totalFileSize += status.getLen(); + + // sample the line size + if (sampledRowCnt < totalSampleLineCnt) { + try (InputStreamReader isr = + new InputStreamReader( + Files.newInputStream( + new File(file.toUri()).toPath())); + BufferedReader br = new BufferedReader(isr)) { + String line; + while (sampledRowCnt < totalSampleLineCnt + && (line = br.readLine()) != null) { + sampledRowCnt += 1; + sampledRowSize += + (line.getBytes(StandardCharsets.UTF_8).length + 1); + } + } + } + } + + // If line break is "\r\n", br.readLine() will ignore '\n' which make sampledRowSize + // smaller than totalFileSize. This will influence test result. + if (sampledRowCnt < totalSampleLineCnt) { + sampledRowSize = totalFileSize; + } + if (sampledRowSize == 0) { + return TableStats.UNKNOWN; + } + + int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledRowCnt); + long estimatedRowCount = totalFileSize * realSampledLineCnt / sampledRowSize; + return new TableStats(estimatedRowCount); + } catch (Exception e) { + return TableStats.UNKNOWN; + } + } } @Override diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java new file mode 100644 index 00000000000..476319d6ce4 --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelVisitor; +import org.apache.calcite.rel.core.TableScan; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test for statistics functionality in {@link CsvFormatFactory} in the case of file system source. + */ +public class CsvFormatFilesystemStatisticsReportTest extends TableTestBase { + private BatchTableTestUtil util; + private TableEnvironment tEnv; + @TempDir private static File path; + + @BeforeEach + public void setup() throws IOException { + util = batchTestUtil(TableConfig.getDefault()); + tEnv = util.getTableEnv(); + String pathName = writeData(path, Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world")); + + String ddl = + String.format( + "CREATE TABLE sourceTable (\n" + + " a bigint,\n" + + " b int,\n" + + " c varchar\n" + + ") with (\n" + + " 'connector' = 'filesystem'," + + " 'format' = 'csv'," + + " 'path' = '%s')", + pathName); + tEnv.executeSql(ddl); + } + + @Test + public void testCsvFileSystemStatisticsReport() { + FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from sourceTable"); + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3)); + } + + private String writeData(File path, List<String> data) throws IOException { + String file = path.getAbsolutePath() + "/00-00.tmp"; + Files.write(new File(file).toPath(), String.join("\n", data).getBytes()); + return file; + } + + private FlinkStatistic getStatisticsFromOptimizedPlan(String sql) { + RelNode relNode = TableTestUtil.toRelNode(tEnv.sqlQuery(sql)); + RelNode optimized = util.getPlanner().optimize(relNode); + FlinkStatisticVisitor visitor = new FlinkStatisticVisitor(); + visitor.go(optimized); + return visitor.result; + } + + private static class FlinkStatisticVisitor extends RelVisitor { + private FlinkStatistic result = null; + + @Override + public void visit(RelNode node, int ordinal, RelNode parent) { + if (node instanceof TableScan) { + Preconditions.checkArgument(result == null); + TableSourceTable table = (TableSourceTable) node.getTable(); + result = table.getStatistic(); + } + super.visit(node, ordinal, parent); + } + } +} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java new file mode 100644 index 00000000000..6a166e2d0b2 --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.plan.stats.TableStats; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for statistics functionality in {@link CsvFormatFactory}. */ +public class CsvFormatStatisticsReportTest { + + private static CsvFileFormatFactory.CsvBulkDecodingFormat csvBulkDecodingFormat; + + @BeforeEach + public void setup() { + Configuration configuration = new Configuration(); + csvBulkDecodingFormat = new CsvFileFormatFactory.CsvBulkDecodingFormat(configuration); + } + + @Test + public void testCsvFormatStatsReportWithSingleFile() throws IOException { + String fileContent = + "#description of the data\n" + + "header1|header2|header3|\n" + + "this is|1|2.0|\n" + + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + Path tempFile = createTempFile(fileContent); + + TableStats tableStats = + csvBulkDecodingFormat.reportStatistics(Collections.singletonList(tempFile), null); + assertThat(tableStats).isEqualTo(new TableStats(6)); + } + + @Test + public void testCsvFormatStatsReportWithMultiFile() throws IOException { + String fileContent1 = + "#description of the data\r\n" + + "header1|header2|header3|\r\n" + + "this is|1|2.0|\r\n" + + "//a comment\r\n" + + "a test|3|4.0|\r\n" + + "#next|5|6.0|\r\n"; + String fileContent2 = + "#description of the data\r\n" + + "header1|header2|header3|\r\n" + + "this is|1|2.0|\r\n" + + "//a comment\r\n" + + "a test|3|4.0|\r\n" + + "#next|5|6.0|\r\n"; + Path tempFile1 = createTempFile(fileContent1); + Path tempFile2 = createTempFile(fileContent2); + List<Path> files = new ArrayList<>(); + files.add(tempFile1); + files.add(tempFile2); + + TableStats tableStats = csvBulkDecodingFormat.reportStatistics(files, null); + assertThat(tableStats).isEqualTo(new TableStats(12)); + } + + @Test + public void testRowSizeBiggerThanTotalSampleLineCnt() throws IOException { + StringBuilder builder = new StringBuilder(); + int lineCnt = 1000; + for (int i = 0; i < lineCnt; i++) { + builder.append("header1|header2|header3|header4|header5").append("\n"); + } + Path tempFile = createTempFile(builder.toString()); + TableStats tableStats = + csvBulkDecodingFormat.reportStatistics(Collections.singletonList(tempFile), null); + assertThat(tableStats).isEqualTo(new TableStats(lineCnt)); + } + + @Test + public void testCsvFormatStatsReportWithEmptyFile() { + TableStats tableStats = csvBulkDecodingFormat.reportStatistics(null, null); + assertThat(tableStats).isEqualTo(TableStats.UNKNOWN); + } + + private static Path createTempFile(String content) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + OutputStreamWriter wrt = + new OutputStreamWriter( + Files.newOutputStream(tempFile.toPath()), StandardCharsets.UTF_8); + wrt.write(content); + wrt.close(); + return new Path(tempFile.toURI().toString()); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java index 98d59d76d5f..45c64b9fced 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java @@ -42,8 +42,10 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex import org.apache.flink.table.types.DataType; import java.io.BufferedReader; -import java.io.FileInputStream; +import java.io.File; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -127,36 +129,48 @@ public class TestCsvFormatFactory @Override public TableStats reportStatistics(List<Path> files, DataType producedDataType) { + // For Csv format, it's a heavy operation to obtain accurate statistics by scanning all + // files. So, We obtain the estimated statistics by sampling, the specific way is to + // sample the first 100 lines and calculate their row size, then compare row size with + // total file size to get the estimated row count. final int totalSampleLineCnt = 100; try { - long totalSize = 0; - int sampledLineCnt = 0; - long sampledTotalSize = 0; + long totalFileSize = 0; + int sampledRowCnt = 0; + long sampledRowSize = 0; for (Path file : files) { FileSystem fs = FileSystem.get(file.toUri()); FileStatus status = fs.getFileStatus(file); - totalSize += status.getLen(); + totalFileSize += status.getLen(); // sample the line size - if (sampledLineCnt < totalSampleLineCnt) { + if (sampledRowCnt < totalSampleLineCnt) { try (InputStreamReader isr = - new InputStreamReader(new FileInputStream(file.getPath()))) { - BufferedReader br = new BufferedReader(isr); + new InputStreamReader( + Files.newInputStream( + new File(file.toUri()).toPath())); + BufferedReader br = new BufferedReader(isr)) { String line; - while (sampledLineCnt < totalSampleLineCnt + while (sampledRowCnt < totalSampleLineCnt && (line = br.readLine()) != null) { - sampledLineCnt += 1; - sampledTotalSize += line.length(); + sampledRowCnt += 1; + sampledRowSize += line.getBytes(StandardCharsets.UTF_8).length; } } } } - if (sampledTotalSize == 0) { + + // If line break is "\r\n", br.readLine() will ignore '\n' which make sampledRowSize + // smaller than totalFileSize. This will influence test result. + if (sampledRowCnt < totalSampleLineCnt) { + sampledRowSize = totalFileSize; + } + if (sampledRowSize == 0) { return TableStats.UNKNOWN; } - int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledLineCnt); - int estimatedRowCount = (int) (totalSize * realSampledLineCnt / sampledTotalSize); + int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledRowCnt); + long estimatedRowCount = totalFileSize * realSampledLineCnt / sampledRowSize; return new TableStats(estimatedRowCount); } catch (Exception e) { return TableStats.UNKNOWN;