This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c3277c557aa423e7f5dd83d18b74082bf6c2bc24 Author: wind <bupt_...@163.com> AuthorDate: Sun Aug 12 12:42:11 2018 +0800 [FLINK-9964][table] Add an initial CSV table format factory This closes #6541. --- flink-formats/flink-csv/pom.xml | 88 ++++++++ .../formats/csv/CsvRowDeserializationSchema.java | 225 +++++++++++++++++++ .../flink/formats/csv/CsvRowFormatFactory.java | 143 ++++++++++++ .../flink/formats/csv/CsvRowSchemaConverter.java | 101 +++++++++ .../formats/csv/CsvRowSerializationSchema.java | 242 +++++++++++++++++++++ .../org.apache.flink.table.factories.TableFactory | 16 ++ .../csv/CsvRowDeserializationSchemaTest.java | 150 +++++++++++++ .../flink/formats/csv/CsvRowFormatFactoryTest.java | 121 +++++++++++ .../formats/csv/CsvRowSchemaConverterTest.java | 75 +++++++ .../formats/csv/CsvRowSerializationSchemaTest.java | 234 ++++++++++++++++++++ flink-formats/pom.xml | 1 + .../org/apache/flink/table/descriptors/Csv.scala | 37 +++- .../flink/table/descriptors/CsvValidator.scala | 24 +- .../apache/flink/table/descriptors/CsvTest.scala | 10 + 14 files changed, 1465 insertions(+), 2 deletions(-) diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml new file mode 100644 index 0000000..a2b4f25 --- /dev/null +++ b/flink-formats/flink-csv/pom.xml @@ -0,0 +1,88 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-formats</artifactId> + <version>1.7-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-csv</artifactId> + <name>flink-csv</name> + + <packaging>jar</packaging> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <!-- use a dedicated Scala version to not depend on it --> + <artifactId>flink-table_2.11</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <!-- jackson --> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-csv</artifactId> + <version>2.7.9</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <!-- use a dedicated Scala version to not depend on it --> + <artifactId>flink-table_2.11</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!-- flink-table needs Scala --> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java new file mode 100644 index 0000000..7e328ee --- /dev/null +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * Deserialization schema from CSV to Flink types. + * + * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and + * convert it to {@link Row}. + * + * <p>Failure during deserialization are forwarded as wrapped IOExceptions. + */ +@Public +public final class CsvRowDeserializationSchema implements DeserializationSchema<Row> { + + /** Schema describing the input csv data. */ + private CsvSchema csvSchema; + + /** Type information describing the input csv data. */ + private TypeInformation<Row> rowTypeInfo; + + /** ObjectReader used to read message, it will be changed when csvSchema is changed. */ + private ObjectReader objectReader; + + /** Charset for byte[]. */ + private String charset = "UTF-8"; + + /** + * Create a csv row DeserializationSchema with given {@link TypeInformation}. + */ + public CsvRowDeserializationSchema(TypeInformation<Row> rowTypeInfo) { + Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not be null !"); + CsvMapper csvMapper = new CsvMapper(); + this.rowTypeInfo = rowTypeInfo; + this.csvSchema = CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo); + this.objectReader = csvMapper.readerFor(JsonNode.class).with(csvSchema); + this.setNullValue("null"); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + JsonNode root = objectReader.readValue(message); + return convertRow(root, (RowTypeInfo) rowTypeInfo); + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + @Override + public TypeInformation<Row> getProducedType() { + return rowTypeInfo; + } + + private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) { + String[] fields = rowTypeInfo.getFieldNames(); + TypeInformation<?>[] types = rowTypeInfo.getFieldTypes(); + Row row = new Row(fields.length); + + for (int i = 0; i < fields.length; i++) { + String columnName = fields[i]; + JsonNode node = root.get(columnName); + row.setField(i, convert(node, types[i])); + } + return row; + } + + private Row convertRow(ArrayNode node, RowTypeInfo rowTypeInfo) { + TypeInformation[] types = rowTypeInfo.getFieldTypes(); + String[] fields = rowTypeInfo.getFieldNames(); + Row row = new Row(fields.length); + for (int i = 0; i < fields.length; i++) { + row.setField(i, convert(node.get(i), types[i])); + } + return row; + } + + /** + * Converts json node to object with given type information. + * @param node json node to be converted. + * @param info type information for the json data. + * @return converted object + */ + private Object convert(JsonNode node, TypeInformation<?> info) { + if (node instanceof NullNode) { + return null; + } + if (info == Types.STRING) { + return node.asText(); + } else if (info == Types.LONG) { + return node.asLong(); + } else if (info == Types.INT) { + return node.asInt(); + } else if (info == Types.DOUBLE) { + return node.asDouble(); + } else if (info == Types.FLOAT) { + return Double.valueOf(node.asDouble()).floatValue(); + } else if (info == Types.BIG_DEC) { + return BigDecimal.valueOf(node.asDouble()); + } else if (info == Types.BIG_INT) { + return BigInteger.valueOf(node.asLong()); + } else if (info == Types.SQL_DATE) { + return Date.valueOf(node.asText()); + } else if (info == Types.SQL_TIME) { + return Time.valueOf(node.asText()); + } else if (info == Types.SQL_TIMESTAMP) { + return Timestamp.valueOf(node.asText()); + } else if (info == Types.BOOLEAN) { + return node.asBoolean(); + } else if (info instanceof RowTypeInfo) { + return convertRow((ArrayNode) node, (RowTypeInfo) info); + } else if (info instanceof BasicArrayTypeInfo) { + return convertArray((ArrayNode) node, ((BasicArrayTypeInfo) info).getComponentInfo()); + } else if (info instanceof PrimitiveArrayTypeInfo && + ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + return convertByteArray((TextNode) node); + } else { + throw new RuntimeException("Unable to support type " + info.toString() + " yet"); + } + } + + private Object[] convertArray(ArrayNode node, TypeInformation<?> elementType) { + final Object[] array = (Object[]) Array.newInstance(elementType.getTypeClass(), node.size()); + for (int i = 0; i < node.size(); i++) { + array[i] = convert(node.get(i), elementType); + } + return array; + } + + private byte[] convertByteArray(TextNode node) { + try { + return node.asText().getBytes(charset); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Unsupport encoding charset" + charset, e); + } + } + + public void setCharset(String charset) { + this.charset = charset; + } + + public void setFieldDelimiter(String s) { + if (s.length() != 1) { + throw new RuntimeException("FieldDelimiter's length must be one !"); + } + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(s.charAt(0)).build(); + } + + public void setArrayElementDelimiter(String s) { + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(s).build(); + this.objectReader = objectReader.with(csvSchema); + } + + public void setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + this.objectReader = objectReader.with(csvSchema); + } + + public void setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + this.objectReader = objectReader.with(csvSchema); + } + + public void setNullValue(String s) { + this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build(); + this.objectReader = objectReader.with(csvSchema); + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != this.getClass()) { + return false; + } + if (this == o) { + return true; + } + final CsvRowDeserializationSchema that = (CsvRowDeserializationSchema) o; + return rowTypeInfo.equals(that.rowTypeInfo) && + csvSchema.toString().equals(that.csvSchema.toString()); + } +} diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java new file mode 100644 index 0000000..e9c65bb --- /dev/null +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.descriptors.CsvValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FormatDescriptorValidator; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.DeserializationSchemaFactory; +import org.apache.flink.table.factories.SerializationSchemaFactory; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Table format for providing configured instances of CSV-to-row {@link SerializationSchema} + * and {@link DeserializationSchema}. + */ +public final class CsvRowFormatFactory implements SerializationSchemaFactory<Row>, + DeserializationSchemaFactory<Row> { + + @Override + public Map<String, String> requiredContext() { + final Map<String, String> context = new HashMap<>(); + context.put(FormatDescriptorValidator.FORMAT_TYPE(), CsvValidator.FORMAT_TYPE_VALUE()); + context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1"); + return context; + } + + @Override + public boolean supportsSchemaDerivation() { + return true; + } + + @Override + public List<String> supportedProperties() { + final List<String> properties = new ArrayList<>(); + properties.add(CsvValidator.FORMAT_FIELDS() + ".#." + DescriptorProperties.TYPE()); + properties.add(CsvValidator.FORMAT_FIELDS() + ".#." + DescriptorProperties.NAME()); + properties.add(CsvValidator.FORMAT_FIELD_DELIMITER()); + properties.add(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER()); + properties.add(CsvValidator.FORMAT_QUOTE_CHARACTER()); + properties.add(CsvValidator.FORMAT_ESCAPE_CHARACTER()); + properties.add(CsvValidator.FORMAT_BYTES_CHARSET()); + properties.add(CsvValidator.FORMAT_NULL_VALUE()); + properties.add(SchemaValidator.SCHEMA() + ".#." + SchemaValidator.SCHEMA_TYPE()); + properties.add(SchemaValidator.SCHEMA() + ".#." + SchemaValidator.SCHEMA_NAME()); + return properties; + } + + @Override + public DeserializationSchema<Row> createDeserializationSchema(Map<String, String> properties) { + final DescriptorProperties descriptorProperties = validateAndGetProperties(properties); + + final CsvRowDeserializationSchema schema = new CsvRowDeserializationSchema( + createTypeInformation(descriptorProperties)); + + // update csv schema with properties + descriptorProperties.getOptionalString(CsvValidator.FORMAT_FIELD_DELIMITER()) + .ifPresent(schema::setFieldDelimiter); + descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER()) + .ifPresent(schema::setArrayElementDelimiter); + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER()) + .ifPresent(schema::setQuoteCharacter); + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER()) + .ifPresent(schema::setEscapeCharacter); + descriptorProperties.getOptionalString(CsvValidator.FORMAT_BYTES_CHARSET()) + .ifPresent(schema::setCharset); + descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_VALUE()) + .ifPresent(schema::setCharset); + + return new CsvRowDeserializationSchema(createTypeInformation(descriptorProperties)); + } + + @Override + public SerializationSchema<Row> createSerializationSchema(Map<String, String> properties) { + final DescriptorProperties descriptorProperties = validateAndGetProperties(properties); + + final CsvRowSerializationSchema schema = new CsvRowSerializationSchema( + createTypeInformation(descriptorProperties)); + + // update csv schema with properties + descriptorProperties.getOptionalString(CsvValidator.FORMAT_FIELD_DELIMITER()) + .ifPresent(schema::setFieldDelimiter); + descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER()) + .ifPresent(schema::setArrayElementDelimiter); + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER()) + .ifPresent(schema::setQuoteCharacter); + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER()) + .ifPresent(schema::setEscapeCharacter); + descriptorProperties.getOptionalString(CsvValidator.FORMAT_BYTES_CHARSET()) + .ifPresent(schema::setCharset); + descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_VALUE()) + .ifPresent(schema::setCharset); + + return new CsvRowSerializationSchema(createTypeInformation(descriptorProperties)); + } + + private static DescriptorProperties validateAndGetProperties(Map<String, String> propertiesMap) { + final DescriptorProperties descriptorProperties = new DescriptorProperties(true); + descriptorProperties.putProperties(propertiesMap); + + // validate + new CsvValidator().validate(descriptorProperties); + + return descriptorProperties; + } + + /** + * Create a {@link TypeInformation} based on the "format-fields" in {@link CsvValidator}. + * @param descriptorProperties descriptor properties + * @return {@link TypeInformation} + */ + private static TypeInformation<Row> createTypeInformation(DescriptorProperties descriptorProperties) { + if (descriptorProperties.getOptionalTableSchema(CsvValidator.FORMAT_FIELDS()).isPresent()) { + return descriptorProperties.getTableSchema(CsvValidator.FORMAT_FIELDS()).toRowType(); + } else { + return SchemaValidator.deriveFormatFields(descriptorProperties).toRowType(); + } + } +} diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java new file mode 100644 index 0000000..1476159 --- /dev/null +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder; +import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +/** + * Converting functions that related to {@link CsvSchema}. + * In {@link CsvSchema}, there are four types(string,number,boolean + * and array), in order to satisfy various flink types, this class + * sorts out instances of {@link TypeInformation} and convert them to + * one of CsvSchema's types. + */ +public final class CsvRowSchemaConverter { + + /** + * Types that can be converted to ColumnType.NUMBER. + */ + private static final HashSet<TypeInformation<?>> NUMBER_TYPES = + new HashSet<>(Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT, + Types.BIG_DEC, Types.BIG_INT)); + + /** + * Types that can be converted to ColumnType.STRING. + */ + private static final HashSet<TypeInformation<?>> STRING_TYPES = + new HashSet<>(Arrays.asList(Types.STRING, Types.SQL_DATE, + Types.SQL_TIME, Types.SQL_TIMESTAMP)); + + /** + * Types that can be converted to ColumnType.BOOLEAN. + */ + private static final HashSet<TypeInformation<?>> BOOLEAN_TYPES = + new HashSet<>(Collections.singletonList(Types.BOOLEAN)); + + /** + * Convert {@link RowTypeInfo} to {@link CsvSchema}. + */ + public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) { + Builder builder = new CsvSchema.Builder(); + String[] fields = rowType.getFieldNames(); + TypeInformation<?>[] infos = rowType.getFieldTypes(); + for (int i = 0; i < rowType.getArity(); i++) { + builder.addColumn(new Column(i, fields[i], convertType(infos[i]))); + } + return builder.build(); + } + + /** + * Convert {@link TypeInformation} to {@link CsvSchema.ColumnType} + * based on their catogories. + */ + private static CsvSchema.ColumnType convertType(TypeInformation<?> info) { + if (STRING_TYPES.contains(info)) { + return CsvSchema.ColumnType.STRING; + } else if (NUMBER_TYPES.contains(info)) { + return CsvSchema.ColumnType.NUMBER; + } else if (BOOLEAN_TYPES.contains(info)) { + return CsvSchema.ColumnType.BOOLEAN; + } else if (info instanceof ObjectArrayTypeInfo + || info instanceof BasicArrayTypeInfo + || info instanceof RowTypeInfo) { + return CsvSchema.ColumnType.ARRAY; + } else if (info instanceof PrimitiveArrayTypeInfo && + ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + return CsvSchema.ColumnType.STRING; + } else { + throw new RuntimeException("Unable to support " + info.toString() + + " yet"); + } + } +} diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java new file mode 100644 index 0000000..f8ced68 --- /dev/null +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ContainerNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * Serialization schema that serializes an object of Flink types into a CSV bytes. + * + * <p>Serializes the input row into a {@link ObjectNode} and + * converts it into <code>byte[]</code>. + * + * <p>Result <code>byte[]</code> messages can be deserialized using {@link CsvRowDeserializationSchema}. + */ +@Public +public final class CsvRowSerializationSchema implements SerializationSchema<Row> { + + /** Schema describing the input csv data. */ + private CsvSchema csvSchema; + + /** Type information describing the input csv data. */ + private TypeInformation<Row> rowTypeInfo; + + /** CsvMapper used to write {@link JsonNode} into bytes. */ + private CsvMapper csvMapper = new CsvMapper(); + + /** Reusable object node. */ + private ObjectNode root; + + /** Charset for byte[]. */ + private String charset = "UTF-8"; + + /** + * Create a {@link CsvRowSerializationSchema} with given {@link TypeInformation}. + * @param rowTypeInfo type information used to create schem. + */ + CsvRowSerializationSchema(TypeInformation<Row> rowTypeInfo) { + Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not be null !"); + this.rowTypeInfo = rowTypeInfo; + this.csvSchema = CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo); + this.setNullValue("null"); + } + + @Override + public byte[] serialize(Row row) { + if (root == null) { + root = csvMapper.createObjectNode(); + } + try { + convertRow(root, row, (RowTypeInfo) rowTypeInfo); + return csvMapper.writer(csvSchema).writeValueAsBytes(root); + } catch (JsonProcessingException e) { + throw new RuntimeException("Could not serialize row '" + row + "'. " + + "Make sure that the schema matches the input.", e); + } + } + + private void convertRow(ObjectNode reuse, Row row, RowTypeInfo rowTypeInfo) { + if (reuse == null) { + reuse = csvMapper.createObjectNode(); + } + if (row.getArity() != rowTypeInfo.getFieldNames().length) { + throw new IllegalStateException(String.format( + "Number of elements in the row '%s' is different from number of field names: %d", + row, rowTypeInfo.getFieldNames().length)); + } + TypeInformation[] types = rowTypeInfo.getFieldTypes(); + String[] fields = rowTypeInfo.getFieldNames(); + for (int i = 0; i < types.length; i++) { + String columnName = fields[i]; + Object obj = row.getField(i); + reuse.set(columnName, convert(reuse, obj, types[i], false)); + } + } + + /** + * Converts an object to a JsonNode. + * @param container {@link ContainerNode} that creates {@link JsonNode}. + * @param obj Object that used to {@link JsonNode}. + * @param info Type infomation that decides the type of {@link JsonNode}. + * @param nested variable that indicates whether the obj is in a nested structure + * like a string in an array. + * @return result after converting. + */ + private JsonNode convert(ContainerNode<?> container, Object obj, TypeInformation info, Boolean nested) { + if (obj == null) { + return container.nullNode(); + } + if (info == Types.STRING) { + return container.textNode((String) obj); + } else if (info == Types.LONG) { + return container.numberNode((Long) obj); + } else if (info == Types.INT) { + return container.numberNode((Integer) obj); + } else if (info == Types.DOUBLE) { + return container.numberNode((Double) obj); + } else if (info == Types.FLOAT) { + return container.numberNode((Float) obj); + } else if (info == Types.BIG_DEC) { + return container.numberNode(new BigDecimal(String.valueOf(obj))); + } else if (info == Types.BIG_INT) { + return container.numberNode(BigInteger.valueOf(Long.valueOf(String.valueOf(obj)))); + } else if (info == Types.SQL_DATE) { + return container.textNode(Date.valueOf(String.valueOf(obj)).toString()); + } else if (info == Types.SQL_TIME) { + return container.textNode(Time.valueOf(String.valueOf(obj)).toString()); + } else if (info == Types.SQL_TIMESTAMP) { + return container.textNode(Timestamp.valueOf(String.valueOf(obj)).toString()); + } else if (info == Types.BOOLEAN) { + return container.booleanNode((Boolean) obj); + } else if (info instanceof RowTypeInfo){ + if (nested) { + throw new RuntimeException("Unable to support nested row type " + info.toString() + " yet"); + } + return convertArray((Row) obj, (RowTypeInfo) info); + } else if (info instanceof BasicArrayTypeInfo) { + if (nested) { + throw new RuntimeException("Unable to support nested array type " + info.toString() + " yet"); + } + return convertArray((Object[]) obj, + ((BasicArrayTypeInfo) info).getComponentInfo()); + } else if (info instanceof PrimitiveArrayTypeInfo && + ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + /* We converts byte[] to TextNode instead of BinaryNode here, + because the instance of BinaryNode will be serialized to base64 string in + {@link com.fasterxml.jackson.databind.node.BinaryNode#serialize(JsonGenerator, SerializerProvider)}, + which is unacceptable for users. + */ + try { + return container.textNode(new String((byte[]) obj, charset)); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Unsupport encoding charset " + charset, e); + } + } else { + throw new RuntimeException("Unable to support type " + info.toString() + " yet"); + } + } + + /** + * Use {@link ArrayNode} to represents a row. + */ + private ArrayNode convertArray(Row row, RowTypeInfo rowTypeInfo) { + ArrayNode arrayNode = csvMapper.createArrayNode(); + TypeInformation[] types = rowTypeInfo.getFieldTypes(); + String[] fields = rowTypeInfo.getFieldNames(); + for (int i = 0; i < fields.length; i++) { + arrayNode.add(convert(arrayNode, row.getField(i), types[i], true)); + } + return arrayNode; + } + + /** + * Use {@link ArrayNode} to represents an array. + */ + private ArrayNode convertArray(Object[] obj, TypeInformation elementInfo) { + ArrayNode arrayNode = csvMapper.createArrayNode(); + for (Object elementObj : obj) { + arrayNode.add(convert(arrayNode, elementObj, elementInfo, true)); + } + return arrayNode; + } + + public void setCharset(String charset) { + this.charset = charset; + } + + public void setFieldDelimiter(String s) { + if (s.length() != 1) { + throw new RuntimeException("FieldDelimiter's length must be one !"); + } + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(s.charAt(0)).build(); + } + + public void setArrayElementDelimiter(String s) { + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(s).build(); + } + + public void setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + } + + public void setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + } + + public void setNullValue(String s) { + this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build(); + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != this.getClass()) { + return false; + } + if (this == o) { + return true; + } + final CsvRowSerializationSchema that = (CsvRowSerializationSchema) o; + + return rowTypeInfo.equals(that.rowTypeInfo) && + csvSchema.toString().equals(that.csvSchema.toString()); + } +} diff --git a/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory new file mode 100644 index 0000000..61cd834 --- /dev/null +++ b/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.formats.csv.CsvRowFormatFactory diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java new file mode 100644 index 0000000..7e63a4a --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Testing for {@link CsvRowDeserializationSchema}. + */ +public class CsvRowDeserializationSchemaTest extends TestLogger { + + @Test + public void testDeserialize() throws IOException { + final long currentMills = System.currentTimeMillis(); + final TypeInformation<Row> rowTypeInfo = Types.ROW( + new String[]{"a", "b", "c", "d", "e", "f", "g"}, + new TypeInformation[]{ + Types.STRING(), Types.LONG(), Types.DECIMAL(), + Types.ROW( + new String[]{"c1", "c2", "c3"}, + new TypeInformation[]{ + Types.INT(), + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, + Types.STRING() + } + ), Types.SQL_TIMESTAMP(), Types.BOOLEAN(), + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO + } + ); + + String c1 = "123"; + String c2 = String.valueOf(34L); + String c3 = String.valueOf(1233.2); + String c4 = "1" + ";" + new String("abc".getBytes()) + ";" + "cba"; + String c5 = new Timestamp(currentMills).toString(); + String c6 = "true"; + String c7 = new String("12345".getBytes()); + byte[] bytes = (c1 + "," + c2 + "," + c3 + "," + c4 + "," + + c5 + "," + c6 + "," + c7).getBytes(); + CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo); + Row deserializedRow = deserializationSchema.deserialize(bytes); + + assertEquals(7, deserializedRow.getArity()); + assertEquals("123", deserializedRow.getField(0)); + assertEquals(34L, deserializedRow.getField(1)); + assertEquals(BigDecimal.valueOf(1233.2), deserializedRow.getField(2)); + assertArrayEquals("abc".getBytes(), (byte[]) ((Row) deserializedRow.getField(3)).getField(1)); + assertEquals(new Timestamp(currentMills), deserializedRow.getField(4)); + assertEquals(true, deserializedRow.getField(5)); + assertArrayEquals("12345".getBytes(), (byte[]) deserializedRow.getField(6)); + } + + @Test + public void testCustomizedProperties() throws IOException { + final TypeInformation<Row> rowTypeInfo = Types.ROW( + new String[]{"a", "b", "c"}, + new TypeInformation[]{Types.STRING(), Types.STRING(), + Types.ROW( + new String[]{"c1", "c2"}, + new TypeInformation[]{Types.INT(), Types.STRING()} + )} + ); + + String c1 = "123*\"4"; + String c2 = "'a,bc'"; + String c3 = "1:zxv"; + + byte[] bytes = (c1 + "," + c2 + "," + c3).getBytes(); + CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo); + deserializationSchema.setEscapeCharacter('*'); + deserializationSchema.setQuoteCharacter('\''); + deserializationSchema.setArrayElementDelimiter(":"); + Row deserializedRow = deserializationSchema.deserialize(bytes); + + assertEquals("123\"4", deserializedRow.getField(0)); + assertEquals("a,bc", deserializedRow.getField(1)); + assertEquals("zxv", ((Row) deserializedRow.getField(2)).getField(1)); + } + + @Test + public void testCharset() throws IOException { + final TypeInformation<Row> rowTypeInfo = Types.ROW( + new String[]{"a"}, + new TypeInformation[]{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO} + ); + final CsvRowDeserializationSchema schema = new CsvRowDeserializationSchema(rowTypeInfo); + schema.setCharset("UTF-16"); + + byte[] bytes = "abc".getBytes(StandardCharsets.UTF_16); + Row result = schema.deserialize(bytes); + + assertEquals("abc", new String((byte[]) result.getField(0), StandardCharsets.UTF_16)); + } + + @Test + public void testNull() throws IOException { + final TypeInformation<Row> rowTypeInfo = Types.ROW( + new String[]{"a"}, + new TypeInformation[]{Types.STRING()} + ); + + final byte[] bytes = "123".getBytes(); + + final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo); + deserializationSchema.setNullValue("123"); + final Row row = deserializationSchema.deserialize(bytes); + assertNull(row.getField(0)); + } + + @Test(expected = IllegalArgumentException.class) + public void testNumberOfFieldNamesAndTypesMismatch() { + TypeInformation<Row> rowTypeInfo = Types.ROW( + new String[]{"a", "b"}, + new TypeInformation[]{Types.STRING()} + ); + new CsvRowDeserializationSchema(rowTypeInfo); + } + +} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java new file mode 100644 index 0000000..9d8263f --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.NoMatchingTableFactoryException; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.descriptors.Csv; +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.factories.DeserializationSchemaFactory; +import org.apache.flink.table.factories.SerializationSchemaFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Testing for {@link CsvRowFormatFactory}. + */ +public class CsvRowFormatFactoryTest extends TestLogger { + + + private static final TypeInformation<Row> SCHEMA = Types.ROW( + new String[]{"a", "b", "c"}, + new TypeInformation[]{Types.STRING(), Types.INT(), Types.ROW( + new String[]{"a", "b", "c"}, + new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()} + )} + ); + + @Test + public void testSchema() { + final Map<String, String> properties = toMap( + new Csv() + .field("a", Types.STRING()) + .field("b", Types.INT()) + .field("c", Types.ROW( + new String[]{"a", "b", "c"}, + new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()} + )) + .arrayElementDelim("^^") + .escapeCharacter('c') + ); + testSchemaSerializationSchema(properties); + testSchemaDeserializationSchema(properties); + } + + @Test + public void testDerived() { + final Map<String, String> properties = toMap( + new Schema() + .field("a", Types.STRING()) + .field("b", Types.INT()) + .field("c", Types.ROW( + new String[]{"a", "b", "c"}, + new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()} + )), new Csv().derived(true) + ); + testSchemaSerializationSchema(properties); + testSchemaDeserializationSchema(properties); + } + + @Test(expected = NoMatchingTableFactoryException.class) + public void testUnsupportedProperties() { + final Map<String, String> properties = toMap( + new Csv() + .field("a", Types.STRING()) + .lineDelimiter("%") + ); + testSchemaSerializationSchema(properties); + } + + private void testSchemaDeserializationSchema(Map<String, String> properties) { + final DeserializationSchema<?> actual2 = TableFactoryService + .find(DeserializationSchemaFactory.class, properties) + .createDeserializationSchema(properties); + final CsvRowDeserializationSchema expected2 = new CsvRowDeserializationSchema(SCHEMA); + assertEquals(expected2, actual2); + } + + private void testSchemaSerializationSchema(Map<String, String> properties) { + final SerializationSchema<?> actual1 = TableFactoryService + .find(SerializationSchemaFactory.class, properties) + .createSerializationSchema(properties); + final SerializationSchema expected1 = new CsvRowSerializationSchema(SCHEMA); + assertEquals(expected1, actual1); + } + + private static Map<String, String> toMap(Descriptor... desc) { + final DescriptorProperties descriptorProperties = new DescriptorProperties(true); + for (Descriptor d : desc) { + d.addProperties(descriptorProperties); + } + return descriptorProperties.asMap(); + } +} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java new file mode 100644 index 0000000..e9635fb --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import com.fasterxml.jackson.dataformat.csv.CsvSchema.ColumnType; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * testing for {@link CsvRowSchemaConverter}. + */ +public class CsvRowSchemaConverterTest extends TestLogger { + + @Test + public void testRowToCsvSchema() { + RowTypeInfo rowTypeInfo = new RowTypeInfo( + new TypeInformation<?>[] { + Types.STRING, + Types.LONG, + Types.ROW(Types.STRING), + Types.BIG_DEC, + Types.BOOLEAN, + BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO + }, + new String[]{"a", "b", "c", "d", "e", "f", "g"} + ); + CsvSchema expect = CsvSchema.builder() + .addColumn("a", ColumnType.STRING) + .addColumn("b", ColumnType.NUMBER) + .addColumn("c", ColumnType.ARRAY) + .addColumn("d", ColumnType.NUMBER) + .addColumn("e", ColumnType.BOOLEAN) + .addColumn("f", ColumnType.ARRAY) + .addColumn("g", ColumnType.STRING) + .build(); + CsvSchema actual = CsvRowSchemaConverter.rowTypeToCsvSchema(rowTypeInfo); + assertEquals(expect.toString(), actual.toString()); + } + + @Test(expected = RuntimeException.class) + public void testUnsupportedType() { + CsvRowSchemaConverter.rowTypeToCsvSchema(new RowTypeInfo( + new TypeInformation[]{Types.STRING, + PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO}, + new String[]{"a", "b"} + )); + } + +} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java new file mode 100644 index 0000000..72b5fa9 --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Testing for {@link CsvRowSerializationSchema}. + */ +public class CsvRowSerializationSchemaTest extends TestLogger { + + @Test + public void testSerializeAndDeserialize() throws IOException { + final String[] fields = new String[]{"a", "b", "c", "d", "e", "f", "g", "h"}; + final TypeInformation[] types = new TypeInformation[]{ + Types.BOOLEAN(), Types.STRING(), Types.INT(), Types.DECIMAL(), + Types.SQL_TIMESTAMP(), PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, + Types.ROW( + new String[]{"g1", "g2"}, + new TypeInformation[]{Types.STRING(), Types.LONG()}), + Types.STRING() + }; + final TypeInformation<Row> rowSchema = Types.ROW(fields, types); + final Row row = new Row(8); + final Row nestedRow = new Row(2); + nestedRow.setField(0, "z\"xcv"); + nestedRow.setField(1, 123L); + row.setField(0, true); + row.setField(1, "abcd"); + row.setField(2, 1); + row.setField(3, BigDecimal.valueOf(1.2334)); + row.setField(4, new Timestamp(System.currentTimeMillis())); + row.setField(5, "qwecxcr".getBytes()); + row.setField(6, nestedRow); + row.setField(7, null); + + final Row resultRow = serializeAndDeserialize(rowSchema, row); + assertEquals(row, resultRow); + } + + @Test + public void testSerialize() { + long currentMillis = System.currentTimeMillis(); + Row row = new Row(4); + Row nestedRow = new Row(2); + row.setField(0, "abc"); + row.setField(1, 34); + row.setField(2, new Timestamp(currentMillis)); + nestedRow.setField(0, "bc"); + nestedRow.setField(1, "qwertyu".getBytes()); + row.setField(3, nestedRow); + + final TypeInformation<Row> typeInfo = Types.ROW( + new String[]{"a", "b", "c", "d"}, + new TypeInformation[]{Types.STRING(), Types.INT(), Types.SQL_TIMESTAMP(), + Types.ROW( + new String[]{"d1", "d2"}, + new TypeInformation[]{Types.STRING(), PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO} + )} + ); + + final CsvRowSerializationSchema schema = new CsvRowSerializationSchema(typeInfo); + byte[] result = schema.serialize(row); + String c1 = "abc"; + String c2 = String.valueOf(34); + String c3 = "\"" + new Timestamp(currentMillis).toString() + "\""; + String c4 = "bc;" + new String("qwertyu".getBytes()); + byte[] expect = (c1 + "," + c2 + "," + c3 + "," + c4 + "\n").getBytes(); + assertArrayEquals(expect, result); + } + + @Test + public void testCustomizedProperties() throws IOException { + final TypeInformation<Row> rowTypeInfo = Types.ROW( + new String[]{"a", "b", "c"}, + new TypeInformation[]{Types.STRING(), Types.STRING(), + Types.ROW( + new String[]{"c1", "c2"}, + new TypeInformation[]{Types.INT(), Types.STRING()} + )} + ); + + final Row row = new Row(3); + final Row nestedRow = new Row(2); + nestedRow.setField(0, 1); + nestedRow.setField(1, "zxv"); + row.setField(0, "12*3'4"); + row.setField(1, "a,bc"); + row.setField(2, nestedRow); + + final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo); + serializationSchema.setEscapeCharacter('*'); + serializationSchema.setQuoteCharacter('\''); + serializationSchema.setArrayElementDelimiter(":"); + byte[] result = serializationSchema.serialize(row); + + final String c1 = "'12**3''4'"; + final String c2 = "'a,bc'"; + final String c3 = "1:zxv"; + byte[] expect = (c1 + "," + c2 + "," + c3 + "\n").getBytes(); + assertArrayEquals(expect, result); + } + + @Test + public void testCharset() throws UnsupportedEncodingException { + final TypeInformation<Row> rowTypeInfo = Types.ROW( + new String[]{"a"}, + new TypeInformation[]{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO} + ); + final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo); + serializationSchema.setCharset("UTF-16"); + + final Row row = new Row(1); + row.setField(0, "123".getBytes(StandardCharsets.UTF_16)); + byte[] result = serializationSchema.serialize(row); + byte[] expect = "123\n".getBytes(); + + assertArrayEquals(expect, result); + } + + @Test + public void testSerializationOfTwoRows() throws IOException { + final TypeInformation<Row> rowSchema = Types.ROW( + new String[] {"f1", "f2", "f3"}, + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}); + + final Row row1 = new Row(3); + row1.setField(0, 1); + row1.setField(1, true); + row1.setField(2, "str"); + + final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema); + final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowSchema); + + byte[] bytes = serializationSchema.serialize(row1); + assertEquals(row1, deserializationSchema.deserialize(bytes)); + + final Row row2 = new Row(3); + row2.setField(0, 10); + row2.setField(1, false); + row2.setField(2, "newStr"); + + bytes = serializationSchema.serialize(row2); + assertEquals(row2, deserializationSchema.deserialize(bytes)); + } + + @Test + public void testNull() { + final TypeInformation<Row> rowTypeInfo = Types.ROW( + new String[]{"a"}, + new TypeInformation[]{Types.STRING()} + ); + final Row row = new Row(1); + row.setField(0, null); + + final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo); + serializationSchema.setNullValue("123"); + byte[] bytes = serializationSchema.serialize(row); + assertArrayEquals("123\n".getBytes(), bytes); + } + + @Test(expected = RuntimeException.class) + public void testSerializeRowWithInvalidNumberOfFields() { + final TypeInformation<Row> rowSchema = Types.ROW( + new String[] {"f1", "f2", "f3"}, + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}); + final Row row = new Row(1); + row.setField(0, 1); + + final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema); + serializationSchema.serialize(row); + } + + @Test(expected = RuntimeException.class) + public void testSerializeNestedRowInNestedRow() { + final TypeInformation<Row> rowSchema = Types.ROW( + new String[]{"a"}, + new TypeInformation[]{Types.ROW( + new String[]{"a1"}, + new TypeInformation[]{Types.ROW( + new String[]{"a11"}, + new TypeInformation[]{Types.STRING()} + )} + )} + ); + final Row row = new Row(1); + final Row nestedRow = new Row(1); + final Row doubleNestedRow = new Row(1); + doubleNestedRow.setField(0, "123"); + nestedRow.setField(0, doubleNestedRow); + row.setField(0, nestedRow); + final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema); + serializationSchema.serialize(row); + } + + private Row serializeAndDeserialize(TypeInformation<Row> rowSchema, Row row) throws IOException { + final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema); + final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowSchema); + + final byte[] bytes = serializationSchema.serialize(row); + return deserializationSchema.deserialize(bytes); + } +} diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml index 51c3466..a236770 100644 --- a/flink-formats/pom.xml +++ b/flink-formats/pom.xml @@ -41,6 +41,7 @@ under the License. <module>flink-avro-confluent-registry</module> <module>flink-parquet</module> <module>flink-sequence-file</module> + <module>flink-csv</module> </modules> <!-- override these root dependencies as 'provided', so they don't end up diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala index 6e63931..ebd6e64 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala @@ -41,6 +41,10 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { private var commentPrefix: Option[String] = None private var isIgnoreFirstLine: Option[Boolean] = None private var lenient: Option[Boolean] = None + private var arrayElementDelim: Option[String] = None + private var escapeCharacter: Option[Character] = None + private var bytesCharset: Option[String] = None + private var derived: Option[Boolean] = None /** * Sets the field delimiter, "," by default. @@ -143,9 +147,40 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { this } + /** + * Set delimiter of array elements, ';' by default. + */ + def arrayElementDelim(delim: String): Csv = { + this.arrayElementDelim = Some(delim) + this + } + + /** + * Set escape character, none by default. + */ + def escapeCharacter(escape: Character): Csv = { + this.escapeCharacter = Some(escape) + this + } + + /** + * Set charset of byte[], 'UTF-8' by defaut. + */ + def bytesCharset(charset: String): Csv = { + this.bytesCharset = Some(charset) + this + } + + /** + * Set true if format schema derives from table schema. + */ + def derived(derived: Boolean): Csv = { + this.derived = Some(derived) + this + } + override protected def toFormatProperties: util.Map[String, String] = { val properties = new DescriptorProperties() - fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _)) lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala index 9eeedb4..97db7fa 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.descriptors +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.descriptors.CsvValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE @@ -33,9 +34,26 @@ class CsvValidator extends FormatDescriptorValidator { properties.validateString(FORMAT_LINE_DELIMITER, true, 1) properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1) properties.validateString(FORMAT_COMMENT_PREFIX, true, 1) + properties.validateString(FORMAT_ARRAY_ELEMENT_DELIMITER, true, 1) + properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1) + properties.validateString(FORMAT_BYTES_CHARSET, true, 1) + properties.validateString(FORMAT_NULL_VALUE, true, 1) properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, true) properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true) - properties.validateTableSchema(FORMAT_FIELDS, false) + properties.validateBoolean(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA, true) + + val tableSchema = properties.getOptionalTableSchema(FORMAT_FIELDS) + val derived = properties.getOptionalBoolean( + FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA).orElse(false) + if (derived && tableSchema.isPresent) { + throw new ValidationException( + "Format cannot define a schema and derive from the table's schema at the same time.") + } else if (tableSchema.isPresent) { + properties.validateTableSchema(FORMAT_FIELDS, false) + } else if (!tableSchema.isPresent && derived) { + throw new ValidationException( + "A definition of a schema or derive from the table's schema is required.") + } } } @@ -49,4 +67,8 @@ object CsvValidator { val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line" val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors" val FORMAT_FIELDS = "format.fields" + val FORMAT_ARRAY_ELEMENT_DELIMITER = "format.array-element-delimiter" + val FORMAT_ESCAPE_CHARACTER = "format.escape-character" + val FORMAT_BYTES_CHARSET = "format.bytes-charset" + val FORMAT_NULL_VALUE = "format.null-value" } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala index 8d01b8b..1c01e71 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala @@ -47,6 +47,16 @@ class CsvTest extends DescriptorTestBase { addPropertyAndVerify(descriptors().get(0), "format.quote-character", "qq") } + @Test(expected = classOf[ValidationException]) + def testTwoSchemas(): Unit = { + addPropertyAndVerify(descriptors().get(0), "format.derive-schema", "true") + } + + @Test + def testOneSchema(): Unit = { + addPropertyAndVerify(descriptors().get(0), "format.derive-schema", "false") + } + // ---------------------------------------------------------------------------------------------- override def descriptors(): util.List[Descriptor] = {