This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c3277c557aa423e7f5dd83d18b74082bf6c2bc24
Author: wind <bupt_...@163.com>
AuthorDate: Sun Aug 12 12:42:11 2018 +0800

    [FLINK-9964][table] Add an initial CSV table format factory
    
    This closes #6541.
---
 flink-formats/flink-csv/pom.xml                    |  88 ++++++++
 .../formats/csv/CsvRowDeserializationSchema.java   | 225 +++++++++++++++++++
 .../flink/formats/csv/CsvRowFormatFactory.java     | 143 ++++++++++++
 .../flink/formats/csv/CsvRowSchemaConverter.java   | 101 +++++++++
 .../formats/csv/CsvRowSerializationSchema.java     | 242 +++++++++++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |  16 ++
 .../csv/CsvRowDeserializationSchemaTest.java       | 150 +++++++++++++
 .../flink/formats/csv/CsvRowFormatFactoryTest.java | 121 +++++++++++
 .../formats/csv/CsvRowSchemaConverterTest.java     |  75 +++++++
 .../formats/csv/CsvRowSerializationSchemaTest.java | 234 ++++++++++++++++++++
 flink-formats/pom.xml                              |   1 +
 .../org/apache/flink/table/descriptors/Csv.scala   |  37 +++-
 .../flink/table/descriptors/CsvValidator.scala     |  24 +-
 .../apache/flink/table/descriptors/CsvTest.scala   |  10 +
 14 files changed, 1465 insertions(+), 2 deletions(-)

diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml
new file mode 100644
index 0000000..a2b4f25
--- /dev/null
+++ b/flink-formats/flink-csv/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-formats</artifactId>
+               <version>1.7-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-csv</artifactId>
+       <name>flink-csv</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <!-- use a dedicated Scala version to not depend on it 
-->
+                       <artifactId>flink-table_2.11</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <!-- Projects depending on this project, won't depend 
on flink-table. -->
+                       <optional>true</optional>
+               </dependency>
+
+               <!-- jackson -->
+               <dependency>
+                       <groupId>com.fasterxml.jackson.dataformat</groupId>
+                       <artifactId>jackson-dataformat-csv</artifactId>
+                       <version>2.7.9</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <!-- use a dedicated Scala version to not depend on it 
-->
+                       <artifactId>flink-table_2.11</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- flink-table needs Scala -->
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-compiler</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
new file mode 100644
index 0000000..7e328ee
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@Public
+public final class CsvRowDeserializationSchema implements 
DeserializationSchema<Row> {
+
+       /** Schema describing the input csv data. */
+       private CsvSchema csvSchema;
+
+       /** Type information describing the input csv data. */
+       private TypeInformation<Row> rowTypeInfo;
+
+       /** ObjectReader used to read message, it will be changed when 
csvSchema is changed. */
+       private ObjectReader objectReader;
+
+       /** Charset for byte[]. */
+       private String charset = "UTF-8";
+
+       /**
+        * Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+        */
+       public CsvRowDeserializationSchema(TypeInformation<Row> rowTypeInfo) {
+               Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+               CsvMapper csvMapper = new CsvMapper();
+               this.rowTypeInfo = rowTypeInfo;
+               this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+               this.objectReader = 
csvMapper.readerFor(JsonNode.class).with(csvSchema);
+               this.setNullValue("null");
+       }
+
+       @Override
+       public Row deserialize(byte[] message) throws IOException {
+               JsonNode root = objectReader.readValue(message);
+               return convertRow(root, (RowTypeInfo) rowTypeInfo);
+       }
+
+       @Override
+       public boolean isEndOfStream(Row nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<Row> getProducedType() {
+               return rowTypeInfo;
+       }
+
+       private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+               String[] fields = rowTypeInfo.getFieldNames();
+               TypeInformation<?>[] types = rowTypeInfo.getFieldTypes();
+               Row row = new Row(fields.length);
+
+               for (int i = 0; i < fields.length; i++) {
+                       String columnName = fields[i];
+                       JsonNode node = root.get(columnName);
+                       row.setField(i, convert(node, types[i]));
+               }
+               return row;
+       }
+
+       private Row convertRow(ArrayNode node, RowTypeInfo rowTypeInfo) {
+               TypeInformation[] types = rowTypeInfo.getFieldTypes();
+               String[] fields = rowTypeInfo.getFieldNames();
+               Row row = new Row(fields.length);
+               for (int i = 0; i < fields.length; i++) {
+                       row.setField(i, convert(node.get(i), types[i]));
+               }
+               return row;
+       }
+
+       /**
+        * Converts json node to object with given type information.
+        * @param node json node to be converted.
+        * @param info type information for the json data.
+        * @return converted object
+        */
+       private Object convert(JsonNode node, TypeInformation<?> info) {
+               if (node instanceof NullNode) {
+                       return null;
+               }
+               if (info == Types.STRING) {
+                       return node.asText();
+               } else if (info == Types.LONG) {
+                       return node.asLong();
+               } else if (info == Types.INT) {
+                       return node.asInt();
+               } else if (info == Types.DOUBLE) {
+                       return node.asDouble();
+               } else if (info == Types.FLOAT) {
+                       return Double.valueOf(node.asDouble()).floatValue();
+               } else if (info == Types.BIG_DEC) {
+                       return BigDecimal.valueOf(node.asDouble());
+               } else if (info == Types.BIG_INT) {
+                       return BigInteger.valueOf(node.asLong());
+               } else if (info == Types.SQL_DATE) {
+                       return Date.valueOf(node.asText());
+               } else if (info == Types.SQL_TIME) {
+                       return Time.valueOf(node.asText());
+               } else if (info == Types.SQL_TIMESTAMP) {
+                       return Timestamp.valueOf(node.asText());
+               } else if (info == Types.BOOLEAN) {
+                       return node.asBoolean();
+               } else if (info instanceof RowTypeInfo) {
+                       return convertRow((ArrayNode) node, (RowTypeInfo) info);
+               } else if (info instanceof BasicArrayTypeInfo) {
+                       return convertArray((ArrayNode) node, 
((BasicArrayTypeInfo) info).getComponentInfo());
+               } else if (info instanceof PrimitiveArrayTypeInfo &&
+                       ((PrimitiveArrayTypeInfo) info).getComponentType() == 
Types.BYTE) {
+                       return convertByteArray((TextNode) node);
+               } else {
+                       throw new RuntimeException("Unable to support type " + 
info.toString() + " yet");
+               }
+       }
+
+       private Object[] convertArray(ArrayNode node, TypeInformation<?> 
elementType) {
+               final Object[] array = (Object[]) 
Array.newInstance(elementType.getTypeClass(), node.size());
+               for (int i = 0; i < node.size(); i++) {
+                       array[i] = convert(node.get(i), elementType);
+               }
+               return array;
+       }
+
+       private byte[] convertByteArray(TextNode node) {
+               try {
+                       return node.asText().getBytes(charset);
+               } catch (UnsupportedEncodingException e) {
+                       throw new RuntimeException("Unsupport encoding charset" 
+ charset, e);
+               }
+       }
+
+       public void setCharset(String charset) {
+               this.charset = charset;
+       }
+
+       public void setFieldDelimiter(String s) {
+               if (s.length() != 1) {
+                       throw new RuntimeException("FieldDelimiter's length 
must be one !");
+               }
+               this.csvSchema = 
this.csvSchema.rebuild().setColumnSeparator(s.charAt(0)).build();
+       }
+
+       public void setArrayElementDelimiter(String s) {
+               this.csvSchema = 
this.csvSchema.rebuild().setArrayElementSeparator(s).build();
+               this.objectReader = objectReader.with(csvSchema);
+       }
+
+       public void setQuoteCharacter(char c) {
+               this.csvSchema = 
this.csvSchema.rebuild().setQuoteChar(c).build();
+               this.objectReader = objectReader.with(csvSchema);
+       }
+
+       public void setEscapeCharacter(char c) {
+               this.csvSchema = 
this.csvSchema.rebuild().setEscapeChar(c).build();
+               this.objectReader = objectReader.with(csvSchema);
+       }
+
+       public void setNullValue(String s) {
+               this.csvSchema = 
this.csvSchema.rebuild().setNullValue(s).build();
+               this.objectReader = objectReader.with(csvSchema);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (o == null || o.getClass() != this.getClass()) {
+                       return false;
+               }
+               if (this == o) {
+                       return true;
+               }
+               final CsvRowDeserializationSchema that = 
(CsvRowDeserializationSchema) o;
+               return rowTypeInfo.equals(that.rowTypeInfo) &&
+                       csvSchema.toString().equals(that.csvSchema.toString());
+       }
+}
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java
new file mode 100644
index 0000000..e9c65bb
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.CsvValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.DeserializationSchemaFactory;
+import org.apache.flink.table.factories.SerializationSchemaFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table format for providing configured instances of CSV-to-row {@link 
SerializationSchema}
+ * and {@link DeserializationSchema}.
+ */
+public final class CsvRowFormatFactory implements 
SerializationSchemaFactory<Row>,
+       DeserializationSchemaFactory<Row>  {
+
+       @Override
+       public Map<String, String> requiredContext() {
+               final Map<String, String> context = new HashMap<>();
+               context.put(FormatDescriptorValidator.FORMAT_TYPE(), 
CsvValidator.FORMAT_TYPE_VALUE());
+               
context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1");
+               return context;
+       }
+
+       @Override
+       public boolean supportsSchemaDerivation() {
+               return true;
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               final List<String> properties = new ArrayList<>();
+               properties.add(CsvValidator.FORMAT_FIELDS() + ".#." + 
DescriptorProperties.TYPE());
+               properties.add(CsvValidator.FORMAT_FIELDS() + ".#." + 
DescriptorProperties.NAME());
+               properties.add(CsvValidator.FORMAT_FIELD_DELIMITER());
+               properties.add(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER());
+               properties.add(CsvValidator.FORMAT_QUOTE_CHARACTER());
+               properties.add(CsvValidator.FORMAT_ESCAPE_CHARACTER());
+               properties.add(CsvValidator.FORMAT_BYTES_CHARSET());
+               properties.add(CsvValidator.FORMAT_NULL_VALUE());
+               properties.add(SchemaValidator.SCHEMA() + ".#." + 
SchemaValidator.SCHEMA_TYPE());
+               properties.add(SchemaValidator.SCHEMA() + ".#." + 
SchemaValidator.SCHEMA_NAME());
+               return properties;
+       }
+
+       @Override
+       public DeserializationSchema<Row> 
createDeserializationSchema(Map<String, String> properties) {
+               final DescriptorProperties descriptorProperties = 
validateAndGetProperties(properties);
+
+               final CsvRowDeserializationSchema schema = new 
CsvRowDeserializationSchema(
+                       createTypeInformation(descriptorProperties));
+
+               // update csv schema with properties
+               
descriptorProperties.getOptionalString(CsvValidator.FORMAT_FIELD_DELIMITER())
+                       .ifPresent(schema::setFieldDelimiter);
+               
descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER())
+                       .ifPresent(schema::setArrayElementDelimiter);
+               
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER())
+                       .ifPresent(schema::setQuoteCharacter);
+               
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER())
+                       .ifPresent(schema::setEscapeCharacter);
+               
descriptorProperties.getOptionalString(CsvValidator.FORMAT_BYTES_CHARSET())
+                       .ifPresent(schema::setCharset);
+               
descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_VALUE())
+                       .ifPresent(schema::setCharset);
+
+               return new 
CsvRowDeserializationSchema(createTypeInformation(descriptorProperties));
+       }
+
+       @Override
+       public SerializationSchema<Row> createSerializationSchema(Map<String, 
String> properties) {
+               final DescriptorProperties descriptorProperties = 
validateAndGetProperties(properties);
+
+               final CsvRowSerializationSchema schema = new 
CsvRowSerializationSchema(
+                       createTypeInformation(descriptorProperties));
+
+               // update csv schema with properties
+               
descriptorProperties.getOptionalString(CsvValidator.FORMAT_FIELD_DELIMITER())
+                       .ifPresent(schema::setFieldDelimiter);
+               
descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER())
+                       .ifPresent(schema::setArrayElementDelimiter);
+               
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER())
+                       .ifPresent(schema::setQuoteCharacter);
+               
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER())
+                       .ifPresent(schema::setEscapeCharacter);
+               
descriptorProperties.getOptionalString(CsvValidator.FORMAT_BYTES_CHARSET())
+                       .ifPresent(schema::setCharset);
+               
descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_VALUE())
+                       .ifPresent(schema::setCharset);
+
+               return new 
CsvRowSerializationSchema(createTypeInformation(descriptorProperties));
+       }
+
+       private static DescriptorProperties 
validateAndGetProperties(Map<String, String> propertiesMap) {
+               final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
+               descriptorProperties.putProperties(propertiesMap);
+
+               // validate
+               new CsvValidator().validate(descriptorProperties);
+
+               return descriptorProperties;
+       }
+
+       /**
+        * Create a {@link TypeInformation} based on the "format-fields" in 
{@link CsvValidator}.
+        * @param descriptorProperties descriptor properties
+        * @return {@link TypeInformation}
+        */
+       private static TypeInformation<Row> 
createTypeInformation(DescriptorProperties descriptorProperties) {
+               if 
(descriptorProperties.getOptionalTableSchema(CsvValidator.FORMAT_FIELDS()).isPresent())
 {
+                       return 
descriptorProperties.getTableSchema(CsvValidator.FORMAT_FIELDS()).toRowType();
+               } else {
+                       return 
SchemaValidator.deriveFormatFields(descriptorProperties).toRowType();
+               }
+       }
+}
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
new file mode 100644
index 0000000..1476159
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
+ */
+public final class CsvRowSchemaConverter {
+
+       /**
+        * Types that can be converted to ColumnType.NUMBER.
+        */
+       private static final HashSet<TypeInformation<?>> NUMBER_TYPES =
+               new HashSet<>(Arrays.asList(Types.LONG, Types.INT, 
Types.DOUBLE, Types.FLOAT,
+                       Types.BIG_DEC, Types.BIG_INT));
+
+       /**
+        * Types that can be converted to ColumnType.STRING.
+        */
+       private static final HashSet<TypeInformation<?>> STRING_TYPES =
+               new HashSet<>(Arrays.asList(Types.STRING, Types.SQL_DATE,
+                       Types.SQL_TIME, Types.SQL_TIMESTAMP));
+
+       /**
+        * Types that can be converted to ColumnType.BOOLEAN.
+        */
+       private static final HashSet<TypeInformation<?>> BOOLEAN_TYPES =
+               new HashSet<>(Collections.singletonList(Types.BOOLEAN));
+
+       /**
+        * Convert {@link RowTypeInfo} to {@link CsvSchema}.
+        */
+       public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) {
+               Builder builder = new CsvSchema.Builder();
+               String[] fields = rowType.getFieldNames();
+               TypeInformation<?>[] infos = rowType.getFieldTypes();
+               for (int i = 0; i < rowType.getArity(); i++) {
+                       builder.addColumn(new Column(i, fields[i], 
convertType(infos[i])));
+               }
+               return builder.build();
+       }
+
+       /**
+        * Convert {@link TypeInformation} to {@link CsvSchema.ColumnType}
+        * based on their catogories.
+        */
+       private static CsvSchema.ColumnType convertType(TypeInformation<?> 
info) {
+               if (STRING_TYPES.contains(info)) {
+                       return CsvSchema.ColumnType.STRING;
+               } else if (NUMBER_TYPES.contains(info)) {
+                       return CsvSchema.ColumnType.NUMBER;
+               } else if (BOOLEAN_TYPES.contains(info)) {
+                       return CsvSchema.ColumnType.BOOLEAN;
+               } else if (info instanceof ObjectArrayTypeInfo
+                       || info instanceof BasicArrayTypeInfo
+                       || info instanceof RowTypeInfo) {
+                       return CsvSchema.ColumnType.ARRAY;
+               } else if (info instanceof PrimitiveArrayTypeInfo &&
+                       ((PrimitiveArrayTypeInfo) info).getComponentType() == 
Types.BYTE) {
+                       return CsvSchema.ColumnType.STRING;
+               } else {
+                       throw new RuntimeException("Unable to support " + 
info.toString()
+                                       + " yet");
+               }
+       }
+}
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
new file mode 100644
index 0000000..f8ced68
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a CSV 
bytes.
+ *
+ * <p>Serializes the input row into a {@link ObjectNode} and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link 
CsvRowDeserializationSchema}.
+ */
+@Public
+public final class CsvRowSerializationSchema implements 
SerializationSchema<Row> {
+
+       /** Schema describing the input csv data. */
+       private CsvSchema csvSchema;
+
+       /** Type information describing the input csv data. */
+       private TypeInformation<Row> rowTypeInfo;
+
+       /** CsvMapper used to write {@link JsonNode} into bytes. */
+       private CsvMapper csvMapper = new CsvMapper();
+
+       /** Reusable object node. */
+       private ObjectNode root;
+
+       /** Charset for byte[]. */
+       private String charset = "UTF-8";
+
+       /**
+        * Create a {@link CsvRowSerializationSchema} with given {@link 
TypeInformation}.
+        * @param rowTypeInfo type information used to create schem.
+        */
+       CsvRowSerializationSchema(TypeInformation<Row> rowTypeInfo) {
+               Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+               this.rowTypeInfo = rowTypeInfo;
+               this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+               this.setNullValue("null");
+       }
+
+       @Override
+       public byte[] serialize(Row row) {
+               if (root == null) {
+                       root = csvMapper.createObjectNode();
+               }
+               try {
+                       convertRow(root, row, (RowTypeInfo) rowTypeInfo);
+                       return 
csvMapper.writer(csvSchema).writeValueAsBytes(root);
+               } catch (JsonProcessingException e) {
+                       throw new RuntimeException("Could not serialize row '" 
+ row + "'. " +
+                               "Make sure that the schema matches the input.", 
e);
+               }
+       }
+
+       private void convertRow(ObjectNode reuse, Row row, RowTypeInfo 
rowTypeInfo) {
+               if (reuse == null) {
+                       reuse = csvMapper.createObjectNode();
+               }
+               if (row.getArity() != rowTypeInfo.getFieldNames().length) {
+                       throw new IllegalStateException(String.format(
+                               "Number of elements in the row '%s' is 
different from number of field names: %d",
+                               row, rowTypeInfo.getFieldNames().length));
+               }
+               TypeInformation[] types = rowTypeInfo.getFieldTypes();
+               String[] fields = rowTypeInfo.getFieldNames();
+               for (int i = 0; i < types.length; i++) {
+                       String columnName = fields[i];
+                       Object obj = row.getField(i);
+                       reuse.set(columnName, convert(reuse, obj, types[i], 
false));
+               }
+       }
+
+       /**
+        * Converts an object to a JsonNode.
+        * @param container {@link ContainerNode} that creates {@link JsonNode}.
+        * @param obj Object that used to {@link JsonNode}.
+        * @param info Type infomation that decides the type of {@link 
JsonNode}.
+        * @param nested variable that indicates whether the obj is in a nested 
structure
+        *               like a string in an array.
+        * @return result after converting.
+        */
+       private JsonNode convert(ContainerNode<?> container, Object obj, 
TypeInformation info, Boolean nested) {
+               if (obj == null) {
+                       return container.nullNode();
+               }
+               if (info == Types.STRING) {
+                       return container.textNode((String) obj);
+               } else if (info == Types.LONG) {
+                       return container.numberNode((Long) obj);
+               } else if (info == Types.INT) {
+                       return container.numberNode((Integer) obj);
+               } else if (info == Types.DOUBLE) {
+                       return container.numberNode((Double) obj);
+               } else if (info == Types.FLOAT) {
+                       return container.numberNode((Float) obj);
+               } else if (info == Types.BIG_DEC) {
+                       return container.numberNode(new 
BigDecimal(String.valueOf(obj)));
+               } else if (info == Types.BIG_INT) {
+                       return 
container.numberNode(BigInteger.valueOf(Long.valueOf(String.valueOf(obj))));
+               } else if (info == Types.SQL_DATE) {
+                       return 
container.textNode(Date.valueOf(String.valueOf(obj)).toString());
+               } else if (info == Types.SQL_TIME) {
+                       return 
container.textNode(Time.valueOf(String.valueOf(obj)).toString());
+               } else if (info == Types.SQL_TIMESTAMP) {
+                       return 
container.textNode(Timestamp.valueOf(String.valueOf(obj)).toString());
+               } else if (info == Types.BOOLEAN) {
+                       return container.booleanNode((Boolean) obj);
+               } else if (info instanceof RowTypeInfo){
+                       if (nested) {
+                               throw new RuntimeException("Unable to support 
nested row type " + info.toString() + " yet");
+                       }
+                       return convertArray((Row) obj, (RowTypeInfo) info);
+               } else if (info instanceof BasicArrayTypeInfo) {
+                       if (nested) {
+                               throw new RuntimeException("Unable to support 
nested array type " + info.toString() + " yet");
+                       }
+                       return convertArray((Object[]) obj,
+                               ((BasicArrayTypeInfo) info).getComponentInfo());
+               } else if (info instanceof PrimitiveArrayTypeInfo &&
+                       ((PrimitiveArrayTypeInfo) info).getComponentType() == 
Types.BYTE) {
+                       /* We converts byte[] to TextNode instead of BinaryNode 
here,
+                         because the instance of BinaryNode will be serialized 
to base64 string in
+                         {@link 
com.fasterxml.jackson.databind.node.BinaryNode#serialize(JsonGenerator, 
SerializerProvider)},
+                         which is unacceptable for users.
+                        */
+                       try {
+                               return container.textNode(new String((byte[]) 
obj, charset));
+                       } catch (UnsupportedEncodingException e) {
+                               throw new RuntimeException("Unsupport encoding 
charset " + charset, e);
+                       }
+               } else {
+                       throw new RuntimeException("Unable to support type " + 
info.toString() + " yet");
+               }
+       }
+
+       /**
+        * Use {@link ArrayNode} to represents a row.
+        */
+       private ArrayNode convertArray(Row row, RowTypeInfo rowTypeInfo) {
+               ArrayNode arrayNode = csvMapper.createArrayNode();
+               TypeInformation[] types = rowTypeInfo.getFieldTypes();
+               String[] fields = rowTypeInfo.getFieldNames();
+               for (int i = 0; i < fields.length; i++) {
+                       arrayNode.add(convert(arrayNode, row.getField(i), 
types[i], true));
+               }
+               return arrayNode;
+       }
+
+       /**
+        * Use {@link ArrayNode} to represents an array.
+        */
+       private ArrayNode convertArray(Object[] obj, TypeInformation 
elementInfo) {
+               ArrayNode arrayNode = csvMapper.createArrayNode();
+               for (Object elementObj : obj) {
+                       arrayNode.add(convert(arrayNode, elementObj, 
elementInfo, true));
+               }
+               return arrayNode;
+       }
+
+       public void setCharset(String charset) {
+               this.charset = charset;
+       }
+
+       public void setFieldDelimiter(String s) {
+               if (s.length() != 1) {
+                       throw new RuntimeException("FieldDelimiter's length 
must be one !");
+               }
+               this.csvSchema = 
this.csvSchema.rebuild().setColumnSeparator(s.charAt(0)).build();
+       }
+
+       public void setArrayElementDelimiter(String s) {
+               this.csvSchema = 
this.csvSchema.rebuild().setArrayElementSeparator(s).build();
+       }
+
+       public void setQuoteCharacter(char c) {
+               this.csvSchema = 
this.csvSchema.rebuild().setQuoteChar(c).build();
+       }
+
+       public void setEscapeCharacter(char c) {
+               this.csvSchema = 
this.csvSchema.rebuild().setEscapeChar(c).build();
+       }
+
+       public void setNullValue(String s) {
+               this.csvSchema = 
this.csvSchema.rebuild().setNullValue(s).build();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (o == null || o.getClass() != this.getClass()) {
+                       return false;
+               }
+               if (this == o) {
+                       return true;
+               }
+               final CsvRowSerializationSchema that = 
(CsvRowSerializationSchema) o;
+
+               return rowTypeInfo.equals(that.rowTypeInfo) &&
+                       csvSchema.toString().equals(that.csvSchema.toString());
+       }
+}
diff --git 
a/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..61cd834
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.csv.CsvRowFormatFactory
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java
new file mode 100644
index 0000000..7e63a4a
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Testing for {@link CsvRowDeserializationSchema}.
+ */
+public class CsvRowDeserializationSchemaTest extends TestLogger {
+
+       @Test
+       public void testDeserialize() throws IOException {
+               final long currentMills = System.currentTimeMillis();
+               final TypeInformation<Row> rowTypeInfo = Types.ROW(
+                       new String[]{"a", "b", "c", "d", "e", "f", "g"},
+                       new TypeInformation[]{
+                               Types.STRING(), Types.LONG(), Types.DECIMAL(),
+                               Types.ROW(
+                                       new String[]{"c1", "c2", "c3"},
+                                       new TypeInformation[]{
+                                               Types.INT(),
+                                               
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+                                               Types.STRING()
+                                       }
+                               ), Types.SQL_TIMESTAMP(), Types.BOOLEAN(),
+                               
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+                       }
+               );
+
+               String c1 = "123";
+               String c2 = String.valueOf(34L);
+               String c3 = String.valueOf(1233.2);
+               String c4 = "1" + ";" + new String("abc".getBytes()) + ";" + 
"cba";
+               String c5 = new Timestamp(currentMills).toString();
+               String c6 = "true";
+               String c7 =  new String("12345".getBytes());
+               byte[] bytes = (c1 + "," + c2 + "," + c3 + "," + c4 + ","
+                       + c5 + "," + c6 + "," + c7).getBytes();
+               CsvRowDeserializationSchema deserializationSchema = new 
CsvRowDeserializationSchema(rowTypeInfo);
+               Row deserializedRow = deserializationSchema.deserialize(bytes);
+
+               assertEquals(7, deserializedRow.getArity());
+               assertEquals("123", deserializedRow.getField(0));
+               assertEquals(34L, deserializedRow.getField(1));
+               assertEquals(BigDecimal.valueOf(1233.2), 
deserializedRow.getField(2));
+               assertArrayEquals("abc".getBytes(), (byte[]) ((Row) 
deserializedRow.getField(3)).getField(1));
+               assertEquals(new Timestamp(currentMills), 
deserializedRow.getField(4));
+               assertEquals(true, deserializedRow.getField(5));
+               assertArrayEquals("12345".getBytes(), (byte[]) 
deserializedRow.getField(6));
+       }
+
+       @Test
+       public void testCustomizedProperties() throws IOException {
+               final TypeInformation<Row> rowTypeInfo = Types.ROW(
+                       new String[]{"a", "b", "c"},
+                       new TypeInformation[]{Types.STRING(), Types.STRING(),
+                               Types.ROW(
+                               new String[]{"c1", "c2"},
+                               new TypeInformation[]{Types.INT(), 
Types.STRING()}
+                       )}
+               );
+
+               String c1 = "123*\"4";
+               String c2 = "'a,bc'";
+               String c3 = "1:zxv";
+
+               byte[] bytes = (c1 + "," + c2 + "," + c3).getBytes();
+               CsvRowDeserializationSchema deserializationSchema = new 
CsvRowDeserializationSchema(rowTypeInfo);
+               deserializationSchema.setEscapeCharacter('*');
+               deserializationSchema.setQuoteCharacter('\'');
+               deserializationSchema.setArrayElementDelimiter(":");
+               Row deserializedRow = deserializationSchema.deserialize(bytes);
+
+               assertEquals("123\"4", deserializedRow.getField(0));
+               assertEquals("a,bc", deserializedRow.getField(1));
+               assertEquals("zxv", ((Row) 
deserializedRow.getField(2)).getField(1));
+       }
+
+       @Test
+       public void testCharset() throws IOException {
+               final TypeInformation<Row> rowTypeInfo = Types.ROW(
+                       new String[]{"a"},
+                       new 
TypeInformation[]{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}
+               );
+               final CsvRowDeserializationSchema schema = new 
CsvRowDeserializationSchema(rowTypeInfo);
+               schema.setCharset("UTF-16");
+
+               byte[] bytes = "abc".getBytes(StandardCharsets.UTF_16);
+               Row result = schema.deserialize(bytes);
+
+               assertEquals("abc", new String((byte[]) result.getField(0), 
StandardCharsets.UTF_16));
+       }
+
+       @Test
+       public void testNull() throws IOException {
+               final TypeInformation<Row> rowTypeInfo = Types.ROW(
+                       new String[]{"a"},
+                       new TypeInformation[]{Types.STRING()}
+               );
+
+               final byte[] bytes = "123".getBytes();
+
+               final CsvRowDeserializationSchema deserializationSchema = new 
CsvRowDeserializationSchema(rowTypeInfo);
+               deserializationSchema.setNullValue("123");
+               final Row row = deserializationSchema.deserialize(bytes);
+               assertNull(row.getField(0));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testNumberOfFieldNamesAndTypesMismatch() {
+               TypeInformation<Row> rowTypeInfo = Types.ROW(
+                       new String[]{"a", "b"},
+                       new TypeInformation[]{Types.STRING()}
+               );
+               new CsvRowDeserializationSchema(rowTypeInfo);
+       }
+
+}
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java
new file mode 100644
index 0000000..9d8263f
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.NoMatchingTableFactoryException;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.Csv;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.factories.DeserializationSchemaFactory;
+import org.apache.flink.table.factories.SerializationSchemaFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Testing for {@link CsvRowFormatFactory}.
+ */
+public class CsvRowFormatFactoryTest extends TestLogger {
+
+
+       private static final TypeInformation<Row> SCHEMA = Types.ROW(
+               new String[]{"a", "b", "c"},
+               new TypeInformation[]{Types.STRING(), Types.INT(), Types.ROW(
+                               new String[]{"a", "b", "c"},
+                               new TypeInformation[]{Types.STRING(), 
Types.INT(), Types.BOOLEAN()}
+               )}
+       );
+
+       @Test
+       public void testSchema() {
+               final Map<String, String> properties = toMap(
+                       new Csv()
+                               .field("a", Types.STRING())
+                               .field("b", Types.INT())
+                               .field("c", Types.ROW(
+                                       new String[]{"a", "b", "c"},
+                                       new TypeInformation[]{Types.STRING(), 
Types.INT(), Types.BOOLEAN()}
+                               ))
+                               .arrayElementDelim("^^")
+                               .escapeCharacter('c')
+               );
+               testSchemaSerializationSchema(properties);
+               testSchemaDeserializationSchema(properties);
+       }
+
+       @Test
+       public void testDerived() {
+               final Map<String, String> properties = toMap(
+                       new Schema()
+                               .field("a", Types.STRING())
+                               .field("b", Types.INT())
+                               .field("c", Types.ROW(
+                                       new String[]{"a", "b", "c"},
+                                       new TypeInformation[]{Types.STRING(), 
Types.INT(), Types.BOOLEAN()}
+                               )), new Csv().derived(true)
+               );
+               testSchemaSerializationSchema(properties);
+               testSchemaDeserializationSchema(properties);
+       }
+
+       @Test(expected = NoMatchingTableFactoryException.class)
+       public void testUnsupportedProperties() {
+               final Map<String, String> properties = toMap(
+                       new Csv()
+                               .field("a", Types.STRING())
+                               .lineDelimiter("%")
+               );
+               testSchemaSerializationSchema(properties);
+       }
+
+       private void testSchemaDeserializationSchema(Map<String, String> 
properties) {
+               final DeserializationSchema<?> actual2 = TableFactoryService
+                       .find(DeserializationSchemaFactory.class, properties)
+                       .createDeserializationSchema(properties);
+               final CsvRowDeserializationSchema expected2 = new 
CsvRowDeserializationSchema(SCHEMA);
+               assertEquals(expected2, actual2);
+       }
+
+       private void testSchemaSerializationSchema(Map<String, String> 
properties) {
+               final SerializationSchema<?> actual1 = TableFactoryService
+                       .find(SerializationSchemaFactory.class, properties)
+                       .createSerializationSchema(properties);
+               final SerializationSchema expected1 = new 
CsvRowSerializationSchema(SCHEMA);
+               assertEquals(expected1, actual1);
+       }
+
+       private static Map<String, String> toMap(Descriptor... desc) {
+               final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
+               for (Descriptor d : desc) {
+                       d.addProperties(descriptorProperties);
+               }
+               return descriptorProperties.asMap();
+       }
+}
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java
new file mode 100644
index 0000000..e9635fb
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.ColumnType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * testing for {@link CsvRowSchemaConverter}.
+ */
+public class CsvRowSchemaConverterTest extends TestLogger {
+
+       @Test
+       public void testRowToCsvSchema() {
+               RowTypeInfo rowTypeInfo = new RowTypeInfo(
+                       new TypeInformation<?>[] {
+                               Types.STRING,
+                               Types.LONG,
+                               Types.ROW(Types.STRING),
+                               Types.BIG_DEC,
+                               Types.BOOLEAN,
+                               BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+                               
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+                       },
+                       new String[]{"a", "b", "c", "d", "e", "f", "g"}
+               );
+               CsvSchema expect = CsvSchema.builder()
+                       .addColumn("a", ColumnType.STRING)
+                       .addColumn("b", ColumnType.NUMBER)
+                       .addColumn("c", ColumnType.ARRAY)
+                       .addColumn("d", ColumnType.NUMBER)
+                       .addColumn("e", ColumnType.BOOLEAN)
+                       .addColumn("f", ColumnType.ARRAY)
+                       .addColumn("g", ColumnType.STRING)
+                       .build();
+               CsvSchema actual = 
CsvRowSchemaConverter.rowTypeToCsvSchema(rowTypeInfo);
+               assertEquals(expect.toString(), actual.toString());
+       }
+
+       @Test(expected = RuntimeException.class)
+       public void testUnsupportedType() {
+               CsvRowSchemaConverter.rowTypeToCsvSchema(new RowTypeInfo(
+                       new TypeInformation[]{Types.STRING,
+                               
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO},
+                       new String[]{"a", "b"}
+               ));
+       }
+
+}
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java
new file mode 100644
index 0000000..72b5fa9
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Testing for {@link CsvRowSerializationSchema}.
+ */
+public class CsvRowSerializationSchemaTest extends TestLogger {
+
+       @Test
+       public void testSerializeAndDeserialize() throws IOException {
+               final String[] fields = new String[]{"a", "b", "c", "d", "e", 
"f", "g", "h"};
+               final TypeInformation[] types = new TypeInformation[]{
+                       Types.BOOLEAN(), Types.STRING(), Types.INT(), 
Types.DECIMAL(),
+                       Types.SQL_TIMESTAMP(), 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+                       Types.ROW(
+                               new String[]{"g1", "g2"},
+                               new TypeInformation[]{Types.STRING(), 
Types.LONG()}),
+                       Types.STRING()
+               };
+               final TypeInformation<Row> rowSchema = Types.ROW(fields, types);
+               final Row row = new Row(8);
+               final Row nestedRow = new Row(2);
+               nestedRow.setField(0, "z\"xcv");
+               nestedRow.setField(1, 123L);
+               row.setField(0, true);
+               row.setField(1, "abcd");
+               row.setField(2, 1);
+               row.setField(3, BigDecimal.valueOf(1.2334));
+               row.setField(4, new Timestamp(System.currentTimeMillis()));
+               row.setField(5, "qwecxcr".getBytes());
+               row.setField(6, nestedRow);
+               row.setField(7, null);
+
+               final Row resultRow = serializeAndDeserialize(rowSchema, row);
+               assertEquals(row, resultRow);
+       }
+
+       @Test
+       public void testSerialize() {
+               long currentMillis = System.currentTimeMillis();
+               Row row = new Row(4);
+               Row nestedRow = new Row(2);
+               row.setField(0, "abc");
+               row.setField(1, 34);
+               row.setField(2, new Timestamp(currentMillis));
+               nestedRow.setField(0, "bc");
+               nestedRow.setField(1, "qwertyu".getBytes());
+               row.setField(3, nestedRow);
+
+               final TypeInformation<Row> typeInfo = Types.ROW(
+                       new String[]{"a", "b", "c", "d"},
+                       new TypeInformation[]{Types.STRING(), Types.INT(), 
Types.SQL_TIMESTAMP(),
+                               Types.ROW(
+                                       new String[]{"d1", "d2"},
+                                       new TypeInformation[]{Types.STRING(), 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}
+                       )}
+               );
+
+               final CsvRowSerializationSchema schema = new 
CsvRowSerializationSchema(typeInfo);
+               byte[] result = schema.serialize(row);
+               String c1 = "abc";
+               String c2 = String.valueOf(34);
+               String c3 = "\"" + new Timestamp(currentMillis).toString() + 
"\"";
+               String c4 = "bc;" + new String("qwertyu".getBytes());
+               byte[] expect = (c1 + "," + c2 + "," + c3 + "," + c4 + 
"\n").getBytes();
+               assertArrayEquals(expect, result);
+       }
+
+       @Test
+       public void testCustomizedProperties() throws IOException {
+               final TypeInformation<Row> rowTypeInfo = Types.ROW(
+                       new String[]{"a", "b", "c"},
+                       new TypeInformation[]{Types.STRING(), Types.STRING(),
+                               Types.ROW(
+                                       new String[]{"c1", "c2"},
+                                       new TypeInformation[]{Types.INT(), 
Types.STRING()}
+                               )}
+               );
+
+               final Row row = new Row(3);
+               final Row nestedRow = new Row(2);
+               nestedRow.setField(0, 1);
+               nestedRow.setField(1, "zxv");
+               row.setField(0, "12*3'4");
+               row.setField(1, "a,bc");
+               row.setField(2, nestedRow);
+
+               final CsvRowSerializationSchema serializationSchema = new 
CsvRowSerializationSchema(rowTypeInfo);
+               serializationSchema.setEscapeCharacter('*');
+               serializationSchema.setQuoteCharacter('\'');
+               serializationSchema.setArrayElementDelimiter(":");
+               byte[] result = serializationSchema.serialize(row);
+
+               final String c1 = "'12**3''4'";
+               final String c2 = "'a,bc'";
+               final String c3 = "1:zxv";
+               byte[] expect = (c1 + "," + c2 + "," + c3 + "\n").getBytes();
+               assertArrayEquals(expect, result);
+       }
+
+       @Test
+       public void testCharset() throws UnsupportedEncodingException {
+               final TypeInformation<Row> rowTypeInfo = Types.ROW(
+                       new String[]{"a"},
+                       new 
TypeInformation[]{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}
+               );
+               final CsvRowSerializationSchema serializationSchema = new 
CsvRowSerializationSchema(rowTypeInfo);
+               serializationSchema.setCharset("UTF-16");
+
+               final Row row = new Row(1);
+               row.setField(0, "123".getBytes(StandardCharsets.UTF_16));
+               byte[] result = serializationSchema.serialize(row);
+               byte[] expect = "123\n".getBytes();
+
+               assertArrayEquals(expect, result);
+       }
+
+       @Test
+       public void testSerializationOfTwoRows() throws IOException {
+               final TypeInformation<Row> rowSchema = Types.ROW(
+                       new String[] {"f1", "f2", "f3"},
+                       new TypeInformation[]{Types.INT(), Types.BOOLEAN(), 
Types.STRING()});
+
+               final Row row1 = new Row(3);
+               row1.setField(0, 1);
+               row1.setField(1, true);
+               row1.setField(2, "str");
+
+               final CsvRowSerializationSchema serializationSchema = new 
CsvRowSerializationSchema(rowSchema);
+               final CsvRowDeserializationSchema deserializationSchema = new 
CsvRowDeserializationSchema(rowSchema);
+
+               byte[] bytes = serializationSchema.serialize(row1);
+               assertEquals(row1, deserializationSchema.deserialize(bytes));
+
+               final Row row2 = new Row(3);
+               row2.setField(0, 10);
+               row2.setField(1, false);
+               row2.setField(2, "newStr");
+
+               bytes = serializationSchema.serialize(row2);
+               assertEquals(row2, deserializationSchema.deserialize(bytes));
+       }
+
+       @Test
+       public void testNull() {
+               final TypeInformation<Row> rowTypeInfo = Types.ROW(
+                       new String[]{"a"},
+                       new TypeInformation[]{Types.STRING()}
+               );
+               final Row row = new Row(1);
+               row.setField(0, null);
+
+               final CsvRowSerializationSchema serializationSchema = new 
CsvRowSerializationSchema(rowTypeInfo);
+               serializationSchema.setNullValue("123");
+               byte[] bytes = serializationSchema.serialize(row);
+               assertArrayEquals("123\n".getBytes(), bytes);
+       }
+
+       @Test(expected = RuntimeException.class)
+       public void testSerializeRowWithInvalidNumberOfFields() {
+               final TypeInformation<Row> rowSchema = Types.ROW(
+                       new String[] {"f1", "f2", "f3"},
+                       new TypeInformation[]{Types.INT(), Types.BOOLEAN(), 
Types.STRING()});
+               final Row row = new Row(1);
+               row.setField(0, 1);
+
+               final CsvRowSerializationSchema serializationSchema = new 
CsvRowSerializationSchema(rowSchema);
+               serializationSchema.serialize(row);
+       }
+
+       @Test(expected = RuntimeException.class)
+       public void testSerializeNestedRowInNestedRow() {
+               final TypeInformation<Row> rowSchema = Types.ROW(
+                       new String[]{"a"},
+                       new TypeInformation[]{Types.ROW(
+                               new String[]{"a1"},
+                               new TypeInformation[]{Types.ROW(
+                                       new String[]{"a11"},
+                                       new TypeInformation[]{Types.STRING()}
+                               )}
+                       )}
+               );
+               final Row row = new Row(1);
+               final Row nestedRow = new Row(1);
+               final Row doubleNestedRow = new Row(1);
+               doubleNestedRow.setField(0, "123");
+               nestedRow.setField(0, doubleNestedRow);
+               row.setField(0, nestedRow);
+               final CsvRowSerializationSchema serializationSchema = new 
CsvRowSerializationSchema(rowSchema);
+               serializationSchema.serialize(row);
+       }
+
+       private Row serializeAndDeserialize(TypeInformation<Row> rowSchema, Row 
row) throws IOException {
+               final CsvRowSerializationSchema serializationSchema = new 
CsvRowSerializationSchema(rowSchema);
+               final CsvRowDeserializationSchema deserializationSchema = new 
CsvRowDeserializationSchema(rowSchema);
+
+               final byte[] bytes = serializationSchema.serialize(row);
+               return deserializationSchema.deserialize(bytes);
+       }
+}
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 51c3466..a236770 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -41,6 +41,7 @@ under the License.
                <module>flink-avro-confluent-registry</module>
                <module>flink-parquet</module>
                <module>flink-sequence-file</module>
+               <module>flink-csv</module>
        </modules>
 
        <!-- override these root dependencies as 'provided', so they don't end 
up
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
index 6e63931..ebd6e64 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
@@ -41,6 +41,10 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
   private var commentPrefix: Option[String] = None
   private var isIgnoreFirstLine: Option[Boolean] = None
   private var lenient: Option[Boolean] = None
+  private var arrayElementDelim: Option[String] = None
+  private var escapeCharacter: Option[Character] = None
+  private var bytesCharset: Option[String] = None
+  private var derived: Option[Boolean] = None
 
   /**
     * Sets the field delimiter, "," by default.
@@ -143,9 +147,40 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
     this
   }
 
+  /**
+    * Set delimiter of array elements, ';' by default.
+    */
+  def arrayElementDelim(delim: String): Csv = {
+    this.arrayElementDelim = Some(delim)
+    this
+  }
+
+  /**
+    * Set escape character, none by default.
+    */
+  def escapeCharacter(escape: Character): Csv = {
+    this.escapeCharacter = Some(escape)
+    this
+  }
+
+  /**
+    * Set charset of byte[], 'UTF-8' by defaut.
+    */
+  def bytesCharset(charset: String): Csv = {
+    this.bytesCharset = Some(charset)
+    this
+  }
+
+  /**
+    * Set true if format schema derives from table schema.
+    */
+  def derived(derived: Boolean): Csv = {
+    this.derived = Some(derived)
+    this
+  }
+
   override protected def toFormatProperties: util.Map[String, String] = {
     val properties = new DescriptorProperties()
-
     fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
     lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
index 9eeedb4..97db7fa 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.descriptors.CsvValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE
 
@@ -33,9 +34,26 @@ class CsvValidator extends FormatDescriptorValidator {
     properties.validateString(FORMAT_LINE_DELIMITER, true, 1)
     properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1)
     properties.validateString(FORMAT_COMMENT_PREFIX, true, 1)
+    properties.validateString(FORMAT_ARRAY_ELEMENT_DELIMITER, true, 1)
+    properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1)
+    properties.validateString(FORMAT_BYTES_CHARSET, true, 1)
+    properties.validateString(FORMAT_NULL_VALUE, true, 1)
     properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, true)
     properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true)
-    properties.validateTableSchema(FORMAT_FIELDS, false)
+    properties.validateBoolean(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA, 
true)
+
+    val tableSchema = properties.getOptionalTableSchema(FORMAT_FIELDS)
+    val derived = properties.getOptionalBoolean(
+      FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA).orElse(false)
+    if (derived && tableSchema.isPresent) {
+      throw new ValidationException(
+        "Format cannot define a schema and derive from the table's schema at 
the same time.")
+    } else if (tableSchema.isPresent) {
+      properties.validateTableSchema(FORMAT_FIELDS, false)
+    } else if (!tableSchema.isPresent && derived) {
+      throw new ValidationException(
+        "A definition of a schema or derive from the table's schema is 
required.")
+    }
   }
 }
 
@@ -49,4 +67,8 @@ object CsvValidator {
   val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line"
   val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors"
   val FORMAT_FIELDS = "format.fields"
+  val FORMAT_ARRAY_ELEMENT_DELIMITER = "format.array-element-delimiter"
+  val FORMAT_ESCAPE_CHARACTER = "format.escape-character"
+  val FORMAT_BYTES_CHARSET = "format.bytes-charset"
+  val FORMAT_NULL_VALUE = "format.null-value"
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
index 8d01b8b..1c01e71 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
@@ -47,6 +47,16 @@ class CsvTest extends DescriptorTestBase {
     addPropertyAndVerify(descriptors().get(0), "format.quote-character", "qq")
   }
 
+  @Test(expected = classOf[ValidationException])
+  def testTwoSchemas(): Unit = {
+    addPropertyAndVerify(descriptors().get(0), "format.derive-schema", "true")
+  }
+
+  @Test
+  def testOneSchema(): Unit = {
+    addPropertyAndVerify(descriptors().get(0), "format.derive-schema", "false")
+  }
+
   // 
----------------------------------------------------------------------------------------------
 
   override def descriptors(): util.List[Descriptor] = {

Reply via email to