Repository: flink
Updated Branches:
  refs/heads/master 1829819b6 -> bbc5e29c8


http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 78dedf4..523eafe 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -26,10 +28,11 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 
 public class JsonRowSerializationSchemaTest {
+
        @Test
        public void testRowSerialization() throws IOException {
                String[] fieldNames = new String[] {"f1", "f2", "f3"};
-               Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, 
String.class};
+               TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { 
Types.INT(), Types.BOOLEAN(), Types.STRING() };
                Row row = new Row(3);
                row.setField(0, 1);
                row.setField(1, true);
@@ -42,14 +45,17 @@ public class JsonRowSerializationSchemaTest {
        @Test
        public void testSerializationOfTwoRows() throws IOException {
                String[] fieldNames = new String[] {"f1", "f2", "f3"};
-               Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, 
String.class};
+               TypeInformation<Row> row = Types.ROW(
+                       fieldNames,
+                       new TypeInformation<?>[] { Types.INT(), 
Types.BOOLEAN(), Types.STRING() }
+               );
                Row row1 = new Row(3);
                row1.setField(0, 1);
                row1.setField(1, true);
                row1.setField(2, "str");
 
                JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(fieldNames, fieldTypes);
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(row);
 
                byte[] bytes = serializationSchema.serialize(row1);
                assertEqualRows(row1, deserializationSchema.deserialize(bytes));
@@ -78,9 +84,9 @@ public class JsonRowSerializationSchemaTest {
                serializationSchema.serialize(row);
        }
 
-       private Row serializeAndDeserialize(String[] fieldNames, Class[] 
fieldTypes, Row row) throws IOException {
+       private Row serializeAndDeserialize(String[] fieldNames, 
TypeInformation<?>[] fieldTypes, Row row) throws IOException {
                JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(fieldNames, fieldTypes);
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(Types.ROW(fieldNames, fieldTypes));
 
                byte[] bytes = serializationSchema.serialize(row);
                return deserializationSchema.deserialize(bytes);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index cc1c166..e0e8f84 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -18,10 +18,10 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -43,7 +43,7 @@ public abstract class KafkaTableSinkTestBase {
 
        private static final String TOPIC = "testTopic";
        protected static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
-       private static final TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+       private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] { Types.INT(), Types.STRING() };
        private static final KafkaPartitioner<Row> PARTITIONER = new 
CustomPartitioner();
        private static final Properties PROPERTIES = createSinkProperties();
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index ad51993..341df45 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -19,8 +19,12 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
@@ -34,7 +38,7 @@ import static org.mockito.Mockito.verify;
 public abstract class KafkaTableSourceTestBase {
 
        private static final String TOPIC = "testTopic";
-       private static final String[] FIELD_NAMES = new String[] { "long", 
"string", "boolean", "double", "missing-field" };
+       private static final String[] FIELD_NAMES = new String[] { "mylong", 
"mystring", "myboolean", "mydouble", "missingField" };
        private static final TypeInformation<?>[] FIELD_TYPES = new 
TypeInformation<?>[] {
                BasicTypeInfo.LONG_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,
@@ -43,6 +47,33 @@ public abstract class KafkaTableSourceTestBase {
                BasicTypeInfo.LONG_TYPE_INFO };
        private static final Properties PROPERTIES = createSourceProperties();
 
+       // Avro record that matches above schema
+       public static class AvroSpecificRecord extends SpecificRecordBase {
+
+               public static Schema SCHEMA$ = 
AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
+
+               public Long mylong;
+               public String mystring;
+               public Boolean myboolean;
+               public Double mydouble;
+               public Long missingField;
+
+               @Override
+               public Schema getSchema() {
+                       return null;
+               }
+
+               @Override
+               public Object get(int field) {
+                       return null;
+               }
+
+               @Override
+               public void put(int field, Object value) {
+
+               }
+       }
+
        @Test
        public void testKafkaTableSource() {
                KafkaTableSource kafkaTableSource = spy(createTableSource());
@@ -57,15 +88,14 @@ public abstract class KafkaTableSourceTestBase {
                        any(getDeserializationSchema()));
        }
 
-       protected abstract KafkaTableSource createTableSource(String topic, 
Properties properties,
-                       String[] fieldNames, TypeInformation<?>[] typeInfo);
+       protected abstract KafkaTableSource createTableSource(String topic, 
Properties properties, TypeInformation<Row> typeInfo);
 
        protected abstract Class<DeserializationSchema<Row>> 
getDeserializationSchema();
 
        protected abstract Class<FlinkKafkaConsumerBase<Row>> 
getFlinkKafkaConsumer();
 
        private KafkaTableSource createTableSource() {
-               return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, 
FIELD_TYPES);
+               return createTableSource(TOPIC, PROPERTIES, 
Types.ROW(FIELD_NAMES, FIELD_TYPES));
        }
 
        private static Properties createSourceProperties() {

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
new file mode 100644
index 0000000..075b79b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.io.avro.generated.Address;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+/**
+ * Utilities for creating Avro Schemas.
+ */
+public final class AvroTestUtils {
+
+       private static String NAMESPACE = 
"org.apache.flink.streaming.connectors.kafka";
+
+       /**
+        * Creates a flat Avro Schema for testing.
+        */
+       public static Schema createFlatAvroSchema(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+               final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = 
SchemaBuilder
+                       .record("BasicAvroRecord")
+                       .namespace(NAMESPACE)
+                       .fields();
+
+               final Schema nullSchema = Schema.create(Schema.Type.NULL);
+
+               for (int i = 0; i < fieldNames.length; i++) {
+                       Schema schema = 
ReflectData.get().getSchema(fieldTypes[i].getTypeClass());
+                       Schema unionSchema = 
Schema.createUnion(Arrays.asList(nullSchema, schema));
+                       
fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault();
+               }
+
+               return fieldAssembler.endRecord();
+       }
+
+       /**
+        * Tests a simple Avro data types without nesting.
+        */
+       public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> getSimpleTestData() {
+               final Address addr = Address.newBuilder()
+                       .setNum(42)
+                       .setStreet("Main Street 42")
+                       .setCity("Test City")
+                       .setState("Test State")
+                       .setZip("12345")
+                       .build();
+
+               final Row rowAddr = new Row(5);
+               rowAddr.setField(0, 42);
+               rowAddr.setField(1, "Main Street 42");
+               rowAddr.setField(2, "Test City");
+               rowAddr.setField(3, "Test State");
+               rowAddr.setField(4, "12345");
+
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> t = new Tuple3<>();
+               t.f0 = Address.class;
+               t.f1 = addr;
+               t.f2 = rowAddr;
+
+               return t;
+       }
+
+       /**
+        * Tests all Avro data types as well as nested types.
+        */
+       public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> getComplexTestData() {
+               final Address addr = Address.newBuilder()
+                       .setNum(42)
+                       .setStreet("Main Street 42")
+                       .setCity("Test City")
+                       .setState("Test State")
+                       .setZip("12345")
+                       .build();
+
+               final Row rowAddr = new Row(5);
+               rowAddr.setField(0, 42);
+               rowAddr.setField(1, "Main Street 42");
+               rowAddr.setField(2, "Test City");
+               rowAddr.setField(3, "Test State");
+               rowAddr.setField(4, "12345");
+
+               final User user = User.newBuilder()
+                       .setName("Charlie")
+                       .setFavoriteNumber(null)
+                       .setFavoriteColor("blue")
+                       .setTypeLongTest(1337L)
+                       .setTypeDoubleTest(1.337d)
+                       .setTypeNullTest(null)
+                       .setTypeBoolTest(false)
+                       .setTypeArrayString(new ArrayList<CharSequence>())
+                       .setTypeArrayBoolean(new ArrayList<Boolean>())
+                       .setTypeNullableArray(null)
+                       .setTypeEnum(Colors.RED)
+                       .setTypeMap(new HashMap<CharSequence, Long>())
+                       .setTypeFixed(null)
+                       .setTypeUnion(null)
+                       .setTypeNested(addr)
+                       .build();
+
+               final Row rowUser = new Row(15);
+               rowUser.setField(0, "Charlie");
+               rowUser.setField(1, null);
+               rowUser.setField(2, "blue");
+               rowUser.setField(3, 1337L);
+               rowUser.setField(4, 1.337d);
+               rowUser.setField(5, null);
+               rowUser.setField(6, false);
+               rowUser.setField(7, new ArrayList<CharSequence>());
+               rowUser.setField(8, new ArrayList<Boolean>());
+               rowUser.setField(9, null);
+               rowUser.setField(10, Colors.RED);
+               rowUser.setField(11, new HashMap<CharSequence, Long>());
+               rowUser.setField(12, null);
+               rowUser.setField(13, null);
+               rowUser.setField(14, rowAddr);
+
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> t = new Tuple3<>();
+               t.f0 = User.class;
+               t.f1 = user;
+               t.f2 = rowUser;
+
+               return t;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index dcbb645..e19cdd8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -48,7 +48,7 @@ public class Types {
 
        /**
         * Generates a RowTypeInfo with fields of the given types.
-        * The fields have the default names (f1, f2 ..).
+        * The fields have the default names (f0, f1, f2 ..).
         * 
         * <p>This method is a shortcut to {@code new RowTypeInfo(types)}.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index a1b348a..1d4b8a1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -267,6 +267,13 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
                return bld.toString();
        }
 
+       /**
+        * Returns the field types of the row. The order matches the order of 
the field names.
+        */
+       public TypeInformation<?>[] getFieldTypes() {
+               return types;
+       }
+
        private boolean hasDuplicateFieldNames(String[] fieldNames) {
                HashSet<String> names = new HashSet<>();
                for (String field : fieldNames) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
index d82b990..f22fa32 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -17,13 +17,17 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{Types => JTypes, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
TypeInformation, Types => JTypes}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.types.Row
+
+import _root_.scala.annotation.varargs
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types extends JTypes {
+object Types {
 
   val STRING = JTypes.STRING
   val BOOLEAN = JTypes.BOOLEAN
@@ -43,24 +47,57 @@ object Types extends JTypes {
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
   /**
-    * Generates RowTypeInfo with default names (f1, f2 ..).
-    * same as ``new RowTypeInfo(types)``
+    * Generates row type information.
+    *
+    * A row type consists of zero or more fields with a field name and a 
corresponding type.
     *
-    * @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+    * The fields have the default names (f0, f1, f2 ..).
+    *
+    * @param types types of row fields; e.g. Types.STRING, Types.INT
     */
-  def ROW[T: Manifest](types: TypeInformation[_]*) = {
+  @varargs
+  def ROW(types: TypeInformation[_]*): TypeInformation[Row] = {
     JTypes.ROW(types: _*)
   }
 
   /**
-    * Generates RowTypeInfo.
-    * same as ``new RowTypeInfo(types, names)``
+    * Generates row type information.
+    *
+    * A row type consists of zero or more fields with a field name and a 
corresponding type.
     *
-    * @param fields of Row. e.g. ROW(("name", Types.STRING), ("number", 
Types.INT))
+    * @param names names of row fields, e.g. "userid", "name"
+    * @param types types of row fields; e.g. Types.STRING, Types.INT
     */
-  def ROW_NAMED(fields: (String, TypeInformation[_])*) = {
-    val names = fields.toList.map(_._1).toArray
-    val types = fields.toList.map(_._2)
+  def ROW(names: Array[String], types: Array[TypeInformation[_]]): 
TypeInformation[Row] = {
     JTypes.ROW_NAMED(names, types: _*)
   }
+
+  /**
+    * Generates type information for an array consisting of Java primitive 
elements.
+    *
+    * @param elementType type of the array elements; e.g. Types.INT
+    */
+  def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+    elementType match {
+      case BOOLEAN =>  PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+      case BYTE =>  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+      case SHORT =>  PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+      case INT =>  PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+      case LONG =>  PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+      case FLOAT =>  PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+      case DOUBLE =>  PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+      case _ =>
+        throw new TableException(s"$elementType cannot be an element of a 
primitive array." +
+            s"Only Java primitive types are supported.")
+    }
+  }
+
+  /**
+    * Generates type information for an array consisting of Java object 
elements.
+    *
+    * @param elementType type of the array elements; e.g. Types.STRING or 
Types.INT
+    */
+  def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+    ObjectArrayTypeInfo.getInfoFor(elementType)
+  }
 }

Reply via email to