http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java deleted file mode 100644 index be968c5..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.Path; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.util.Collector; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * Tests for the {@link AvroInputFormat} reading Pojos. - */ -@RunWith(Parameterized.class) -public class AvroPojoTest extends MultipleProgramsTestBase { - public AvroPojoTest(TestExecutionMode mode) { - super(mode); - } - - private File inFile; - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - inFile = tempFolder.newFile(); - AvroRecordInputFormatTest.writeTestFile(inFile); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testSimpleAvroRead() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { - @Override - public User map(User value) throws Exception { - value.setTypeMap(null); - return value; - } - }); - - usersDS.writeAsText(resultPath); - - env.execute("Simple Avro read job"); - - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; - } - - @Test - public void testSerializeWithAvro() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceAvro(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { - @Override - public User map(User value) throws Exception { - Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1); - ab.put("hehe", 12L); - value.setTypeMap(ab); - return value; - } - }); - - usersDS.writeAsText(resultPath); - - env.execute("Simple Avro read job"); - - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; - - } - - @Test - public void testKeySelection() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for (User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - expected = "(Alyssa,1)\n(Charlie,1)\n"; - } - - @Test - public void testWithAvroGenericSer() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceAvro(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { - @Override - public String getKey(User value) throws Exception { - return String.valueOf(value.getName()); - } - }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for (User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - expected = "(Charlie,1)\n(Alyssa,1)\n"; - } - - @Test - public void testWithKryoGenericSer() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceKryo(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { - @Override - public String getKey(User value) throws Exception { - return String.valueOf(value.getName()); - } - }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for (User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - expected = "(Charlie,1)\n(Alyssa,1)\n"; - } - - /** - * Test some know fields for grouping on. - */ - @Test - public void testAllFields() throws Exception { - for (String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { - testField(fieldName); - } - } - - private void testField(final String fieldName) throws Exception { - before(); - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { - @Override - public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { - for (User u : values) { - out.collect(u.get(fieldName)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Simple Avro read job"); - - // test if automatic registration of the Types worked - ExecutionConfig ec = env.getConfig(); - Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class)); - - if (fieldName.equals("name")) { - expected = "Alyssa\nCharlie"; - } else if (fieldName.equals("type_enum")) { - expected = "GREEN\nRED\n"; - } else if (fieldName.equals("type_double_test")) { - expected = "123.45\n1.337\n"; - } else { - Assert.fail("Unknown field"); - } - - after(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java deleted file mode 100644 index 7bff28a..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.typeutils.AvroTypeInfo; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.file.FileReader; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.util.Utf8; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -/** - * Test the avro input format. - * (The testcase is mostly the getting started tutorial of avro) - * http://avro.apache.org/docs/current/gettingstartedjava.html - */ -public class AvroRecordInputFormatTest { - - public File testFile; - - static final String TEST_NAME = "Alyssa"; - - static final String TEST_ARRAY_STRING_1 = "ELEMENT 1"; - static final String TEST_ARRAY_STRING_2 = "ELEMENT 2"; - - static final boolean TEST_ARRAY_BOOLEAN_1 = true; - static final boolean TEST_ARRAY_BOOLEAN_2 = false; - - static final Colors TEST_ENUM_COLOR = Colors.GREEN; - - static final String TEST_MAP_KEY1 = "KEY 1"; - static final long TEST_MAP_VALUE1 = 8546456L; - static final String TEST_MAP_KEY2 = "KEY 2"; - static final long TEST_MAP_VALUE2 = 17554L; - - static final int TEST_NUM = 239; - static final String TEST_STREET = "Baker Street"; - static final String TEST_CITY = "London"; - static final String TEST_STATE = "London"; - static final String TEST_ZIP = "NW1 6XE"; - - private Schema userSchema = new User().getSchema(); - - public static void writeTestFile(File testFile) throws IOException { - ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); - stringArray.add(TEST_ARRAY_STRING_1); - stringArray.add(TEST_ARRAY_STRING_2); - - ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); - booleanArray.add(TEST_ARRAY_BOOLEAN_1); - booleanArray.add(TEST_ARRAY_BOOLEAN_2); - - HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); - longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); - longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); - - Address addr = new Address(); - addr.setNum(TEST_NUM); - addr.setStreet(TEST_STREET); - addr.setCity(TEST_CITY); - addr.setState(TEST_STATE); - addr.setZip(TEST_ZIP); - - User user1 = new User(); - - user1.setName(TEST_NAME); - user1.setFavoriteNumber(256); - user1.setTypeDoubleTest(123.45d); - user1.setTypeBoolTest(true); - user1.setTypeArrayString(stringArray); - user1.setTypeArrayBoolean(booleanArray); - user1.setTypeEnum(TEST_ENUM_COLOR); - user1.setTypeMap(longMap); - user1.setTypeNested(addr); - - // Construct via builder - User user2 = User.newBuilder() - .setName("Charlie") - .setFavoriteColor("blue") - .setFavoriteNumber(null) - .setTypeBoolTest(false) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeLongTest(1337L) - .setTypeArrayString(new ArrayList<CharSequence>()) - .setTypeArrayBoolean(new ArrayList<Boolean>()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap<CharSequence, Long>()) - .setTypeFixed(null) - .setTypeUnion(null) - .setTypeNested( - Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) - .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) - .build()) - .build(); - DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); - DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); - dataFileWriter.create(user1.getSchema(), testFile); - dataFileWriter.append(user1); - dataFileWriter.append(user2); - dataFileWriter.close(); - } - - @Before - public void createFiles() throws IOException { - testFile = File.createTempFile("AvroInputFormatTest", null); - writeTestFile(testFile); - } - - /** - * Test if the AvroInputFormat is able to properly read data from an avro file. - * @throws IOException - */ - @Test - public void testDeserialisation() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(1); - assertEquals(splits.length, 1); - format.open(splits[0]); - - User u = format.nextRecord(null); - assertNotNull(u); - - String name = u.getName().toString(); - assertNotNull("empty record", name); - assertEquals("name not equal", TEST_NAME, name); - - // check arrays - List<CharSequence> sl = u.getTypeArrayString(); - assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); - assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); - - List<Boolean> bl = u.getTypeArrayBoolean(); - assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); - assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); - - // check enums - Colors enumValue = u.getTypeEnum(); - assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); - - // check maps - Map<CharSequence, Long> lm = u.getTypeMap(); - assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); - assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); - - assertFalse("expecting second element", format.reachedEnd()); - assertNotNull("expecting second element", format.nextRecord(u)); - - assertNull(format.nextRecord(u)); - assertTrue(format.reachedEnd()); - - format.close(); - } - - /** - * Test if the AvroInputFormat is able to properly read data from an avro file. - * @throws IOException - */ - @Test - public void testDeserialisationReuseAvroRecordFalse() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - format.setReuseAvroValue(false); - - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(1); - assertEquals(splits.length, 1); - format.open(splits[0]); - - User u = format.nextRecord(null); - assertNotNull(u); - - String name = u.getName().toString(); - assertNotNull("empty record", name); - assertEquals("name not equal", TEST_NAME, name); - - // check arrays - List<CharSequence> sl = u.getTypeArrayString(); - assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); - assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); - - List<Boolean> bl = u.getTypeArrayBoolean(); - assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); - assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); - - // check enums - Colors enumValue = u.getTypeEnum(); - assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); - - // check maps - Map<CharSequence, Long> lm = u.getTypeMap(); - assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); - assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); - - assertFalse("expecting second element", format.reachedEnd()); - assertNotNull("expecting second element", format.nextRecord(u)); - - assertNull(format.nextRecord(u)); - assertTrue(format.reachedEnd()); - - format.close(); - } - - /** - * Test if the Flink serialization is able to properly process GenericData.Record types. - * Usually users of Avro generate classes (POJOs) from Avro schemas. - * However, if generated classes are not available, one can also use GenericData.Record. - * It is an untyped key-value record which is using a schema to validate the correctness of the data. - * - * <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead. - */ - @Test - public void testDeserializeToGenericType() throws IOException { - DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema); - - try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { - // initialize Record by reading it from disk (that's easier than creating it by hand) - GenericData.Record rec = new GenericData.Record(userSchema); - dataFileReader.next(rec); - - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - assertEquals(null, rec.get("type_long_test")); // it is null for the first record. - - // now serialize it with our framework: - TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class); - - ExecutionConfig ec = new ExecutionConfig(); - Assert.assertEquals(GenericTypeInfo.class, te.getClass()); - - Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>()); - - TypeSerializer<GenericData.Record> tser = te.createSerializer(ec); - Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); - Assert.assertTrue( - ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && - ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class)); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) { - tser.serialize(rec, outView); - } - - GenericData.Record newRec; - try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper( - new ByteArrayInputStream(out.toByteArray()))) { - newRec = tser.deserialize(inView); - } - - // check if it is still the same - assertNotNull(newRec); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); - assertEquals("name not equal", TEST_NAME, newRec.get("name").toString()); - assertEquals(null, newRec.get("type_long_test")); - } - } - - /** - * This test validates proper serialization with specific (generated POJO) types. - */ - @Test - public void testDeserializeToSpecificType() throws IOException { - - DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema); - - try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { - User rec = dataFileReader.next(); - - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - - // now serialize it with our framework: - ExecutionConfig ec = new ExecutionConfig(); - TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class); - - Assert.assertEquals(AvroTypeInfo.class, te.getClass()); - TypeSerializer<User> tser = te.createSerializer(ec); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) { - tser.serialize(rec, outView); - } - - User newRec; - try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper( - new ByteArrayInputStream(out.toByteArray()))) { - newRec = tser.deserialize(inView); - } - - // check if it is still the same - assertNotNull(newRec); - assertEquals("name not equal", TEST_NAME, newRec.getName().toString()); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString()); - } - } - - /** - * Test if the AvroInputFormat is able to properly read data from an Avro - * file as a GenericRecord. - * - * @throws IOException - */ - @Test - public void testDeserialisationGenericRecord() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), - GenericRecord.class); - - doTestDeserializationGenericRecord(format, parameters); - } - - /** - * Helper method to test GenericRecord serialisation. - * - * @param format - * the format to test - * @param parameters - * the configuration to use - * @throws IOException - * thrown id there is a issue - */ - @SuppressWarnings("unchecked") - private void doTestDeserializationGenericRecord(final AvroInputFormat<GenericRecord> format, - final Configuration parameters) throws IOException { - try { - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(1); - assertEquals(splits.length, 1); - format.open(splits[0]); - - GenericRecord u = format.nextRecord(null); - assertNotNull(u); - assertEquals("The schemas should be equal", userSchema, u.getSchema()); - - String name = u.get("name").toString(); - assertNotNull("empty record", name); - assertEquals("name not equal", TEST_NAME, name); - - // check arrays - List<CharSequence> sl = (List<CharSequence>) u.get("type_array_string"); - assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); - assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); - - List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean"); - assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); - assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); - - // check enums - GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) u.get("type_enum"); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), enumValue.toString()); - - // check maps - Map<CharSequence, Long> lm = (Map<CharSequence, Long>) u.get("type_map"); - assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); - assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); - - assertFalse("expecting second element", format.reachedEnd()); - assertNotNull("expecting second element", format.nextRecord(u)); - - assertNull(format.nextRecord(u)); - assertTrue(format.reachedEnd()); - } finally { - format.close(); - } - } - - /** - * Test if the AvroInputFormat is able to properly read data from an avro - * file as a GenericRecord. - * - * @throws IOException if there is an error - */ - @Test - public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), - GenericRecord.class); - format.configure(parameters); - format.setReuseAvroValue(false); - - doTestDeserializationGenericRecord(format, parameters); - } - - @After - public void deleteFiles() { - testFile.delete(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java deleted file mode 100644 index 6401a87..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro; - -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.Fixed16; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; - -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.specific.SpecificDatumWriter; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Random; - -import static org.junit.Assert.assertEquals; - -/** - * Test the avro input format. - * (The testcase is mostly the getting started tutorial of avro) - * http://avro.apache.org/docs/current/gettingstartedjava.html - */ -public class AvroSplittableInputFormatTest { - - private File testFile; - - static final String TEST_NAME = "Alyssa"; - - static final String TEST_ARRAY_STRING_1 = "ELEMENT 1"; - static final String TEST_ARRAY_STRING_2 = "ELEMENT 2"; - - static final boolean TEST_ARRAY_BOOLEAN_1 = true; - static final boolean TEST_ARRAY_BOOLEAN_2 = false; - - static final Colors TEST_ENUM_COLOR = Colors.GREEN; - - static final String TEST_MAP_KEY1 = "KEY 1"; - static final long TEST_MAP_VALUE1 = 8546456L; - static final String TEST_MAP_KEY2 = "KEY 2"; - static final long TEST_MAP_VALUE2 = 17554L; - - static final Integer TEST_NUM = new Integer(239); - static final String TEST_STREET = "Baker Street"; - static final String TEST_CITY = "London"; - static final String TEST_STATE = "London"; - static final String TEST_ZIP = "NW1 6XE"; - - static final int NUM_RECORDS = 5000; - - @Before - public void createFiles() throws IOException { - testFile = File.createTempFile("AvroSplittableInputFormatTest", null); - - ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); - stringArray.add(TEST_ARRAY_STRING_1); - stringArray.add(TEST_ARRAY_STRING_2); - - ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); - booleanArray.add(TEST_ARRAY_BOOLEAN_1); - booleanArray.add(TEST_ARRAY_BOOLEAN_2); - - HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); - longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); - longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); - - Address addr = new Address(); - addr.setNum(new Integer(TEST_NUM)); - addr.setStreet(TEST_STREET); - addr.setCity(TEST_CITY); - addr.setState(TEST_STATE); - addr.setZip(TEST_ZIP); - - User user1 = new User(); - user1.setName(TEST_NAME); - user1.setFavoriteNumber(256); - user1.setTypeDoubleTest(123.45d); - user1.setTypeBoolTest(true); - user1.setTypeArrayString(stringArray); - user1.setTypeArrayBoolean(booleanArray); - user1.setTypeEnum(TEST_ENUM_COLOR); - user1.setTypeMap(longMap); - user1.setTypeNested(addr); - - // Construct via builder - User user2 = User.newBuilder() - .setName(TEST_NAME) - .setFavoriteColor("blue") - .setFavoriteNumber(null) - .setTypeBoolTest(false) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeLongTest(1337L) - .setTypeArrayString(new ArrayList<CharSequence>()) - .setTypeArrayBoolean(new ArrayList<Boolean>()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap<CharSequence, Long>()) - .setTypeFixed(new Fixed16()) - .setTypeUnion(123L) - .setTypeNested( - Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) - .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) - .build()) - .build(); - DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); - DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); - dataFileWriter.create(user1.getSchema(), testFile); - dataFileWriter.append(user1); - dataFileWriter.append(user2); - - Random rnd = new Random(1337); - for (int i = 0; i < NUM_RECORDS - 2; i++) { - User user = new User(); - user.setName(TEST_NAME + rnd.nextInt()); - user.setFavoriteNumber(rnd.nextInt()); - user.setTypeDoubleTest(rnd.nextDouble()); - user.setTypeBoolTest(true); - user.setTypeArrayString(stringArray); - user.setTypeArrayBoolean(booleanArray); - user.setTypeEnum(TEST_ENUM_COLOR); - user.setTypeMap(longMap); - Address address = new Address(); - address.setNum(new Integer(TEST_NUM)); - address.setStreet(TEST_STREET); - address.setCity(TEST_CITY); - address.setState(TEST_STATE); - address.setZip(TEST_ZIP); - user.setTypeNested(address); - - dataFileWriter.append(user); - } - dataFileWriter.close(); - } - - @Test - public void testSplittedIF() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(4); - assertEquals(splits.length, 4); - int elements = 0; - int[] elementsPerSplit = new int[4]; - for (int i = 0; i < splits.length; i++) { - format.open(splits[i]); - while (!format.reachedEnd()) { - User u = format.nextRecord(null); - Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); - elements++; - elementsPerSplit[i]++; - } - format.close(); - } - - Assert.assertEquals(1539, elementsPerSplit[0]); - Assert.assertEquals(1026, elementsPerSplit[1]); - Assert.assertEquals(1539, elementsPerSplit[2]); - Assert.assertEquals(896, elementsPerSplit[3]); - Assert.assertEquals(NUM_RECORDS, elements); - format.close(); - } - - @Test - public void testAvroRecoveryWithFailureAtStart() throws Exception { - final int recordsUntilCheckpoint = 132; - - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - format.configure(parameters); - - FileInputSplit[] splits = format.createInputSplits(4); - assertEquals(splits.length, 4); - - int elements = 0; - int[] elementsPerSplit = new int[4]; - for (int i = 0; i < splits.length; i++) { - format.reopen(splits[i], format.getCurrentState()); - while (!format.reachedEnd()) { - User u = format.nextRecord(null); - Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); - elements++; - - if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { - - // do the whole checkpoint-restore procedure and see if we pick up from where we left off. - Tuple2<Long, Long> state = format.getCurrentState(); - - // this is to make sure that nothing stays from the previous format - // (as it is going to be in the normal case) - format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); - - format.reopen(splits[i], state); - assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); - } - elementsPerSplit[i]++; - } - format.close(); - } - - Assert.assertEquals(1539, elementsPerSplit[0]); - Assert.assertEquals(1026, elementsPerSplit[1]); - Assert.assertEquals(1539, elementsPerSplit[2]); - Assert.assertEquals(896, elementsPerSplit[3]); - Assert.assertEquals(NUM_RECORDS, elements); - format.close(); - } - - @Test - public void testAvroRecovery() throws Exception { - final int recordsUntilCheckpoint = 132; - - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - format.configure(parameters); - - FileInputSplit[] splits = format.createInputSplits(4); - assertEquals(splits.length, 4); - - int elements = 0; - int[] elementsPerSplit = new int[4]; - for (int i = 0; i < splits.length; i++) { - format.open(splits[i]); - while (!format.reachedEnd()) { - User u = format.nextRecord(null); - Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); - elements++; - - if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { - - // do the whole checkpoint-restore procedure and see if we pick up from where we left off. - Tuple2<Long, Long> state = format.getCurrentState(); - - // this is to make sure that nothing stays from the previous format - // (as it is going to be in the normal case) - format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); - - format.reopen(splits[i], state); - assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); - } - elementsPerSplit[i]++; - } - format.close(); - } - - Assert.assertEquals(1539, elementsPerSplit[0]); - Assert.assertEquals(1026, elementsPerSplit[1]); - Assert.assertEquals(1539, elementsPerSplit[2]); - Assert.assertEquals(896, elementsPerSplit[3]); - Assert.assertEquals(NUM_RECORDS, elements); - format.close(); - } - - /* - This test is gave the reference values for the test of Flink's IF. - - This dependency needs to be added - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <version>1.7.6</version> - </dependency> - - @Test - public void testHadoop() throws Exception { - JobConf jf = new JobConf(); - FileInputFormat.addInputPath(jf, new org.apache.hadoop.fs.Path(testFile.toURI())); - jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false); - org.apache.avro.mapred.AvroInputFormat<User> format = new org.apache.avro.mapred.AvroInputFormat<User>(); - InputSplit[] sp = format.getSplits(jf, 4); - int elementsPerSplit[] = new int[4]; - int cnt = 0; - int i = 0; - for (InputSplit s:sp) { - RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter()); - AvroWrapper<User> k = r.createKey(); - NullWritable v = r.createValue(); - - while (r.next(k, v)) { - cnt++; - elementsPerSplit[i]++; - } - i++; - } - System.out.println("Status "+Arrays.toString(elementsPerSplit)); - } **/ - - @After - public void deleteFiles() { - testFile.delete(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java deleted file mode 100644 index 96ffb7f..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro.example; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.io.GenericInputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; - -import java.io.IOException; -import java.util.Random; - -/** - * Example that shows how to use an Avro typea in a program. - */ -@SuppressWarnings("serial") -public class AvroTypeExample { - - public static void main(String[] args) throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<User> users = env.createInput(new UserGeneratingInputFormat()); - - users - .map(new NumberExtractingMapper()) - .groupBy(1) - .reduceGroup(new ConcatenatingReducer()) - .print(); - } - - private static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> { - - @Override - public Tuple2<User, Integer> map(User user) { - return new Tuple2<User, Integer>(user, user.getFavoriteNumber()); - } - } - - private static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> { - - @Override - public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception { - int number = 0; - StringBuilder colors = new StringBuilder(); - - for (Tuple2<User, Integer> u : values) { - number = u.f1; - colors.append(u.f0.getFavoriteColor()).append(" - "); - } - - colors.setLength(colors.length() - 3); - out.collect(new Tuple2<Integer, String>(number, colors.toString())); - } - } - - private static final class UserGeneratingInputFormat extends GenericInputFormat<User> { - - private static final long serialVersionUID = 1L; - - private static final int NUM = 100; - - private final Random rnd = new Random(32498562304986L); - - private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" }; - - private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" }; - - private int count; - - @Override - public boolean reachedEnd() throws IOException { - return count >= NUM; - } - - @Override - public User nextRecord(User reuse) throws IOException { - count++; - - User u = new User(); - u.setName(NAMES[rnd.nextInt(NAMES.length)]); - u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]); - u.setFavoriteNumber(rnd.nextInt(87)); - return u; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java deleted file mode 100644 index 4608f96..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.apache.flink.api.io.avro.example; -@SuppressWarnings("all") [email protected] -public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - @Deprecated public java.lang.CharSequence name; - @Deprecated public java.lang.Integer favorite_number; - @Deprecated public java.lang.CharSequence favorite_color; - - /** - * Default constructor. Note that this does not initialize fields - * to their default values from the schema. If that is desired then - * one should use {@link #newBuilder()}. - */ - public User() {} - - /** - * All-args constructor. - */ - public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) { - this.name = name; - this.favorite_number = favorite_number; - this.favorite_color = favorite_color; - } - - public org.apache.avro.Schema getSchema() { return SCHEMA$; } - // Used by DatumWriter. Applications should not call. - public java.lang.Object get(int field$) { - switch (field$) { - case 0: return name; - case 1: return favorite_number; - case 2: return favorite_color; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - // Used by DatumReader. Applications should not call. - @SuppressWarnings(value="unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: name = (java.lang.CharSequence)value$; break; - case 1: favorite_number = (java.lang.Integer)value$; break; - case 2: favorite_color = (java.lang.CharSequence)value$; break; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - /** - * Gets the value of the 'name' field. - */ - public java.lang.CharSequence getName() { - return name; - } - - /** - * Sets the value of the 'name' field. - * @param value the value to set. - */ - public void setName(java.lang.CharSequence value) { - this.name = value; - } - - /** - * Gets the value of the 'favorite_number' field. - */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** - * Sets the value of the 'favorite_number' field. - * @param value the value to set. - */ - public void setFavoriteNumber(java.lang.Integer value) { - this.favorite_number = value; - } - - /** - * Gets the value of the 'favorite_color' field. - */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** - * Sets the value of the 'favorite_color' field. - * @param value the value to set. - */ - public void setFavoriteColor(java.lang.CharSequence value) { - this.favorite_color = value; - } - - /** Creates a new User RecordBuilder */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() { - return new org.apache.flink.api.io.avro.example.User.Builder(); - } - - /** Creates a new User RecordBuilder by copying an existing Builder */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) { - return new org.apache.flink.api.io.avro.example.User.Builder(other); - } - - /** Creates a new User RecordBuilder by copying an existing User instance */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) { - return new org.apache.flink.api.io.avro.example.User.Builder(other); - } - - /** - * RecordBuilder for User instances. - */ - public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User> - implements org.apache.avro.data.RecordBuilder<User> { - - private java.lang.CharSequence name; - private java.lang.Integer favorite_number; - private java.lang.CharSequence favorite_color; - - /** Creates a new Builder */ - private Builder() { - super(org.apache.flink.api.io.avro.example.User.SCHEMA$); - } - - /** Creates a Builder by copying an existing Builder */ - private Builder(org.apache.flink.api.io.avro.example.User.Builder other) { - super(other); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - } - - /** Creates a Builder by copying an existing User instance */ - private Builder(org.apache.flink.api.io.avro.example.User other) { - super(org.apache.flink.api.io.avro.example.User.SCHEMA$); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - } - - /** Gets the value of the 'name' field */ - public java.lang.CharSequence getName() { - return name; - } - - /** Sets the value of the 'name' field */ - public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) { - validate(fields()[0], value); - this.name = value; - fieldSetFlags()[0] = true; - return this; - } - - /** Checks whether the 'name' field has been set */ - public boolean hasName() { - return fieldSetFlags()[0]; - } - - /** Clears the value of the 'name' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearName() { - name = null; - fieldSetFlags()[0] = false; - return this; - } - - /** Gets the value of the 'favorite_number' field */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** Sets the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) { - validate(fields()[1], value); - this.favorite_number = value; - fieldSetFlags()[1] = true; - return this; - } - - /** Checks whether the 'favorite_number' field has been set */ - public boolean hasFavoriteNumber() { - return fieldSetFlags()[1]; - } - - /** Clears the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() { - favorite_number = null; - fieldSetFlags()[1] = false; - return this; - } - - /** Gets the value of the 'favorite_color' field */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** Sets the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) { - validate(fields()[2], value); - this.favorite_color = value; - fieldSetFlags()[2] = true; - return this; - } - - /** Checks whether the 'favorite_color' field has been set */ - public boolean hasFavoriteColor() { - return fieldSetFlags()[2]; - } - - /** Clears the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() { - favorite_color = null; - fieldSetFlags()[2] = false; - return this; - } - - @Override - public User build() { - try { - User record = new User(); - record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); - record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]); - record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]); - return record; - } catch (Exception e) { - throw new org.apache.avro.AvroRuntimeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java deleted file mode 100644 index 5ae88ca..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.Path; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Tests for the type extraction of the {@link AvroInputFormat}. - */ -public class AvroInputFormatTypeExtractionTest { - - @Test - public void testTypeExtraction() { - try { - InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class); - - TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<MyAvroType> input = env.createInput(format); - TypeInformation<?> typeInfoDataSet = input.getType(); - - Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo); - Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo); - - Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass()); - Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass()); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - /** - * Test type. - */ - public static final class MyAvroType { - - public String theString; - - public MyAvroType recursive; - - private double aDouble; - - public double getaDouble() { - return aDouble; - } - - public void setaDouble(double aDouble) { - this.aDouble = aDouble; - } - - public void setTheString(String theString) { - this.theString = theString; - } - - public String getTheString() { - return theString; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java deleted file mode 100644 index 71ebd78..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import org.apache.flink.api.io.avro.example.User; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for {@link AvroOutputFormat}. - */ -public class AvroOutputFormatTest { - - @Test - public void testSetCodec() throws Exception { - // given - final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); - - // when - try { - outputFormat.setCodec(Codec.SNAPPY); - } catch (Exception ex) { - // then - fail("unexpected exception"); - } - } - - @Test - public void testSetCodecError() throws Exception { - // given - boolean error = false; - final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); - - // when - try { - outputFormat.setCodec(null); - } catch (Exception ex) { - error = true; - } - - // then - assertTrue(error); - } - - @Test - public void testSerialization() throws Exception { - - serializeAndDeserialize(null, null); - serializeAndDeserialize(null, User.SCHEMA$); - for (final Codec codec : Codec.values()) { - serializeAndDeserialize(codec, null); - serializeAndDeserialize(codec, User.SCHEMA$); - } - } - - private void serializeAndDeserialize(final Codec codec, final Schema schema) throws IOException, ClassNotFoundException { - // given - final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); - if (codec != null) { - outputFormat.setCodec(codec); - } - if (schema != null) { - outputFormat.setSchema(schema); - } - - final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - // when - try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(outputFormat); - } - try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) { - // then - Object o = ois.readObject(); - assertTrue(o instanceof AvroOutputFormat); - final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o; - final Codec restoredCodec = (Codec) Whitebox.getInternalState(restored, "codec"); - final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema"); - - assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null); - assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null); - } - } - - @Test - public void testCompression() throws Exception { - // given - final Path outputPath = new Path(File.createTempFile("avro-output-file", "avro").getAbsolutePath()); - final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath, User.class); - outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - - final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file", "compressed.avro").getAbsolutePath()); - final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath, User.class); - compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - compressedOutputFormat.setCodec(Codec.SNAPPY); - - // when - output(outputFormat); - output(compressedOutputFormat); - - // then - assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath)); - - // cleanup - FileSystem fs = FileSystem.getLocalFileSystem(); - fs.delete(outputPath, false); - fs.delete(compressedOutputPath, false); - } - - private long fileSize(Path path) throws IOException { - return path.getFileSystem().getFileStatus(path).getLen(); - } - - private void output(final AvroOutputFormat<User> outputFormat) throws IOException { - outputFormat.configure(new Configuration()); - outputFormat.open(1, 1); - for (int i = 0; i < 100; i++) { - outputFormat.writeRecord(new User("testUser", 1, "blue")); - } - outputFormat.close(); - } - - @Test - public void testGenericRecord() throws IOException { - final Path outputPath = new Path(File.createTempFile("avro-output-file", "generic.avro").getAbsolutePath()); - final AvroOutputFormat<GenericRecord> outputFormat = new AvroOutputFormat<>(outputPath, GenericRecord.class); - Schema schema = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"user\", \"fields\": [{\"name\":\"user_name\", \"type\":\"string\"}, {\"name\":\"favorite_number\", \"type\":\"int\"}, {\"name\":\"favorite_color\", \"type\":\"string\"}]}"); - outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - outputFormat.setSchema(schema); - output(outputFormat, schema); - - GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); - DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File(outputPath.getPath()), reader); - - while (dataFileReader.hasNext()) { - GenericRecord record = dataFileReader.next(); - assertEquals(record.get("user_name").toString(), "testUser"); - assertEquals(record.get("favorite_number"), 1); - assertEquals(record.get("favorite_color").toString(), "blue"); - } - - //cleanup - FileSystem fs = FileSystem.getLocalFileSystem(); - fs.delete(outputPath, false); - - } - - private void output(final AvroOutputFormat<GenericRecord> outputFormat, Schema schema) throws IOException { - outputFormat.configure(new Configuration()); - outputFormat.open(1, 1); - for (int i = 0; i < 100; i++) { - GenericRecord record = new GenericData.Record(schema); - record.put("user_name", "testUser"); - record.put("favorite_number", 1); - record.put("favorite_color", "blue"); - outputFormat.writeRecord(record); - } - outputFormat.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/typeutils/AvroTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/typeutils/AvroTypeInfoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/typeutils/AvroTypeInfoTest.java deleted file mode 100644 index e0bb1a1..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/typeutils/AvroTypeInfoTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.api.common.typeutils.TypeInformationTestBase; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.User; - -/** - * Test for {@link AvroTypeInfo}. - */ -public class AvroTypeInfoTest extends TypeInformationTestBase<AvroTypeInfo<?>> { - - @Override - protected AvroTypeInfo<?>[] getTestData() { - return new AvroTypeInfo<?>[] { - new AvroTypeInfo<>(Address.class), - new AvroTypeInfo<>(User.class), - }; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc deleted file mode 100644 index ab8adf5..0000000 --- a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc +++ /dev/null @@ -1,35 +0,0 @@ -[ -{"namespace": "org.apache.flink.api.io.avro.generated", - "type": "record", - "name": "Address", - "fields": [ - {"name": "num", "type": "int"}, - {"name": "street", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "state", "type": "string"}, - {"name": "zip", "type": "string"} - ] -}, -{"namespace": "org.apache.flink.api.io.avro.generated", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]}, - {"name": "type_long_test", "type": ["long", "null"]}, - {"name": "type_double_test", "type": "double"}, - {"name": "type_null_test", "type": ["null"]}, - {"name": "type_bool_test", "type": ["boolean"]}, - {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, - {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, - {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, - {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}}, - {"name": "type_map", "type": {"type": "map", "values": "long"}}, - {"name": "type_fixed", - "size": 16, - "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, - {"name": "type_union", "type": ["null", "boolean", "long", "double"]}, - {"name": "type_nested", "type": ["null", "Address"]} - ] -}] http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties deleted file mode 100644 index 881dc06..0000000 --- a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=OFF, A1 - -# A1 is set to be a ConsoleAppender. -log4j.appender.A1=org.apache.log4j.ConsoleAppender - -# A1 uses PatternLayout. -log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/resources/testdata.avro ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/resources/testdata.avro b/flink-connectors/flink-avro/src/test/resources/testdata.avro deleted file mode 100644 index 45308b9..0000000 Binary files a/flink-connectors/flink-avro/src/test/resources/testdata.avro and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-filesystem/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml index f39758b..12a151e 100644 --- a/flink-connectors/flink-connector-filesystem/pom.xml +++ b/flink-connectors/flink-connector-filesystem/pom.xml @@ -57,6 +57,15 @@ under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-avro. --> + <optional>true</optional> + </dependency> + <!-- test dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java index 317ee55..e931633 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java @@ -65,7 +65,7 @@ Usage: } </pre> */ -public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable { +public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable { private static final long serialVersionUID = 1L; public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key"; public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value"; http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml index 581640d..2b6660d 100644 --- a/flink-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml @@ -78,8 +78,16 @@ under the License. <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-avro. --> <optional>true</optional> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-0.11/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml index 475c842..162d5d0 100644 --- a/flink-connectors/flink-connector-kafka-0.11/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml @@ -78,8 +78,16 @@ under the License. <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-avro. --> <optional>true</optional> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-0.8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml index dd7a542..c990188 100644 --- a/flink-connectors/flink-connector-kafka-0.8/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml @@ -69,8 +69,16 @@ under the License. <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-avro. --> <optional>true</optional> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-0.9/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml index fef070d..819d590 100644 --- a/flink-connectors/flink-connector-kafka-0.9/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml @@ -68,8 +68,16 @@ under the License. <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-avro. --> <optional>true</optional> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml index c9f7de2..4f2fb45 100644 --- a/flink-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-connectors/flink-connector-kafka-base/pom.xml @@ -62,8 +62,16 @@ under the License. <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-avro. --> <optional>true</optional> </dependency> @@ -218,25 +226,6 @@ under the License. <inherited>true</inherited> <extensions>true</extensions> </plugin> - <!-- Add Avro generated classes for testing. --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>${project.basedir}/../flink-avro/src/test/java/org/apache/flink/api/io/avro/generated</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java deleted file mode 100644 index 0d36f4c..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.specific.SpecificData; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.util.Utf8; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.List; - -/** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. - * - * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. - * - * {@link Utf8} is converted to regular Java Strings. - */ -public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { - - /** - * Avro record class. - */ - private Class<? extends SpecificRecord> recordClazz; - - /** - * Schema for deterministic field order. - */ - private transient Schema schema; - - /** - * Reader that deserializes byte array into a record. - */ - private transient DatumReader<SpecificRecord> datumReader; - - /** - * Input stream to read message from. - */ - private transient MutableByteArrayInputStream inputStream; - - /** - * Avro decoder that decodes binary data. - */ - private transient Decoder decoder; - - /** - * Record to deserialize byte array to. - */ - private SpecificRecord record; - - /** - * Creates a Avro deserialization schema for the given record. - * - * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row - */ - public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { - Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); - this.recordClazz = recordClazz; - this.schema = SpecificData.get().getSchema(recordClazz); - this.datumReader = new SpecificDatumReader<>(schema); - this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); - this.inputStream = new MutableByteArrayInputStream(); - this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); - } - - @Override - public Row deserialize(byte[] message) throws IOException { - // read record - try { - inputStream.setBuffer(message); - this.record = datumReader.read(record, decoder); - } catch (IOException e) { - throw new RuntimeException("Failed to deserialize Row.", e); - } - - // convert to row - final Object row = convertToRow(schema, record); - return (Row) row; - } - - private void writeObject(ObjectOutputStream oos) throws IOException { - oos.writeObject(recordClazz); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { - this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject(); - this.schema = SpecificData.get().getSchema(recordClazz); - this.datumReader = new SpecificDatumReader<>(schema); - this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); - this.inputStream = new MutableByteArrayInputStream(); - this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); - } - - /** - * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. - * Avro's {@link Utf8} fields are converted into regular Java strings. - */ - private static Object convertToRow(Schema schema, Object recordObj) { - if (recordObj instanceof GenericRecord) { - // records can be wrapped in a union - if (schema.getType() == Schema.Type.UNION) { - final List<Schema> types = schema.getTypes(); - if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { - schema = types.get(1); - } - else { - throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); - } - } else if (schema.getType() != Schema.Type.RECORD) { - throw new RuntimeException("Record type for row type expected. But is: " + schema); - } - final List<Schema.Field> fields = schema.getFields(); - final Row row = new Row(fields.size()); - final GenericRecord record = (GenericRecord) recordObj; - for (int i = 0; i < fields.size(); i++) { - final Schema.Field field = fields.get(i); - row.setField(i, convertToRow(field.schema(), record.get(field.pos()))); - } - return row; - } else if (recordObj instanceof Utf8) { - return recordObj.toString(); - } else { - return recordObj; - } - } - - /** - * An extension of the ByteArrayInputStream that allows to change a buffer that should be - * read without creating a new ByteArrayInputStream instance. This allows to re-use the same - * InputStream instance, copying message to process, and creation of Decoder on every new message. - */ - private static final class MutableByteArrayInputStream extends ByteArrayInputStream { - - public MutableByteArrayInputStream() { - super(new byte[0]); - } - - /** - * Set buffer that can be read via the InputStream interface and reset the input stream. - * This has the same effect as creating a new ByteArrayInputStream with a new buffer. - * - * @param buf the new buffer to read. - */ - public void setBuffer(byte[] buf) { - this.buf = buf; - this.pos = 0; - this.count = buf.length; - } - } -}
