[FLINK-1391] Add support for using Avro-POJOs and Avro types with Kryo
Conflicts:
flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
Conflicts:
flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84c49981
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84c49981
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84c49981
Branch: refs/heads/release-0.8
Commit: 84c4998125b175ee524cec4292ab29060784861c
Parents: 02b6f85
Author: Robert Metzger <[email protected]>
Authored: Mon Jan 12 21:11:09 2015 +0100
Committer: Robert Metzger <[email protected]>
Committed: Mon Feb 9 14:48:34 2015 +0100
----------------------------------------------------------------------
docs/example_connectors.md | 27 +
docs/programming_guide.md | 2 +-
flink-addons/flink-avro/pom.xml | 30 +-
.../flink/api/avro/EncoderDecoderTest.java | 9 +-
.../org/apache/flink/api/io/avro/.gitignore | 1 +
.../apache/flink/api/io/avro/AvroPojoTest.java | 157 ++++
.../api/io/avro/AvroRecordInputFormatTest.java | 144 +++-
.../flink/api/io/avro/generated/Colors.java | 32 -
.../flink/api/io/avro/generated/User.java | 755 -------------------
.../src/test/resources/avro/user.avsc | 10 +-
.../common/typeutils/ComparatorTestBase.java | 4 +-
flink-java/pom.xml | 9 +-
.../flink/api/java/ExecutionEnvironment.java | 2 +-
.../flink/api/java/typeutils/AvroTypeInfo.java | 72 ++
.../flink/api/java/typeutils/TypeExtractor.java | 8 +-
.../java/typeutils/runtime/KryoSerializer.java | 110 ++-
.../java/typeutils/runtime/PojoComparator.java | 7 +-
pom.xml | 4 +-
tools/maven/suppressions.xml | 1 +
19 files changed, 551 insertions(+), 833 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/docs/example_connectors.md
----------------------------------------------------------------------
diff --git a/docs/example_connectors.md b/docs/example_connectors.md
index 28f35bb..fcd65af 100644
--- a/docs/example_connectors.md
+++ b/docs/example_connectors.md
@@ -69,6 +69,33 @@ users to use all existing Hadoop input formats with Flink.
This section shows some examples for connecting Flink to other systems.
[Read more about Hadoop compatibility in Flink](hadoop_compatibility.html).
+## Avro support in Flink
+
+Flink has extensive build-in support for [Apache
Avro](http://avro.apache.org/). This allows to easily read from Avro files with
Flink.
+Also, the serialization framework of Flink is able to handle classes generated
from Avro schemas.
+
+In order to read data from an Avro file, you have to specify an
`AvroInputFormat`.
+
+**Example**:
+
+~~~java
+AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataSet<User> usersDS = env.createInput(users);
+~~~
+
+Note that `User` is a POJO generated by Avro. Flink also allows to perform
string-based key selection of these POJOs. For example:
+
+~~~java
+usersDS.groupBy("name")
+~~~
+
+
+Note that using the `GenericData.Record` type is possible with Flink, but not
recommended. Since the record contains the full schema, its very data intensive
and thus probably slow to use.
+
+Flink's POJO field selection also works with POJOs generated from Avro.
However, the usage is only possible if the field types are written correctly to
the generated class. If a field is of type `Object` you can not use the field
as a join or grouping key.
+Specifying a field in Avro like this `{"name": "type_double_test", "type":
"double"},` works fine, however specifying it as a UNION-type with only one
field (`{"name": "type_double_test", "type": ["double"]},`) will generate a
field of type `Object`. Note that specifying nullable types (`{"name":
"type_double_test", "type": ["null", "double"]},`) is possible!
+
+
### Access Microsoft Azure Table Storage
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 7a2211d..f41fc97 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -982,7 +982,7 @@ These are valid expressions for the example POJO above:
- `complex.word.f2`: Selects the last field in the Tuple3.
- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key.
-Please note that you can only use types inside POJOs that Flink is able to
serialize. Currently, we are using [Avro](http://avro.apache.org) to serialize
arbitrary objects (such as `Date`).
+Please note that you can only use types inside POJOs that Flink is able to
serialize. Currently, we are using
[Kryo](https://github.com/EsotericSoftware/kryo) to serialize arbitrary objects
(such as `java.util.Collection`).
### Define key using a Key Selector Function
{:.no_toc}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
index a0d8d9d..dfc9105 100644
--- a/flink-addons/flink-avro/pom.xml
+++ b/flink-addons/flink-avro/pom.xml
@@ -41,7 +41,8 @@ under the License.
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
</dependency>
-
+
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
@@ -53,6 +54,14 @@ under the License.
<artifactId>avro</artifactId>
<!-- version is derived from base module -->
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -116,7 +125,24 @@ under the License.
</execution>
</executions>
</plugin>
-
+ <!-- Generate Test class from avro schema -->
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.7.7</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+
<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<pluginManagement>
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index 0724457..8f14cb3 100644
---
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -29,9 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.Fixed16;
import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.util.StringUtils;
import org.junit.Test;
@@ -232,8 +234,11 @@ public class EncoderDecoderTest {
map.put("1", 1L);
map.put("2", 2L);
map.put("3", 3L);
-
- User user = new User("Freudenreich", 1337, "macintosh gray",
1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map);
+
+ byte[] b = new byte[16];
+ new Random().nextBytes(b);
+ Fixed16 f = new Fixed16(b);
+ User user = new User("Freudenreich", 1337, "macintosh gray",
1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map, f,
new Boolean(true));
testObjectSerialization(user);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
new file mode 100644
index 0000000..dc9b237
--- /dev/null
+++
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
@@ -0,0 +1 @@
+generated
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
new file mode 100644
index 0000000..6ff4836
--- /dev/null
+++
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.io.avro;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+
+@RunWith(Parameterized.class)
+public class AvroPojoTest extends MultipleProgramsTestBase {
+ public AvroPojoTest(ExecutionMode mode) {
+ super(mode);
+ }
+
+ private File inFile;
+ private String resultPath;
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ inFile = tempFolder.newFile();
+ AvroRecordInputFormatTest.writeTestFile(inFile);
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ @Test
+ public void testSimpleAvroRead() throws Exception {
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in,
User.class);
+ DataSet<User> usersDS = env.createInput(users)
+ // null map type because the order changes in
different JVMs (hard to test)
+ .map(new MapFunction<User, User>() {
+ @Override
+ public User map(User value) throws Exception {
+ value.setTypeMap(null);
+ return value;
+ }
+ });
+
+ usersDS.writeAsText(resultPath);
+
+ env.execute("Simple Avro read job");
+
+
+ expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256,
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\":
123.45, \"type_null_test\": null, \"type_bool_test\": true,
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\":
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\",
\"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" +
+ "{\"name\": \"Charlie\", \"favorite_number\":
null, \"favorite_color\": \"blue\", \"type_long_test\": 1337,
\"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\":
false, \"type_array_string\": [], \"type_array_boolean\": [],
\"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null,
\"type_fixed\": null, \"type_union\": null}\n";
+ }
+
+ @Test
+ public void testKeySelection() throws Exception {
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in,
User.class);
+ DataSet<User> usersDS = env.createInput(users);
+
+ DataSet<Tuple2<String, Integer>> res =
usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User,
Tuple2<String, Integer>>() {
+ @Override
+ public void reduce(Iterable<User> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
+ for(User u : values) {
+ out.collect(new Tuple2<String,
Integer>(u.getName().toString(), 1));
+ }
+ }
+ });
+
+ res.writeAsText(resultPath);
+ env.execute("Avro Key selection");
+
+
+ expected = "(Alyssa,1)\n(Charlie,1)\n";
+ }
+
+ /**
+ * Test some know fields for grouping on
+ */
+ @Test
+ public void testAllFields() throws Exception {
+ for(String fieldName : Arrays.asList("name", "type_enum",
"type_double_test")) {
+ testField(fieldName);
+ }
+ }
+
+ private void testField(final String fieldName) throws Exception {
+ before();
+
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in,
User.class);
+ DataSet<User> usersDS = env.createInput(users);
+
+ DataSet<Object> res =
usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
+ @Override
+ public void reduce(Iterable<User> values,
Collector<Object> out) throws Exception {
+ for(User u : values) {
+ out.collect(u.get(fieldName));
+ }
+ }
+ });
+ res.writeAsText(resultPath);
+ env.execute("Simple Avro read job");
+ if(fieldName.equals("name")) {
+ expected = "Alyssa\nCharlie";
+ } else if(fieldName.equals("type_enum")) {
+ expected = "GREEN\nRED\n";
+ } else if(fieldName.equals("type_double_test")) {
+ expected = "123.45\n1.337\n";
+ } else {
+ Assert.fail("Unknown field");
+ }
+
+ after();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index d8d8b46..1ec4a8a 100644
---
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -27,13 +27,24 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.io.avro.generated.Colors;
import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -48,7 +59,7 @@ import org.junit.Test;
*/
public class AvroRecordInputFormatTest {
- private File testFile;
+ public File testFile;
final static String TEST_NAME = "Alyssa";
@@ -65,24 +76,25 @@ public class AvroRecordInputFormatTest {
final static String TEST_MAP_KEY2 = "KEY 2";
final static long TEST_MAP_VALUE2 = 17554L;
- @Before
- public void createFiles() throws IOException {
- testFile = File.createTempFile("AvroInputFormatTest", null);
-
+ private Schema userSchema = new User().getSchema();
+
+
+ public static void writeTestFile(File testFile) throws IOException {
ArrayList<CharSequence> stringArray = new
ArrayList<CharSequence>();
stringArray.add(TEST_ARRAY_STRING_1);
stringArray.add(TEST_ARRAY_STRING_2);
-
+
ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
booleanArray.add(TEST_ARRAY_BOOLEAN_1);
booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-
+
HashMap<CharSequence, Long> longMap = new HashMap<CharSequence,
Long>();
longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-
-
+
+
User user1 = new User();
+
user1.setName(TEST_NAME);
user1.setFavoriteNumber(256);
user1.setTypeDoubleTest(123.45d);
@@ -91,22 +103,24 @@ public class AvroRecordInputFormatTest {
user1.setTypeArrayBoolean(booleanArray);
user1.setTypeEnum(TEST_ENUM_COLOR);
user1.setTypeMap(longMap);
-
+
// Construct via builder
User user2 = User.newBuilder()
- .setName("Charlie")
- .setFavoriteColor("blue")
- .setFavoriteNumber(null)
- .setTypeBoolTest(false)
- .setTypeDoubleTest(1.337d)
- .setTypeNullTest(null)
- .setTypeLongTest(1337L)
- .setTypeArrayString(new ArrayList<CharSequence>())
- .setTypeArrayBoolean(new ArrayList<Boolean>())
- .setTypeNullableArray(null)
- .setTypeEnum(Colors.RED)
- .setTypeMap(new HashMap<CharSequence, Long>())
- .build();
+ .setName("Charlie")
+ .setFavoriteColor("blue")
+ .setFavoriteNumber(null)
+ .setTypeBoolTest(false)
+ .setTypeDoubleTest(1.337d)
+ .setTypeNullTest(null)
+ .setTypeLongTest(1337L)
+ .setTypeArrayString(new
ArrayList<CharSequence>())
+ .setTypeArrayBoolean(new ArrayList<Boolean>())
+ .setTypeNullableArray(null)
+ .setTypeEnum(Colors.RED)
+ .setTypeMap(new HashMap<CharSequence, Long>())
+ .setTypeFixed(null)
+ .setTypeUnion(null)
+ .build();
DatumWriter<User> userDatumWriter = new
SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new
DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), testFile);
@@ -114,7 +128,17 @@ public class AvroRecordInputFormatTest {
dataFileWriter.append(user2);
dataFileWriter.close();
}
-
+ @Before
+ public void createFiles() throws IOException {
+ testFile = File.createTempFile("AvroInputFormatTest", null);
+ writeTestFile(testFile);
+ }
+
+
+ /**
+ * Test if the AvroInputFormat is able to properly read data from an
avro file.
+ * @throws IOException
+ */
@Test
public void testDeserialisation() throws IOException {
Configuration parameters = new Configuration();
@@ -159,9 +183,79 @@ public class AvroRecordInputFormatTest {
format.close();
}
-
+
+ /**
+ * Test if the Flink serialization is able to properly process
GenericData.Record types.
+ * Usually users of Avro generate classes (POJOs) from Avro schemas.
+ * However, if generated classes are not available, one can also use
GenericData.Record.
+ * It is an untyped key-value record which is using a schema to
validate the correctness of the data.
+ *
+ * It is not recommended to use GenericData.Record with Flink. Use
generated POJOs instead.
+ */
+ @Test
+ public void testDeserializeToGenericType() throws IOException {
+ DatumReader<GenericData.Record> datumReader = new
GenericDatumReader<GenericData.Record>(userSchema);
+
+ FileReader<GenericData.Record> dataFileReader =
DataFileReader.openReader(testFile, datumReader);
+ // initialize Record by reading it from disk (thats easier than
creating it by hand)
+ GenericData.Record rec = new GenericData.Record(userSchema);
+ dataFileReader.next(rec);
+ // check if record has been read correctly
+ assertNotNull(rec);
+ assertEquals("name not equal", TEST_NAME,
rec.get("name").toString() );
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(),
rec.get("type_enum").toString());
+ assertEquals(null, rec.get("type_long_test")); // it is null
for the first record.
+
+ // now serialize it with our framework:
+ TypeInformation<GenericData.Record> te =
(TypeInformation<GenericData.Record>)
TypeExtractor.createTypeInfo(GenericData.Record.class);
+ TypeSerializer<GenericData.Record> tser = te.createSerializer();
+ ComparatorTestBase.TestOutputView target = new
ComparatorTestBase.TestOutputView();
+ tser.serialize(rec, target);
+
+ GenericData.Record newRec =
tser.deserialize(target.getInputView());
+
+ // check if it is still the same
+ assertNotNull(newRec);
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(),
newRec.get("type_enum").toString());
+ assertEquals("name not equal", TEST_NAME,
newRec.get("name").toString() );
+ assertEquals(null, newRec.get("type_long_test"));
+
+ }
+
+ /**
+ * This test validates proper serialization with specific (generated
POJO) types.
+ */
+ @Test
+ public void testDeserializeToSpecificType() throws IOException {
+
+ DatumReader<User> datumReader = new
SpecificDatumReader<User>(userSchema);
+
+ FileReader<User> dataFileReader =
DataFileReader.openReader(testFile, datumReader);
+ User rec = dataFileReader.next();
+
+ // check if record has been read correctly
+ assertNotNull(rec);
+ assertEquals("name not equal", TEST_NAME,
rec.get("name").toString() );
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(),
rec.get("type_enum").toString());
+
+ // now serialize it with our framework:
+ TypeInformation<User> te = (TypeInformation<User>)
TypeExtractor.createTypeInfo(User.class);
+ TypeSerializer<User> tser = te.createSerializer();
+ ComparatorTestBase.TestOutputView target = new
ComparatorTestBase.TestOutputView();
+ tser.serialize(rec, target);
+
+ User newRec = tser.deserialize(target.getInputView());
+
+ // check if it is still the same
+ assertNotNull(newRec);
+ assertEquals("name not equal", TEST_NAME,
newRec.getName().toString() );
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(),
newRec.getTypeEnum().toString() );
+ }
+
+
@After
public void deleteFiles() {
testFile.delete();
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
deleted file mode 100644
index 58e1f5c..0000000
---
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.io.avro.generated;
-@SuppressWarnings("all")
[email protected]
-public enum Colors {
- RED, GREEN, BLUE ;
- public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
deleted file mode 100644
index 505857e..0000000
---
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
+++ /dev/null
@@ -1,755 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.io.avro.generated;
-@SuppressWarnings("all")
[email protected]
-public class User extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type
_map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
- @Deprecated public java.lang.CharSequence name;
- @Deprecated public java.lang.Integer favorite_number;
- @Deprecated public java.lang.CharSequence favorite_color;
- @Deprecated public java.lang.Long type_long_test;
- @Deprecated public java.lang.Object type_double_test;
- @Deprecated public java.lang.Object type_null_test;
- @Deprecated public java.lang.Object type_bool_test;
- @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
- @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
- @Deprecated public java.util.List<java.lang.CharSequence>
type_nullable_array;
- @Deprecated public org.apache.flink.api.io.avro.generated.Colors type_enum;
- @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long>
type_map;
-
- /**
- * Default constructor. Note that this does not initialize fields
- * to their default values from the schema. If that is desired then
- * one should use {@link \#newBuilder()}.
- */
- public User() {}
-
- /**
- * All-args constructor.
- */
- public User(java.lang.CharSequence name, java.lang.Integer favorite_number,
java.lang.CharSequence favorite_color, java.lang.Long type_long_test,
java.lang.Object type_double_test, java.lang.Object type_null_test,
java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence>
type_array_string, java.util.List<java.lang.Boolean> type_array_boolean,
java.util.List<java.lang.CharSequence> type_nullable_array,
org.apache.flink.api.io.avro.generated.Colors type_enum,
java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
- this.name = name;
- this.favorite_number = favorite_number;
- this.favorite_color = favorite_color;
- this.type_long_test = type_long_test;
- this.type_double_test = type_double_test;
- this.type_null_test = type_null_test;
- this.type_bool_test = type_bool_test;
- this.type_array_string = type_array_string;
- this.type_array_boolean = type_array_boolean;
- this.type_nullable_array = type_nullable_array;
- this.type_enum = type_enum;
- this.type_map = type_map;
- }
-
- public org.apache.avro.Schema getSchema() { return SCHEMA$; }
- // Used by DatumWriter. Applications should not call.
- public java.lang.Object get(int field$) {
- switch (field$) {
- case 0: return name;
- case 1: return favorite_number;
- case 2: return favorite_color;
- case 3: return type_long_test;
- case 4: return type_double_test;
- case 5: return type_null_test;
- case 6: return type_bool_test;
- case 7: return type_array_string;
- case 8: return type_array_boolean;
- case 9: return type_nullable_array;
- case 10: return type_enum;
- case 11: return type_map;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
- // Used by DatumReader. Applications should not call.
- @SuppressWarnings(value="unchecked")
- public void put(int field$, java.lang.Object value$) {
- switch (field$) {
- case 0: name = (java.lang.CharSequence)value$; break;
- case 1: favorite_number = (java.lang.Integer)value$; break;
- case 2: favorite_color = (java.lang.CharSequence)value$; break;
- case 3: type_long_test = (java.lang.Long)value$; break;
- case 4: type_double_test = (java.lang.Object)value$; break;
- case 5: type_null_test = (java.lang.Object)value$; break;
- case 6: type_bool_test = (java.lang.Object)value$; break;
- case 7: type_array_string =
(java.util.List<java.lang.CharSequence>)value$; break;
- case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$;
break;
- case 9: type_nullable_array =
(java.util.List<java.lang.CharSequence>)value$; break;
- case 10: type_enum =
(org.apache.flink.api.io.avro.generated.Colors)value$; break;
- case 11: type_map =
(java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- /**
- * Gets the value of the 'name' field.
- */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /**
- * Sets the value of the 'name' field.
- * @param value the value to set.
- */
- public void setName(java.lang.CharSequence value) {
- this.name = value;
- }
-
- /**
- * Gets the value of the 'favorite_number' field.
- */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /**
- * Sets the value of the 'favorite_number' field.
- * @param value the value to set.
- */
- public void setFavoriteNumber(java.lang.Integer value) {
- this.favorite_number = value;
- }
-
- /**
- * Gets the value of the 'favorite_color' field.
- */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /**
- * Sets the value of the 'favorite_color' field.
- * @param value the value to set.
- */
- public void setFavoriteColor(java.lang.CharSequence value) {
- this.favorite_color = value;
- }
-
- /**
- * Gets the value of the 'type_long_test' field.
- */
- public java.lang.Long getTypeLongTest() {
- return type_long_test;
- }
-
- /**
- * Sets the value of the 'type_long_test' field.
- * @param value the value to set.
- */
- public void setTypeLongTest(java.lang.Long value) {
- this.type_long_test = value;
- }
-
- /**
- * Gets the value of the 'type_double_test' field.
- */
- public java.lang.Object getTypeDoubleTest() {
- return type_double_test;
- }
-
- /**
- * Sets the value of the 'type_double_test' field.
- * @param value the value to set.
- */
- public void setTypeDoubleTest(java.lang.Object value) {
- this.type_double_test = value;
- }
-
- /**
- * Gets the value of the 'type_null_test' field.
- */
- public java.lang.Object getTypeNullTest() {
- return type_null_test;
- }
-
- /**
- * Sets the value of the 'type_null_test' field.
- * @param value the value to set.
- */
- public void setTypeNullTest(java.lang.Object value) {
- this.type_null_test = value;
- }
-
- /**
- * Gets the value of the 'type_bool_test' field.
- */
- public java.lang.Object getTypeBoolTest() {
- return type_bool_test;
- }
-
- /**
- * Sets the value of the 'type_bool_test' field.
- * @param value the value to set.
- */
- public void setTypeBoolTest(java.lang.Object value) {
- this.type_bool_test = value;
- }
-
- /**
- * Gets the value of the 'type_array_string' field.
- */
- public java.util.List<java.lang.CharSequence> getTypeArrayString() {
- return type_array_string;
- }
-
- /**
- * Sets the value of the 'type_array_string' field.
- * @param value the value to set.
- */
- public void setTypeArrayString(java.util.List<java.lang.CharSequence> value)
{
- this.type_array_string = value;
- }
-
- /**
- * Gets the value of the 'type_array_boolean' field.
- */
- public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
- return type_array_boolean;
- }
-
- /**
- * Sets the value of the 'type_array_boolean' field.
- * @param value the value to set.
- */
- public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
- this.type_array_boolean = value;
- }
-
- /**
- * Gets the value of the 'type_nullable_array' field.
- */
- public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
- return type_nullable_array;
- }
-
- /**
- * Sets the value of the 'type_nullable_array' field.
- * @param value the value to set.
- */
- public void setTypeNullableArray(java.util.List<java.lang.CharSequence>
value) {
- this.type_nullable_array = value;
- }
-
- /**
- * Gets the value of the 'type_enum' field.
- */
- public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
- return type_enum;
- }
-
- /**
- * Sets the value of the 'type_enum' field.
- * @param value the value to set.
- */
- public void setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value)
{
- this.type_enum = value;
- }
-
- /**
- * Gets the value of the 'type_map' field.
- */
- public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
- return type_map;
- }
-
- /**
- * Sets the value of the 'type_map' field.
- * @param value the value to set.
- */
- public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long>
value) {
- this.type_map = value;
- }
-
- /** Creates a new User RecordBuilder */
- public static org.apache.flink.api.io.avro.generated.User.Builder
newBuilder() {
- return new org.apache.flink.api.io.avro.generated.User.Builder();
- }
-
- /** Creates a new User RecordBuilder by copying an existing Builder */
- public static org.apache.flink.api.io.avro.generated.User.Builder
newBuilder(org.apache.flink.api.io.avro.generated.User.Builder other) {
- return new org.apache.flink.api.io.avro.generated.User.Builder(other);
- }
-
- /** Creates a new User RecordBuilder by copying an existing User instance */
- public static org.apache.flink.api.io.avro.generated.User.Builder
newBuilder(org.apache.flink.api.io.avro.generated.User other) {
- return new org.apache.flink.api.io.avro.generated.User.Builder(other);
- }
-
- /**
- * RecordBuilder for User instances.
- */
- public static class Builder extends
org.apache.avro.specific.SpecificRecordBuilderBase<User>
- implements org.apache.avro.data.RecordBuilder<User> {
-
- private java.lang.CharSequence name;
- private java.lang.Integer favorite_number;
- private java.lang.CharSequence favorite_color;
- private java.lang.Long type_long_test;
- private java.lang.Object type_double_test;
- private java.lang.Object type_null_test;
- private java.lang.Object type_bool_test;
- private java.util.List<java.lang.CharSequence> type_array_string;
- private java.util.List<java.lang.Boolean> type_array_boolean;
- private java.util.List<java.lang.CharSequence> type_nullable_array;
- private org.apache.flink.api.io.avro.generated.Colors type_enum;
- private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
- /** Creates a new Builder */
- private Builder() {
- super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
- }
-
- /** Creates a Builder by copying an existing Builder */
- private Builder(org.apache.flink.api.io.avro.generated.User.Builder other)
{
- super(other);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(),
other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(),
other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- if (isValidValue(fields()[3], other.type_long_test)) {
- this.type_long_test = data().deepCopy(fields()[3].schema(),
other.type_long_test);
- fieldSetFlags()[3] = true;
- }
- if (isValidValue(fields()[4], other.type_double_test)) {
- this.type_double_test = data().deepCopy(fields()[4].schema(),
other.type_double_test);
- fieldSetFlags()[4] = true;
- }
- if (isValidValue(fields()[5], other.type_null_test)) {
- this.type_null_test = data().deepCopy(fields()[5].schema(),
other.type_null_test);
- fieldSetFlags()[5] = true;
- }
- if (isValidValue(fields()[6], other.type_bool_test)) {
- this.type_bool_test = data().deepCopy(fields()[6].schema(),
other.type_bool_test);
- fieldSetFlags()[6] = true;
- }
- if (isValidValue(fields()[7], other.type_array_string)) {
- this.type_array_string = data().deepCopy(fields()[7].schema(),
other.type_array_string);
- fieldSetFlags()[7] = true;
- }
- if (isValidValue(fields()[8], other.type_array_boolean)) {
- this.type_array_boolean = data().deepCopy(fields()[8].schema(),
other.type_array_boolean);
- fieldSetFlags()[8] = true;
- }
- if (isValidValue(fields()[9], other.type_nullable_array)) {
- this.type_nullable_array = data().deepCopy(fields()[9].schema(),
other.type_nullable_array);
- fieldSetFlags()[9] = true;
- }
- if (isValidValue(fields()[10], other.type_enum)) {
- this.type_enum = data().deepCopy(fields()[10].schema(),
other.type_enum);
- fieldSetFlags()[10] = true;
- }
- if (isValidValue(fields()[11], other.type_map)) {
- this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
- fieldSetFlags()[11] = true;
- }
- }
-
- /** Creates a Builder by copying an existing User instance */
- private Builder(org.apache.flink.api.io.avro.generated.User other) {
- super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(),
other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(),
other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- if (isValidValue(fields()[3], other.type_long_test)) {
- this.type_long_test = data().deepCopy(fields()[3].schema(),
other.type_long_test);
- fieldSetFlags()[3] = true;
- }
- if (isValidValue(fields()[4], other.type_double_test)) {
- this.type_double_test = data().deepCopy(fields()[4].schema(),
other.type_double_test);
- fieldSetFlags()[4] = true;
- }
- if (isValidValue(fields()[5], other.type_null_test)) {
- this.type_null_test = data().deepCopy(fields()[5].schema(),
other.type_null_test);
- fieldSetFlags()[5] = true;
- }
- if (isValidValue(fields()[6], other.type_bool_test)) {
- this.type_bool_test = data().deepCopy(fields()[6].schema(),
other.type_bool_test);
- fieldSetFlags()[6] = true;
- }
- if (isValidValue(fields()[7], other.type_array_string)) {
- this.type_array_string = data().deepCopy(fields()[7].schema(),
other.type_array_string);
- fieldSetFlags()[7] = true;
- }
- if (isValidValue(fields()[8], other.type_array_boolean)) {
- this.type_array_boolean = data().deepCopy(fields()[8].schema(),
other.type_array_boolean);
- fieldSetFlags()[8] = true;
- }
- if (isValidValue(fields()[9], other.type_nullable_array)) {
- this.type_nullable_array = data().deepCopy(fields()[9].schema(),
other.type_nullable_array);
- fieldSetFlags()[9] = true;
- }
- if (isValidValue(fields()[10], other.type_enum)) {
- this.type_enum = data().deepCopy(fields()[10].schema(),
other.type_enum);
- fieldSetFlags()[10] = true;
- }
- if (isValidValue(fields()[11], other.type_map)) {
- this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
- fieldSetFlags()[11] = true;
- }
- }
-
- /** Gets the value of the 'name' field */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /** Sets the value of the 'name' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setName(java.lang.CharSequence value) {
- validate(fields()[0], value);
- this.name = value;
- fieldSetFlags()[0] = true;
- return this;
- }
-
- /** Checks whether the 'name' field has been set */
- public boolean hasName() {
- return fieldSetFlags()[0];
- }
-
- /** Clears the value of the 'name' field */
- public org.apache.flink.api.io.avro.generated.User.Builder clearName() {
- name = null;
- fieldSetFlags()[0] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_number' field */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /** Sets the value of the 'favorite_number' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setFavoriteNumber(java.lang.Integer value) {
- validate(fields()[1], value);
- this.favorite_number = value;
- fieldSetFlags()[1] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_number' field has been set */
- public boolean hasFavoriteNumber() {
- return fieldSetFlags()[1];
- }
-
- /** Clears the value of the 'favorite_number' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearFavoriteNumber() {
- favorite_number = null;
- fieldSetFlags()[1] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_color' field */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /** Sets the value of the 'favorite_color' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setFavoriteColor(java.lang.CharSequence value) {
- validate(fields()[2], value);
- this.favorite_color = value;
- fieldSetFlags()[2] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_color' field has been set */
- public boolean hasFavoriteColor() {
- return fieldSetFlags()[2];
- }
-
- /** Clears the value of the 'favorite_color' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearFavoriteColor() {
- favorite_color = null;
- fieldSetFlags()[2] = false;
- return this;
- }
-
- /** Gets the value of the 'type_long_test' field */
- public java.lang.Long getTypeLongTest() {
- return type_long_test;
- }
-
- /** Sets the value of the 'type_long_test' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeLongTest(java.lang.Long value) {
- validate(fields()[3], value);
- this.type_long_test = value;
- fieldSetFlags()[3] = true;
- return this;
- }
-
- /** Checks whether the 'type_long_test' field has been set */
- public boolean hasTypeLongTest() {
- return fieldSetFlags()[3];
- }
-
- /** Clears the value of the 'type_long_test' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearTypeLongTest() {
- type_long_test = null;
- fieldSetFlags()[3] = false;
- return this;
- }
-
- /** Gets the value of the 'type_double_test' field */
- public java.lang.Object getTypeDoubleTest() {
- return type_double_test;
- }
-
- /** Sets the value of the 'type_double_test' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeDoubleTest(java.lang.Object value) {
- validate(fields()[4], value);
- this.type_double_test = value;
- fieldSetFlags()[4] = true;
- return this;
- }
-
- /** Checks whether the 'type_double_test' field has been set */
- public boolean hasTypeDoubleTest() {
- return fieldSetFlags()[4];
- }
-
- /** Clears the value of the 'type_double_test' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearTypeDoubleTest() {
- type_double_test = null;
- fieldSetFlags()[4] = false;
- return this;
- }
-
- /** Gets the value of the 'type_null_test' field */
- public java.lang.Object getTypeNullTest() {
- return type_null_test;
- }
-
- /** Sets the value of the 'type_null_test' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeNullTest(java.lang.Object value) {
- validate(fields()[5], value);
- this.type_null_test = value;
- fieldSetFlags()[5] = true;
- return this;
- }
-
- /** Checks whether the 'type_null_test' field has been set */
- public boolean hasTypeNullTest() {
- return fieldSetFlags()[5];
- }
-
- /** Clears the value of the 'type_null_test' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearTypeNullTest() {
- type_null_test = null;
- fieldSetFlags()[5] = false;
- return this;
- }
-
- /** Gets the value of the 'type_bool_test' field */
- public java.lang.Object getTypeBoolTest() {
- return type_bool_test;
- }
-
- /** Sets the value of the 'type_bool_test' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeBoolTest(java.lang.Object value) {
- validate(fields()[6], value);
- this.type_bool_test = value;
- fieldSetFlags()[6] = true;
- return this;
- }
-
- /** Checks whether the 'type_bool_test' field has been set */
- public boolean hasTypeBoolTest() {
- return fieldSetFlags()[6];
- }
-
- /** Clears the value of the 'type_bool_test' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearTypeBoolTest() {
- type_bool_test = null;
- fieldSetFlags()[6] = false;
- return this;
- }
-
- /** Gets the value of the 'type_array_string' field */
- public java.util.List<java.lang.CharSequence> getTypeArrayString() {
- return type_array_string;
- }
-
- /** Sets the value of the 'type_array_string' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
- validate(fields()[7], value);
- this.type_array_string = value;
- fieldSetFlags()[7] = true;
- return this;
- }
-
- /** Checks whether the 'type_array_string' field has been set */
- public boolean hasTypeArrayString() {
- return fieldSetFlags()[7];
- }
-
- /** Clears the value of the 'type_array_string' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearTypeArrayString() {
- type_array_string = null;
- fieldSetFlags()[7] = false;
- return this;
- }
-
- /** Gets the value of the 'type_array_boolean' field */
- public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
- return type_array_boolean;
- }
-
- /** Sets the value of the 'type_array_boolean' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
- validate(fields()[8], value);
- this.type_array_boolean = value;
- fieldSetFlags()[8] = true;
- return this;
- }
-
- /** Checks whether the 'type_array_boolean' field has been set */
- public boolean hasTypeArrayBoolean() {
- return fieldSetFlags()[8];
- }
-
- /** Clears the value of the 'type_array_boolean' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearTypeArrayBoolean() {
- type_array_boolean = null;
- fieldSetFlags()[8] = false;
- return this;
- }
-
- /** Gets the value of the 'type_nullable_array' field */
- public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
- return type_nullable_array;
- }
-
- /** Sets the value of the 'type_nullable_array' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
- validate(fields()[9], value);
- this.type_nullable_array = value;
- fieldSetFlags()[9] = true;
- return this;
- }
-
- /** Checks whether the 'type_nullable_array' field has been set */
- public boolean hasTypeNullableArray() {
- return fieldSetFlags()[9];
- }
-
- /** Clears the value of the 'type_nullable_array' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
clearTypeNullableArray() {
- type_nullable_array = null;
- fieldSetFlags()[9] = false;
- return this;
- }
-
- /** Gets the value of the 'type_enum' field */
- public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
- return type_enum;
- }
-
- /** Sets the value of the 'type_enum' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) {
- validate(fields()[10], value);
- this.type_enum = value;
- fieldSetFlags()[10] = true;
- return this;
- }
-
- /** Checks whether the 'type_enum' field has been set */
- public boolean hasTypeEnum() {
- return fieldSetFlags()[10];
- }
-
- /** Clears the value of the 'type_enum' field */
- public org.apache.flink.api.io.avro.generated.User.Builder clearTypeEnum()
{
- type_enum = null;
- fieldSetFlags()[10] = false;
- return this;
- }
-
- /** Gets the value of the 'type_map' field */
- public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
- return type_map;
- }
-
- /** Sets the value of the 'type_map' field */
- public org.apache.flink.api.io.avro.generated.User.Builder
setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
- validate(fields()[11], value);
- this.type_map = value;
- fieldSetFlags()[11] = true;
- return this;
- }
-
- /** Checks whether the 'type_map' field has been set */
- public boolean hasTypeMap() {
- return fieldSetFlags()[11];
- }
-
- /** Clears the value of the 'type_map' field */
- public org.apache.flink.api.io.avro.generated.User.Builder clearTypeMap() {
- type_map = null;
- fieldSetFlags()[11] = false;
- return this;
- }
-
- @Override
- public User build() {
- try {
- User record = new User();
- record.name = fieldSetFlags()[0] ? this.name :
(java.lang.CharSequence) defaultValue(fields()[0]);
- record.favorite_number = fieldSetFlags()[1] ? this.favorite_number :
(java.lang.Integer) defaultValue(fields()[1]);
- record.favorite_color = fieldSetFlags()[2] ? this.favorite_color :
(java.lang.CharSequence) defaultValue(fields()[2]);
- record.type_long_test = fieldSetFlags()[3] ? this.type_long_test :
(java.lang.Long) defaultValue(fields()[3]);
- record.type_double_test = fieldSetFlags()[4] ? this.type_double_test :
(java.lang.Object) defaultValue(fields()[4]);
- record.type_null_test = fieldSetFlags()[5] ? this.type_null_test :
(java.lang.Object) defaultValue(fields()[5]);
- record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test :
(java.lang.Object) defaultValue(fields()[6]);
- record.type_array_string = fieldSetFlags()[7] ? this.type_array_string
: (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
- record.type_array_boolean = fieldSetFlags()[8] ?
this.type_array_boolean : (java.util.List<java.lang.Boolean>)
defaultValue(fields()[8]);
- record.type_nullable_array = fieldSetFlags()[9] ?
this.type_nullable_array : (java.util.List<java.lang.CharSequence>)
defaultValue(fields()[9]);
- record.type_enum = fieldSetFlags()[10] ? this.type_enum :
(org.apache.flink.api.io.avro.generated.Colors) defaultValue(fields()[10]);
- record.type_map = fieldSetFlags()[11] ? this.type_map :
(java.util.Map<java.lang.CharSequence,java.lang.Long>)
defaultValue(fields()[11]);
- return record;
- } catch (Exception e) {
- throw new org.apache.avro.AvroRuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-addons/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/resources/avro/user.avsc
b/flink-addons/flink-avro/src/test/resources/avro/user.avsc
index af3cb75..6801b10 100644
--- a/flink-addons/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-addons/flink-avro/src/test/resources/avro/user.avsc
@@ -1,5 +1,5 @@
-{"namespace": "org.apache.flink.api.java.record.io.avro.generated",
+{"namespace": "org.apache.flink.api.io.avro.generated",
"type": "record",
"name": "User",
"fields": [
@@ -7,13 +7,17 @@
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]},
{"name": "type_long_test", "type": ["long", "null"]},
- {"name": "type_double_test", "type": ["double"]},
+ {"name": "type_double_test", "type": "double"},
{"name": "type_null_test", "type": ["null"]},
{"name": "type_bool_test", "type": ["boolean"]},
{"name": "type_array_string", "type" : {"type" : "array", "items" :
"string"}},
{"name": "type_array_boolean", "type" : {"type" : "array", "items" :
"boolean"}},
{"name": "type_nullable_array", "type": ["null", {"type":"array",
"items":"string"}], "default":null},
{"name": "type_enum", "type": {"type": "enum", "name": "Colors",
"symbols" : ["RED", "GREEN", "BLUE"]}},
- {"name": "type_map", "type": {"type": "map", "values": "long"}}
+ {"name": "type_map", "type": {"type": "map", "values": "long"}},
+ {"name": "type_fixed",
+ "size": 16,
+ "type": ["null", {"name": "Fixed16", "size": 16, "type":
"fixed"}] },
+ {"name": "type_union", "type": ["null", "boolean", "long", "double"]}
]
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
----------------------------------------------------------------------
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
index e6a8cb6..bc5c6b6 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
@@ -448,7 +448,7 @@ public abstract class ComparatorTestBase<T> {
}
//
--------------------------------------------------------------------------------------------
- protected static final class TestOutputView extends DataOutputStream
implements DataOutputView {
+ public static final class TestOutputView extends DataOutputStream
implements DataOutputView {
public TestOutputView() {
super(new ByteArrayOutputStream(4096));
@@ -474,7 +474,7 @@ public abstract class ComparatorTestBase<T> {
}
}
- protected static final class TestInputView extends DataInputStream
implements DataInputView {
+ public static final class TestInputView extends DataInputStream
implements DataInputView {
public TestInputView(byte[] data) {
super(new ByteArrayInputStream(data));
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 1469dac..4a4bbfa 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -63,6 +63,13 @@ under the License.
<version>0.5.1</version>
</dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-avro_2.10</artifactId>
+ <version>0.5.1</version>
+ </dependency>
+
+
<!-- guava needs to be in "provided" scope, to make sure it is
not included into the jars by the shading -->
<dependency>
<groupId>com.google.guava</groupId>
@@ -80,7 +87,7 @@ under the License.
</dependency>
</dependencies>
- <!-- Because flink-scala uses it in tests -->
+ <!-- Because flink-scala and flink-avro uses it in tests -->
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index c19e9aa..563787f 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -371,7 +371,7 @@ public abstract class ExecutionEnvironment {
}
catch (Exception e) {
throw new InvalidProgramException("The type returned by
the input format could not be automatically determined. " +
- "Please specify the TypeInformation of
the produced type explicitly.");
+ "Please specify the TypeInformation of
the produced type explicitly.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..ccdf7f7
--- /dev/null
+++
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Special type information to generate a POJO type info from an avro schema.
+ *
+ * Proceeding: It uses a regular pojo type analysis and replaces all
GenericType<CharSequence>
+ * with a GenericType<avro.Utf8>
+ * @param <T>
+ */
+public class AvroTypeInfo<T extends SpecificRecordBase> extends
PojoTypeInfo<T> {
+ public AvroTypeInfo(Class<T> typeClass) {
+ super(typeClass, generateFieldsFromAvroSchema(typeClass));
+ }
+
+ private static <T extends SpecificRecordBase> List<PojoField>
generateFieldsFromAvroSchema(Class<T> typeClass) {
+ PojoTypeExtractor pte = new PojoTypeExtractor();
+ TypeInformation ti = pte.analyzePojo(typeClass, new
ArrayList<Type>(), null);
+
+ if(!(ti instanceof PojoTypeInfo)) {
+ throw new IllegalStateException("Expecting type to be a
PojoTypeInfo");
+ }
+ PojoTypeInfo pti = (PojoTypeInfo) ti;
+ List<PojoField> newFields = new
ArrayList<PojoField>(pti.getTotalFields());
+
+ for(int i = 0; i < pti.getTotalFields(); i++) {
+ PojoField f = pti.getPojoFieldAt(i);
+ TypeInformation newType = f.type;
+ // check if type is a CharSequence
+ if(newType instanceof GenericTypeInfo) {
+
if((newType).getTypeClass().equals(CharSequence.class)) {
+ // replace the type by a
org.apache.avro.util.Utf8
+ newType = new
GenericTypeInfo(org.apache.avro.util.Utf8.class);
+ }
+ }
+ PojoField newField = new PojoField(f.field, newType);
+ newFields.add(newField);
+ }
+ return newFields;
+ }
+
+ private static class PojoTypeExtractor extends TypeExtractor {
+ private PojoTypeExtractor() {
+ super();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index b528d00..2e90c8a 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
@@ -66,7 +67,7 @@ public class TypeExtractor {
// in an endless recursion
private Set<Class<?>> alreadySeen;
- private TypeExtractor() {
+ protected TypeExtractor() {
alreadySeen = new HashSet<Class<?>>();
}
@@ -936,6 +937,11 @@ public class TypeExtractor {
return (TypeInformation<X>) new EnumTypeInfo(clazz);
}
+ // special case for POJOs generated by Avro.
+ if(SpecificRecordBase.class.isAssignableFrom(clazz)) {
+ return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
+ }
+
if (alreadySeen.contains(clazz)) {
return new GenericTypeInfo<X>(clazz);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index a5f5f65..f7a90a5 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -20,21 +20,39 @@ package org.apache.flink.api.java.typeutils.runtime;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.twitter.chill.ScalaKryoInstantiator;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import scala.reflect.ClassTag;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.lang.reflect.Modifier;
+/**
+ * A type serializer that serializes its type using the Kryo serialization
+ * framework (https://github.com/EsotericSoftware/kryo).
+ *
+ * This serializer is intended as a fallback serializer for the cases that are
+ * not covered by the basic types, tuples, and POJOs.
+ *
+ * @param <T> The type to be serialized.
+ */
public class KryoSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 2L;
@@ -48,6 +66,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
private transient Input input;
private transient Output output;
+
+ //
------------------------------------------------------------------------
public KryoSerializer(Class<T> type){
if(type == null){
@@ -56,6 +76,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
this.type = type;
}
+ //
------------------------------------------------------------------------
@Override
public boolean isImmutableType() {
@@ -191,15 +212,92 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new ScalaKryoInstantiator().newKryo();
- this.kryo.addDefaultSerializer(Throwable.class, new
JavaSerializer());
- this.kryo.setRegistrationRequired(false);
- this.kryo.register(type);
-
this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+ // Throwable and all subclasses should be serialized
via java serialization
+ kryo.addDefaultSerializer(Throwable.class, new
JavaSerializer());
+
+ // If the type we have to serialize as a GenricType is
implementing SpecificRecordBase,
+ // we have to register the avro serializer
+ // This rule only applies if users explicitly use the
GenericTypeInformation for the avro types
+ // usually, we are able to handle Avro POJOs with the
POJO serializer.
+ if(SpecificRecordBase.class.isAssignableFrom(type)) {
+ ClassTag<SpecificRecordBase> tag =
scala.reflect.ClassTag$.MODULE$.apply(type);
+ this.kryo.register(type,
com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
+
+ }
+ // Avro POJOs contain java.util.List which have
GenericData.Array as their runtime type
+ // because Kryo is not able to serialize them properly,
we use this serializer for them
+ this.kryo.register(GenericData.Array.class, new
SpecificInstanceCollectionSerializer(ArrayList.class));
+ // We register this serializer for users who want to
use untyped Avro records (GenericData.Record).
+ // Kryo is able to serialize everything in there,
except for the Schema.
+ // This serializer is very slow, but using the
GenericData.Records of Kryo is in general a bad idea.
+ // we add the serializer as a default serializer
because Avro is using a private sub-type at runtime.
+ this.kryo.addDefaultSerializer(Schema.class, new
AvroSchemaSerializer());
+
+
+ // register the type of our class
+ kryo.register(type);
+
+ // register given types. we do this first so that any
registration of a
+ // more specific serializer overrides this
+ for (Class<?> type : registeredTypes) {
+ kryo.register(type);
+ }
+
+
+ kryo.setRegistrationRequired(false);
+
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Custom Serializers
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Special serializer for Java collections enforcing certain instance
types.
+ * Avro is serializing collections with an "GenericData.Array" type.
Kryo is not able to handle
+ * this type, so we use ArrayLists.
+ */
+ public static class SpecificInstanceCollectionSerializer<T extends
Collection> extends CollectionSerializer {
+ Class<T> type;
+ public SpecificInstanceCollectionSerializer(Class<T> type) {
+ this.type = type;
+ }
+
+ @Override
+ protected Collection create(Kryo kryo, Input input,
Class<Collection> type) {
+ return kryo.newInstance(this.type);
+ }
+
+ @Override
+ protected Collection createCopy(Kryo kryo, Collection original)
{
+ return kryo.newInstance(this.type);
+ }
+ }
+
+ /**
+ * Slow serialization approach for Avro schemas.
+ * This is only used with {{@link
org.apache.avro.generic.GenericData.Record}} types.
+ * Having this serializer, we are able to handle avro Records.
+ */
+ public static class AvroSchemaSerializer extends Serializer<Schema> {
+ @Override
+ public void write(Kryo kryo, Output output, Schema object) {
+ String schemaAsString = object.toString(false);
+ output.writeString(schemaAsString);
+ }
+
+ @Override
+ public Schema read(Kryo kryo, Input input, Class<Schema> type) {
+ String schemaAsString = input.readString();
+ // the parser seems to be stateful, to we need a new
one for every type.
+ Schema.Parser sParser = new Schema.Parser();
+ return sParser.parse(schemaAsString);
}
}
-
//
--------------------------------------------------------------------------------------------
- // for testing
+ // For testing
//
--------------------------------------------------------------------------------------------
Kryo getKryo() {
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index ae4a806..c0c7797 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -210,7 +210,12 @@ public final class PojoComparator<T> extends
CompositeTypeComparator<T> implemen
int code = 0;
for (; i < this.keyFields.length; i++) {
code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
- code +=
this.comparators[i].hash(accessField(keyFields[i], value));
+ try {
+ code +=
this.comparators[i].hash(accessField(keyFields[i], value));
+ }catch(NullPointerException npe) {
+ throw new RuntimeException("A
NullPointerException occured while accessing a key field in a POJO. " +
+ "Most likely, the value
grouped/joined on is null. Field name: "+keyFields[i].getName(), npe);
+ }
}
return code;
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b729e17..e59f340 100644
--- a/pom.xml
+++ b/pom.xml
@@ -923,7 +923,9 @@ under the License.
<exclude>flink-runtime/resources/web-docs-infoserver/js/timeline.js</exclude>
<!-- Test Data. -->
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
-
<exclude>flink-addons/flink-avro/src/test/resources/avro/user.avsc</exclude>
+
<exclude>flink-addons/flink-avro/src/test/resources/avro/*.avsc</exclude>
+ <!-- Generated files -->
+
<exclude>flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
<!-- Configuration Files. -->
<exclude>**/flink-bin/conf/slaves</exclude>
<!-- Administrative files in
the main trunk. -->
http://git-wip-us.apache.org/repos/asf/flink/blob/84c49981/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 377cbfd..b17dbce 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -24,4 +24,5 @@ under the License.
<suppressions>
<suppress
files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java"
checks="[a-zA-Z0-9]*"/>
+ <suppress
files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]generated[\\/].*.java"
checks="[a-zA-Z0-9]*"/>
</suppressions>
\ No newline at end of file