Repository: flink Updated Branches: refs/heads/master 1829819b6 -> bbc5e29c8
http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java index 78dedf4..523eafe 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -16,6 +16,8 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; import org.apache.flink.types.Row; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; @@ -26,10 +28,11 @@ import java.io.IOException; import static org.junit.Assert.assertEquals; public class JsonRowSerializationSchemaTest { + @Test public void testRowSerialization() throws IOException { String[] fieldNames = new String[] {"f1", "f2", "f3"}; - Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; + TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() }; Row row = new Row(3); row.setField(0, 1); row.setField(1, true); @@ -42,14 +45,17 @@ public class JsonRowSerializationSchemaTest { @Test public void testSerializationOfTwoRows() throws IOException { String[] fieldNames = new String[] {"f1", "f2", "f3"}; - Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; + TypeInformation<Row> row = Types.ROW( + fieldNames, + new TypeInformation<?>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() } + ); Row row1 = new Row(3); row1.setField(0, 1); row1.setField(1, true); row1.setField(2, "str"); JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(row); byte[] bytes = serializationSchema.serialize(row1); assertEqualRows(row1, deserializationSchema.deserialize(bytes)); @@ -78,9 +84,9 @@ public class JsonRowSerializationSchemaTest { serializationSchema.serialize(row); } - private Row serializeAndDeserialize(String[] fieldNames, Class[] fieldTypes, Row row) throws IOException { + private Row serializeAndDeserialize(String[] fieldNames, TypeInformation<?>[] fieldTypes, Row row) throws IOException { JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(Types.ROW(fieldNames, fieldTypes)); byte[] bytes = serializationSchema.serialize(row); return deserializationSchema.deserialize(bytes); http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index cc1c166..e0e8f84 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -43,7 +43,7 @@ public abstract class KafkaTableSinkTestBase { private static final String TOPIC = "testTopic"; protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; - private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() }; private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner(); private static final Properties PROPERTIES = createSinkProperties(); @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java index ad51993..341df45 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java @@ -19,8 +19,12 @@ package org.apache.flink.streaming.connectors.kafka; import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils; +import org.apache.flink.table.api.Types; import org.apache.flink.types.Row; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.serialization.DeserializationSchema; @@ -34,7 +38,7 @@ import static org.mockito.Mockito.verify; public abstract class KafkaTableSourceTestBase { private static final String TOPIC = "testTopic"; - private static final String[] FIELD_NAMES = new String[] { "long", "string", "boolean", "double", "missing-field" }; + private static final String[] FIELD_NAMES = new String[] { "mylong", "mystring", "myboolean", "mydouble", "missingField" }; private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] { BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, @@ -43,6 +47,33 @@ public abstract class KafkaTableSourceTestBase { BasicTypeInfo.LONG_TYPE_INFO }; private static final Properties PROPERTIES = createSourceProperties(); + // Avro record that matches above schema + public static class AvroSpecificRecord extends SpecificRecordBase { + + public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES); + + public Long mylong; + public String mystring; + public Boolean myboolean; + public Double mydouble; + public Long missingField; + + @Override + public Schema getSchema() { + return null; + } + + @Override + public Object get(int field) { + return null; + } + + @Override + public void put(int field, Object value) { + + } + } + @Test public void testKafkaTableSource() { KafkaTableSource kafkaTableSource = spy(createTableSource()); @@ -57,15 +88,14 @@ public abstract class KafkaTableSourceTestBase { any(getDeserializationSchema())); } - protected abstract KafkaTableSource createTableSource(String topic, Properties properties, - String[] fieldNames, TypeInformation<?>[] typeInfo); + protected abstract KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo); protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema(); protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer(); private KafkaTableSource createTableSource() { - return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, FIELD_TYPES); + return createTableSource(TOPIC, PROPERTIES, Types.ROW(FIELD_NAMES, FIELD_TYPES)); } private static Properties createSourceProperties() { http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java new file mode 100644 index 0000000..075b79b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.io.avro.generated.Address; +import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +/** + * Utilities for creating Avro Schemas. + */ +public final class AvroTestUtils { + + private static String NAMESPACE = "org.apache.flink.streaming.connectors.kafka"; + + /** + * Creates a flat Avro Schema for testing. + */ + public static Schema createFlatAvroSchema(String[] fieldNames, TypeInformation[] fieldTypes) { + final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder + .record("BasicAvroRecord") + .namespace(NAMESPACE) + .fields(); + + final Schema nullSchema = Schema.create(Schema.Type.NULL); + + for (int i = 0; i < fieldNames.length; i++) { + Schema schema = ReflectData.get().getSchema(fieldTypes[i].getTypeClass()); + Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, schema)); + fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault(); + } + + return fieldAssembler.endRecord(); + } + + /** + * Tests a simple Avro data types without nesting. + */ + public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSimpleTestData() { + final Address addr = Address.newBuilder() + .setNum(42) + .setStreet("Main Street 42") + .setCity("Test City") + .setState("Test State") + .setZip("12345") + .build(); + + final Row rowAddr = new Row(5); + rowAddr.setField(0, 42); + rowAddr.setField(1, "Main Street 42"); + rowAddr.setField(2, "Test City"); + rowAddr.setField(3, "Test State"); + rowAddr.setField(4, "12345"); + + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>(); + t.f0 = Address.class; + t.f1 = addr; + t.f2 = rowAddr; + + return t; + } + + /** + * Tests all Avro data types as well as nested types. + */ + public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getComplexTestData() { + final Address addr = Address.newBuilder() + .setNum(42) + .setStreet("Main Street 42") + .setCity("Test City") + .setState("Test State") + .setZip("12345") + .build(); + + final Row rowAddr = new Row(5); + rowAddr.setField(0, 42); + rowAddr.setField(1, "Main Street 42"); + rowAddr.setField(2, "Test City"); + rowAddr.setField(3, "Test State"); + rowAddr.setField(4, "12345"); + + final User user = User.newBuilder() + .setName("Charlie") + .setFavoriteNumber(null) + .setFavoriteColor("blue") + .setTypeLongTest(1337L) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeBoolTest(false) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(null) + .setTypeUnion(null) + .setTypeNested(addr) + .build(); + + final Row rowUser = new Row(15); + rowUser.setField(0, "Charlie"); + rowUser.setField(1, null); + rowUser.setField(2, "blue"); + rowUser.setField(3, 1337L); + rowUser.setField(4, 1.337d); + rowUser.setField(5, null); + rowUser.setField(6, false); + rowUser.setField(7, new ArrayList<CharSequence>()); + rowUser.setField(8, new ArrayList<Boolean>()); + rowUser.setField(9, null); + rowUser.setField(10, Colors.RED); + rowUser.setField(11, new HashMap<CharSequence, Long>()); + rowUser.setField(12, null); + rowUser.setField(13, null); + rowUser.setField(14, rowAddr); + + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>(); + t.f0 = User.class; + t.f1 = user; + t.f2 = rowUser; + + return t; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java index dcbb645..e19cdd8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java @@ -48,7 +48,7 @@ public class Types { /** * Generates a RowTypeInfo with fields of the given types. - * The fields have the default names (f1, f2 ..). + * The fields have the default names (f0, f1, f2 ..). * * <p>This method is a shortcut to {@code new RowTypeInfo(types)}. * http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java index a1b348a..1d4b8a1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java @@ -267,6 +267,13 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> { return bld.toString(); } + /** + * Returns the field types of the row. The order matches the order of the field names. + */ + public TypeInformation<?>[] getFieldTypes() { + return types; + } + private boolean hasDuplicateFieldNames(String[] fieldNames) { HashSet<String> names = new HashSet<>(); for (String field : fieldNames) { http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala index d82b990..f22fa32 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala @@ -17,13 +17,17 @@ */ package org.apache.flink.table.api -import org.apache.flink.api.common.typeinfo.{Types => JTypes, TypeInformation} +import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes} +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.types.Row + +import _root_.scala.annotation.varargs /** * This class enumerates all supported types of the Table API. */ -object Types extends JTypes { +object Types { val STRING = JTypes.STRING val BOOLEAN = JTypes.BOOLEAN @@ -43,24 +47,57 @@ object Types extends JTypes { val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS /** - * Generates RowTypeInfo with default names (f1, f2 ..). - * same as ``new RowTypeInfo(types)`` + * Generates row type information. + * + * A row type consists of zero or more fields with a field name and a corresponding type. * - * @param types of Row fields. e.g. ROW(Types.STRING, Types.INT) + * The fields have the default names (f0, f1, f2 ..). + * + * @param types types of row fields; e.g. Types.STRING, Types.INT */ - def ROW[T: Manifest](types: TypeInformation[_]*) = { + @varargs + def ROW(types: TypeInformation[_]*): TypeInformation[Row] = { JTypes.ROW(types: _*) } /** - * Generates RowTypeInfo. - * same as ``new RowTypeInfo(types, names)`` + * Generates row type information. + * + * A row type consists of zero or more fields with a field name and a corresponding type. * - * @param fields of Row. e.g. ROW(("name", Types.STRING), ("number", Types.INT)) + * @param names names of row fields, e.g. "userid", "name" + * @param types types of row fields; e.g. Types.STRING, Types.INT */ - def ROW_NAMED(fields: (String, TypeInformation[_])*) = { - val names = fields.toList.map(_._1).toArray - val types = fields.toList.map(_._2) + def ROW(names: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = { JTypes.ROW_NAMED(names, types: _*) } + + /** + * Generates type information for an array consisting of Java primitive elements. + * + * @param elementType type of the array elements; e.g. Types.INT + */ + def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = { + elementType match { + case BOOLEAN => PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO + case BYTE => PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO + case SHORT => PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO + case INT => PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO + case LONG => PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO + case FLOAT => PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO + case DOUBLE => PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO + case _ => + throw new TableException(s"$elementType cannot be an element of a primitive array." + + s"Only Java primitive types are supported.") + } + } + + /** + * Generates type information for an array consisting of Java object elements. + * + * @param elementType type of the array elements; e.g. Types.STRING or Types.INT + */ + def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = { + ObjectArrayTypeInfo.getInfoFor(elementType) + } }
