http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java new file mode 100644 index 0000000..e2d91af --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import java.io.File; +import java.net.InetAddress; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.RemoteExecutor; +import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; + +import org.junit.Assert; +import org.junit.Test; + + +public class AvroExternalJarProgramITCase { + + private static final String JAR_FILE = "target/maven-test-jar.jar"; + + private static final String TEST_DATA_FILE = "/testdata.avro"; + + @Test + public void testExternalProgram() { + + ForkableFlinkMiniCluster testMiniCluster = null; + + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); + testMiniCluster = new ForkableFlinkMiniCluster(config, false); + testMiniCluster.start(); + + String jarFile = JAR_FILE; + String testData = getClass().getResource(TEST_DATA_FILE).toString(); + + PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData }); + + + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort()); + + Client client = new Client(config); + + client.setPrintStatusDuringExecution(false); + client.runBlocking(program, 4); + + } + catch (Throwable t) { + System.err.println(t.getMessage()); + t.printStackTrace(); + Assert.fail("Error during the packaged program execution: " + t.getMessage()); + } + finally { + if (testMiniCluster != null) { + try { + testMiniCluster.stop(); + } catch (Throwable t) { + // ignore + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java new file mode 100644 index 0000000..d40fec5 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import org.junit.Assert; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.io.DatumReader; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.util.JavaProgramTestBase; + +@SuppressWarnings("serial") +public class AvroOutputFormatITCase extends JavaProgramTestBase { + + public static String outputPath1; + + public static String outputPath2; + + public static String inputPath; + + public static String userData = "alice|1|blue\n" + + "bob|2|red\n" + + "john|3|yellow\n" + + "walt|4|black\n"; + + @Override + protected void preSubmit() throws Exception { + inputPath = createTempFile("user", userData); + outputPath1 = getTempDirPath("avro_output1"); + outputPath2 = getTempDirPath("avro_output2"); + } + + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) + .fieldDelimiter("|") + .types(String.class, Integer.class, String.class); + + //output the data with AvroOutputFormat for specific user type + DataSet<User> specificUser = input.map(new ConvertToUser()); + specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1); + + //output the data with AvroOutputFormat for reflect user type + DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective()); + reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2); + + env.execute(); + } + + @Override + protected void postSubmit() throws Exception { + //compare result for specific user type + File [] output1; + File file1 = asFile(outputPath1); + if (file1.isDirectory()) { + output1 = file1.listFiles(); + // check for avro ext in dir. + for (File avroOutput : output1) { + Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); + } + } else { + output1 = new File[] {file1}; + } + List<String> result1 = new ArrayList<String>(); + DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); + for (File avroOutput : output1) { + + DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); + while (dataFileReader1.hasNext()) { + User user = dataFileReader1.next(); + result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); + } + } + for (String expectedResult : userData.split("\n")) { + Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult)); + } + + //compare result for reflect user type + File [] output2; + File file2 = asFile(outputPath2); + if (file2.isDirectory()) { + output2 = file2.listFiles(); + } else { + output2 = new File[] {file2}; + } + List<String> result2 = new ArrayList<String>(); + DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class); + for (File avroOutput : output2) { + DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2); + while (dataFileReader2.hasNext()) { + ReflectiveUser user = dataFileReader2.next(); + result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); + } + } + for (String expectedResult : userData.split("\n")) { + Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult)); + } + + + } + + + public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { + + @Override + public User map(Tuple3<String, Integer, String> value) throws Exception { + return new User(value.f0, value.f1, value.f2); + } + } + + public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { + + @Override + public ReflectiveUser map(User value) throws Exception { + return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString()); + } + } + + + public static class ReflectiveUser { + private String name; + private int favoriteNumber; + private String favoriteColor; + + public ReflectiveUser() {} + + public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) { + this.name = name; + this.favoriteNumber = favoriteNumber; + this.favoriteColor = favoriteColor; + } + + public String getName() { + return this.name; + } + public String getFavoriteColor() { + return this.favoriteColor; + } + public int getFavoriteNumber() { + return this.favoriteNumber; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java new file mode 100644 index 0000000..c39db15 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.flink.api.io.avro.generated.Address; +import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.Fixed16; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.util.StringUtils; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization. + */ +public class EncoderDecoderTest { + @Test + public void testComplexStringsDirecty() { + try { + Random rnd = new Random(349712539451944123L); + + for (int i = 0; i < 10; i++) { + String testString = StringUtils.getRandomString(rnd, 10, 100); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + DataOutputEncoder encoder = new DataOutputEncoder(); + encoder.setOut(dataOut); + + encoder.writeString(testString); + dataOut.flush(); + dataOut.close(); + } + + byte[] data = baos.toByteArray(); + + // deserialize + { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream dataIn = new DataInputStream(bais); + DataInputDecoder decoder = new DataInputDecoder(); + decoder.setIn(dataIn); + + String deserialized = decoder.readString(); + + assertEquals(testString, deserialized); + } + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + @Test + public void testPrimitiveTypes() { + + testObjectSerialization(new Boolean(true)); + testObjectSerialization(new Boolean(false)); + + testObjectSerialization(Byte.valueOf((byte) 0)); + testObjectSerialization(Byte.valueOf((byte) 1)); + testObjectSerialization(Byte.valueOf((byte) -1)); + testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE)); + testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE)); + + testObjectSerialization(Short.valueOf((short) 0)); + testObjectSerialization(Short.valueOf((short) 1)); + testObjectSerialization(Short.valueOf((short) -1)); + testObjectSerialization(Short.valueOf(Short.MIN_VALUE)); + testObjectSerialization(Short.valueOf(Short.MAX_VALUE)); + + testObjectSerialization(Integer.valueOf(0)); + testObjectSerialization(Integer.valueOf(1)); + testObjectSerialization(Integer.valueOf(-1)); + testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE)); + testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE)); + + testObjectSerialization(Long.valueOf(0)); + testObjectSerialization(Long.valueOf(1)); + testObjectSerialization(Long.valueOf(-1)); + testObjectSerialization(Long.valueOf(Long.MIN_VALUE)); + testObjectSerialization(Long.valueOf(Long.MAX_VALUE)); + + testObjectSerialization(Float.valueOf(0)); + testObjectSerialization(Float.valueOf(1)); + testObjectSerialization(Float.valueOf(-1)); + testObjectSerialization(Float.valueOf((float)Math.E)); + testObjectSerialization(Float.valueOf((float)Math.PI)); + testObjectSerialization(Float.valueOf(Float.MIN_VALUE)); + testObjectSerialization(Float.valueOf(Float.MAX_VALUE)); + testObjectSerialization(Float.valueOf(Float.MIN_NORMAL)); + testObjectSerialization(Float.valueOf(Float.NaN)); + testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY)); + testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY)); + + testObjectSerialization(Double.valueOf(0)); + testObjectSerialization(Double.valueOf(1)); + testObjectSerialization(Double.valueOf(-1)); + testObjectSerialization(Double.valueOf(Math.E)); + testObjectSerialization(Double.valueOf(Math.PI)); + testObjectSerialization(Double.valueOf(Double.MIN_VALUE)); + testObjectSerialization(Double.valueOf(Double.MAX_VALUE)); + testObjectSerialization(Double.valueOf(Double.MIN_NORMAL)); + testObjectSerialization(Double.valueOf(Double.NaN)); + testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY)); + testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY)); + + testObjectSerialization(""); + testObjectSerialization("abcdefg"); + testObjectSerialization("ab\u1535\u0155xyz\u706F"); + + testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523)); + testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001)); + } + + @Test + public void testArrayTypes() { + { + int[] array = new int[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + long[] array = new long[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + float[] array = new float[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + double[] array = new double[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"}; + testObjectSerialization(array); + } + } + + @Test + public void testEmptyArray() { + { + int[] array = new int[0]; + testObjectSerialization(array); + } + { + long[] array = new long[0]; + testObjectSerialization(array); + } + { + float[] array = new float[0]; + testObjectSerialization(array); + } + { + double[] array = new double[0]; + testObjectSerialization(array); + } + { + String[] array = new String[0]; + testObjectSerialization(array); + } + } + + @Test + public void testObjects() { + // simple object containing only primitives + { + testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42)); + } + + // object with collection + { + ArrayList<String> list = new ArrayList<String>(); + list.add("A"); + list.add("B"); + list.add("C"); + list.add("D"); + list.add("E"); + + testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym")); + } + + // object with empty collection + { + ArrayList<String> list = new ArrayList<String>(); + testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus")); + } + } + + @Test + public void testNestedObjectsWithCollections() { + testObjectSerialization(new ComplexNestedObject2(true)); + } + + @Test + public void testGeneratedObjectWithNullableFields() { + List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" }); + List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true); + Map<CharSequence, Long> map = new HashMap<CharSequence, Long>(); + map.put("1", 1L); + map.put("2", 2L); + map.put("3", 3L); + + byte[] b = new byte[16]; + new Random().nextBytes(b); + Fixed16 f = new Fixed16(b); + Address addr = new Address(new Integer(239), "6th Main", "Bangalore", + "Karnataka", "560075"); + User user = new User("Freudenreich", 1337, "macintosh gray", + 1234567890L, 3.1415926, null, true, strings, bools, null, + Colors.GREEN, map, f, new Boolean(true), addr); + + testObjectSerialization(user); + } + + @Test + public void testVarLenCountEncoding() { + try { + long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL}; + + // write + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + + for (long val : values) { + DataOutputEncoder.writeVarLongCount(dataOut, val); + } + + dataOut.flush(); + dataOut.close(); + } + + // read + { + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dataIn = new DataInputStream(bais); + + for (long val : values) { + long read = DataInputDecoder.readVarLongCount(dataIn); + assertEquals("Wrong var-len encoded value read.", val, read); + } + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + private static <X> void testObjectSerialization(X obj) { + + try { + + // serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + DataOutputEncoder encoder = new DataOutputEncoder(); + encoder.setOut(dataOut); + + @SuppressWarnings("unchecked") + Class<X> clazz = (Class<X>) obj.getClass(); + ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz); + + writer.write(obj, encoder); + dataOut.flush(); + dataOut.close(); + } + + byte[] data = baos.toByteArray(); + X result = null; + + // deserialize + { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream dataIn = new DataInputStream(bais); + DataInputDecoder decoder = new DataInputDecoder(); + decoder.setIn(dataIn); + + @SuppressWarnings("unchecked") + Class<X> clazz = (Class<X>) obj.getClass(); + ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz); + + // create a reuse object if possible, otherwise we have no reuse object + X reuse = null; + try { + @SuppressWarnings("unchecked") + X test = (X) obj.getClass().newInstance(); + reuse = test; + } catch (Throwable t) {} + + result = reader.read(reuse, decoder); + } + + // check + final String message = "Deserialized object is not the same as the original"; + + if (obj.getClass().isArray()) { + Class<?> clazz = obj.getClass(); + if (clazz == byte[].class) { + assertArrayEquals(message, (byte[]) obj, (byte[]) result); + } + else if (clazz == short[].class) { + assertArrayEquals(message, (short[]) obj, (short[]) result); + } + else if (clazz == int[].class) { + assertArrayEquals(message, (int[]) obj, (int[]) result); + } + else if (clazz == long[].class) { + assertArrayEquals(message, (long[]) obj, (long[]) result); + } + else if (clazz == char[].class) { + assertArrayEquals(message, (char[]) obj, (char[]) result); + } + else if (clazz == float[].class) { + assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f); + } + else if (clazz == double[].class) { + assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0); + } else { + assertArrayEquals(message, (Object[]) obj, (Object[]) result); + } + } else { + assertEquals(message, obj, result); + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + // Test Objects + // -------------------------------------------------------------------------------------------- + + + public static final class SimpleTypes { + + private final int iVal; + private final long lVal; + private final byte bVal; + private final String sVal; + private final short rVal; + private final double dVal; + + + public SimpleTypes() { + this(0, 0, (byte) 0, "", (short) 0, 0); + } + + public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) { + this.iVal = iVal; + this.lVal = lVal; + this.bVal = bVal; + this.sVal = sVal; + this.rVal = rVal; + this.dVal = dVal; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == SimpleTypes.class) { + SimpleTypes other = (SimpleTypes) obj; + + return other.iVal == this.iVal && + other.lVal == this.lVal && + other.bVal == this.bVal && + other.sVal.equals(this.sVal) && + other.rVal == this.rVal && + other.dVal == this.dVal; + + } else { + return false; + } + } + } + + public static class ComplexNestedObject1 { + + private double doubleValue; + + private List<String> stringList; + + public ComplexNestedObject1() {} + + public ComplexNestedObject1(int offInit) { + this.doubleValue = 6293485.6723 + offInit; + + this.stringList = new ArrayList<String>(); + this.stringList.add("A" + offInit); + this.stringList.add("somewhat" + offInit); + this.stringList.add("random" + offInit); + this.stringList.add("collection" + offInit); + this.stringList.add("of" + offInit); + this.stringList.add("strings" + offInit); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == ComplexNestedObject1.class) { + ComplexNestedObject1 other = (ComplexNestedObject1) obj; + return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList); + } else { + return false; + } + } + } + + public static class ComplexNestedObject2 { + + private long longValue; + + private Map<String, ComplexNestedObject1> theMap; + + public ComplexNestedObject2() {} + + public ComplexNestedObject2(boolean init) { + this.longValue = 46547; + + this.theMap = new HashMap<String, ComplexNestedObject1>(); + this.theMap.put("36354L", new ComplexNestedObject1(43546543)); + this.theMap.put("785611L", new ComplexNestedObject1(45784568)); + this.theMap.put("43L", new ComplexNestedObject1(9876543)); + this.theMap.put("-45687L", new ComplexNestedObject1(7897615)); + this.theMap.put("1919876876896L", new ComplexNestedObject1(27154)); + this.theMap.put("-868468468L", new ComplexNestedObject1(546435)); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == ComplexNestedObject2.class) { + ComplexNestedObject2 other = (ComplexNestedObject2) obj; + return other.longValue == this.longValue && this.theMap.equals(other.theMap); + } else { + return false; + } + } + } + + public static class Book { + + private long bookId; + private String title; + private long authorId; + + public Book() {} + + public Book(long bookId, String title, long authorId) { + this.bookId = bookId; + this.title = title; + this.authorId = authorId; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == Book.class) { + Book other = (Book) obj; + return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title); + } else { + return false; + } + } + } + + public static class BookAuthor { + + private long authorId; + private List<String> bookTitles; + private String authorName; + + public BookAuthor() {} + + public BookAuthor(long authorId, List<String> bookTitles, String authorName) { + this.authorId = authorId; + this.bookTitles = bookTitles; + this.authorName = authorName; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == BookAuthor.class) { + BookAuthor other = (BookAuthor) obj; + return other.authorName.equals(this.authorName) && other.authorId == this.authorId && + other.bookTitles.equals(this.bookTitles); + } else { + return false; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java new file mode 100644 index 0000000..1174786 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.avro.testjar; + +// ================================================================================================ +// This file defines the classes for the AvroExternalJarProgramITCase. +// The program is exported into src/test/resources/AvroTestProgram.jar. +// +// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED +// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL +// NOT BE COVERED BY THIS TEST. +// ================================================================================================ + + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.core.fs.Path; + +public class AvroExternalJarProgram { + + public static final class Color { + + private String name; + private double saturation; + + public Color() { + name = ""; + saturation = 1.0; + } + + public Color(String name, double saturation) { + this.name = name; + this.saturation = saturation; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public double getSaturation() { + return saturation; + } + + public void setSaturation(double saturation) { + this.saturation = saturation; + } + + @Override + public String toString() { + return name + '(' + saturation + ')'; + } + } + + public static final class MyUser { + + private String name; + private List<Color> colors; + + public MyUser() { + name = "unknown"; + colors = new ArrayList<Color>(); + } + + public MyUser(String name, List<Color> colors) { + this.name = name; + this.colors = colors; + } + + public String getName() { + return name; + } + + public List<Color> getColors() { + return colors; + } + + public void setName(String name) { + this.name = name; + } + + public void setColors(List<Color> colors) { + this.colors = colors; + } + + @Override + public String toString() { + return name + " : " + colors; + } + } + + // -------------------------------------------------------------------------------------------- + + // -------------------------------------------------------------------------------------------- + + public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, MyUser> map(MyUser u) { + String namePrefix = u.getName().substring(0, 1); + return new Tuple2<String, MyUser>(namePrefix, u); + } + } + + public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) { + return val1; + } + } + + // -------------------------------------------------------------------------------------------- + // Test Data + // -------------------------------------------------------------------------------------------- + + public static final class Generator { + + private final Random rnd = new Random(2389756789345689276L); + + public MyUser nextUser() { + return randomUser(); + } + + private MyUser randomUser() { + + int numColors = rnd.nextInt(5); + ArrayList<Color> colors = new ArrayList<Color>(numColors); + for (int i = 0; i < numColors; i++) { + colors.add(new Color(randomString(), rnd.nextDouble())); + } + + return new MyUser(randomString(), colors); + } + + private String randomString() { + char[] c = new char[this.rnd.nextInt(20) + 5]; + + for (int i = 0; i < c.length; i++) { + c[i] = (char) (this.rnd.nextInt(150) + 40); + } + + return new String(c); + } + } + + public static void writeTestData(File testFile, int numRecords) throws IOException { + + DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class); + DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter); + + dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile); + + + Generator generator = new Generator(); + + for (int i = 0; i < numRecords; i++) { + MyUser user = generator.nextUser(); + dataFileWriter.append(user); + } + + dataFileWriter.close(); + } + +// public static void main(String[] args) throws Exception { +// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath(); +// writeTestData(new File(testDataFile), 50); +// } + + public static void main(String[] args) throws Exception { + String inputPath = args[0]; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class)); + + DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper()); + + result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>()); + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java new file mode 100644 index 0000000..f33f433 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.io.avro; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +@RunWith(Parameterized.class) +public class AvroPojoTest extends MultipleProgramsTestBase { + public AvroPojoTest(TestExecutionMode mode) { + super(mode); + } + + private File inFile; + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + inFile = tempFolder.newFile(); + AvroRecordInputFormatTest.writeTestFile(inFile); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSimpleAvroRead() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction<User, User>() { + @Override + public User map(User value) throws Exception { + value.setTypeMap(null); + return value; + } + }); + + usersDS.writeAsText(resultPath); + + env.execute("Simple Avro read job"); + + + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; + } + + @Test + public void testSerializeWithAvro() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceAvro(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction<User, User>() { + @Override + public User map(User value) throws Exception { + Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1); + ab.put("hehe", 12L); + value.setTypeMap(ab); + return value; + } + }); + + usersDS.writeAsText(resultPath); + + env.execute("Simple Avro read job"); + + + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; + + } + + @Test + public void testKeySelection() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + expected = "(Alyssa,1)\n(Charlie,1)\n"; + } + + @Test + public void testWithAvroGenericSer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceAvro(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { + @Override + public String getKey(User value) throws Exception { + return String.valueOf(value.getName()); + } + }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for(User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + expected = "(Charlie,1)\n(Alyssa,1)\n"; + } + + @Test + public void testWithKryoGenericSer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceKryo(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { + @Override + public String getKey(User value) throws Exception { + return String.valueOf(value.getName()); + } + }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + expected = "(Charlie,1)\n(Alyssa,1)\n"; + } + + /** + * Test some know fields for grouping on + */ + @Test + public void testAllFields() throws Exception { + for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { + testField(fieldName); + } + } + + private void testField(final String fieldName) throws Exception { + before(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { + @Override + public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { + for(User u : values) { + out.collect(u.get(fieldName)); + } + } + }); + res.writeAsText(resultPath); + env.execute("Simple Avro read job"); + + // test if automatic registration of the Types worked + ExecutionConfig ec = env.getConfig(); + Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class)); + + if(fieldName.equals("name")) { + expected = "Alyssa\nCharlie"; + } else if(fieldName.equals("type_enum")) { + expected = "GREEN\nRED\n"; + } else if(fieldName.equals("type_double_test")) { + expected = "123.45\n1.337\n"; + } else { + Assert.fail("Unknown field"); + } + + after(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java new file mode 100644 index 0000000..ab30ef3 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.io.avro; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.esotericsoftware.kryo.Serializer; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.io.avro.generated.Address; +import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the avro input format. + * (The testcase is mostly the getting started tutorial of avro) + * http://avro.apache.org/docs/current/gettingstartedjava.html + */ +public class AvroRecordInputFormatTest { + + public File testFile; + + final static String TEST_NAME = "Alyssa"; + + final static String TEST_ARRAY_STRING_1 = "ELEMENT 1"; + final static String TEST_ARRAY_STRING_2 = "ELEMENT 2"; + + final static boolean TEST_ARRAY_BOOLEAN_1 = true; + final static boolean TEST_ARRAY_BOOLEAN_2 = false; + + final static Colors TEST_ENUM_COLOR = Colors.GREEN; + + final static String TEST_MAP_KEY1 = "KEY 1"; + final static long TEST_MAP_VALUE1 = 8546456L; + final static String TEST_MAP_KEY2 = "KEY 2"; + final static long TEST_MAP_VALUE2 = 17554L; + + final static Integer TEST_NUM = 239; + final static String TEST_STREET = "Baker Street"; + final static String TEST_CITY = "London"; + final static String TEST_STATE = "London"; + final static String TEST_ZIP = "NW1 6XE"; + + + private Schema userSchema = new User().getSchema(); + + + public static void writeTestFile(File testFile) throws IOException { + ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); + stringArray.add(TEST_ARRAY_STRING_1); + stringArray.add(TEST_ARRAY_STRING_2); + + ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); + booleanArray.add(TEST_ARRAY_BOOLEAN_1); + booleanArray.add(TEST_ARRAY_BOOLEAN_2); + + HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); + longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); + longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); + + Address addr = new Address(); + addr.setNum(new Integer(TEST_NUM)); + addr.setStreet(TEST_STREET); + addr.setCity(TEST_CITY); + addr.setState(TEST_STATE); + addr.setZip(TEST_ZIP); + + + User user1 = new User(); + + user1.setName(TEST_NAME); + user1.setFavoriteNumber(256); + user1.setTypeDoubleTest(123.45d); + user1.setTypeBoolTest(true); + user1.setTypeArrayString(stringArray); + user1.setTypeArrayBoolean(booleanArray); + user1.setTypeEnum(TEST_ENUM_COLOR); + user1.setTypeMap(longMap); + user1.setTypeNested(addr); + + // Construct via builder + User user2 = User.newBuilder() + .setName("Charlie") + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(null) + .setTypeUnion(null) + .setTypeNested( + Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) + .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) + .build()) + .build(); + DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); + DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); + dataFileWriter.create(user1.getSchema(), testFile); + dataFileWriter.append(user1); + dataFileWriter.append(user2); + dataFileWriter.close(); + } + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroInputFormatTest", null); + writeTestFile(testFile); + } + + + /** + * Test if the AvroInputFormat is able to properly read data from an avro file. + * @throws IOException + */ + @Test + public void testDeserialisation() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + User u = format.nextRecord(null); + assertNotNull(u); + + String name = u.getName().toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = u.getTypeArrayString(); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = u.getTypeArrayBoolean(); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + Colors enumValue = u.getTypeEnum(); + assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); + + // check maps + Map<CharSequence, Long> lm = u.getTypeMap(); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + + format.close(); + } + + /** + * Test if the Flink serialization is able to properly process GenericData.Record types. + * Usually users of Avro generate classes (POJOs) from Avro schemas. + * However, if generated classes are not available, one can also use GenericData.Record. + * It is an untyped key-value record which is using a schema to validate the correctness of the data. + * + * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead. + */ + @Test + public void testDeserializeToGenericType() throws IOException { + DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(userSchema); + + FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader); + // initialize Record by reading it from disk (thats easier than creating it by hand) + GenericData.Record rec = new GenericData.Record(userSchema); + dataFileReader.next(rec); + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + assertEquals(null, rec.get("type_long_test")); // it is null for the first record. + + // now serialize it with our framework: + + TypeInformation<GenericData.Record> te = (TypeInformation<GenericData.Record>) TypeExtractor.createTypeInfo(GenericData.Record.class); + ExecutionConfig ec = new ExecutionConfig(); + Assert.assertEquals(GenericTypeInfo.class, te.getClass()); + Serializers.recursivelyRegisterType(( (GenericTypeInfo) te).getTypeClass(), ec); + + TypeSerializer<GenericData.Record> tser = te.createSerializer(ec); + Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); + Assert.assertTrue( + ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && + ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class)); + ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); + tser.serialize(rec, target); + + GenericData.Record newRec = tser.deserialize(target.getInputView()); + + // check if it is still the same + assertNotNull(newRec); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); + assertEquals("name not equal", TEST_NAME, newRec.get("name").toString() ); + assertEquals(null, newRec.get("type_long_test")); + + } + + /** + * This test validates proper serialization with specific (generated POJO) types. + */ + @Test + public void testDeserializeToSpecificType() throws IOException { + + DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema); + + FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader); + User rec = dataFileReader.next(); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + + // now serialize it with our framework: + ExecutionConfig ec = new ExecutionConfig(); + TypeInformation<User> te = (TypeInformation<User>) TypeExtractor.createTypeInfo(User.class); + Assert.assertEquals(AvroTypeInfo.class, te.getClass()); + TypeSerializer<User> tser = te.createSerializer(ec); + ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); + tser.serialize(rec, target); + + User newRec = tser.deserialize(target.getInputView()); + + // check if it is still the same + assertNotNull(newRec); + assertEquals("name not equal", TEST_NAME, newRec.getName().toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString() ); + } + + + @After + public void deleteFiles() { + testFile.delete(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java new file mode 100644 index 0000000..898b8fd --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.io.avro; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.flink.api.io.avro.generated.Address; +import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.Fixed16; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Test the avro input format. + * (The testcase is mostly the getting started tutorial of avro) + * http://avro.apache.org/docs/current/gettingstartedjava.html + */ +public class AvroSplittableInputFormatTest { + + private File testFile; + + final static String TEST_NAME = "Alyssa"; + + final static String TEST_ARRAY_STRING_1 = "ELEMENT 1"; + final static String TEST_ARRAY_STRING_2 = "ELEMENT 2"; + + final static boolean TEST_ARRAY_BOOLEAN_1 = true; + final static boolean TEST_ARRAY_BOOLEAN_2 = false; + + final static Colors TEST_ENUM_COLOR = Colors.GREEN; + + final static String TEST_MAP_KEY1 = "KEY 1"; + final static long TEST_MAP_VALUE1 = 8546456L; + final static String TEST_MAP_KEY2 = "KEY 2"; + final static long TEST_MAP_VALUE2 = 17554L; + + final static Integer TEST_NUM = new Integer(239); + final static String TEST_STREET = "Baker Street"; + final static String TEST_CITY = "London"; + final static String TEST_STATE = "London"; + final static String TEST_ZIP = "NW1 6XE"; + + final static int NUM_RECORDS = 5000; + + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroSplittableInputFormatTest", null); + + ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); + stringArray.add(TEST_ARRAY_STRING_1); + stringArray.add(TEST_ARRAY_STRING_2); + + ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); + booleanArray.add(TEST_ARRAY_BOOLEAN_1); + booleanArray.add(TEST_ARRAY_BOOLEAN_2); + + HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); + longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); + longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); + + Address addr = new Address(); + addr.setNum(new Integer(TEST_NUM)); + addr.setStreet(TEST_STREET); + addr.setCity(TEST_CITY); + addr.setState(TEST_STATE); + addr.setZip(TEST_ZIP); + + + User user1 = new User(); + user1.setName(TEST_NAME); + user1.setFavoriteNumber(256); + user1.setTypeDoubleTest(123.45d); + user1.setTypeBoolTest(true); + user1.setTypeArrayString(stringArray); + user1.setTypeArrayBoolean(booleanArray); + user1.setTypeEnum(TEST_ENUM_COLOR); + user1.setTypeMap(longMap); + user1.setTypeNested(addr); + + // Construct via builder + User user2 = User.newBuilder() + .setName(TEST_NAME) + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(new Fixed16()) + .setTypeUnion(123L) + .setTypeNested( + Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) + .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) + .build()) + + .build(); + DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); + DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); + dataFileWriter.create(user1.getSchema(), testFile); + dataFileWriter.append(user1); + dataFileWriter.append(user2); + + Random rnd = new Random(1337); + for(int i = 0; i < NUM_RECORDS -2 ; i++) { + User user = new User(); + user.setName(TEST_NAME + rnd.nextInt()); + user.setFavoriteNumber(rnd.nextInt()); + user.setTypeDoubleTest(rnd.nextDouble()); + user.setTypeBoolTest(true); + user.setTypeArrayString(stringArray); + user.setTypeArrayBoolean(booleanArray); + user.setTypeEnum(TEST_ENUM_COLOR); + user.setTypeMap(longMap); + Address address = new Address(); + address.setNum(new Integer(TEST_NUM)); + address.setStreet(TEST_STREET); + address.setCity(TEST_CITY); + address.setState(TEST_STATE); + address.setZip(TEST_ZIP); + user.setTypeNested(address); + + dataFileWriter.append(user); + } + dataFileWriter.close(); + } + + @Test + public void testSplittedIF() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(4); + assertEquals(splits.length, 4); + int elements = 0; + int elementsPerSplit[] = new int[4]; + for(int i = 0; i < splits.length; i++) { + format.open(splits[i]); + while(!format.reachedEnd()) { + User u = format.nextRecord(null); + Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); + elements++; + elementsPerSplit[i]++; + } + format.close(); + } + + Assert.assertEquals(1539, elementsPerSplit[0]); + Assert.assertEquals(1026, elementsPerSplit[1]); + Assert.assertEquals(1539, elementsPerSplit[2]); + Assert.assertEquals(896, elementsPerSplit[3]); + Assert.assertEquals(NUM_RECORDS, elements); + format.close(); + } + + /* + This test is gave the reference values for the test of Flink's IF. + + This dependency needs to be added + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <version>1.7.6</version> + </dependency> + + @Test + public void testHadoop() throws Exception { + JobConf jf = new JobConf(); + FileInputFormat.addInputPath(jf, new org.apache.hadoop.fs.Path(testFile.toURI())); + jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false); + org.apache.avro.mapred.AvroInputFormat<User> format = new org.apache.avro.mapred.AvroInputFormat<User>(); + InputSplit[] sp = format.getSplits(jf, 4); + int elementsPerSplit[] = new int[4]; + int cnt = 0; + int i = 0; + for(InputSplit s:sp) { + RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter()); + AvroWrapper<User> k = r.createKey(); + NullWritable v = r.createValue(); + + while(r.next(k,v)) { + cnt++; + elementsPerSplit[i]++; + } + i++; + } + System.out.println("Status "+Arrays.toString(elementsPerSplit)); + } **/ + + @After + public void deleteFiles() { + testFile.delete(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java new file mode 100644 index 0000000..23fbab3 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.io; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +public class AvroInputFormatTypeExtractionTest { + + @Test + public void testTypeExtraction() { + try { + InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class); + + TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<MyAvroType> input = env.createInput(format); + TypeInformation<?> typeInfoDataSet = input.getType(); + + + Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo); + Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo); + + Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass()); + Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + public static final class MyAvroType { + + public String theString; + + private double aDouble; + + public double getaDouble() { + return aDouble; + } + + public void setaDouble(double aDouble) { + this.aDouble = aDouble; + } + + public void setTheString(String theString) { + this.theString = theString; + } + + public String getTheString() { + return theString; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc new file mode 100644 index 0000000..02c11af --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc @@ -0,0 +1,35 @@ +[ +{"namespace": "org.apache.flink.api.io.avro.generated", + "type": "record", + "name": "Address", + "fields": [ + {"name": "num", "type": "int"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"} + ] +}, +{"namespace": "org.apache.flink.api.io.avro.generated", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]}, + {"name": "type_long_test", "type": ["long", "null"]}, + {"name": "type_double_test", "type": "double"}, + {"name": "type_null_test", "type": ["null"]}, + {"name": "type_bool_test", "type": ["boolean"]}, + {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, + {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, + {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, + {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}}, + {"name": "type_map", "type": {"type": "map", "values": "long"}}, + {"name": "type_fixed", + "size": 16, + "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, + {"name": "type_union", "type": ["null", "boolean", "long", "double"]}, + {"name": "type_nested", "type": ["null", "Address"]} + ] +}] http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..0b686e5 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro b/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro new file mode 100644 index 0000000..45308b9 Binary files /dev/null and b/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro differ http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml new file mode 100644 index 0000000..d98729a --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-batch-connectors</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-hadoop-compatibility</artifactId> + <name>flink-hadoop-compatibility</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java new file mode 100644 index 0000000..83ab23d --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; +import org.apache.flink.util.Collector; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.Reporter; + +/** + * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. + */ +@SuppressWarnings("rawtypes") +public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> + extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> + implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { + + private static final long serialVersionUID = 1L; + + private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper; + private transient JobConf jobConf; + + private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector; + private transient Reporter reporter; + + /** + * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. + * + * @param hadoopMapper The Hadoop Mapper to wrap. + */ + public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) { + this(hadoopMapper, new JobConf()); + } + + /** + * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. + * The Hadoop Mapper is configured with the provided JobConf. + * + * @param hadoopMapper The Hadoop Mapper to wrap. + * @param conf The JobConf that is used to configure the Hadoop Mapper. + */ + public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) { + if(hadoopMapper == null) { + throw new NullPointerException("Mapper may not be null."); + } + if(conf == null) { + throw new NullPointerException("JobConf may not be null."); + } + + this.mapper = hadoopMapper; + this.jobConf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.mapper.configure(jobConf); + + this.reporter = new HadoopDummyReporter(); + this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); + } + + @Override + public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) + throws Exception { + outputCollector.setFlinkCollector(out); + mapper.map(value.f0, value.f1, outputCollector, reporter); + } + + @SuppressWarnings("unchecked") + @Override + public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { + Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2); + Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3); + + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); + return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); + } + + /** + * Custom serialization methods. + * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> + */ + private void writeObject(final ObjectOutputStream out) throws IOException { + out.writeObject(mapper.getClass()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = + (Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); + mapper = InstantiationUtil.instantiate(mapperClass); + + jobConf = new JobConf(); + jobConf.readFields(in); + } + +}