http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java new file mode 100644 index 0000000..ae41031 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.avro.AvroInputFormat; +import org.apache.flink.formats.avro.AvroRecordInputFormatTest; +import org.apache.flink.formats.avro.generated.Fixed16; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for the {@link AvroInputFormat} reading Pojos. + */ +@RunWith(Parameterized.class) +public class AvroTypeExtractionTest extends MultipleProgramsTestBase { + public AvroTypeExtractionTest(TestExecutionMode mode) { + super(mode); + } + + private File inFile; + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + inFile = tempFolder.newFile(); + AvroRecordInputFormatTest.writeTestFile(inFile); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSimpleAvroRead() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction<User, User>() { + @Override + public User map(User value) throws Exception { + value.setTypeMap(null); + return value; + } + }); + + usersDS.writeAsText(resultPath); + + env.execute("Simple Avro read job"); + + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; + } + + @Test + public void testSerializeWithAvro() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceAvro(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction<User, User>() { + @Override + public User map(User value) throws Exception { + Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1); + ab.put("hehe", 12L); + value.setTypeMap(ab); + return value; + } + }); + + usersDS.writeAsText(resultPath); + + env.execute("Simple Avro read job"); + + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; + + } + + @Test + public void testKeySelection() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + expected = "(Alyssa,1)\n(Charlie,1)\n"; + } + + @Test + public void testWithAvroGenericSer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceAvro(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { + @Override + public String getKey(User value) throws Exception { + return String.valueOf(value.getName()); + } + }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + expected = "(Charlie,1)\n(Alyssa,1)\n"; + } + + @Test + public void testWithKryoGenericSer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceKryo(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { + @Override + public String getKey(User value) throws Exception { + return String.valueOf(value.getName()); + } + }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + expected = "(Charlie,1)\n(Alyssa,1)\n"; + } + + /** + * Test some know fields for grouping on. + */ + @Test + public void testAllFields() throws Exception { + for (String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { + testField(fieldName); + } + } + + private void testField(final String fieldName) throws Exception { + before(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { + @Override + public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { + for (User u : values) { + out.collect(u.get(fieldName)); + } + } + }); + res.writeAsText(resultPath); + env.execute("Simple Avro read job"); + + // test if automatic registration of the Types worked + ExecutionConfig ec = env.getConfig(); + Assert.assertTrue(ec.getRegisteredKryoTypes().contains(Fixed16.class)); + + if (fieldName.equals("name")) { + expected = "Alyssa\nCharlie"; + } else if (fieldName.equals("type_enum")) { + expected = "GREEN\nRED\n"; + } else if (fieldName.equals("type_double_test")) { + expected = "123.45\n1.337\n"; + } else { + Assert.fail("Unknown field"); + } + + after(); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java new file mode 100644 index 0000000..79a4a45 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeutils.TypeInformationTestBase; +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.generated.User; + +/** + * Test for {@link AvroTypeInfo}. + */ +public class AvroTypeInfoTest extends TypeInformationTestBase<AvroTypeInfo<?>> { + + @Override + protected AvroTypeInfo<?>[] getTestData() { + return new AvroTypeInfo<?>[] { + new AvroTypeInfo<>(Address.class), + new AvroTypeInfo<>(User.class), + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java new file mode 100644 index 0000000..90ac040 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.generated.Colors; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.types.Row; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificRecord; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; + +/** + * Utilities for creating Avro Schemas. + */ +public final class AvroTestUtils { + + private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka"; + + /** + * Creates a flat Avro Schema for testing. + */ + public static Schema createFlatAvroSchema(String[] fieldNames, TypeInformation[] fieldTypes) { + final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder + .record("BasicAvroRecord") + .namespace(NAMESPACE) + .fields(); + + final Schema nullSchema = Schema.create(Schema.Type.NULL); + + for (int i = 0; i < fieldNames.length; i++) { + Schema schema = ReflectData.get().getSchema(fieldTypes[i].getTypeClass()); + Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, schema)); + fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault(); + } + + return fieldAssembler.endRecord(); + } + + /** + * Tests a simple Avro data types without nesting. + */ + public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSimpleTestData() { + final Address addr = Address.newBuilder() + .setNum(42) + .setStreet("Main Street 42") + .setCity("Test City") + .setState("Test State") + .setZip("12345") + .build(); + + final Row rowAddr = new Row(5); + rowAddr.setField(0, 42); + rowAddr.setField(1, "Main Street 42"); + rowAddr.setField(2, "Test City"); + rowAddr.setField(3, "Test State"); + rowAddr.setField(4, "12345"); + + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>(); + t.f0 = Address.class; + t.f1 = addr; + t.f2 = rowAddr; + + return t; + } + + /** + * Tests all Avro data types as well as nested types. + */ + public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getComplexTestData() { + final Address addr = Address.newBuilder() + .setNum(42) + .setStreet("Main Street 42") + .setCity("Test City") + .setState("Test State") + .setZip("12345") + .build(); + + final Row rowAddr = new Row(5); + rowAddr.setField(0, 42); + rowAddr.setField(1, "Main Street 42"); + rowAddr.setField(2, "Test City"); + rowAddr.setField(3, "Test State"); + rowAddr.setField(4, "12345"); + + final User user = User.newBuilder() + .setName("Charlie") + .setFavoriteNumber(null) + .setFavoriteColor("blue") + .setTypeLongTest(1337L) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeBoolTest(false) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(null) + .setTypeUnion(null) + .setTypeNested(addr) + .build(); + + final Row rowUser = new Row(15); + rowUser.setField(0, "Charlie"); + rowUser.setField(1, null); + rowUser.setField(2, "blue"); + rowUser.setField(3, 1337L); + rowUser.setField(4, 1.337d); + rowUser.setField(5, null); + rowUser.setField(6, false); + rowUser.setField(7, new ArrayList<CharSequence>()); + rowUser.setField(8, new ArrayList<Boolean>()); + rowUser.setField(9, null); + rowUser.setField(10, Colors.RED); + rowUser.setField(11, new HashMap<CharSequence, Long>()); + rowUser.setField(12, null); + rowUser.setField(13, null); + rowUser.setField(14, rowAddr); + + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>(); + t.f0 = User.class; + t.f1 = user; + t.f2 = rowUser; + + return t; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc new file mode 100644 index 0000000..9685a15 --- /dev/null +++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc @@ -0,0 +1,35 @@ +[ +{"namespace": "org.apache.flink.formats.avro.generated", + "type": "record", + "name": "Address", + "fields": [ + {"name": "num", "type": "int"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"} + ] +}, +{"namespace": "org.apache.flink.formats.avro.generated", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]}, + {"name": "type_long_test", "type": ["long", "null"]}, + {"name": "type_double_test", "type": "double"}, + {"name": "type_null_test", "type": ["null"]}, + {"name": "type_bool_test", "type": ["boolean"]}, + {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, + {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, + {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, + {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}}, + {"name": "type_map", "type": {"type": "map", "values": "long"}}, + {"name": "type_fixed", + "size": 16, + "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, + {"name": "type_union", "type": ["null", "boolean", "long", "double"]}, + {"name": "type_nested", "type": ["null", "Address"]} + ] +}] http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/log4j-test.properties b/flink-formats/flink-avro/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..881dc06 --- /dev/null +++ b/flink-formats/flink-avro/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/resources/testdata.avro ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/testdata.avro b/flink-formats/flink-avro/src/test/resources/testdata.avro new file mode 100644 index 0000000..3102d03 Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/testdata.avro differ http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/pom.xml ---------------------------------------------------------------------- diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml new file mode 100644 index 0000000..f8de3e0 --- /dev/null +++ b/flink-formats/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + + <artifactId>flink-formats</artifactId> + <name>flink-formats</name> + <packaging>pom</packaging> + + <modules> + <module>flink-avro</module> + </modules> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index e4b907a..2906eb8 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.api.java.LocalEnvironment; -import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -168,31 +168,30 @@ public class HDFSTest { } @Test - public void testAvroOut() { - String type = "one"; - AvroOutputFormat<String> avroOut = - new AvroOutputFormat<String>(String.class); + public void testChangingFileNames() { + org.apache.hadoop.fs.Path hdfsPath = new org.apache.hadoop.fs.Path(hdfsURI + "/hdfsTest"); + Path path = new Path(hdfsPath.toString()); - org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest"); + String type = "one"; + TextOutputFormat<String> outputFormat = new TextOutputFormat<>(path); - avroOut.setOutputFilePath(new Path(result.toString())); - avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE); - avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS); + outputFormat.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE); + outputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS); try { - avroOut.open(0, 2); - avroOut.writeRecord(type); - avroOut.close(); + outputFormat.open(0, 2); + outputFormat.writeRecord(type); + outputFormat.close(); - avroOut.open(1, 2); - avroOut.writeRecord(type); - avroOut.close(); + outputFormat.open(1, 2); + outputFormat.writeRecord(type); + outputFormat.close(); - assertTrue("No result file present", hdfs.exists(result)); - FileStatus[] files = hdfs.listStatus(result); + assertTrue("No result file present", hdfs.exists(hdfsPath)); + FileStatus[] files = hdfs.listStatus(hdfsPath); Assert.assertEquals(2, files.length); for (FileStatus file : files) { - assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName())); + assertTrue("1".equals(file.getPath().getName()) || "2".equals(file.getPath().getName())); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-libraries/flink-cep/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml index 23978b2..bd57d17 100644 --- a/flink-libraries/flink-cep/pom.xml +++ b/flink-libraries/flink-cep/pom.xml @@ -89,7 +89,13 @@ under the License. <scope>test</scope> </dependency> - + <!-- we include Avro to make the CEPMigrationTest work, it uses a Kryo-serialized savepoint (see FLINK-7420) --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml ---------------------------------------------------------------------- diff --git a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml index ff6f84d..864c257 100644 --- a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml +++ b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml @@ -161,6 +161,12 @@ under the License. <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> </exclusion> + + <!-- we don't want Hadoop's Avro dependency, since Flink adds its own Avro support --> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> </exclusions> </dependency> @@ -412,6 +418,12 @@ under the License. <groupId>com.sun.jersey.contribs</groupId> <artifactId>jersey-guice</artifactId> </exclusion> + + <!-- we don't want Hadoop's Avro dependency, since Flink adds its own Avro support --> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-shaded-hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml index ba90fc9..3e7cb41 100644 --- a/flink-shaded-hadoop/pom.xml +++ b/flink-shaded-hadoop/pom.xml @@ -52,11 +52,6 @@ under the License. <artifactId>slf4j-api</artifactId> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <scope>provided</scope> - </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 8e10a2e..2217199 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -170,21 +170,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-avro_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-avro_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>flink-optimizer_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9cf603a..5302c5c 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ under the License. <module>flink-streaming-java</module> <module>flink-streaming-scala</module> <module>flink-connectors</module> + <module>flink-formats</module> <module>flink-examples</module> <module>flink-clients</module> <module>flink-queryable-state</module> @@ -275,19 +276,6 @@ under the License. <version>5.0.4-1.0</version> </dependency> - <!-- Make sure we use a consistent avro version throughout the project --> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <version>1.8.2</version> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <version>1.8.2</version> - </dependency> - <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> @@ -1033,7 +1021,8 @@ under the License. <!-- Test Data. --> <exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude> <exclude>flink-runtime/src/test/resources/flink_11-kryo_registrations</exclude> - <exclude>flink-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude> + <exclude>flink-core/src/test/resources/kryo-serializer-config-snapshot-v1</exclude> + <exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude> <exclude>out/test/flink-avro/avro/user.avsc</exclude> <exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude> <exclude>test-infra/end-to-end-test/test-data/*</exclude> @@ -1042,8 +1031,8 @@ under the License. <exclude>**/src/test/resources/*-snapshot</exclude> <exclude>**/src/test/resources/*-savepoint</exclude> - <exclude>flink-connectors/flink-avro/src/test/resources/testdata.avro</exclude> - <exclude>flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude> + <exclude>flink-formats/flink-avro/src/test/resources/testdata.avro</exclude> + <exclude>flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/*.java</exclude> <exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude> <exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude> <!-- Configuration Files. --> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/tools/maven/suppressions.xml ---------------------------------------------------------------------- diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index 5d5c455..a58e17c 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -23,8 +23,7 @@ under the License. "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd"> <suppressions> - <suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/> - <suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/> + <suppress files="org[\\/]apache[\\/]flink[\\/]formats[\\/]avro[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/> <!-- Sometimes we have to temporarily fix very long, different formatted Calcite files. --> <suppress files="org[\\/]apache[\\/]calcite.*" checks="[a-zA-Z0-9]*"/> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/tools/travis_mvn_watchdog.sh ---------------------------------------------------------------------- diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh index 978bc9f..fda6023 100755 --- a/tools/travis_mvn_watchdog.sh +++ b/tools/travis_mvn_watchdog.sh @@ -77,7 +77,7 @@ flink-filesystems/flink-hadoop-fs,\ flink-filesystems/flink-mapr-fs,\ flink-filesystems/flink-s3-fs-hadoop,\ flink-filesystems/flink-s3-fs-presto,\ -flink-connectors/flink-avro,\ +flink-formats/flink-avro,\ flink-connectors/flink-hbase,\ flink-connectors/flink-hcatalog,\ flink-connectors/flink-hadoop-compatibility,\
