This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 186724f [FLINK-17286][table][json] Integrate json to file system connector 186724f is described below commit 186724f1625f13482ef1e6362764459129ee84fe Author: Leonard Xu <xbjt...@163.com> AuthorDate: Mon May 11 15:35:31 2020 +0800 [FLINK-17286][table][json] Integrate json to file system connector This closes #12010 --- .../flink/table/catalog/hive/HiveCatalog.java | 2 +- .../flink/api/common/io/DelimitedInputFormat.java | 6 +- flink-formats/flink-json/pom.xml | 17 ++ .../formats/json/JsonFileSystemFormatFactory.java | 270 +++++++++++++++++++++ .../org.apache.flink.table.factories.TableFactory | 1 + .../formats/json/JsonBatchFileSystemITCase.java | 62 +++++ .../flink/formats/json/JsonFsStreamSinkITCase.java | 39 +++ .../flink/orc/OrcFileSystemFormatFactory.java | 6 +- .../parquet/ParquetFileSystemFormatFactory.java | 6 +- .../table/factories/FileSystemFormatFactory.java | 61 ++++- .../flink/table/utils}/PartitionPathUtils.java | 83 ++++++- .../utils/TestCsvFileSystemFormatFactory.java | 4 +- .../planner/utils/TestRowDataCsvInputFormat.java | 2 +- .../planner/runtime/FileSystemITCaseBase.scala | 7 +- .../table/filesystem/DynamicPartitionWriter.java | 2 +- .../table/filesystem/FileSystemTableSink.java | 1 + .../table/filesystem/FileSystemTableSource.java | 1 + .../table/filesystem/GroupedPartitionWriter.java | 2 +- .../flink/table/filesystem/PartitionLoader.java | 4 +- .../table/filesystem/PartitionTempFileManager.java | 2 +- .../table/filesystem/SingleDirectoryWriter.java | 2 +- .../table/filesystem/RowPartitionComputerTest.java | 2 +- 22 files changed, 558 insertions(+), 24 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 6d9dc57..128aa21 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -116,7 +116,7 @@ import java.util.stream.Collectors; import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX; import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveIntStat; import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveLongStat; -import static org.apache.flink.table.filesystem.PartitionPathUtils.unescapePathName; +import static org.apache.flink.table.utils.PartitionPathUtils.unescapePathName; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index 977a02c..0c09a4c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -144,9 +144,9 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple private transient int limit; - private transient byte[] currBuffer; // buffer in which current record byte sequence is found - private transient int currOffset; // offset in above buffer - private transient int currLen; // length of current byte sequence + protected transient byte[] currBuffer; // buffer in which current record byte sequence is found + protected transient int currOffset; // offset in above buffer + protected transient int currLen; // length of current byte sequence private transient boolean overLimit; diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml index bff7fc1..19d5045 100644 --- a/flink-formats/flink-json/pom.xml +++ b/flink-formats/flink-json/pom.xml @@ -77,6 +77,23 @@ under the License. <scope>test</scope> </dependency> + <!-- Json filesystem format factory ITCase test dependency --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <!-- test utils dependency --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- JSON RowData schema test dependency --> <dependency> <groupId>org.scala-lang</groupId> diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java new file mode 100644 index 0000000..9a2be31 --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.api.common.io.DelimitedInputFormat; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.FileSystemFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.PartitionPathUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_IGNORE_PARSE_ERRORS; + +/** + * Factory to build reader/writer to read/write json format file. + */ +public class JsonFileSystemFormatFactory implements FileSystemFormatFactory { + + @Override + public Map<String, String> requiredContext() { + Map<String, String> context = new HashMap<>(); + context.put(FORMAT, "json"); + return context; + } + + @Override + public List<String> supportedProperties() { + ArrayList<String> properties = new ArrayList<>(); + properties.add(FORMAT_FAIL_ON_MISSING_FIELD); + properties.add(FORMAT_IGNORE_PARSE_ERRORS); + return properties; + } + + @Override + public InputFormat<RowData, ?> createReader(ReaderContext context) { + DescriptorProperties properties = getValidatedProperties(context.getFormatProperties()); + boolean failOnMissingField = properties.getOptionalBoolean(FORMAT_FAIL_ON_MISSING_FIELD).orElse(false); + boolean ignoreParseErrors = properties.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS).orElse(false); + + RowType formatRowType = context.getFormatRowType(); + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + formatRowType, + new GenericTypeInfo(GenericRowData.class), + failOnMissingField, + ignoreParseErrors); + + String[] fieldNames = context.getSchema().getFieldNames(); + List<String> projectFields = Arrays.stream(context.getProjectFields()) + .mapToObj(idx -> fieldNames[idx]) + .collect(Collectors.toList()); + List<String> jsonFields = Arrays.stream(fieldNames) + .filter(field -> !context.getPartitionKeys().contains(field)) + .collect(Collectors.toList()); + + int[] jsonSelectFieldToProjectFieldMapping = context.getFormatProjectFields().stream() + .mapToInt(projectFields::indexOf) + .toArray(); + int[] jsonSelectFieldToJsonFieldMapping = context.getFormatProjectFields().stream() + .mapToInt(jsonFields::indexOf) + .toArray(); + + return new JsonInputFormat( + context.getPaths(), + context.getSchema().getFieldDataTypes(), + context.getSchema().getFieldNames(), + context.getProjectFields(), + context.getPartitionKeys(), + context.getDefaultPartName(), + context.getPushedDownLimit(), + jsonSelectFieldToProjectFieldMapping, + jsonSelectFieldToJsonFieldMapping, + deserializationSchema); + } + + @Override + public Optional<Encoder<RowData>> createEncoder(WriterContext context) { + return Optional.of(new JsonRowDataEncoder(new JsonRowDataSerializationSchema(context.getFormatRowType()))); + } + + @Override + public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) { + return Optional.empty(); + } + + @Override + public boolean supportsSchemaDerivation() { + return true; + } + + private static DescriptorProperties getValidatedProperties(Map<String, String> propertiesMap) { + final DescriptorProperties properties = new DescriptorProperties(true); + properties.putProperties(propertiesMap); + properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true); + properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true); + return properties; + } + + /** + * A {@link JsonInputFormat} is responsible to read {@link RowData} records + * from json format files. + */ + public static class JsonInputFormat extends DelimitedInputFormat<RowData> { + /** + * Code of \r, used to remove \r from a line when the line ends with \r\n. + */ + private static final byte CARRIAGE_RETURN = (byte) '\r'; + + /** + * Code of \n, used to identify if \n is used as delimiter. + */ + private static final byte NEW_LINE = (byte) '\n'; + + private final DataType[] fieldTypes; + private final String[] fieldNames; + private final int[] selectFields; + private final List<String> partitionKeys; + private final String defaultPartValue; + private final long limit; + private final int[] jsonSelectFieldToProjectFieldMapping; + private final int[] jsonSelectFieldToJsonFieldMapping; + private final JsonRowDataDeserializationSchema deserializationSchema; + + private transient boolean end; + private transient long emitted; + // reuse object for per record + private transient GenericRowData rowData; + + public JsonInputFormat( + Path[] filePaths, + DataType[] fieldTypes, + String[] fieldNames, + int[] selectFields, + List<String> partitionKeys, + String defaultPartValue, + long limit, + int[] jsonSelectFieldToProjectFieldMapping, + int[] jsonSelectFieldToJsonFieldMapping, + JsonRowDataDeserializationSchema deserializationSchema) { + super.setFilePaths(filePaths); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + this.selectFields = selectFields; + this.partitionKeys = partitionKeys; + this.defaultPartValue = defaultPartValue; + this.limit = limit; + this.jsonSelectFieldToProjectFieldMapping = jsonSelectFieldToProjectFieldMapping; + this.jsonSelectFieldToJsonFieldMapping = jsonSelectFieldToJsonFieldMapping; + this.deserializationSchema = deserializationSchema; + } + + @Override + public boolean supportsMultiPaths() { + return true; + } + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + this.end = false; + this.emitted = 0L; + this.rowData = PartitionPathUtils.fillPartitionValueForRecord(fieldNames, fieldTypes, selectFields, + partitionKeys, currentSplit.getPath(), defaultPartValue); + } + + @Override + public boolean reachedEnd() { + return emitted >= limit || end; + } + + @Override + public RowData readRecord(RowData reuse, byte[] bytes, int offset, int numBytes) throws IOException { + // remove \r from a line when the line ends with \r\n + if (this.getDelimiter() != null && this.getDelimiter().length == 1 + && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1 + && bytes[offset + numBytes - 1] == CARRIAGE_RETURN) { + numBytes -= 1; + } + byte[] trimBytes = Arrays.copyOfRange(bytes, offset, offset + numBytes); + GenericRowData jsonRow = (GenericRowData) deserializationSchema.deserialize(trimBytes); + + if (jsonRow == null) { + return null; + } + + GenericRowData returnRecord = rowData; + for (int i = 0; i < jsonSelectFieldToJsonFieldMapping.length; i++) { + returnRecord.setField(jsonSelectFieldToProjectFieldMapping[i], + jsonRow.getField(jsonSelectFieldToJsonFieldMapping[i])); + } + + emitted++; + return returnRecord; + } + + @Override + public RowData nextRecord(RowData record) throws IOException { + while (true) { + if (readLine()) { + RowData row = readRecord(record, this.currBuffer, this.currOffset, this.currLen); + if (row == null) { + continue; + } else { + return row; + } + } else { + this.end = true; + return null; + } + } + } + } + + /** + * A {@link JsonRowDataEncoder} is responsible to encode a {@link RowData} to {@link java.io.OutputStream} + * with json format. + */ + public static class JsonRowDataEncoder implements Encoder<RowData> { + + private static final long serialVersionUID = 1L; + private static final String DEFAULT_LINE_DELIMITER = "\n"; + private final JsonRowDataSerializationSchema serializationSchema; + + public JsonRowDataEncoder(JsonRowDataSerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + } + + @Override + public void encode(RowData element, OutputStream stream) throws IOException { + stream.write(serializationSchema.serialize(element)); + stream.write(DEFAULT_LINE_DELIMITER.getBytes(StandardCharsets.UTF_8)); + } + } +} diff --git a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index aec5846..bc622f2 100644 --- a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.flink.formats.json.JsonRowFormatFactory +org.apache.flink.formats.json.JsonFileSystemFormatFactory diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java new file mode 100644 index 0000000..2c01d29 --- /dev/null +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; + +import org.junit.Test; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * ITCase to test json format for {@link JsonFileSystemFormatFactory}. + */ +public class JsonBatchFileSystemITCase extends BatchFileSystemITCaseBase { + + @Override + public String[] formatProperties() { + List<String> ret = new ArrayList<>(); + ret.add("'format'='json'"); + ret.add("'format.ignore-parse-errors'='true'"); + return ret.toArray(new String[0]); + } + + @Test + public void testParseError() throws Exception { + String path = new URI(resultPath()).getPath(); + new File(path).mkdirs(); + File file = new File(path, "temp_file"); + file.createNewFile(); + FileUtils.writeFileUtf8(file, + "{\"x\":\"x5\",\"y\":5,\"a\":1,\"b\":1}\n" + + "{I am a wrong json.}\n" + + "{\"x\":\"x5\",\"y\":5,\"a\":1,\"b\":1}"); + + check("select * from nonPartitionedTable", + Arrays.asList( + Row.of("x5,5,1,1"), + Row.of("x5,5,1,1"))); + } +} diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java new file mode 100644 index 0000000..7690299 --- /dev/null +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase; + +import java.util.ArrayList; +import java.util.List; + +/** + * Test checkpoint for file system table factory with json format. + */ +public class JsonFsStreamSinkITCase extends FsStreamingSinkITCaseBase { + + @Override + public String[] additionalProperties() { + List<String> ret = new ArrayList<>(); + ret.add("'format'='json'"); + // for test purpose + ret.add("'sink.rolling-policy.file-size'='1'"); + return ret.toArray(new String[0]); + } +} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java index d56d388..9ab0ad5 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java @@ -29,10 +29,10 @@ import org.apache.flink.orc.writer.OrcBulkWriterFactory; import org.apache.flink.table.data.RowData; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.FileSystemFormatFactory; -import org.apache.flink.table.filesystem.PartitionPathUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -111,12 +111,12 @@ public class OrcFileSystemFormatFactory implements FileSystemFormatFactory { DescriptorProperties properties = new DescriptorProperties(); properties.putProperties(context.getFormatProperties()); - LogicalType[] orcTypes = Arrays.stream(context.getFieldTypesWithoutPartKeys()) + LogicalType[] orcTypes = Arrays.stream(context.getFormatFieldTypes()) .map(DataType::getLogicalType) .toArray(LogicalType[]::new); TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType( - RowType.of(orcTypes, context.getFieldNamesWithoutPartKeys())); + RowType.of(orcTypes, context.getFormatFieldNames())); OrcBulkWriterFactory<RowData> factory = new OrcBulkWriterFactory<>( new RowDataVectorizer(typeDescription.toString(), orcTypes), diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java index c4754a2..95791bb 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java @@ -32,10 +32,10 @@ import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil; import org.apache.flink.table.data.RowData; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.FileSystemFormatFactory; -import org.apache.flink.table.filesystem.PartitionPathUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -135,10 +135,10 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory { properties.putProperties(context.getFormatProperties()); return Optional.of(ParquetRowDataBuilder.createWriterFactory( - RowType.of(Arrays.stream(context.getFieldTypesWithoutPartKeys()) + RowType.of(Arrays.stream(context.getFormatFieldTypes()) .map(DataType::getLogicalType) .toArray(LogicalType[]::new), - context.getFieldNamesWithoutPartKeys()), + context.getFormatFieldNames()), getParquetConfiguration(properties), isUtcTimestamp(properties) )); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java index 938fc3d..1f660f8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java @@ -27,11 +27,14 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; /** * File system format factory for creating configured instances of reader and writer. @@ -102,6 +105,48 @@ public interface FileSystemFormatFactory extends TableFormatFactory<RowData> { * The follow up operator will filter the records again. */ List<Expression> getPushedDownFilters(); + + /** + * Get field names without partition keys. + */ + default String[] getFormatFieldNames() { + return Arrays.stream(getSchema().getFieldNames()) + .filter(name -> !getPartitionKeys().contains(name)) + .toArray(String[]::new); + } + + /** + * Get field types without partition keys. + */ + default DataType[] getFormatFieldTypes() { + return Arrays.stream(getSchema().getFieldNames()) + .filter(name -> !getPartitionKeys().contains(name)) + .map(name -> getSchema().getFieldDataType(name).get()) + .toArray(DataType[]::new); + } + + /** + * RowType of table that excludes partition key fields. + */ + default RowType getFormatRowType() { + return RowType.of( + Arrays.stream(getFormatFieldTypes()) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new), + getFormatFieldNames()); + } + + /** + * Mapping from non-partition project fields index to all project fields index. + */ + default List<String> getFormatProjectFields() { + final List<String> selectFieldNames = Arrays.stream(getProjectFields()) + .mapToObj(i -> getSchema().getFieldNames()[i]) + .collect(Collectors.toList()); + return selectFieldNames.stream() + .filter(name -> !getPartitionKeys().contains(name)) + .collect(Collectors.toList()); + } } /** @@ -127,7 +172,7 @@ public interface FileSystemFormatFactory extends TableFormatFactory<RowData> { /** * Get field names without partition keys. */ - default String[] getFieldNamesWithoutPartKeys() { + default String[] getFormatFieldNames() { return Arrays.stream(getSchema().getFieldNames()) .filter(name -> !getPartitionKeys().contains(name)) .toArray(String[]::new); @@ -136,11 +181,23 @@ public interface FileSystemFormatFactory extends TableFormatFactory<RowData> { /** * Get field types without partition keys. */ - default DataType[] getFieldTypesWithoutPartKeys() { + default DataType[] getFormatFieldTypes() { return Arrays.stream(getSchema().getFieldNames()) .filter(name -> !getPartitionKeys().contains(name)) .map(name -> getSchema().getFieldDataType(name).get()) .toArray(DataType[]::new); } + + /** + * Get RowType of the table without partition keys. + * @return + */ + default RowType getFormatRowType() { + return RowType.of( + Arrays.stream(getFormatFieldTypes()) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new), + getFormatFieldNames()); + } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java similarity index 72% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathUtils.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java index 93583e6..2ee8d48 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.filesystem; +package org.apache.flink.table.utils; import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple2; @@ -24,8 +24,15 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -201,6 +208,80 @@ public class PartitionPathUtils { return ret; } + /** + * Extract partition value from path and fill to record. + * @param fieldNames record field names. + * @param fieldTypes record field types. + * @param selectFields the selected fields. + * @param partitionKeys the partition field names. + * @param path the file path that the partition located. + * @param defaultPartValue default value of partition field. + * @return the filled record. + */ + public static GenericRowData fillPartitionValueForRecord( + String[] fieldNames, + DataType[] fieldTypes, + int[] selectFields, + List<String> partitionKeys, + Path path, + String defaultPartValue) { + GenericRowData record = new GenericRowData(selectFields.length); + LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath(path); + for (int i = 0; i < selectFields.length; i++) { + int selectField = selectFields[i]; + String name = fieldNames[selectField]; + if (partitionKeys.contains(name)) { + String value = partSpec.get(name); + value = defaultPartValue.equals(value) ? null : value; + record.setField(i, PartitionPathUtils.convertStringToInternalValue(value, fieldTypes[selectField])); + } + } + return record; + } + + /** + * Restore partition value from string and type. + * + * @param valStr string partition value. + * @param type type of partition field. + * @return partition value. + */ + public static Object convertStringToInternalValue(String valStr, DataType type) { + if (valStr == null) { + return null; + } + + LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot(); + switch (typeRoot) { + case CHAR: + case VARCHAR: + return StringData.fromString(valStr); + case BOOLEAN: + return Boolean.parseBoolean(valStr); + case TINYINT: + return Byte.parseByte(valStr); + case SMALLINT: + return Short.parseShort(valStr); + case INTEGER: + return Integer.parseInt(valStr); + case BIGINT: + return Long.parseLong(valStr); + case FLOAT: + return Float.parseFloat(valStr); + case DOUBLE: + return Double.parseDouble(valStr); + case DATE: + return (int) LocalDate.parse(valStr).toEpochDay(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return TimestampData.fromLocalDateTime(LocalDateTime.parse(valStr)); + default: + throw new RuntimeException(String.format( + "Can not convert %s to type %s for partition value", + valStr, + type)); + } + } + private static FileStatus[] getFileStatusRecurse(Path path, int expectLevel, FileSystem fs) { ArrayList<FileStatus> result = new ArrayList<>(); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java index 48045e2..64949e1 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java @@ -89,7 +89,7 @@ public class TestCsvFileSystemFormatFactory implements FileSystemFormatFactory { return Optional.empty(); } - DataType[] types = context.getFieldTypesWithoutPartKeys(); + DataType[] types = context.getFormatFieldTypes(); return Optional.of((rowData, stream) -> { writeCsvToStream(types, rowData, stream); }); @@ -127,7 +127,7 @@ public class TestCsvFileSystemFormatFactory implements FileSystemFormatFactory { return Optional.empty(); } - DataType[] types = context.getFieldTypesWithoutPartKeys(); + DataType[] types = context.getFormatFieldTypes(); return Optional.of(out -> new CsvBulkWriter(types, out)); } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestRowDataCsvInputFormat.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestRowDataCsvInputFormat.java index 43f2316..828aac2 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestRowDataCsvInputFormat.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestRowDataCsvInputFormat.java @@ -34,8 +34,8 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.util.DataFormatConverters; -import org.apache.flink.table.filesystem.PartitionPathUtils; import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.flink.types.Row; import java.io.IOException; diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala index cb0f2f7..2c69b07 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala @@ -30,7 +30,7 @@ import org.apache.flink.types.Row import org.junit.rules.TemporaryFolder import org.junit.{Rule, Test} -import scala.collection.Seq +import scala.collection.{JavaConverters, Seq} /** * Test File system table factory. @@ -49,6 +49,11 @@ trait FileSystemITCaseBase { def check(sqlQuery: String, expectedResult: Seq[Row]): Unit + def check(sqlQuery: String, expectedResult: java.util.List[Row]): Unit = { + check(sqlQuery, + JavaConverters.asScalaIteratorConverter(expectedResult.iterator()).asScala.toSeq) + } + def open(): Unit = { resultPath = fileTmpFolder.newFolder().toURI.toString BatchTableEnvUtil.registerCollection( diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/DynamicPartitionWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/DynamicPartitionWriter.java index a3ddb8f..008ee19 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/DynamicPartitionWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/DynamicPartitionWriter.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat; import java.util.HashMap; import java.util.Map; -import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath; +import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath; /** * Dynamic partition writer to writing multiple partitions at the same time, it maybe consumes more memory. diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java index 34c5b7d..ea55158 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java @@ -44,6 +44,7 @@ import org.apache.flink.table.sinks.OverwritableTableSink; import org.apache.flink.table.sinks.PartitionableTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.flink.util.Preconditions; import java.io.IOException; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java index 249f1ee..7e52ced 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java @@ -33,6 +33,7 @@ import org.apache.flink.table.sources.LimitableTableSource; import org.apache.flink.table.sources.PartitionableTableSource; import org.apache.flink.table.sources.ProjectableTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.utils.PartitionPathUtils; import java.util.ArrayList; import java.util.Arrays; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java index c5fab7b..94dfe5e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java @@ -21,7 +21,7 @@ package org.apache.flink.table.filesystem; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.io.OutputFormat; -import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath; +import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath; /** * {@link PartitionWriter} for grouped dynamic partition inserting. It will create a new format diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java index 4e1ab04..9e0b98d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java @@ -31,8 +31,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Optional; -import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath; -import static org.apache.flink.table.filesystem.PartitionPathUtils.listStatusWithoutHidden; +import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath; +import static org.apache.flink.table.utils.PartitionPathUtils.listStatusWithoutHidden; /** * Loader to temporary files to final output path and meta store. According to overwrite, diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java index 26e9c81..b00bed3 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java @@ -31,7 +31,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import static org.apache.flink.table.filesystem.PartitionPathUtils.searchPartSpecAndPaths; +import static org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths; import static org.apache.flink.util.Preconditions.checkArgument; /** diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SingleDirectoryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SingleDirectoryWriter.java index af00a61..8927363 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SingleDirectoryWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SingleDirectoryWriter.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat; import java.io.IOException; import java.util.LinkedHashMap; -import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath; +import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath; /** * {@link PartitionWriter} for single directory writer. It just use one format to write. diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/RowPartitionComputerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/RowPartitionComputerTest.java index 06e2553..d7dcc6e 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/RowPartitionComputerTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/RowPartitionComputerTest.java @@ -23,7 +23,7 @@ import org.apache.flink.types.Row; import org.junit.Assert; import org.junit.Test; -import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath; +import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath; /** * Test for {@link RowPartitionComputer}.