This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 186724f  [FLINK-17286][table][json] Integrate json to file system 
connector
186724f is described below

commit 186724f1625f13482ef1e6362764459129ee84fe
Author: Leonard Xu <xbjt...@163.com>
AuthorDate: Mon May 11 15:35:31 2020 +0800

    [FLINK-17286][table][json] Integrate json to file system connector
    
    
    This closes #12010
---
 .../flink/table/catalog/hive/HiveCatalog.java      |   2 +-
 .../flink/api/common/io/DelimitedInputFormat.java  |   6 +-
 flink-formats/flink-json/pom.xml                   |  17 ++
 .../formats/json/JsonFileSystemFormatFactory.java  | 270 +++++++++++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |   1 +
 .../formats/json/JsonBatchFileSystemITCase.java    |  62 +++++
 .../flink/formats/json/JsonFsStreamSinkITCase.java |  39 +++
 .../flink/orc/OrcFileSystemFormatFactory.java      |   6 +-
 .../parquet/ParquetFileSystemFormatFactory.java    |   6 +-
 .../table/factories/FileSystemFormatFactory.java   |  61 ++++-
 .../flink/table/utils}/PartitionPathUtils.java     |  83 ++++++-
 .../utils/TestCsvFileSystemFormatFactory.java      |   4 +-
 .../planner/utils/TestRowDataCsvInputFormat.java   |   2 +-
 .../planner/runtime/FileSystemITCaseBase.scala     |   7 +-
 .../table/filesystem/DynamicPartitionWriter.java   |   2 +-
 .../table/filesystem/FileSystemTableSink.java      |   1 +
 .../table/filesystem/FileSystemTableSource.java    |   1 +
 .../table/filesystem/GroupedPartitionWriter.java   |   2 +-
 .../flink/table/filesystem/PartitionLoader.java    |   4 +-
 .../table/filesystem/PartitionTempFileManager.java |   2 +-
 .../table/filesystem/SingleDirectoryWriter.java    |   2 +-
 .../table/filesystem/RowPartitionComputerTest.java |   2 +-
 22 files changed, 558 insertions(+), 24 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 6d9dc57..128aa21 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -116,7 +116,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
 import static 
org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveIntStat;
 import static 
org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveLongStat;
-import static 
org.apache.flink.table.filesystem.PartitionPathUtils.unescapePathName;
+import static org.apache.flink.table.utils.PartitionPathUtils.unescapePathName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 977a02c..0c09a4c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -144,9 +144,9 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
 
        private transient int limit;
 
-       private transient byte[] currBuffer;            // buffer in which 
current record byte sequence is found
-       private transient int currOffset;                       // offset in 
above buffer
-       private transient int currLen;                          // length of 
current byte sequence
+       protected transient byte[] currBuffer;          // buffer in which 
current record byte sequence is found
+       protected transient int currOffset;                     // offset in 
above buffer
+       protected transient int currLen;                                // 
length of current byte sequence
 
        private transient boolean overLimit;
 
diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index bff7fc1..19d5045 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -77,6 +77,23 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- Json filesystem format factory ITCase test dependency -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <!-- test utils dependency -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- JSON RowData schema test dependency -->
                <dependency>
                        <groupId>org.scala-lang</groupId>
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
new file mode 100644
index 0000000..9a2be31
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.FileSystemFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.PartitionPathUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_IGNORE_PARSE_ERRORS;
+
+/**
+ * Factory to build reader/writer to read/write json format file.
+ */
+public class JsonFileSystemFormatFactory implements FileSystemFormatFactory {
+
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> context = new HashMap<>();
+               context.put(FORMAT, "json");
+               return context;
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               ArrayList<String> properties = new ArrayList<>();
+               properties.add(FORMAT_FAIL_ON_MISSING_FIELD);
+               properties.add(FORMAT_IGNORE_PARSE_ERRORS);
+               return properties;
+       }
+
+       @Override
+       public InputFormat<RowData, ?> createReader(ReaderContext context) {
+               DescriptorProperties properties = 
getValidatedProperties(context.getFormatProperties());
+               boolean failOnMissingField = 
properties.getOptionalBoolean(FORMAT_FAIL_ON_MISSING_FIELD).orElse(false);
+               boolean ignoreParseErrors = 
properties.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS).orElse(false);
+
+               RowType formatRowType = context.getFormatRowType();
+               JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+                       formatRowType,
+                       new GenericTypeInfo(GenericRowData.class),
+                       failOnMissingField,
+                       ignoreParseErrors);
+
+               String[] fieldNames = context.getSchema().getFieldNames();
+               List<String> projectFields = 
Arrays.stream(context.getProjectFields())
+                       .mapToObj(idx -> fieldNames[idx])
+                       .collect(Collectors.toList());
+               List<String> jsonFields = Arrays.stream(fieldNames)
+                       .filter(field -> 
!context.getPartitionKeys().contains(field))
+                       .collect(Collectors.toList());
+
+               int[] jsonSelectFieldToProjectFieldMapping = 
context.getFormatProjectFields().stream()
+                       .mapToInt(projectFields::indexOf)
+                       .toArray();
+               int[] jsonSelectFieldToJsonFieldMapping = 
context.getFormatProjectFields().stream()
+                       .mapToInt(jsonFields::indexOf)
+                       .toArray();
+
+               return new JsonInputFormat(
+                       context.getPaths(),
+                       context.getSchema().getFieldDataTypes(),
+                       context.getSchema().getFieldNames(),
+                       context.getProjectFields(),
+                       context.getPartitionKeys(),
+                       context.getDefaultPartName(),
+                       context.getPushedDownLimit(),
+                       jsonSelectFieldToProjectFieldMapping,
+                       jsonSelectFieldToJsonFieldMapping,
+                       deserializationSchema);
+       }
+
+       @Override
+       public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
+               return Optional.of(new JsonRowDataEncoder(new 
JsonRowDataSerializationSchema(context.getFormatRowType())));
+       }
+
+       @Override
+       public Optional<BulkWriter.Factory<RowData>> 
createBulkWriterFactory(WriterContext context) {
+               return Optional.empty();
+       }
+
+       @Override
+       public boolean supportsSchemaDerivation() {
+               return true;
+       }
+
+       private static DescriptorProperties getValidatedProperties(Map<String, 
String> propertiesMap) {
+               final DescriptorProperties properties = new 
DescriptorProperties(true);
+               properties.putProperties(propertiesMap);
+               properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true);
+               properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true);
+               return properties;
+       }
+
+       /**
+        * A {@link JsonInputFormat} is responsible to read {@link RowData} 
records
+        * from json format files.
+        */
+       public static class JsonInputFormat extends 
DelimitedInputFormat<RowData> {
+               /**
+                * Code of \r, used to remove \r from a line when the line ends 
with \r\n.
+                */
+               private static final byte CARRIAGE_RETURN = (byte) '\r';
+
+               /**
+                * Code of \n, used to identify if \n is used as delimiter.
+                */
+               private static final byte NEW_LINE = (byte) '\n';
+
+               private final DataType[] fieldTypes;
+               private final String[] fieldNames;
+               private final int[] selectFields;
+               private final List<String> partitionKeys;
+               private final String defaultPartValue;
+               private final long limit;
+               private final int[] jsonSelectFieldToProjectFieldMapping;
+               private final int[] jsonSelectFieldToJsonFieldMapping;
+               private final JsonRowDataDeserializationSchema 
deserializationSchema;
+
+               private transient boolean end;
+               private transient long emitted;
+               // reuse object for per record
+               private transient GenericRowData rowData;
+
+               public JsonInputFormat(
+                       Path[] filePaths,
+                       DataType[] fieldTypes,
+                       String[] fieldNames,
+                       int[] selectFields,
+                       List<String> partitionKeys,
+                       String defaultPartValue,
+                       long limit,
+                       int[] jsonSelectFieldToProjectFieldMapping,
+                       int[] jsonSelectFieldToJsonFieldMapping,
+                       JsonRowDataDeserializationSchema deserializationSchema) 
{
+                       super.setFilePaths(filePaths);
+                       this.fieldTypes = fieldTypes;
+                       this.fieldNames = fieldNames;
+                       this.selectFields = selectFields;
+                       this.partitionKeys = partitionKeys;
+                       this.defaultPartValue = defaultPartValue;
+                       this.limit = limit;
+                       this.jsonSelectFieldToProjectFieldMapping = 
jsonSelectFieldToProjectFieldMapping;
+                       this.jsonSelectFieldToJsonFieldMapping = 
jsonSelectFieldToJsonFieldMapping;
+                       this.deserializationSchema = deserializationSchema;
+               }
+
+               @Override
+               public boolean supportsMultiPaths() {
+                       return true;
+               }
+
+               @Override
+               public void open(FileInputSplit split) throws IOException {
+                       super.open(split);
+                       this.end = false;
+                       this.emitted = 0L;
+                       this.rowData = 
PartitionPathUtils.fillPartitionValueForRecord(fieldNames, fieldTypes, 
selectFields,
+                               partitionKeys, currentSplit.getPath(), 
defaultPartValue);
+               }
+
+               @Override
+               public boolean reachedEnd() {
+                       return emitted >= limit || end;
+               }
+
+               @Override
+               public RowData readRecord(RowData reuse, byte[] bytes, int 
offset, int numBytes) throws IOException {
+                       // remove \r from a line when the line ends with \r\n
+                       if (this.getDelimiter() != null && 
this.getDelimiter().length == 1
+                               && this.getDelimiter()[0] == NEW_LINE && offset 
+ numBytes >= 1
+                               && bytes[offset + numBytes - 1] == 
CARRIAGE_RETURN) {
+                               numBytes -= 1;
+                       }
+                       byte[] trimBytes = Arrays.copyOfRange(bytes, offset, 
offset + numBytes);
+                       GenericRowData jsonRow = (GenericRowData) 
deserializationSchema.deserialize(trimBytes);
+
+                       if (jsonRow == null) {
+                               return null;
+                       }
+
+                       GenericRowData returnRecord = rowData;
+                       for (int i = 0; i < 
jsonSelectFieldToJsonFieldMapping.length; i++) {
+                               
returnRecord.setField(jsonSelectFieldToProjectFieldMapping[i],
+                                       
jsonRow.getField(jsonSelectFieldToJsonFieldMapping[i]));
+                       }
+
+                       emitted++;
+                       return returnRecord;
+               }
+
+               @Override
+               public RowData nextRecord(RowData record) throws IOException {
+                       while (true) {
+                               if (readLine()) {
+                                       RowData row = readRecord(record, 
this.currBuffer, this.currOffset, this.currLen);
+                                       if (row == null) {
+                                               continue;
+                                       } else {
+                                               return row;
+                                       }
+                               } else {
+                                       this.end = true;
+                                       return null;
+                               }
+                       }
+               }
+       }
+
+       /**
+        * A {@link JsonRowDataEncoder} is responsible to encode a {@link 
RowData} to {@link java.io.OutputStream}
+        * with json format.
+        */
+       public static class JsonRowDataEncoder implements Encoder<RowData> {
+
+               private static final long serialVersionUID = 1L;
+               private static final String DEFAULT_LINE_DELIMITER = "\n";
+               private final JsonRowDataSerializationSchema 
serializationSchema;
+
+               public JsonRowDataEncoder(JsonRowDataSerializationSchema 
serializationSchema) {
+                       this.serializationSchema = serializationSchema;
+               }
+
+               @Override
+               public void encode(RowData element, OutputStream stream) throws 
IOException {
+                       stream.write(serializationSchema.serialize(element));
+                       
stream.write(DEFAULT_LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));
+               }
+       }
+}
diff --git 
a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index aec5846..bc622f2 100644
--- 
a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.formats.json.JsonRowFormatFactory
+org.apache.flink.formats.json.JsonFileSystemFormatFactory
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
new file mode 100644
index 0000000..2c01d29
--- /dev/null
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import 
org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * ITCase to test json format for {@link JsonFileSystemFormatFactory}.
+ */
+public class JsonBatchFileSystemITCase extends BatchFileSystemITCaseBase {
+
+       @Override
+       public String[] formatProperties() {
+               List<String> ret = new ArrayList<>();
+               ret.add("'format'='json'");
+               ret.add("'format.ignore-parse-errors'='true'");
+               return ret.toArray(new String[0]);
+       }
+
+       @Test
+       public void testParseError() throws Exception {
+               String path = new URI(resultPath()).getPath();
+               new File(path).mkdirs();
+               File file = new File(path, "temp_file");
+               file.createNewFile();
+               FileUtils.writeFileUtf8(file,
+                       "{\"x\":\"x5\",\"y\":5,\"a\":1,\"b\":1}\n" +
+                               "{I am a wrong json.}\n" +
+                               "{\"x\":\"x5\",\"y\":5,\"a\":1,\"b\":1}");
+
+               check("select * from nonPartitionedTable",
+                       Arrays.asList(
+                               Row.of("x5,5,1,1"),
+                               Row.of("x5,5,1,1")));
+       }
+}
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java
new file mode 100644
index 0000000..7690299
--- /dev/null
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test checkpoint for file system table factory with json format.
+ */
+public class JsonFsStreamSinkITCase extends FsStreamingSinkITCaseBase {
+
+       @Override
+       public String[] additionalProperties() {
+               List<String> ret = new ArrayList<>();
+               ret.add("'format'='json'");
+               // for test purpose
+               ret.add("'sink.rolling-policy.file-size'='1'");
+               return ret.toArray(new String[0]);
+       }
+}
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
index d56d388..9ab0ad5 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
@@ -29,10 +29,10 @@ import org.apache.flink.orc.writer.OrcBulkWriterFactory;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
-import org.apache.flink.table.filesystem.PartitionPathUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.PartitionPathUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -111,12 +111,12 @@ public class OrcFileSystemFormatFactory implements 
FileSystemFormatFactory {
                DescriptorProperties properties = new DescriptorProperties();
                properties.putProperties(context.getFormatProperties());
 
-               LogicalType[] orcTypes = 
Arrays.stream(context.getFieldTypesWithoutPartKeys())
+               LogicalType[] orcTypes = 
Arrays.stream(context.getFormatFieldTypes())
                                .map(DataType::getLogicalType)
                                .toArray(LogicalType[]::new);
 
                TypeDescription typeDescription = 
OrcSplitReaderUtil.logicalTypeToOrcType(
-                               RowType.of(orcTypes, 
context.getFieldNamesWithoutPartKeys()));
+                               RowType.of(orcTypes, 
context.getFormatFieldNames()));
 
                OrcBulkWriterFactory<RowData> factory = new 
OrcBulkWriterFactory<>(
                                new 
RowDataVectorizer(typeDescription.toString(), orcTypes),
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
index c4754a2..95791bb 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
@@ -32,10 +32,10 @@ import 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
-import org.apache.flink.table.filesystem.PartitionPathUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.PartitionPathUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.ParquetOutputFormat;
@@ -135,10 +135,10 @@ public class ParquetFileSystemFormatFactory implements 
FileSystemFormatFactory {
                properties.putProperties(context.getFormatProperties());
 
                return Optional.of(ParquetRowDataBuilder.createWriterFactory(
-                               
RowType.of(Arrays.stream(context.getFieldTypesWithoutPartKeys())
+                               
RowType.of(Arrays.stream(context.getFormatFieldTypes())
                                                                
.map(DataType::getLogicalType)
                                                                
.toArray(LogicalType[]::new),
-                                               
context.getFieldNamesWithoutPartKeys()),
+                                               context.getFormatFieldNames()),
                                getParquetConfiguration(properties),
                                isUtcTimestamp(properties)
                ));
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java
index 938fc3d..1f660f8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java
@@ -27,11 +27,14 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * File system format factory for creating configured instances of reader and 
writer.
@@ -102,6 +105,48 @@ public interface FileSystemFormatFactory extends 
TableFormatFactory<RowData> {
                 * The follow up operator will filter the records again.
                 */
                List<Expression> getPushedDownFilters();
+
+               /**
+                * Get field names without partition keys.
+                */
+               default String[] getFormatFieldNames() {
+                       return Arrays.stream(getSchema().getFieldNames())
+                               .filter(name -> 
!getPartitionKeys().contains(name))
+                               .toArray(String[]::new);
+               }
+
+               /**
+                * Get field types without partition keys.
+                */
+               default DataType[] getFormatFieldTypes() {
+                       return Arrays.stream(getSchema().getFieldNames())
+                               .filter(name -> 
!getPartitionKeys().contains(name))
+                               .map(name -> 
getSchema().getFieldDataType(name).get())
+                               .toArray(DataType[]::new);
+               }
+
+               /**
+                * RowType of table that excludes partition key fields.
+                */
+               default RowType getFormatRowType() {
+                       return RowType.of(
+                               Arrays.stream(getFormatFieldTypes())
+                                       .map(DataType::getLogicalType)
+                                       .toArray(LogicalType[]::new),
+                               getFormatFieldNames());
+               }
+
+               /**
+                * Mapping from non-partition project fields index to all 
project fields index.
+                */
+               default List<String> getFormatProjectFields() {
+                       final List<String> selectFieldNames = 
Arrays.stream(getProjectFields())
+                               .mapToObj(i -> getSchema().getFieldNames()[i])
+                               .collect(Collectors.toList());
+                       return selectFieldNames.stream()
+                               .filter(name -> 
!getPartitionKeys().contains(name))
+                               .collect(Collectors.toList());
+               }
        }
 
        /**
@@ -127,7 +172,7 @@ public interface FileSystemFormatFactory extends 
TableFormatFactory<RowData> {
                /**
                 * Get field names without partition keys.
                 */
-               default String[] getFieldNamesWithoutPartKeys() {
+               default String[] getFormatFieldNames() {
                        return Arrays.stream(getSchema().getFieldNames())
                                        .filter(name -> 
!getPartitionKeys().contains(name))
                                        .toArray(String[]::new);
@@ -136,11 +181,23 @@ public interface FileSystemFormatFactory extends 
TableFormatFactory<RowData> {
                /**
                 * Get field types without partition keys.
                 */
-               default DataType[] getFieldTypesWithoutPartKeys() {
+               default DataType[] getFormatFieldTypes() {
                        return Arrays.stream(getSchema().getFieldNames())
                                        .filter(name -> 
!getPartitionKeys().contains(name))
                                        .map(name -> 
getSchema().getFieldDataType(name).get())
                                        .toArray(DataType[]::new);
                }
+
+               /**
+                * Get RowType of the table without partition keys.
+                * @return
+                */
+               default RowType getFormatRowType() {
+                       return RowType.of(
+                               Arrays.stream(getFormatFieldTypes())
+                                       .map(DataType::getLogicalType)
+                                       .toArray(LogicalType[]::new),
+                               getFormatFieldNames());
+               }
        }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java
similarity index 72%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathUtils.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java
index 93583e6..2ee8d48 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.filesystem;
+package org.apache.flink.table.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -24,8 +24,15 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -201,6 +208,80 @@ public class PartitionPathUtils {
                return ret;
        }
 
+       /**
+        * Extract partition value from path and fill to record.
+        * @param fieldNames record field names.
+        * @param fieldTypes record field types.
+        * @param selectFields the selected fields.
+        * @param partitionKeys the partition field names.
+        * @param path the file path that the partition located.
+        * @param defaultPartValue default value of partition field.
+        * @return the filled record.
+        */
+       public static GenericRowData fillPartitionValueForRecord(
+                       String[] fieldNames,
+                       DataType[] fieldTypes,
+                       int[] selectFields,
+                       List<String> partitionKeys,
+                       Path path,
+                       String defaultPartValue) {
+               GenericRowData record = new GenericRowData(selectFields.length);
+               LinkedHashMap<String, String> partSpec = 
PartitionPathUtils.extractPartitionSpecFromPath(path);
+               for (int i = 0; i < selectFields.length; i++) {
+                       int selectField = selectFields[i];
+                       String name = fieldNames[selectField];
+                       if (partitionKeys.contains(name)) {
+                               String value = partSpec.get(name);
+                               value = defaultPartValue.equals(value) ? null : 
value;
+                               record.setField(i, 
PartitionPathUtils.convertStringToInternalValue(value, 
fieldTypes[selectField]));
+                       }
+               }
+               return record;
+       }
+
+       /**
+        * Restore partition value from string and type.
+        *
+        * @param valStr string partition value.
+        * @param type type of partition field.
+        * @return partition value.
+        */
+       public static Object convertStringToInternalValue(String valStr, 
DataType type) {
+               if (valStr == null) {
+                       return null;
+               }
+
+               LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot();
+               switch (typeRoot) {
+                       case CHAR:
+                       case VARCHAR:
+                               return StringData.fromString(valStr);
+                       case BOOLEAN:
+                               return Boolean.parseBoolean(valStr);
+                       case TINYINT:
+                               return Byte.parseByte(valStr);
+                       case SMALLINT:
+                               return Short.parseShort(valStr);
+                       case INTEGER:
+                               return Integer.parseInt(valStr);
+                       case BIGINT:
+                               return Long.parseLong(valStr);
+                       case FLOAT:
+                               return Float.parseFloat(valStr);
+                       case DOUBLE:
+                               return Double.parseDouble(valStr);
+                       case DATE:
+                               return (int) 
LocalDate.parse(valStr).toEpochDay();
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return 
TimestampData.fromLocalDateTime(LocalDateTime.parse(valStr));
+                       default:
+                               throw new RuntimeException(String.format(
+                                       "Can not convert %s to type %s for 
partition value",
+                                       valStr,
+                                       type));
+               }
+       }
+
        private static FileStatus[] getFileStatusRecurse(Path path, int 
expectLevel, FileSystem fs) {
                ArrayList<FileStatus> result = new ArrayList<>();
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java
index 48045e2..64949e1 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java
@@ -89,7 +89,7 @@ public class TestCsvFileSystemFormatFactory implements 
FileSystemFormatFactory {
                        return Optional.empty();
                }
 
-               DataType[] types = context.getFieldTypesWithoutPartKeys();
+               DataType[] types = context.getFormatFieldTypes();
                return Optional.of((rowData, stream) -> {
                        writeCsvToStream(types, rowData, stream);
                });
@@ -127,7 +127,7 @@ public class TestCsvFileSystemFormatFactory implements 
FileSystemFormatFactory {
                        return Optional.empty();
                }
 
-               DataType[] types = context.getFieldTypesWithoutPartKeys();
+               DataType[] types = context.getFormatFieldTypes();
                return Optional.of(out -> new CsvBulkWriter(types, out));
        }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestRowDataCsvInputFormat.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestRowDataCsvInputFormat.java
index 43f2316..828aac2 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestRowDataCsvInputFormat.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestRowDataCsvInputFormat.java
@@ -34,8 +34,8 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.filesystem.PartitionPathUtils;
 import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.types.Row;
 
 import java.io.IOException;
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index cb0f2f7..2c69b07 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -30,7 +30,7 @@ import org.apache.flink.types.Row
 import org.junit.rules.TemporaryFolder
 import org.junit.{Rule, Test}
 
-import scala.collection.Seq
+import scala.collection.{JavaConverters, Seq}
 
 /**
   * Test File system table factory.
@@ -49,6 +49,11 @@ trait FileSystemITCaseBase {
 
   def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
 
+  def check(sqlQuery: String, expectedResult: java.util.List[Row]): Unit = {
+    check(sqlQuery,
+      
JavaConverters.asScalaIteratorConverter(expectedResult.iterator()).asScala.toSeq)
+  }
+
   def open(): Unit = {
     resultPath = fileTmpFolder.newFolder().toURI.toString
     BatchTableEnvUtil.registerCollection(
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/DynamicPartitionWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/DynamicPartitionWriter.java
index a3ddb8f..008ee19 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/DynamicPartitionWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/DynamicPartitionWriter.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
 
 /**
  * Dynamic partition writer to writing multiple partitions at the same time, 
it maybe consumes more memory.
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 34c5b7d..ea55158 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -44,6 +44,7 @@ import org.apache.flink.table.sinks.OverwritableTableSink;
 import org.apache.flink.table.sinks.PartitionableTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
index 249f1ee..7e52ced 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.sources.LimitableTableSource;
 import org.apache.flink.table.sources.PartitionableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.PartitionPathUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java
index c5fab7b..94dfe5e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.filesystem;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.OutputFormat;
 
-import static 
org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
 
 /**
  * {@link PartitionWriter} for grouped dynamic partition inserting. It will 
create a new format
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
index 4e1ab04..9e0b98d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
@@ -31,8 +31,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
-import static 
org.apache.flink.table.filesystem.PartitionPathUtils.listStatusWithoutHidden;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.listStatusWithoutHidden;
 
 /**
  * Loader to temporary files to final output path and meta store. According to 
overwrite,
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
index 26e9c81..b00bed3 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
@@ -31,7 +31,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.table.filesystem.PartitionPathUtils.searchPartSpecAndPaths;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SingleDirectoryWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SingleDirectoryWriter.java
index af00a61..8927363 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SingleDirectoryWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SingleDirectoryWriter.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import java.io.IOException;
 import java.util.LinkedHashMap;
 
-import static 
org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
 
 /**
  * {@link PartitionWriter} for single directory writer. It just use one format 
to write.
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/RowPartitionComputerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/RowPartitionComputerTest.java
index 06e2553..d7dcc6e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/RowPartitionComputerTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/RowPartitionComputerTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.types.Row;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static 
org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
 
 /**
  * Test for {@link RowPartitionComputer}.

Reply via email to