[FLINK-7420] [avro] Move all Avro code to flink-avro
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/537a10ea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/537a10ea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/537a10ea Branch: refs/heads/master Commit: 537a10ea2ff6a2d8507483c66f413f77884e77c4 Parents: 2c0fa24 Author: twalthr <[email protected]> Authored: Wed Aug 16 12:17:00 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Nov 3 16:40:34 2017 +0100 ---------------------------------------------------------------------- .gitignore | 2 +- flink-connectors/flink-avro/pom.xml | 265 --------- .../apache/flink/api/avro/DataInputDecoder.java | 212 -------- .../flink/api/avro/DataOutputEncoder.java | 180 ------- .../api/avro/FSDataInputStreamWrapper.java | 67 --- .../flink/api/java/io/AvroInputFormat.java | 207 -------- .../flink/api/java/io/AvroOutputFormat.java | 201 ------- .../src/test/assembly/test-assembly.xml | 36 -- .../api/avro/AvroExternalJarProgramITCase.java | 92 ---- .../flink/api/avro/AvroOutputFormatITCase.java | 177 ------- .../flink/api/avro/EncoderDecoderTest.java | 529 ------------------ .../avro/testjar/AvroExternalJarProgram.java | 211 -------- .../apache/flink/api/io/avro/AvroPojoTest.java | 255 --------- .../api/io/avro/AvroRecordInputFormatTest.java | 460 ---------------- .../io/avro/AvroSplittableInputFormatTest.java | 325 ------------ .../api/io/avro/example/AvroTypeExample.java | 106 ---- .../apache/flink/api/io/avro/example/User.java | 269 ---------- .../io/AvroInputFormatTypeExtractionTest.java | 86 --- .../flink/api/java/io/AvroOutputFormatTest.java | 197 ------- .../api/java/typeutils/AvroTypeInfoTest.java | 37 -- .../src/test/resources/avro/user.avsc | 35 -- .../src/test/resources/log4j-test.properties | 27 - .../flink-avro/src/test/resources/testdata.avro | Bin 4572 -> 0 bytes .../flink-connector-filesystem/pom.xml | 9 + .../connectors/fs/AvroKeyValueSinkWriter.java | 2 +- .../flink-connector-kafka-0.10/pom.xml | 12 +- .../flink-connector-kafka-0.11/pom.xml | 12 +- .../flink-connector-kafka-0.8/pom.xml | 12 +- .../flink-connector-kafka-0.9/pom.xml | 12 +- .../flink-connector-kafka-base/pom.xml | 31 +- .../AvroRowDeserializationSchema.java | 179 ------- .../AvroRowSerializationSchema.java | 149 ------ .../kafka/AvroRowDeSerializationSchemaTest.java | 148 ------ .../kafka/KafkaAvroTableSourceTestBase.java | 2 +- .../kafka/testutils/AvroTestUtils.java | 152 ------ flink-connectors/pom.xml | 1 - flink-core/pom.xml | 9 +- .../flink/api/common/ExecutionConfig.java | 10 +- .../flink/api/java/typeutils/AvroTypeInfo.java | 84 --- .../flink/api/java/typeutils/PojoTypeInfo.java | 26 +- .../api/java/typeutils/TypeExtractionUtils.java | 35 ++ .../flink/api/java/typeutils/TypeExtractor.java | 64 ++- .../java/typeutils/runtime/AvroSerializer.java | 332 ------------ .../typeutils/runtime/DataInputDecoder.java | 230 -------- .../typeutils/runtime/DataOutputEncoder.java | 191 ------- .../typeutils/runtime/kryo/KryoSerializer.java | 11 +- .../typeutils/runtime/kryo/Serializers.java | 134 +++-- .../runtime/AvroGenericArraySerializerTest.java | 28 - .../runtime/AvroGenericTypeComparatorTest.java | 28 - .../runtime/AvroGenericTypeSerializerTest.java | 29 - .../runtime/AvroSerializerEmptyArrayTest.java | 189 ------- .../kryo/KryoSerializerCompatibilityTest.java | 28 +- .../resources/kryo-serializer-flink1.3-snapshot | Bin 0 -> 1305 bytes .../scala/examples/twitter/TwitterExample.scala | 3 +- flink-formats/flink-avro/pom.xml | 280 ++++++++++ .../flink/api/java/typeutils/AvroTypeInfo.java | 38 ++ .../flink/formats/avro/AvroInputFormat.java | 207 ++++++++ .../flink/formats/avro/AvroOutputFormat.java | 201 +++++++ .../avro/AvroRowDeserializationSchema.java | 179 +++++++ .../avro/AvroRowSerializationSchema.java | 149 ++++++ .../formats/avro/typeutils/AvroSerializer.java | 338 ++++++++++++ .../formats/avro/typeutils/AvroTypeInfo.java | 100 ++++ .../avro/utils/AvroKryoSerializerUtils.java | 72 +++ .../formats/avro/utils/DataInputDecoder.java | 212 ++++++++ .../formats/avro/utils/DataOutputEncoder.java | 180 +++++++ .../avro/utils/FSDataInputStreamWrapper.java | 67 +++ .../src/test/assembly/test-assembly.xml | 36 ++ .../avro/AvroExternalJarProgramITCase.java | 92 ++++ .../avro/AvroInputFormatTypeExtractionTest.java | 86 +++ .../formats/avro/AvroOutputFormatITCase.java | 188 +++++++ .../formats/avro/AvroOutputFormatTest.java | 207 ++++++++ .../formats/avro/AvroRecordInputFormatTest.java | 459 ++++++++++++++++ .../avro/AvroRowDeSerializationSchemaTest.java | 146 +++++ .../avro/AvroSplittableInputFormatTest.java | 324 +++++++++++ .../flink/formats/avro/EncoderDecoderTest.java | 531 +++++++++++++++++++ .../avro/testjar/AvroExternalJarProgram.java | 211 ++++++++ .../AvroGenericArraySerializerTest.java | 33 ++ .../AvroGenericTypeComparatorTest.java | 33 ++ .../AvroGenericTypeSerializerTest.java | 33 ++ .../typeutils/AvroSerializerEmptyArrayTest.java | 217 ++++++++ .../avro/typeutils/AvroTypeExtractionTest.java | 257 +++++++++ .../avro/typeutils/AvroTypeInfoTest.java | 37 ++ .../flink/formats/avro/utils/AvroTestUtils.java | 152 ++++++ .../src/test/resources/avro/user.avsc | 35 ++ .../src/test/resources/log4j-test.properties | 27 + .../flink-avro/src/test/resources/testdata.avro | Bin 0 -> 4576 bytes flink-formats/pom.xml | 42 ++ .../org/apache/flink/hdfstests/HDFSTest.java | 35 +- flink-libraries/flink-cep/pom.xml | 8 +- .../flink-shaded-hadoop2/pom.xml | 12 + flink-shaded-hadoop/pom.xml | 5 - flink-tests/pom.xml | 15 - pom.xml | 21 +- tools/maven/suppressions.xml | 3 +- tools/travis_mvn_watchdog.sh | 2 +- 95 files changed, 5488 insertions(+), 5910 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 3c9e4e8..8fc9fce 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,7 @@ tmp *.log .DS_Store build-target -flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/ +flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/ flink-runtime-web/web-dashboard/assets/fonts/ flink-runtime-web/web-dashboard/node_modules/ flink-runtime-web/web-dashboard/bower_components/ http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml deleted file mode 100644 index f8d9293..0000000 --- a/flink-connectors/flink-avro/pom.xml +++ /dev/null @@ -1,265 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connectors</artifactId> - <version>1.4-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-avro_${scala.binary.version}</artifactId> - <name>flink-avro</name> - - <packaging>jar</packaging> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <!-- version is derived from base module --> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils-junit</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>create-test-dependency</id> - <phase>process-test-classes</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <archive> - <manifest> - <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass> - </manifest> - </archive> - <finalName>maven</finalName> - <attach>false</attach> - <descriptors> - <descriptor>src/test/assembly/test-assembly.xml</descriptor> - </descriptors> - </configuration> - </execution> - </executions> - </plugin> - <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the - classpath when running the tests to actually test whether the user code class loader - is properly used.--> - <plugin> - <artifactId>maven-clean-plugin</artifactId> - <version>2.5</version><!--$NO-MVN-MAN-VER$--> - <executions> - <execution> - <id>remove-avroexternalprogram</id> - <phase>process-test-classes</phase> - <goals> - <goal>clean</goal> - </goals> - <configuration> - <excludeDefaultDirectories>true</excludeDefaultDirectories> - <filesets> - <fileset> - <directory>${project.build.testOutputDirectory}</directory> - <includes> - <include>**/testjar/*.class</include> - </includes> - </fileset> - </filesets> - </configuration> - </execution> - </executions> - <configuration> - <filesets> - <fileset> - <directory>${project.basedir}/src/test/java/org/apache/flink/api/io/avro/generated</directory> - </fileset> - </filesets> - </configuration> - </plugin> - <!-- Generate Test class from avro schema --> - <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <version>1.8.2</version> - <executions> - <execution> - <phase>generate-sources</phase> - <goals> - <goal>schema</goal> - </goals> - <configuration> - <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> - <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Add Avro test classes to test jar in order to test AvroTypeInfo. --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration combine.self="override"> - <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation> - <promoteTransitiveDependencies>true</promoteTransitiveDependencies> - <artifactSet> - <includes> - <include>org.codehaus.jackson:*</include> - </includes> - </artifactSet> - <relocations> - <relocation> - <pattern>org.codehaus.jackson</pattern> - <shadedPattern>org.apache.flink.avro.shaded.org.codehouse.jackson</shadedPattern> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - - <pluginManagement> - <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - <versionRange>[2.4,)</versionRange> - <goals> - <goal>single</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-clean-plugin</artifactId> - <versionRange>[1,)</versionRange> - <goals> - <goal>clean</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <versionRange>[1.7.7,)</versionRange> - <goals> - <goal>schema</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java deleted file mode 100644 index 870d66f..0000000 --- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.apache.avro.io.Decoder; -import org.apache.avro.util.Utf8; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * A {@link Decoder} that reads from a {@link DataInput}. - */ -public class DataInputDecoder extends Decoder { - - private final Utf8 stringDecoder = new Utf8(); - - private DataInput in; - - public void setIn(DataInput in) { - this.in = in; - } - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void readNull() {} - - @Override - public boolean readBoolean() throws IOException { - return in.readBoolean(); - } - - @Override - public int readInt() throws IOException { - return in.readInt(); - } - - @Override - public long readLong() throws IOException { - return in.readLong(); - } - - @Override - public float readFloat() throws IOException { - return in.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return in.readDouble(); - } - - @Override - public int readEnum() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void readFixed(byte[] bytes, int start, int length) throws IOException { - in.readFully(bytes, start, length); - } - - @Override - public ByteBuffer readBytes(ByteBuffer old) throws IOException { - int length = readInt(); - ByteBuffer result; - if (old != null && length <= old.capacity() && old.hasArray()) { - result = old; - result.clear(); - } else { - result = ByteBuffer.allocate(length); - } - in.readFully(result.array(), result.arrayOffset() + result.position(), length); - result.limit(length); - return result; - } - - @Override - public void skipFixed(int length) throws IOException { - skipBytes(length); - } - - @Override - public void skipBytes() throws IOException { - int num = readInt(); - skipBytes(num); - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - @Override - public Utf8 readString(Utf8 old) throws IOException { - int length = readInt(); - Utf8 result = (old != null ? old : new Utf8()); - result.setByteLength(length); - - if (length > 0) { - in.readFully(result.getBytes(), 0, length); - } - - return result; - } - - @Override - public String readString() throws IOException { - return readString(stringDecoder).toString(); - } - - @Override - public void skipString() throws IOException { - int len = readInt(); - skipBytes(len); - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public long readArrayStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long arrayNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipArray() throws IOException { - return readVarLongCount(in); - } - - @Override - public long readMapStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long mapNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipMap() throws IOException { - return readVarLongCount(in); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public int readIndex() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - private void skipBytes(int num) throws IOException { - while (num > 0) { - num -= in.skipBytes(num); - } - } - - public static long readVarLongCount(DataInput in) throws IOException { - long value = in.readUnsignedByte(); - - if ((value & 0x80) == 0) { - return value; - } - else { - long curr; - int shift = 7; - value = value & 0x7f; - while (((curr = in.readUnsignedByte()) & 0x80) != 0){ - value |= (curr & 0x7f) << shift; - shift += 7; - } - value |= curr << shift; - return value; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java deleted file mode 100644 index beae330..0000000 --- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.apache.avro.io.Encoder; -import org.apache.avro.util.Utf8; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * An {@link Encoder} that writes data to a {@link DataOutput}. - */ -public final class DataOutputEncoder extends Encoder implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private DataOutput out; - - public void setOut(DataOutput out) { - this.out = out; - } - - @Override - public void flush() throws IOException {} - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void writeNull() {} - - @Override - public void writeBoolean(boolean b) throws IOException { - out.writeBoolean(b); - } - - @Override - public void writeInt(int n) throws IOException { - out.writeInt(n); - } - - @Override - public void writeLong(long n) throws IOException { - out.writeLong(n); - } - - @Override - public void writeFloat(float f) throws IOException { - out.writeFloat(f); - } - - @Override - public void writeDouble(double d) throws IOException { - out.writeDouble(d); - } - - @Override - public void writeEnum(int e) throws IOException { - out.writeInt(e); - } - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void writeFixed(byte[] bytes, int start, int len) throws IOException { - out.write(bytes, start, len); - } - - @Override - public void writeBytes(byte[] bytes, int start, int len) throws IOException { - out.writeInt(len); - if (len > 0) { - out.write(bytes, start, len); - } - } - - @Override - public void writeBytes(ByteBuffer bytes) throws IOException { - int num = bytes.remaining(); - out.writeInt(num); - - if (num > 0) { - writeFixed(bytes); - } - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - @Override - public void writeString(String str) throws IOException { - byte[] bytes = Utf8.getBytesFor(str); - writeBytes(bytes, 0, bytes.length); - } - - @Override - public void writeString(Utf8 utf8) throws IOException { - writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); - - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public void writeArrayStart() {} - - @Override - public void setItemCount(long itemCount) throws IOException { - if (itemCount > 0) { - writeVarLongCount(out, itemCount); - } - } - - @Override - public void startItem() {} - - @Override - public void writeArrayEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - @Override - public void writeMapStart() {} - - @Override - public void writeMapEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public void writeIndex(int unionIndex) throws IOException { - out.writeInt(unionIndex); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - public static void writeVarLongCount(DataOutput out, long val) throws IOException { - if (val < 0) { - throw new IOException("Illegal count (must be non-negative): " + val); - } - - while ((val & ~0x7FL) != 0) { - out.write(((int) val) | 0x80); - val >>>= 7; - } - out.write((int) val); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java deleted file mode 100644 index 19e4a89..0000000 --- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.apache.flink.core.fs.FSDataInputStream; - -import org.apache.avro.file.SeekableInput; - -import java.io.Closeable; -import java.io.IOException; - -/** - * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well). - * - * <p>The wrapper keeps track of the position in the data stream. - */ -public class FSDataInputStreamWrapper implements Closeable, SeekableInput { - private final FSDataInputStream stream; - private long pos; - private long len; - - public FSDataInputStreamWrapper(FSDataInputStream stream, long len) { - this.stream = stream; - this.pos = 0; - this.len = len; - } - - public long length() throws IOException { - return this.len; - } - - public int read(byte[] b, int off, int len) throws IOException { - int read; - read = stream.read(b, off, len); - pos += read; - return read; - } - - public void seek(long p) throws IOException { - stream.seek(p); - pos = p; - } - - public long tell() throws IOException { - return pos; - } - - public void close() throws IOException { - stream.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java deleted file mode 100644 index 33105cc..0000000 --- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import org.apache.flink.api.avro.FSDataInputStreamWrapper; -import org.apache.flink.api.common.io.CheckpointableInputFormat; -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; - -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.SeekableInput; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.specific.SpecificDatumReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Provides a {@link FileInputFormat} for Avro records. - * - * @param <E> - * the type of the result Avro record. If you specify - * {@link GenericRecord} then the result will be returned as a - * {@link GenericRecord}, so you do not have to know the schema ahead - * of time. - */ -public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>, - CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class); - - private final Class<E> avroValueType; - - private boolean reuseAvroValue = true; - - private transient DataFileReader<E> dataFileReader; - - private transient long end; - - private transient long recordsReadSinceLastSync; - - private long lastSync = -1L; - - public AvroInputFormat(Path filePath, Class<E> type) { - super(filePath); - this.avroValueType = type; - } - - /** - * Sets the flag whether to reuse the Avro value instance for all records. - * By default, the input format reuses the Avro value. - * - * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise. - */ - public void setReuseAvroValue(boolean reuseAvroValue) { - this.reuseAvroValue = reuseAvroValue; - } - - /** - * If set, the InputFormat will only read entire files. - */ - public void setUnsplittable(boolean unsplittable) { - this.unsplittable = unsplittable; - } - - // -------------------------------------------------------------------------------------------- - // Typing - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<E> getProducedType() { - return TypeExtractor.getForClass(this.avroValueType); - } - - // -------------------------------------------------------------------------------------------- - // Input Format Methods - // -------------------------------------------------------------------------------------------- - - @Override - public void open(FileInputSplit split) throws IOException { - super.open(split); - dataFileReader = initReader(split); - dataFileReader.sync(split.getStart()); - lastSync = dataFileReader.previousSync(); - } - - private DataFileReader<E> initReader(FileInputSplit split) throws IOException { - DatumReader<E> datumReader; - - if (org.apache.avro.generic.GenericRecord.class == avroValueType) { - datumReader = new GenericDatumReader<E>(); - } else { - datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType) - ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType); - } - if (LOG.isInfoEnabled()) { - LOG.info("Opening split {}", split); - } - - SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); - DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader); - - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema()); - } - - end = split.getStart() + split.getLength(); - recordsReadSinceLastSync = 0; - return dataFileReader; - } - - @Override - public boolean reachedEnd() throws IOException { - return !dataFileReader.hasNext() || dataFileReader.pastSync(end); - } - - public long getRecordsReadFromBlock() { - return this.recordsReadSinceLastSync; - } - - @Override - public E nextRecord(E reuseValue) throws IOException { - if (reachedEnd()) { - return null; - } - - // if we start a new block, then register the event, and - // restart the counter. - if (dataFileReader.previousSync() != lastSync) { - lastSync = dataFileReader.previousSync(); - recordsReadSinceLastSync = 0; - } - recordsReadSinceLastSync++; - - if (reuseAvroValue) { - return dataFileReader.next(reuseValue); - } else { - if (GenericRecord.class == avroValueType) { - return dataFileReader.next(); - } else { - return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class)); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Checkpointing - // -------------------------------------------------------------------------------------------- - - @Override - public Tuple2<Long, Long> getCurrentState() throws IOException { - return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync); - } - - @Override - public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException { - Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); - Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); - - try { - this.open(split); - } finally { - if (state.f0 != -1) { - lastSync = state.f0; - recordsReadSinceLastSync = state.f1; - } - } - - if (lastSync != -1) { - // open and read until the record we were before - // the checkpoint and discard the values - dataFileReader.seek(lastSync); - for (int i = 0; i < recordsReadSinceLastSync; i++) { - dataFileReader.next(null); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java deleted file mode 100644 index 5da8f75..0000000 --- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import org.apache.flink.api.common.io.FileOutputFormat; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.fs.Path; - -import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.specific.SpecificDatumWriter; - -import java.io.IOException; -import java.io.Serializable; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * {@link FileOutputFormat} for Avro records. - * @param <E> - */ -public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable { - - /** - * Wrapper which encapsulates the supported codec and a related serialization byte. - */ - public enum Codec { - - NULL((byte) 0, CodecFactory.nullCodec()), - SNAPPY((byte) 1, CodecFactory.snappyCodec()), - BZIP2((byte) 2, CodecFactory.bzip2Codec()), - DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), - XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); - - private byte codecByte; - - private CodecFactory codecFactory; - - Codec(final byte codecByte, final CodecFactory codecFactory) { - this.codecByte = codecByte; - this.codecFactory = codecFactory; - } - - private byte getCodecByte() { - return codecByte; - } - - private CodecFactory getCodecFactory() { - return codecFactory; - } - - private static Codec forCodecByte(byte codecByte) { - for (final Codec codec : Codec.values()) { - if (codec.getCodecByte() == codecByte) { - return codec; - } - } - throw new IllegalArgumentException("no codec for codecByte: " + codecByte); - } - } - - private static final long serialVersionUID = 1L; - - private final Class<E> avroValueType; - - private transient Schema userDefinedSchema = null; - - private transient Codec codec = null; - - private transient DataFileWriter<E> dataFileWriter; - - public AvroOutputFormat(Path filePath, Class<E> type) { - super(filePath); - this.avroValueType = type; - } - - public AvroOutputFormat(Class<E> type) { - this.avroValueType = type; - } - - @Override - protected String getDirectoryFileName(int taskNumber) { - return super.getDirectoryFileName(taskNumber) + ".avro"; - } - - public void setSchema(Schema schema) { - this.userDefinedSchema = schema; - } - - /** - * Set avro codec for compression. - * - * @param codec avro codec. - */ - public void setCodec(final Codec codec) { - this.codec = checkNotNull(codec, "codec can not be null"); - } - - @Override - public void writeRecord(E record) throws IOException { - dataFileWriter.append(record); - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - super.open(taskNumber, numTasks); - - DatumWriter<E> datumWriter; - Schema schema; - if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { - datumWriter = new SpecificDatumWriter<E>(avroValueType); - try { - schema = ((org.apache.avro.specific.SpecificRecordBase) avroValueType.newInstance()).getSchema(); - } catch (InstantiationException | IllegalAccessException e) { - throw new RuntimeException(e.getMessage()); - } - } else if (org.apache.avro.generic.GenericRecord.class.isAssignableFrom(avroValueType)) { - if (userDefinedSchema == null) { - throw new IllegalStateException("Schema must be set when using Generic Record"); - } - datumWriter = new GenericDatumWriter<E>(userDefinedSchema); - schema = userDefinedSchema; - } else { - datumWriter = new ReflectDatumWriter<E>(avroValueType); - schema = ReflectData.get().getSchema(avroValueType); - } - dataFileWriter = new DataFileWriter<E>(datumWriter); - if (codec != null) { - dataFileWriter.setCodec(codec.getCodecFactory()); - } - if (userDefinedSchema == null) { - dataFileWriter.create(schema, stream); - } else { - dataFileWriter.create(userDefinedSchema, stream); - } - } - - private void writeObject(java.io.ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - - if (codec != null) { - out.writeByte(codec.getCodecByte()); - } else { - out.writeByte(-1); - } - - if (userDefinedSchema != null) { - byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET); - out.writeInt(json.length); - out.write(json); - } else { - out.writeInt(0); - } - } - - private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - byte codecByte = in.readByte(); - if (codecByte >= 0) { - setCodec(Codec.forCodecByte(codecByte)); - } - - int length = in.readInt(); - if (length != 0) { - byte[] json = new byte[length]; - in.readFully(json); - - Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET)); - setSchema(schema); - } - } - - @Override - public void close() throws IOException { - dataFileWriter.flush(); - dataFileWriter.close(); - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml deleted file mode 100644 index 0cbdbe1..0000000 --- a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml +++ /dev/null @@ -1,36 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<assembly> - <id>test-jar</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${project.build.testOutputDirectory}</directory> - <outputDirectory></outputDirectory> - <!--modify/add include to match your package(s) --> - <includes> - <include>org/apache/flink/api/avro/testjar/**</include> - </includes> - </fileSet> - </fileSets> -</assembly> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java deleted file mode 100644 index 6133778..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.apache.flink.api.avro.testjar.AvroExternalJarProgram; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.test.util.TestEnvironment; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.net.URL; -import java.util.Collections; - -/** - * IT case for the {@link AvroExternalJarProgram}. - */ -public class AvroExternalJarProgramITCase extends TestLogger { - - private static final String JAR_FILE = "maven-test-jar.jar"; - - private static final String TEST_DATA_FILE = "/testdata.avro"; - - @Test - public void testExternalProgram() { - - LocalFlinkMiniCluster testMiniCluster = null; - - try { - int parallelism = 4; - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); - testMiniCluster = new LocalFlinkMiniCluster(config, false); - testMiniCluster.start(); - - String jarFile = JAR_FILE; - String testData = getClass().getResource(TEST_DATA_FILE).toString(); - - PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData }); - - TestEnvironment.setAsContext( - testMiniCluster, - parallelism, - Collections.singleton(new Path(jarFile)), - Collections.<URL>emptyList()); - - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort()); - - program.invokeInteractiveModeForExecution(); - } - catch (Throwable t) { - System.err.println(t.getMessage()); - t.printStackTrace(); - Assert.fail("Error during the packaged program execution: " + t.getMessage()); - } - finally { - TestEnvironment.unsetAsContext(); - - if (testMiniCluster != null) { - try { - testMiniCluster.stop(); - } catch (Throwable t) { - // ignore - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java deleted file mode 100644 index f630f41..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.io.avro.example.User; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.util.JavaProgramTestBase; - -import org.apache.avro.file.DataFileReader; -import org.apache.avro.io.DatumReader; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.specific.SpecificDatumReader; -import org.junit.Assert; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -/** - * IT cases for the {@link AvroOutputFormat}. - */ -@SuppressWarnings("serial") -public class AvroOutputFormatITCase extends JavaProgramTestBase { - - public static String outputPath1; - - public static String outputPath2; - - public static String inputPath; - - public static String userData = "alice|1|blue\n" + - "bob|2|red\n" + - "john|3|yellow\n" + - "walt|4|black\n"; - - @Override - protected void preSubmit() throws Exception { - inputPath = createTempFile("user", userData); - outputPath1 = getTempDirPath("avro_output1"); - outputPath2 = getTempDirPath("avro_output2"); - } - - @Override - protected void testProgram() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) - .fieldDelimiter("|") - .types(String.class, Integer.class, String.class); - - //output the data with AvroOutputFormat for specific user type - DataSet<User> specificUser = input.map(new ConvertToUser()); - AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class); - avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec - avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema - specificUser.write(avroOutputFormat, outputPath1); - - //output the data with AvroOutputFormat for reflect user type - DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective()); - reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2); - - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - //compare result for specific user type - File [] output1; - File file1 = asFile(outputPath1); - if (file1.isDirectory()) { - output1 = file1.listFiles(); - // check for avro ext in dir. - for (File avroOutput : output1) { - Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); - } - } else { - output1 = new File[] {file1}; - } - List<String> result1 = new ArrayList<String>(); - DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); - for (File avroOutput : output1) { - - DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); - while (dataFileReader1.hasNext()) { - User user = dataFileReader1.next(); - result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); - } - } - for (String expectedResult : userData.split("\n")) { - Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult)); - } - - //compare result for reflect user type - File [] output2; - File file2 = asFile(outputPath2); - if (file2.isDirectory()) { - output2 = file2.listFiles(); - } else { - output2 = new File[] {file2}; - } - List<String> result2 = new ArrayList<String>(); - DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class); - for (File avroOutput : output2) { - DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2); - while (dataFileReader2.hasNext()) { - ReflectiveUser user = dataFileReader2.next(); - result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); - } - } - for (String expectedResult : userData.split("\n")) { - Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult)); - } - - } - - private static final class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { - - @Override - public User map(Tuple3<String, Integer, String> value) throws Exception { - return new User(value.f0, value.f1, value.f2); - } - } - - private static final class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { - - @Override - public ReflectiveUser map(User value) throws Exception { - return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString()); - } - } - - private static class ReflectiveUser { - private String name; - private int favoriteNumber; - private String favoriteColor; - - public ReflectiveUser() {} - - public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) { - this.name = name; - this.favoriteNumber = favoriteNumber; - this.favoriteColor = favoriteColor; - } - - public String getName() { - return this.name; - } - - public String getFavoriteColor() { - return this.favoriteColor; - } - - public int getFavoriteNumber() { - return this.favoriteNumber; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java deleted file mode 100644 index 808c257..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java +++ /dev/null @@ -1,529 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.Fixed16; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.util.StringUtils; - -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization. - */ -public class EncoderDecoderTest { - @Test - public void testComplexStringsDirecty() { - try { - Random rnd = new Random(349712539451944123L); - - for (int i = 0; i < 10; i++) { - String testString = StringUtils.getRandomString(rnd, 10, 100); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - DataOutputEncoder encoder = new DataOutputEncoder(); - encoder.setOut(dataOut); - - encoder.writeString(testString); - dataOut.flush(); - dataOut.close(); - } - - byte[] data = baos.toByteArray(); - - // deserialize - { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dataIn = new DataInputStream(bais); - DataInputDecoder decoder = new DataInputDecoder(); - decoder.setIn(dataIn); - - String deserialized = decoder.readString(); - - assertEquals(testString, deserialized); - } - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - @Test - public void testPrimitiveTypes() { - - testObjectSerialization(new Boolean(true)); - testObjectSerialization(new Boolean(false)); - - testObjectSerialization(Byte.valueOf((byte) 0)); - testObjectSerialization(Byte.valueOf((byte) 1)); - testObjectSerialization(Byte.valueOf((byte) -1)); - testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE)); - testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE)); - - testObjectSerialization(Short.valueOf((short) 0)); - testObjectSerialization(Short.valueOf((short) 1)); - testObjectSerialization(Short.valueOf((short) -1)); - testObjectSerialization(Short.valueOf(Short.MIN_VALUE)); - testObjectSerialization(Short.valueOf(Short.MAX_VALUE)); - - testObjectSerialization(Integer.valueOf(0)); - testObjectSerialization(Integer.valueOf(1)); - testObjectSerialization(Integer.valueOf(-1)); - testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE)); - testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE)); - - testObjectSerialization(Long.valueOf(0)); - testObjectSerialization(Long.valueOf(1)); - testObjectSerialization(Long.valueOf(-1)); - testObjectSerialization(Long.valueOf(Long.MIN_VALUE)); - testObjectSerialization(Long.valueOf(Long.MAX_VALUE)); - - testObjectSerialization(Float.valueOf(0)); - testObjectSerialization(Float.valueOf(1)); - testObjectSerialization(Float.valueOf(-1)); - testObjectSerialization(Float.valueOf((float) Math.E)); - testObjectSerialization(Float.valueOf((float) Math.PI)); - testObjectSerialization(Float.valueOf(Float.MIN_VALUE)); - testObjectSerialization(Float.valueOf(Float.MAX_VALUE)); - testObjectSerialization(Float.valueOf(Float.MIN_NORMAL)); - testObjectSerialization(Float.valueOf(Float.NaN)); - testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY)); - testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY)); - - testObjectSerialization(Double.valueOf(0)); - testObjectSerialization(Double.valueOf(1)); - testObjectSerialization(Double.valueOf(-1)); - testObjectSerialization(Double.valueOf(Math.E)); - testObjectSerialization(Double.valueOf(Math.PI)); - testObjectSerialization(Double.valueOf(Double.MIN_VALUE)); - testObjectSerialization(Double.valueOf(Double.MAX_VALUE)); - testObjectSerialization(Double.valueOf(Double.MIN_NORMAL)); - testObjectSerialization(Double.valueOf(Double.NaN)); - testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY)); - testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY)); - - testObjectSerialization(""); - testObjectSerialization("abcdefg"); - testObjectSerialization("ab\u1535\u0155xyz\u706F"); - - testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523)); - testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001)); - } - - @Test - public void testArrayTypes() { - { - int[] array = new int[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - long[] array = new long[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - float[] array = new float[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - double[] array = new double[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"}; - testObjectSerialization(array); - } - } - - @Test - public void testEmptyArray() { - { - int[] array = new int[0]; - testObjectSerialization(array); - } - { - long[] array = new long[0]; - testObjectSerialization(array); - } - { - float[] array = new float[0]; - testObjectSerialization(array); - } - { - double[] array = new double[0]; - testObjectSerialization(array); - } - { - String[] array = new String[0]; - testObjectSerialization(array); - } - } - - @Test - public void testObjects() { - // simple object containing only primitives - { - testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42)); - } - - // object with collection - { - ArrayList<String> list = new ArrayList<String>(); - list.add("A"); - list.add("B"); - list.add("C"); - list.add("D"); - list.add("E"); - - testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym")); - } - - // object with empty collection - { - ArrayList<String> list = new ArrayList<String>(); - testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus")); - } - } - - @Test - public void testNestedObjectsWithCollections() { - testObjectSerialization(new ComplexNestedObject2(true)); - } - - @Test - public void testGeneratedObjectWithNullableFields() { - List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" }); - List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true); - Map<CharSequence, Long> map = new HashMap<CharSequence, Long>(); - map.put("1", 1L); - map.put("2", 2L); - map.put("3", 3L); - - byte[] b = new byte[16]; - new Random().nextBytes(b); - Fixed16 f = new Fixed16(b); - Address addr = new Address(new Integer(239), "6th Main", "Bangalore", - "Karnataka", "560075"); - User user = new User("Freudenreich", 1337, "macintosh gray", - 1234567890L, 3.1415926, null, true, strings, bools, null, - Colors.GREEN, map, f, new Boolean(true), addr); - - testObjectSerialization(user); - } - - @Test - public void testVarLenCountEncoding() { - try { - long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL}; - - // write - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - - for (long val : values) { - DataOutputEncoder.writeVarLongCount(dataOut, val); - } - - dataOut.flush(); - dataOut.close(); - } - - // read - { - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dataIn = new DataInputStream(bais); - - for (long val : values) { - long read = DataInputDecoder.readVarLongCount(dataIn); - assertEquals("Wrong var-len encoded value read.", val, read); - } - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - private static <X> void testObjectSerialization(X obj) { - - try { - - // serialize - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - DataOutputEncoder encoder = new DataOutputEncoder(); - encoder.setOut(dataOut); - - @SuppressWarnings("unchecked") - Class<X> clazz = (Class<X>) obj.getClass(); - ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz); - - writer.write(obj, encoder); - dataOut.flush(); - dataOut.close(); - } - - byte[] data = baos.toByteArray(); - X result = null; - - // deserialize - { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dataIn = new DataInputStream(bais); - DataInputDecoder decoder = new DataInputDecoder(); - decoder.setIn(dataIn); - - @SuppressWarnings("unchecked") - Class<X> clazz = (Class<X>) obj.getClass(); - ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz); - - // create a reuse object if possible, otherwise we have no reuse object - X reuse = null; - try { - @SuppressWarnings("unchecked") - X test = (X) obj.getClass().newInstance(); - reuse = test; - } catch (Throwable t) {} - - result = reader.read(reuse, decoder); - } - - // check - final String message = "Deserialized object is not the same as the original"; - - if (obj.getClass().isArray()) { - Class<?> clazz = obj.getClass(); - if (clazz == byte[].class) { - assertArrayEquals(message, (byte[]) obj, (byte[]) result); - } - else if (clazz == short[].class) { - assertArrayEquals(message, (short[]) obj, (short[]) result); - } - else if (clazz == int[].class) { - assertArrayEquals(message, (int[]) obj, (int[]) result); - } - else if (clazz == long[].class) { - assertArrayEquals(message, (long[]) obj, (long[]) result); - } - else if (clazz == char[].class) { - assertArrayEquals(message, (char[]) obj, (char[]) result); - } - else if (clazz == float[].class) { - assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f); - } - else if (clazz == double[].class) { - assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0); - } else { - assertArrayEquals(message, (Object[]) obj, (Object[]) result); - } - } else { - assertEquals(message, obj, result); - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - // Test Objects - // -------------------------------------------------------------------------------------------- - - private static final class SimpleTypes { - - private final int iVal; - private final long lVal; - private final byte bVal; - private final String sVal; - private final short rVal; - private final double dVal; - - public SimpleTypes() { - this(0, 0, (byte) 0, "", (short) 0, 0); - } - - public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) { - this.iVal = iVal; - this.lVal = lVal; - this.bVal = bVal; - this.sVal = sVal; - this.rVal = rVal; - this.dVal = dVal; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == SimpleTypes.class) { - SimpleTypes other = (SimpleTypes) obj; - - return other.iVal == this.iVal && - other.lVal == this.lVal && - other.bVal == this.bVal && - other.sVal.equals(this.sVal) && - other.rVal == this.rVal && - other.dVal == this.dVal; - - } else { - return false; - } - } - } - - private static class ComplexNestedObject1 { - - private double doubleValue; - - private List<String> stringList; - - public ComplexNestedObject1() {} - - public ComplexNestedObject1(int offInit) { - this.doubleValue = 6293485.6723 + offInit; - - this.stringList = new ArrayList<String>(); - this.stringList.add("A" + offInit); - this.stringList.add("somewhat" + offInit); - this.stringList.add("random" + offInit); - this.stringList.add("collection" + offInit); - this.stringList.add("of" + offInit); - this.stringList.add("strings" + offInit); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == ComplexNestedObject1.class) { - ComplexNestedObject1 other = (ComplexNestedObject1) obj; - return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList); - } else { - return false; - } - } - } - - private static class ComplexNestedObject2 { - - private long longValue; - - private Map<String, ComplexNestedObject1> theMap; - - public ComplexNestedObject2() {} - - public ComplexNestedObject2(boolean init) { - this.longValue = 46547; - - this.theMap = new HashMap<String, ComplexNestedObject1>(); - this.theMap.put("36354L", new ComplexNestedObject1(43546543)); - this.theMap.put("785611L", new ComplexNestedObject1(45784568)); - this.theMap.put("43L", new ComplexNestedObject1(9876543)); - this.theMap.put("-45687L", new ComplexNestedObject1(7897615)); - this.theMap.put("1919876876896L", new ComplexNestedObject1(27154)); - this.theMap.put("-868468468L", new ComplexNestedObject1(546435)); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == ComplexNestedObject2.class) { - ComplexNestedObject2 other = (ComplexNestedObject2) obj; - return other.longValue == this.longValue && this.theMap.equals(other.theMap); - } else { - return false; - } - } - } - - private static class Book { - - private long bookId; - private String title; - private long authorId; - - public Book() {} - - public Book(long bookId, String title, long authorId) { - this.bookId = bookId; - this.title = title; - this.authorId = authorId; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == Book.class) { - Book other = (Book) obj; - return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title); - } else { - return false; - } - } - } - - private static class BookAuthor { - - private long authorId; - private List<String> bookTitles; - private String authorName; - - public BookAuthor() {} - - public BookAuthor(long authorId, List<String> bookTitles, String authorName) { - this.authorId = authorId; - this.bookTitles = bookTitles; - this.authorName = authorName; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == BookAuthor.class) { - BookAuthor other = (BookAuthor) obj; - return other.authorName.equals(this.authorName) && other.authorId == this.authorId && - other.bookTitles.equals(this.bookTitles); - } else { - return false; - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java deleted file mode 100644 index a8541b6..0000000 --- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro.testjar; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.functions.RichReduceFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.Path; - -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -/** - * This file defines the classes for the AvroExternalJarProgramITCase. - */ -public class AvroExternalJarProgram { - - private static final class Color { - - private String name; - private double saturation; - - public Color() { - name = ""; - saturation = 1.0; - } - - public Color(String name, double saturation) { - this.name = name; - this.saturation = saturation; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public double getSaturation() { - return saturation; - } - - public void setSaturation(double saturation) { - this.saturation = saturation; - } - - @Override - public String toString() { - return name + '(' + saturation + ')'; - } - } - - private static final class MyUser { - - private String name; - private List<Color> colors; - - public MyUser() { - name = "unknown"; - colors = new ArrayList<Color>(); - } - - public MyUser(String name, List<Color> colors) { - this.name = name; - this.colors = colors; - } - - public String getName() { - return name; - } - - public List<Color> getColors() { - return colors; - } - - public void setName(String name) { - this.name = name; - } - - public void setColors(List<Color> colors) { - this.colors = colors; - } - - @Override - public String toString() { - return name + " : " + colors; - } - } - - // -------------------------------------------------------------------------------------------- - - // -------------------------------------------------------------------------------------------- - - private static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, MyUser> map(MyUser u) { - String namePrefix = u.getName().substring(0, 1); - return new Tuple2<String, MyUser>(namePrefix, u); - } - } - - private static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) { - return val1; - } - } - - // -------------------------------------------------------------------------------------------- - // Test Data - // -------------------------------------------------------------------------------------------- - - private static final class Generator { - - private final Random rnd = new Random(2389756789345689276L); - - public MyUser nextUser() { - return randomUser(); - } - - private MyUser randomUser() { - - int numColors = rnd.nextInt(5); - ArrayList<Color> colors = new ArrayList<Color>(numColors); - for (int i = 0; i < numColors; i++) { - colors.add(new Color(randomString(), rnd.nextDouble())); - } - - return new MyUser(randomString(), colors); - } - - private String randomString() { - char[] c = new char[this.rnd.nextInt(20) + 5]; - - for (int i = 0; i < c.length; i++) { - c[i] = (char) (this.rnd.nextInt(150) + 40); - } - - return new String(c); - } - } - - public static void writeTestData(File testFile, int numRecords) throws IOException { - - DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class); - DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter); - - dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile); - - Generator generator = new Generator(); - - for (int i = 0; i < numRecords; i++) { - MyUser user = generator.nextUser(); - dataFileWriter.append(user); - } - - dataFileWriter.close(); - } - -// public static void main(String[] args) throws Exception { -// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath(); -// writeTestData(new File(testDataFile), 50); -// } - - public static void main(String[] args) throws Exception { - String inputPath = args[0]; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class)); - - DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper()); - - result.output(new DiscardingOutputFormat<Tuple2<String, MyUser>>()); - env.execute(); - } -}
