http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc deleted file mode 100644 index 02c11af..0000000 --- a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc +++ /dev/null @@ -1,35 +0,0 @@ -[ -{"namespace": "org.apache.flink.api.io.avro.generated", - "type": "record", - "name": "Address", - "fields": [ - {"name": "num", "type": "int"}, - {"name": "street", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "state", "type": "string"}, - {"name": "zip", "type": "string"} - ] -}, -{"namespace": "org.apache.flink.api.io.avro.generated", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]}, - {"name": "type_long_test", "type": ["long", "null"]}, - {"name": "type_double_test", "type": "double"}, - {"name": "type_null_test", "type": ["null"]}, - {"name": "type_bool_test", "type": ["boolean"]}, - {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, - {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, - {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, - {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}}, - {"name": "type_map", "type": {"type": "map", "values": "long"}}, - {"name": "type_fixed", - "size": 16, - "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, - {"name": "type_union", "type": ["null", "boolean", "long", "double"]}, - {"name": "type_nested", "type": ["null", "Address"]} - ] -}]
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties deleted file mode 100644 index 0b686e5..0000000 --- a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=OFF, A1 - -# A1 is set to be a ConsoleAppender. -log4j.appender.A1=org.apache.log4j.ConsoleAppender - -# A1 uses PatternLayout. -log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml deleted file mode 100644 index 8b3bb27..0000000 --- a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml +++ /dev/null @@ -1,29 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro b/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro deleted file mode 100644 index 45308b9..0000000 Binary files a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml deleted file mode 100644 index 8f423d9..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml +++ /dev/null @@ -1,182 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-batch-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-hadoop-compatibility_2.10</artifactId> - <name>flink-hadoop-compatibility</name> - - <packaging>jar</packaging> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-hadoop2</artifactId> - <version>${project.version}</version> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - </dependencies> - - - <build> - <plugins> - <!-- activate API compatibility checks --> - <plugin> - <groupId>com.github.siom79.japicmp</groupId> - <artifactId>japicmp-maven-plugin</artifactId> - </plugin> - <!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.1.4</version> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Scala Code Style, most of the configuration done via plugin management --> - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <configuration> - <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> - </configuration> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java deleted file mode 100644 index 7bcb4bf..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.annotation.Public; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.WritableComparator; -import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; -import org.apache.hadoop.io.Writable; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable - * interface defines the serialization and deserialization routines for the data type. - * - * @param <T> The type of the class represented by this type information. - */ -@Public -public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> typeClass; - - @PublicEvolving - public WritableTypeInfo(Class<T> typeClass) { - this.typeClass = checkNotNull(typeClass); - - checkArgument( - Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class), - "WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - @PublicEvolving - public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { - if(Comparable.class.isAssignableFrom(typeClass)) { - return new WritableComparator(sortOrderAscending, typeClass); - } - else { - throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " + - "Class does not implement Comparable interface."); - } - } - - @Override - @PublicEvolving - public boolean isBasicType() { - return false; - } - - @Override - @PublicEvolving - public boolean isTupleType() { - return false; - } - - @Override - @PublicEvolving - public int getArity() { - return 1; - } - - @Override - @PublicEvolving - public int getTotalFields() { - return 1; - } - - @Override - @PublicEvolving - public Class<T> getTypeClass() { - return this.typeClass; - } - - @Override - @PublicEvolving - public boolean isKeyType() { - return Comparable.class.isAssignableFrom(typeClass); - } - - @Override - @PublicEvolving - public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { - return new WritableSerializer<T>(typeClass); - } - - @Override - public String toString() { - return "WritableType<" + typeClass.getName() + ">"; - } - - @Override - public int hashCode() { - return typeClass.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof WritableTypeInfo) { - @SuppressWarnings("unchecked") - WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj; - - return writableTypeInfo.canEqual(this) && - typeClass == writableTypeInfo.typeClass; - - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof WritableTypeInfo; - } - - // -------------------------------------------------------------------------------------------- - - @PublicEvolving - static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) { - if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) { - return new WritableTypeInfo<T>(typeClass); - } - else { - throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName()); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java deleted file mode 100644 index 3a95d94..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import com.esotericsoftware.kryo.Kryo; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.types.NormalizableKey; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.io.Writable; -import org.objenesis.strategy.StdInstantiatorStrategy; - -import java.io.IOException; - -public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> { - - private static final long serialVersionUID = 1L; - - private Class<T> type; - - private final boolean ascendingComparison; - - private transient T reference; - - private transient T tempReference; - - private transient Kryo kryo; - - @SuppressWarnings("rawtypes") - private final TypeComparator[] comparators = new TypeComparator[] {this}; - - public WritableComparator(boolean ascending, Class<T> type) { - this.type = type; - this.ascendingComparison = ascending; - } - - @Override - public int hash(T record) { - return record.hashCode(); - } - - @Override - public void setReference(T toCompare) { - checkKryoInitialized(); - - reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type)); - } - - @Override - public boolean equalToReference(T candidate) { - return candidate.equals(reference); - } - - @Override - public int compareToReference(TypeComparator<T> referencedComparator) { - T otherRef = ((WritableComparator<T>) referencedComparator).reference; - int comp = otherRef.compareTo(reference); - return ascendingComparison ? comp : -comp; - } - - @Override - public int compare(T first, T second) { - int comp = first.compareTo(second); - return ascendingComparison ? comp : -comp; - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - ensureReferenceInstantiated(); - ensureTempReferenceInstantiated(); - - reference.readFields(firstSource); - tempReference.readFields(secondSource); - - int comp = reference.compareTo(tempReference); - return ascendingComparison ? comp : -comp; - } - - @Override - public boolean supportsNormalizedKey() { - return NormalizableKey.class.isAssignableFrom(type); - } - - @Override - public int getNormalizeKeyLen() { - ensureReferenceInstantiated(); - - NormalizableKey<?> key = (NormalizableKey<?>) reference; - return key.getMaxNormalizedKeyLen(); - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return keyBytes < getNormalizeKeyLen(); - } - - @Override - public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - NormalizableKey<?> key = (NormalizableKey<?>) record; - key.copyNormalizedKey(target, offset, numBytes); - } - - @Override - public boolean invertNormalizedKey() { - return !ascendingComparison; - } - - @Override - public TypeComparator<T> duplicate() { - return new WritableComparator<T>(ascendingComparison, type); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @SuppressWarnings("rawtypes") - @Override - public TypeComparator[] getFlatComparators() { - return comparators; - } - - // -------------------------------------------------------------------------------------------- - // unsupported normalization - // -------------------------------------------------------------------------------------------- - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - // -------------------------------------------------------------------------------------------- - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); - - this.kryo.setAsmEnabled(true); - this.kryo.register(type); - } - } - - private void ensureReferenceInstantiated() { - if (reference == null) { - reference = InstantiationUtil.instantiate(type, Writable.class); - } - } - - private void ensureTempReferenceInstantiated() { - if (tempReference == null) { - tempReference = InstantiationUtil.instantiate(type, Writable.class); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java deleted file mode 100644 index 9036d75..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - - -import com.esotericsoftware.kryo.Kryo; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.objenesis.strategy.StdInstantiatorStrategy; - -import java.io.IOException; - -public class WritableSerializer<T extends Writable> extends TypeSerializer<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> typeClass; - - private transient Kryo kryo; - - private transient T copyInstance; - - public WritableSerializer(Class<T> typeClass) { - this.typeClass = typeClass; - } - - @SuppressWarnings("unchecked") - @Override - public T createInstance() { - if(typeClass == NullWritable.class) { - return (T) NullWritable.get(); - } - return InstantiationUtil.instantiate(typeClass); - } - - - - @Override - public T copy(T from) { - checkKryoInitialized(); - - return KryoUtils.copy(from, kryo, this); - } - - @Override - public T copy(T from, T reuse) { - checkKryoInitialized(); - - return KryoUtils.copy(from, reuse, kryo, this); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T record, DataOutputView target) throws IOException { - record.write(target); - } - - @Override - public T deserialize(DataInputView source) throws IOException { - return deserialize(createInstance(), source); - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - reuse.readFields(source); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - ensureInstanceInstantiated(); - copyInstance.readFields(source); - copyInstance.write(target); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public WritableSerializer<T> duplicate() { - return new WritableSerializer<T>(typeClass); - } - - // -------------------------------------------------------------------------------------------- - - private void ensureInstanceInstantiated() { - if (copyInstance == null) { - copyInstance = createInstance(); - } - } - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); - - this.kryo.setAsmEnabled(true); - this.kryo.register(typeClass); - } - } - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return this.typeClass.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof WritableSerializer) { - WritableSerializer<?> other = (WritableSerializer<?>) obj; - - return other.canEqual(this) && typeClass == other.typeClass; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof WritableSerializer; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java deleted file mode 100644 index 9e8a3e4..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; - -import java.io.IOException; - -/** - * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink. - * - * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat} - * and {@link org.apache.hadoop.mapreduce.InputFormat}. - * - * Key value pairs produced by the Hadoop InputFormats are converted into Flink - * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field - * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field - * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value. - * - */ - -public final class HadoopInputs { - // ----------------------------------- Hadoop Input Format --------------------------------------- - - /** - * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}. - * - * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. - */ - public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) { - // set input path in JobConf - org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); - // return wrapping InputFormat - return createHadoopInput(mapredInputFormat, key, value, job); - } - - /** - * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}. - * - * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. - */ - public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) { - return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); - } - - /** - * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes. - * - * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat. - */ - public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException { - return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath); - } - - /** - * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}. - * - * @return A Flink InputFormat that wraps the Hadoop InputFormat. - */ - public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { - return new HadoopInputFormat<>(mapredInputFormat, key, value, job); - } - - /** - * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. - * - * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. - */ - public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile( - org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException - { - // set input path in Job - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); - // return wrapping InputFormat - return createHadoopInput(mapreduceInputFormat, key, value, job); - } - - /** - * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. - * - * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. - */ - public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile( - org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException - { - return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance()); - } - - /** - * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}. - * - * @return A Flink InputFormat that wraps the Hadoop InputFormat. - */ - public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput( - org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) - { - return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java deleted file mode 100644 index 97ca329..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility; - -import org.apache.commons.cli.Option; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.hadoop.util.GenericOptionsParser; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Utility class to work with Apache Hadoop libraries. - */ -public class HadoopUtils { - /** - * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser} - * - * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser} - * @return A {@link ParameterTool} - * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser} - * @see GenericOptionsParser - */ - public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException { - Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions(); - Map<String, String> map = new HashMap<String, String>(); - for (Option option : options) { - String[] split = option.getValue().split("="); - map.put(split[0], split[1]); - } - return ParameterTool.fromMap(map); - } -} - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java deleted file mode 100644 index ba8aa90..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.apache.flink.annotation.Public; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; -import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.Reporter; - -/** - * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. - */ -@SuppressWarnings("rawtypes") -@Public -public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> - extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> - implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { - - private static final long serialVersionUID = 1L; - - private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper; - private transient JobConf jobConf; - - private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector; - private transient Reporter reporter; - - /** - * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. - * - * @param hadoopMapper The Hadoop Mapper to wrap. - */ - public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) { - this(hadoopMapper, new JobConf()); - } - - /** - * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. - * The Hadoop Mapper is configured with the provided JobConf. - * - * @param hadoopMapper The Hadoop Mapper to wrap. - * @param conf The JobConf that is used to configure the Hadoop Mapper. - */ - public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) { - if(hadoopMapper == null) { - throw new NullPointerException("Mapper may not be null."); - } - if(conf == null) { - throw new NullPointerException("JobConf may not be null."); - } - - this.mapper = hadoopMapper; - this.jobConf = conf; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.mapper.configure(jobConf); - - this.reporter = new HadoopDummyReporter(); - this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); - } - - @Override - public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) - throws Exception { - outputCollector.setFlinkCollector(out); - mapper.map(value.f0, value.f1, outputCollector, reporter); - } - - @SuppressWarnings("unchecked") - @Override - public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { - Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2); - Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3); - - final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); - final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); - return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); - } - - /** - * Custom serialization methods. - * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> - */ - private void writeObject(final ObjectOutputStream out) throws IOException { - out.writeObject(mapper.getClass()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { - Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = - (Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); - mapper = InstantiationUtil.instantiate(mapperClass); - - jobConf = new JobConf(); - jobConf.readFields(in); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java deleted file mode 100644 index c1acc2b..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.apache.flink.annotation.Public; -import org.apache.flink.api.common.functions.GroupCombineFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction. - */ -@SuppressWarnings("rawtypes") -@Public -public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> - extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> - implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>, - ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { - - private static final long serialVersionUID = 1L; - - private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer; - private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner; - private transient JobConf jobConf; - - private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator; - private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector; - private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector; - private transient Reporter reporter; - - /** - * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. - * - * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. - * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. - */ - public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, - Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) { - this(hadoopReducer, hadoopCombiner, new JobConf()); - } - - /** - * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. - * - * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. - * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. - * @param conf The JobConf that is used to configure both Hadoop Reducers. - */ - public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, - Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) { - if(hadoopReducer == null) { - throw new NullPointerException("Reducer may not be null."); - } - if(hadoopCombiner == null) { - throw new NullPointerException("Combiner may not be null."); - } - if(conf == null) { - throw new NullPointerException("JobConf may not be null."); - } - - this.reducer = hadoopReducer; - this.combiner = hadoopCombiner; - this.jobConf = conf; - } - - @SuppressWarnings("unchecked") - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.reducer.configure(jobConf); - this.combiner.configure(jobConf); - - this.reporter = new HadoopDummyReporter(); - Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); - TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); - this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer); - this.combineCollector = new HadoopOutputCollector<>(); - this.reduceCollector = new HadoopOutputCollector<>(); - } - - @Override - public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) - throws Exception { - reduceCollector.setFlinkCollector(out); - valueIterator.set(values.iterator()); - reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter); - } - - @Override - public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception { - combineCollector.setFlinkCollector(out); - valueIterator.set(values.iterator()); - combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter); - } - - @SuppressWarnings("unchecked") - @Override - public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { - Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); - Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); - - final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass); - final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass); - return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo); - } - - /** - * Custom serialization methods. - * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> - */ - private void writeObject(final ObjectOutputStream out) throws IOException { - - out.writeObject(reducer.getClass()); - out.writeObject(combiner.getClass()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { - - Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = - (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); - reducer = InstantiationUtil.instantiate(reducerClass); - - Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = - (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject(); - combiner = InstantiationUtil.instantiate(combinerClass); - - jobConf = new JobConf(); - jobConf.readFields(in); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java deleted file mode 100644 index 55aea24..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.apache.flink.annotation.Public; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. - */ -@SuppressWarnings("rawtypes") -@Public -public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> - extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> - implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { - - private static final long serialVersionUID = 1L; - - private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer; - private transient JobConf jobConf; - - private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator; - private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector; - private transient Reporter reporter; - - /** - * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. - * - * @param hadoopReducer The Hadoop Reducer to wrap. - */ - public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) { - this(hadoopReducer, new JobConf()); - } - - /** - * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. - * - * @param hadoopReducer The Hadoop Reducer to wrap. - * @param conf The JobConf that is used to configure the Hadoop Reducer. - */ - public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) { - if(hadoopReducer == null) { - throw new NullPointerException("Reducer may not be null."); - } - if(conf == null) { - throw new NullPointerException("JobConf may not be null."); - } - - this.reducer = hadoopReducer; - this.jobConf = conf; - } - - @SuppressWarnings("unchecked") - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.reducer.configure(jobConf); - - this.reporter = new HadoopDummyReporter(); - this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); - Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); - TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); - this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); - } - - @Override - public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) - throws Exception { - - reduceCollector.setFlinkCollector(out); - valueIterator.set(values.iterator()); - reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter); - } - - @SuppressWarnings("unchecked") - @Override - public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { - Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); - Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); - - final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); - final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); - return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); - } - - /** - * Custom serialization methods - * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> - */ - private void writeObject(final ObjectOutputStream out) throws IOException { - - out.writeObject(reducer.getClass()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { - - Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = - (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); - reducer = InstantiationUtil.instantiate(reducerClass); - - jobConf = new JobConf(); - jobConf.readFields(in); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java deleted file mode 100644 index bfe03d3..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred.wrapper; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.apache.hadoop.mapred.OutputCollector; - -import java.io.IOException; - -/** - * A Hadoop OutputCollector that wraps a Flink OutputCollector. - * On each call of collect() the data is forwarded to the wrapped Flink collector. - */ -public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> { - - private Collector<Tuple2<KEY,VALUE>> flinkCollector; - - private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>(); - - /** - * Set the wrapped Flink collector. - * - * @param flinkCollector The wrapped Flink OutputCollector. - */ - public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) { - this.flinkCollector = flinkCollector; - } - - /** - * Use the wrapped Flink collector to collect a key-value pair for Flink. - * - * @param key the key to collect - * @param val the value to collect - * @throws IOException unexpected of key or value in key-value pair. - */ - @Override - public void collect(final KEY key, final VALUE val) throws IOException { - this.outTuple.f0 = key; - this.outTuple.f1 = val; - this.flinkCollector.collect(outTuple); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java deleted file mode 100644 index 2d204b8..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred.wrapper; - -import java.util.Iterator; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator; -import org.apache.flink.api.java.tuple.Tuple2; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field. - */ -public class HadoopTupleUnwrappingIterator<KEY,VALUE> - extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private final TypeSerializer<KEY> keySerializer; - - private transient Iterator<Tuple2<KEY,VALUE>> iterator; - - private transient KEY curKey; - private transient VALUE firstValue; - private transient boolean atFirst; - - public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) { - this.keySerializer = checkNotNull(keySerializer); - } - - /** - * Set the Flink iterator to wrap. - * - * @param iterator The Flink iterator to wrap. - */ - @Override - public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) { - this.iterator = iterator; - if(this.hasNext()) { - final Tuple2<KEY, VALUE> tuple = iterator.next(); - this.curKey = keySerializer.copy(tuple.f0); - this.firstValue = tuple.f1; - this.atFirst = true; - } else { - this.atFirst = false; - } - } - - @Override - public boolean hasNext() { - if(this.atFirst) { - return true; - } - return iterator.hasNext(); - } - - @Override - public VALUE next() { - if(this.atFirst) { - this.atFirst = false; - return firstValue; - } - - final Tuple2<KEY, VALUE> tuple = iterator.next(); - return tuple.f1; - } - - public KEY getCurrentKey() { - return this.curKey; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala deleted file mode 100644 index 133a5f4..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.hadoopcompatibility.scala - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.hadoop.mapreduce -import org.apache.flink.api.scala.hadoop.mapred -import org.apache.hadoop.fs.{Path => HadoopPath} -import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} -import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat} - -/** - * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink. - * - * It provides methods to create Flink InputFormat wrappers for Hadoop - * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]]. - * - * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where - * the first field is the key and the second field is the value. - * - */ -object HadoopInputs { - - /** - * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop - * [[org.apache.hadoop.mapred.FileInputFormat]]. - */ - def readHadoopFile[K, V]( - mapredInputFormat: MapredFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String, - job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { - - // set input path in JobConf - MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) - // wrap mapredInputFormat - createHadoopInput(mapredInputFormat, key, value, job) - } - - /** - * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop - * [[org.apache.hadoop.mapred.FileInputFormat]]. - */ - def readHadoopFile[K, V]( - mapredInputFormat: MapredFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { - - readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf) - } - - /** - * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence - * file with the given key and value classes. - */ - def readSequenceFile[K, V]( - key: Class[K], - value: Class[V], - inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { - - readHadoopFile( - new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V], - key, - value, - inputPath - ) - } - - /** - * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop - * [[org.apache.hadoop.mapred.InputFormat]]. - */ - def createHadoopInput[K, V]( - mapredInputFormat: MapredInputFormat[K, V], - key: Class[K], - value: Class[V], - job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { - - new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) - } - - /** - * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop - * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. - */ - def readHadoopFile[K, V]( - mapreduceInputFormat: MapreduceFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String, - job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = { - - // set input path in Job - MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) - // wrap mapreduceInputFormat - createHadoopInput(mapreduceInputFormat, key, value, job) - } - - /** - * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop - * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. - */ - def readHadoopFile[K, V]( - mapreduceInputFormat: MapreduceFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = - { - readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance) - } - - /** - * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop - * [[org.apache.hadoop.mapreduce.InputFormat]]. - */ - def createHadoopInput[K, V]( - mapreduceInputFormat: MapreduceInputFormat[K, V], - key: Class[K], - value: Class[V], - job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = { - - new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job) - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java deleted file mode 100644 index 2aefd9f..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparator; - -import org.junit.Test; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -import static org.junit.Assert.*; - -@SuppressWarnings("serial") -public class WritableExtractionTest { - - @Test - public void testDetectWritable() { - // writable interface itself must not be writable - assertFalse(TypeExtractor.isHadoopWritable(Writable.class)); - - // various forms of extension - assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class)); - assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class)); - assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class)); - - // some non-writables - assertFalse(TypeExtractor.isHadoopWritable(String.class)); - assertFalse(TypeExtractor.isHadoopWritable(List.class)); - assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class)); - } - - @Test - public void testCreateWritableInfo() { - TypeInformation<DirectWritable> info1 = - TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class); - assertEquals(DirectWritable.class, info1.getTypeClass()); - - TypeInformation<ViaInterfaceExtension> info2 = - TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class); - assertEquals(ViaInterfaceExtension.class, info2.getTypeClass()); - - TypeInformation<ViaAbstractClassExtension> info3 = - TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class); - assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass()); - } - - @Test - public void testValidateTypeInfo() { - // validate unrelated type info - TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class); - - // validate writable type info correctly - TypeExtractor.validateIfWritable(new WritableTypeInfo<>( - DirectWritable.class), DirectWritable.class); - TypeExtractor.validateIfWritable(new WritableTypeInfo<>( - ViaInterfaceExtension.class), ViaInterfaceExtension.class); - TypeExtractor.validateIfWritable(new WritableTypeInfo<>( - ViaAbstractClassExtension.class), ViaAbstractClassExtension.class); - - // incorrect case: not writable at all - try { - TypeExtractor.validateIfWritable(new WritableTypeInfo<>( - DirectWritable.class), String.class); - fail("should have failed with an exception"); - } catch (InvalidTypesException e) { - // expected - } - - // incorrect case: wrong writable - try { - TypeExtractor.validateIfWritable(new WritableTypeInfo<>( - ViaInterfaceExtension.class), DirectWritable.class); - fail("should have failed with an exception"); - } catch (InvalidTypesException e) { - // expected - } - } - - @Test - public void testExtractFromFunction() { - RichMapFunction<DirectWritable, DirectWritable> function = new RichMapFunction<DirectWritable, DirectWritable>() { - @Override - public DirectWritable map(DirectWritable value) throws Exception { - return null; - } - }; - - TypeInformation<DirectWritable> outType = - TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class)); - - assertTrue(outType instanceof WritableTypeInfo); - assertEquals(DirectWritable.class, outType.getTypeClass()); - } - - @Test - public void testExtractAsPartOfPojo() { - PojoTypeInfo<PojoWithWritable> pojoInfo = - (PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class); - - boolean foundWritable = false; - for (int i = 0; i < pojoInfo.getArity(); i++) { - PojoField field = pojoInfo.getPojoFieldAt(i); - String name = field.getField().getName(); - - if (name.equals("hadoopCitizen")) { - if (foundWritable) { - fail("already seen"); - } - foundWritable = true; - assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation()); - assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass()); - - } - } - - assertTrue("missed the writable type", foundWritable); - } - - @Test - public void testInputValidationError() { - - RichMapFunction<Writable, String> function = new RichMapFunction<Writable, String>() { - @Override - public String map(Writable value) throws Exception { - return null; - } - }; - - @SuppressWarnings("unchecked") - TypeInformation<Writable> inType = - (TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class); - - try { - TypeExtractor.getMapReturnTypes(function, inType); - fail("exception expected"); - } - catch (InvalidTypesException e) { - // right - } - } - - // ------------------------------------------------------------------------ - // test type classes - // ------------------------------------------------------------------------ - - public interface ExtendedWritable extends Writable {} - - public static abstract class AbstractWritable implements Writable {} - - public static class DirectWritable implements Writable { - - @Override - public void write(DataOutput dataOutput) throws IOException {} - - @Override - public void readFields(DataInput dataInput) throws IOException {} - } - - public static class ViaInterfaceExtension implements ExtendedWritable { - - @Override - public void write(DataOutput dataOutput) throws IOException {} - - @Override - public void readFields(DataInput dataInput) throws IOException {} - } - - public static class ViaAbstractClassExtension extends AbstractWritable { - - @Override - public void write(DataOutput dataOutput) throws IOException {} - - @Override - public void readFields(DataInput dataInput) throws IOException {} - } - - public static class PojoWithWritable { - public String str; - public DirectWritable hadoopCitizen; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java deleted file mode 100644 index 3d2b652..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.hadoop.io.Writable; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -public class WritableInfoParserTest { - - @Test - public void testWritableType() { - TypeInformation<?> ti = TypeInfoParser.parse( - "Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>"); - - Assert.assertTrue(ti instanceof WritableTypeInfo<?>); - Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass()); - } - - @Test - public void testPojoWithWritableType() { - TypeInformation<?> ti = TypeInfoParser.parse( - "org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<" - + "basic=Integer," - + "tuple=Tuple2<String, Integer>," - + "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>," - + "array=String[]" - + ">"); - Assert.assertTrue(ti instanceof PojoTypeInfo); - PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; - Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName()); - Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo); - Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName()); - Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo); - Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName()); - Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo); - Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName()); - Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo); - } - // ------------------------------------------------------------------------ - // Test types - // ------------------------------------------------------------------------ - - public static class MyWritable implements Writable { - - @Override - public void write(DataOutput out) throws IOException {} - - @Override - public void readFields(DataInput in) throws IOException {} - } - - public static class MyPojo { - public Integer basic; - public Tuple2<String, Integer> tuple; - public MyWritable hadoopCitizen; - public String[] array; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java deleted file mode 100644 index eb9cdf0..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.util.TestLogger; -import org.apache.hadoop.io.Writable; -import org.junit.Test; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -public class WritableTypeInfoTest extends TestLogger { - - @Test - public void testWritableTypeInfoEquality() { - WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class); - WritableTypeInfo<TestClass> tpeInfo2 = new WritableTypeInfo<>(TestClass.class); - - assertEquals(tpeInfo1, tpeInfo2); - assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode()); - } - - @Test - public void testWritableTypeInfoInequality() { - WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class); - WritableTypeInfo<AlternateClass> tpeInfo2 = new WritableTypeInfo<>(AlternateClass.class); - - assertNotEquals(tpeInfo1, tpeInfo2); - } - - // ------------------------------------------------------------------------ - // test types - // ------------------------------------------------------------------------ - - public static class TestClass implements Writable { - - @Override - public void write(DataOutput dataOutput) throws IOException {} - - @Override - public void readFields(DataInput dataInput) throws IOException {} - } - - public static class AlternateClass implements Writable { - - @Override - public void write(DataOutput dataOutput) throws IOException {} - - @Override - public void readFields(DataInput dataInput) throws IOException {} - } -}