http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java deleted file mode 100644 index 5187de7..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator; -import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer; -import org.apache.flink.api.java.typeutils.runtime.ValueComparator; -import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; -import org.apache.flink.types.BooleanValue; -import org.apache.flink.types.ByteValue; -import org.apache.flink.types.CharValue; -import org.apache.flink.types.CopyableValue; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.FloatValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.ShortValue; -import org.apache.flink.types.StringValue; -import org.apache.flink.types.Value; - -/** - * Type information for data types that extend the {@link Value} interface. The value - * interface allows types to define their custom serialization and deserialization routines. - * - * @param <T> The type of the class represented by this type information. - */ -public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implements AtomicType<T> { - - private static final long serialVersionUID = 1L; - - public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO = new ValueTypeInfo<>(BooleanValue.class); - public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new ValueTypeInfo<>(ByteValue.class); - public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new ValueTypeInfo<>(CharValue.class); - public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = new ValueTypeInfo<>(DoubleValue.class); - public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = new ValueTypeInfo<>(FloatValue.class); - public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new ValueTypeInfo<>(IntValue.class); - public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new ValueTypeInfo<>(LongValue.class); - public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new ValueTypeInfo<>(NullValue.class); - public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = new ValueTypeInfo<>(ShortValue.class); - public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class); - - private final Class<T> type; - - public ValueTypeInfo(Class<T> type) { - this.type = Preconditions.checkNotNull(type); - - Preconditions.checkArgument( - Value.class.isAssignableFrom(type) || type.equals(Value.class), - "ValueTypeInfo can only be used for subclasses of " + Value.class.getName()); - } - - @Override - public int getArity() { - return 1; - } - - @Override - public int getTotalFields() { - return 1; - } - - @Override - public Class<T> getTypeClass() { - return this.type; - } - - @Override - public boolean isBasicType() { - return false; - } - - public boolean isBasicValueType() { - return type.equals(StringValue.class) || type.equals(ByteValue.class) || type.equals(ShortValue.class) || type.equals(CharValue.class) || - type.equals(DoubleValue.class) || type.equals(FloatValue.class) || type.equals(IntValue.class) || type.equals(LongValue.class) || - type.equals(NullValue.class) || type.equals(BooleanValue.class); - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public boolean isKeyType() { - return Comparable.class.isAssignableFrom(type); - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { - if (CopyableValue.class.isAssignableFrom(type)) { - return (TypeSerializer<T>) createCopyableValueSerializer(type.asSubclass(CopyableValue.class)); - } - else { - return new ValueSerializer<T>(type); - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { - if (!isKeyType()) { - throw new RuntimeException("The type " + type.getName() + " is not Comparable."); - } - - if (CopyableValue.class.isAssignableFrom(type)) { - return (TypeComparator<T>) new CopyableValueComparator(sortOrderAscending, type); - } - else { - return (TypeComparator<T>) new ValueComparator(sortOrderAscending, type); - } - } - - // utility method to summon the necessary bound - private static <X extends CopyableValue<X>> CopyableValueSerializer<X> createCopyableValueSerializer(Class<X> clazz) { - return new CopyableValueSerializer<X>(clazz); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return this.type.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ValueTypeInfo) { - @SuppressWarnings("unchecked") - ValueTypeInfo<T> valueTypeInfo = (ValueTypeInfo<T>) obj; - - return valueTypeInfo.canEqual(this) && - type == valueTypeInfo.type; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof ValueTypeInfo; - } - - @Override - public String toString() { - return "ValueType<" + type.getSimpleName() + ">"; - } - - // -------------------------------------------------------------------------------------------- - - static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) { - if (Value.class.isAssignableFrom(typeClass) && !typeClass.equals(Value.class)) { - return new ValueTypeInfo<X>(typeClass); - } - else { - throw new InvalidTypesException("The given class is no subclass of " + Value.class.getName()); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java deleted file mode 100644 index 6c140d9..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.WritableComparator; -import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; -import org.apache.hadoop.io.Writable; - -/** - * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable - * interface defines the serialization and deserialization routines for the data type. - * - * @param <T> The type of the class represented by this type information. - */ -public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> typeClass; - - public WritableTypeInfo(Class<T> typeClass) { - this.typeClass = Preconditions.checkNotNull(typeClass); - - Preconditions.checkArgument( - Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class), - "WritableTypeInfo can only be used for subclasses of " + Writable.class.getName()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { - if(Comparable.class.isAssignableFrom(typeClass)) { - return new WritableComparator(sortOrderAscending, typeClass); - } - else { - throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " + - "Class does not implement Comparable interface."); - } - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public int getTotalFields() { - return 1; - } - - @Override - public Class<T> getTypeClass() { - return this.typeClass; - } - - @Override - public boolean isKeyType() { - return Comparable.class.isAssignableFrom(typeClass); - } - - @Override - public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { - return new WritableSerializer<T>(typeClass); - } - - @Override - public String toString() { - return "WritableType<" + typeClass.getName() + ">"; - } - - @Override - public int hashCode() { - return typeClass.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof WritableTypeInfo) { - @SuppressWarnings("unchecked") - WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj; - - return writableTypeInfo.canEqual(this) && - typeClass == writableTypeInfo.typeClass; - - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof WritableTypeInfo; - } - - // -------------------------------------------------------------------------------------------- - - static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) { - if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) { - return new WritableTypeInfo<T>(typeClass); - } - else { - throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName()); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java deleted file mode 100644 index bc04367..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import com.google.common.base.Preconditions; -import org.apache.avro.generic.GenericData; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.InstantiationUtil; - -import com.esotericsoftware.kryo.Kryo; -import org.objenesis.strategy.StdInstantiatorStrategy; - - -/** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. - * - * @param <T> The type serialized. - */ -public final class AvroSerializer<T> extends TypeSerializer<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> type; - - private final Class<? extends T> typeToInstantiate; - - private transient ReflectDatumWriter<T> writer; - private transient ReflectDatumReader<T> reader; - - private transient DataOutputEncoder encoder; - private transient DataInputDecoder decoder; - - private transient Kryo kryo; - - private transient T deepCopyInstance; - - // -------------------------------------------------------------------------------------------- - - public AvroSerializer(Class<T> type) { - this(type, type); - } - - public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) { - this.type = Preconditions.checkNotNull(type); - this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); - - InstantiationUtil.checkForInstantiation(typeToInstantiate); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public AvroSerializer<T> duplicate() { - return new AvroSerializer<T>(type, typeToInstantiate); - } - - @Override - public T createInstance() { - return InstantiationUtil.instantiate(this.typeToInstantiate); - } - - @Override - public T copy(T from) { - checkKryoInitialized(); - - return KryoUtils.copy(from, kryo, this); - } - - @Override - public T copy(T from, T reuse) { - checkKryoInitialized(); - - return KryoUtils.copy(from, reuse, kryo, this); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T value, DataOutputView target) throws IOException { - checkAvroInitialized(); - this.encoder.setOut(target); - this.writer.write(value, this.encoder); - } - - @Override - public T deserialize(DataInputView source) throws IOException { - checkAvroInitialized(); - this.decoder.setIn(source); - return this.reader.read(null, this.decoder); - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - checkAvroInitialized(); - this.decoder.setIn(source); - return this.reader.read(reuse, this.decoder); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - checkAvroInitialized(); - - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); - - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); - } - - - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader<T>(type); - this.writer = new ReflectDatumWriter<T>(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); - } - } - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); - - // register Avro types. - this.kryo.register(GenericData.Array.class, new Serializers.SpecificInstanceCollectionSerializerForArrayList()); - this.kryo.register(Utf8.class); - this.kryo.register(GenericData.EnumSymbol.class); - this.kryo.register(GenericData.Fixed.class); - this.kryo.register(GenericData.StringType.class); - this.kryo.setAsmEnabled(true); - this.kryo.register(type); - } - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof AvroSerializer) { - @SuppressWarnings("unchecked") - AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj; - - return avroSerializer.canEqual(this) && - type == avroSerializer.type && - typeToInstantiate == avroSerializer.typeToInstantiate; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof AvroSerializer; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java deleted file mode 100644 index 9b3b191..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.types.CopyableValue; -import org.apache.flink.types.NormalizableKey; -import org.apache.flink.util.InstantiationUtil; - -/** - * Comparator for all Value types that extend Key - */ -public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>> extends TypeComparator<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> type; - - private final boolean ascendingComparison; - - private transient T reference; - - private transient T tempReference; - - private final TypeComparator<?>[] comparators = new TypeComparator[] {this}; - - public CopyableValueComparator(boolean ascending, Class<T> type) { - this.type = type; - this.ascendingComparison = ascending; - this.reference = InstantiationUtil.instantiate(type, CopyableValue.class); - } - - @Override - public int hash(T record) { - return record.hashCode(); - } - - @Override - public void setReference(T toCompare) { - toCompare.copyTo(reference); - } - - @Override - public boolean equalToReference(T candidate) { - return candidate.equals(this.reference); - } - - @Override - public int compareToReference(TypeComparator<T> referencedComparator) { - T otherRef = ((CopyableValueComparator<T>) referencedComparator).reference; - int comp = otherRef.compareTo(reference); - return ascendingComparison ? comp : -comp; - } - - @Override - public int compare(T first, T second) { - int comp = first.compareTo(second); - return ascendingComparison ? comp : -comp; - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - if (tempReference == null) { - tempReference = InstantiationUtil.instantiate(type, CopyableValue.class); - } - - reference.read(firstSource); - tempReference.read(secondSource); - int comp = reference.compareTo(tempReference); - return ascendingComparison ? comp : -comp; - } - - @Override - public boolean supportsNormalizedKey() { - return NormalizableKey.class.isAssignableFrom(type); - } - - @Override - public int getNormalizeKeyLen() { - NormalizableKey<?> key = (NormalizableKey<?>) reference; - return key.getMaxNormalizedKeyLen(); - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return keyBytes < getNormalizeKeyLen(); - } - - @Override - public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - NormalizableKey<?> key = (NormalizableKey<?>) record; - key.copyNormalizedKey(target, offset, numBytes); - } - - @Override - public boolean invertNormalizedKey() { - return !ascendingComparison; - } - - @Override - public TypeComparator<T> duplicate() { - return new CopyableValueComparator<T>(ascendingComparison, type); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @Override - public TypeComparator<?>[] getFlatComparators() { - return comparators; - } - - // -------------------------------------------------------------------------------------------- - // unsupported normalization - // -------------------------------------------------------------------------------------------- - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - // -------------------------------------------------------------------------------------------- - // serialization - // -------------------------------------------------------------------------------------------- - - private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { - // read basic object and the type - s.defaultReadObject(); - - this.reference = InstantiationUtil.instantiate(type, CopyableValue.class); - this.tempReference = null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java deleted file mode 100644 index 9e46f27..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.CopyableValue; -import org.apache.flink.util.InstantiationUtil; - - -public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSerializer<T> { - - private static final long serialVersionUID = 1L; - - - private final Class<T> valueClass; - - private transient T instance; - - - public CopyableValueSerializer(Class<T> valueClass) { - this.valueClass = Preconditions.checkNotNull(valueClass); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public CopyableValueSerializer<T> duplicate() { - return this; - } - - @Override - public T createInstance() { - return InstantiationUtil.instantiate(this.valueClass); - } - - @Override - public T copy(T from) { - return copy(from, createInstance()); - } - - @Override - public T copy(T from, T reuse) { - from.copyTo(reuse); - return reuse; - } - - @Override - public int getLength() { - ensureInstanceInstantiated(); - return instance.getBinaryLength(); - } - - @Override - public void serialize(T value, DataOutputView target) throws IOException { - value.write(target); - } - - @Override - public T deserialize(DataInputView source) throws IOException { - return deserialize(createInstance(), source); - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - reuse.read(source); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - ensureInstanceInstantiated(); - instance.copy(source, target); - } - - // -------------------------------------------------------------------------------------------- - - private void ensureInstanceInstantiated() { - if (instance == null) { - instance = createInstance(); - } - } - - @Override - public int hashCode() { - return this.valueClass.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof CopyableValueSerializer) { - @SuppressWarnings("unchecked") - CopyableValueSerializer<T> copyableValueSerializer = (CopyableValueSerializer<T>) obj; - - return copyableValueSerializer.canEqual(this) && - valueClass == copyableValueSerializer.valueClass; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CopyableValueSerializer; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java deleted file mode 100644 index e48f9fa..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.io.Decoder; -import org.apache.avro.util.Utf8; - - -public class DataInputDecoder extends Decoder implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private transient Utf8 stringDecoder = new Utf8(); - - - private transient DataInput in; - - - public void setIn(DataInput in) { - this.in = in; - } - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void readNull() {} - - - @Override - public boolean readBoolean() throws IOException { - return in.readBoolean(); - } - - @Override - public int readInt() throws IOException { - return in.readInt(); - } - - @Override - public long readLong() throws IOException { - return in.readLong(); - } - - @Override - public float readFloat() throws IOException { - return in.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return in.readDouble(); - } - - @Override - public int readEnum() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void readFixed(byte[] bytes, int start, int length) throws IOException { - in.readFully(bytes, start, length); - } - - @Override - public ByteBuffer readBytes(ByteBuffer old) throws IOException { - int length = readInt(); - ByteBuffer result; - if (old != null && length <= old.capacity() && old.hasArray()) { - result = old; - result.clear(); - } else { - result = ByteBuffer.allocate(length); - } - in.readFully(result.array(), result.arrayOffset() + result.position(), length); - result.limit(length); - return result; - } - - - @Override - public void skipFixed(int length) throws IOException { - skipBytes(length); - } - - @Override - public void skipBytes() throws IOException { - int num = readInt(); - skipBytes(num); - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - - @Override - public Utf8 readString(Utf8 old) throws IOException { - int length = readInt(); - Utf8 result = (old != null ? old : new Utf8()); - result.setByteLength(length); - - if (length > 0) { - in.readFully(result.getBytes(), 0, length); - } - - return result; - } - - @Override - public String readString() throws IOException { - return readString(stringDecoder).toString(); - } - - @Override - public void skipString() throws IOException { - int len = readInt(); - skipBytes(len); - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public long readArrayStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long arrayNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipArray() throws IOException { - return readVarLongCount(in); - } - - @Override - public long readMapStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long mapNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipMap() throws IOException { - return readVarLongCount(in); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public int readIndex() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - private void skipBytes(int num) throws IOException { - while (num > 0) { - num -= in.skipBytes(num); - } - } - - public static long readVarLongCount(DataInput in) throws IOException { - long value = in.readUnsignedByte(); - - if ((value & 0x80) == 0) { - return value; - } - else { - long curr; - int shift = 7; - value = value & 0x7f; - while (((curr = in.readUnsignedByte()) & 0x80) != 0){ - value |= (curr & 0x7f) << shift; - shift += 7; - } - value |= curr << shift; - return value; - } - } - - // -------------------------------------------------------------------------------------------- - // serialization - // -------------------------------------------------------------------------------------------- - - private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { - // Read in size, and any hidden stuff - s.defaultReadObject(); - - this.stringDecoder = new Utf8(); - this.in = null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java deleted file mode 100644 index be17d64..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.core.memory.DataInputView; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -/** - * An input stream that draws its data from a {@link DataInputView}. - */ -public class DataInputViewStream extends InputStream { - - protected DataInputView inputView; - - public DataInputViewStream(DataInputView inputView) { - this.inputView = inputView; - } - - public DataInputView getInputView(){ - return inputView; - } - - @Override - public int read() throws IOException { - try { - return inputView.readUnsignedByte(); - } catch(EOFException ex) { - return -1; - } - } - - @Override - public long skip(long n) throws IOException { - long counter = n; - while(counter > Integer.MAX_VALUE) { - int skippedBytes = inputView.skipBytes(Integer.MAX_VALUE); - - if (skippedBytes == 0) { - return n - counter; - } - - counter -= skippedBytes; - } - return n - counter - inputView.skipBytes((int) counter); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return inputView.read(b, off, len); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java deleted file mode 100644 index 5c89962..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.io.Encoder; -import org.apache.avro.util.Utf8; - - -public final class DataOutputEncoder extends Encoder implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private transient DataOutput out; - - - public void setOut(DataOutput out) { - this.out = out; - } - - - @Override - public void flush() throws IOException {} - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void writeNull() {} - - - @Override - public void writeBoolean(boolean b) throws IOException { - out.writeBoolean(b); - } - - @Override - public void writeInt(int n) throws IOException { - out.writeInt(n); - } - - @Override - public void writeLong(long n) throws IOException { - out.writeLong(n); - } - - @Override - public void writeFloat(float f) throws IOException { - out.writeFloat(f); - } - - @Override - public void writeDouble(double d) throws IOException { - out.writeDouble(d); - } - - @Override - public void writeEnum(int e) throws IOException { - out.writeInt(e); - } - - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void writeFixed(byte[] bytes, int start, int len) throws IOException { - out.write(bytes, start, len); - } - - @Override - public void writeBytes(byte[] bytes, int start, int len) throws IOException { - out.writeInt(len); - if (len > 0) { - out.write(bytes, start, len); - } - } - - @Override - public void writeBytes(ByteBuffer bytes) throws IOException { - int num = bytes.remaining(); - out.writeInt(num); - - if (num > 0) { - writeFixed(bytes); - } - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - @Override - public void writeString(String str) throws IOException { - byte[] bytes = Utf8.getBytesFor(str); - writeBytes(bytes, 0, bytes.length); - } - - @Override - public void writeString(Utf8 utf8) throws IOException { - writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); - - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public void writeArrayStart() {} - - @Override - public void setItemCount(long itemCount) throws IOException { - if (itemCount > 0) { - writeVarLongCount(out, itemCount); - } - } - - @Override - public void startItem() {} - - @Override - public void writeArrayEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - @Override - public void writeMapStart() {} - - @Override - public void writeMapEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public void writeIndex(int unionIndex) throws IOException { - out.writeInt(unionIndex); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - - public static final void writeVarLongCount(DataOutput out, long val) throws IOException { - if (val < 0) { - throw new IOException("Illegal count (must be non-negative): " + val); - } - - while ((val & ~0x7FL) != 0) { - out.write(((int) val) | 0x80); - val >>>= 7; - } - out.write((int) val); - } - - private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { - // Read in size, and any hidden stuff - s.defaultReadObject(); - - this.out = null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java deleted file mode 100644 index 66f2af6..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.core.memory.DataOutputView; -import java.io.IOException; -import java.io.OutputStream; - -public class DataOutputViewStream extends OutputStream { - protected DataOutputView outputView; - - public DataOutputViewStream(DataOutputView outputView){ - this.outputView = outputView; - } - - @Override - public void write(int b) throws IOException { - outputView.writeByte(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - outputView.write(b, off, len); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java deleted file mode 100644 index b4b95f3..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import static org.apache.flink.api.java.typeutils.Either.Left; -import static org.apache.flink.api.java.typeutils.Either.Right; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.Either; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -/** - * A {@link TypeSerializer} for the {@ link Either} type of the Java class. - * - * @param <L> the Left value type - * @param <R> the Right value type - */ -public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> { - - private static final long serialVersionUID = 1L; - - private final TypeSerializer<L> leftSerializer; - - private final TypeSerializer<R> rightSerializer; - - public EitherSerializer(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) { - this.leftSerializer = leftSerializer; - this.rightSerializer = rightSerializer; - } - - @Override - public boolean isImmutableType() { - return leftSerializer.isImmutableType() && rightSerializer.isImmutableType(); - } - - @Override - public TypeSerializer<Either<L, R>> duplicate() { - TypeSerializer<L> duplicateLeft = leftSerializer.duplicate(); - TypeSerializer<R> duplicateRight = rightSerializer.duplicate(); - - if ((leftSerializer != duplicateLeft) || (rightSerializer != duplicateRight)) { - // stateful - return new EitherSerializer<L, R>(duplicateLeft, duplicateRight); - } - else { - return this; - } - } - - - @Override - public Either<L, R> createInstance() { - // We arbitrarily always create a Right value instance. - return Right(rightSerializer.createInstance()); - } - - @Override - public Either<L, R> copy(Either<L, R> from) { - if (from.isLeft()) { - L left = from.left(); - L copyLeft = leftSerializer.copy(left); - return Left(copyLeft); - } - else { - R right = from.right(); - R copyRight = rightSerializer.copy(right); - return Right(copyRight); - } - } - - @Override - public Either<L, R> copy(Either<L, R> from, Either<L, R> reuse) { - if (from.isRight()) { - final R right = from.right(); - if (reuse.isRight()) { - R copyRight = rightSerializer.copy(right, reuse.right()); - return Right(copyRight); - } - else { - // if the reuse record isn't a right value, we cannot reuse - R copyRight = rightSerializer.copy(right); - return Right(copyRight); - } - } - else { - L left = from.left(); - // reuse record is never a left value because we always create a right instance - L copyLeft = leftSerializer.copy(left); - return Left(copyLeft); - } - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(Either<L, R> record, DataOutputView target) throws IOException { - if (record.isLeft()) { - target.writeBoolean(true); - leftSerializer.serialize(record.left(), target); - } - else { - target.writeBoolean(false); - rightSerializer.serialize(record.right(), target); - } - } - - @Override - public Either<L, R> deserialize(DataInputView source) throws IOException { - boolean isLeft = source.readBoolean(); - if (isLeft) { - return Left(leftSerializer.deserialize(source)); - } - else { - return Right(rightSerializer.deserialize(source)); - } - } - - @Override - public Either<L, R> deserialize(Either<L, R> reuse, DataInputView source) throws IOException { - boolean isLeft = source.readBoolean(); - if (!isLeft) { - if (reuse.isRight()) { - return Right(rightSerializer.deserialize(reuse.right(), source)); - } - else { - // if the reuse record isn't a right value, we cannot reuse - return Right(rightSerializer.deserialize(source)); - } - } - else { - // reuse record is never a left value because we always create a right instance - return Left(leftSerializer.deserialize(source)); - } - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - boolean isLeft = source.readBoolean(); - target.writeBoolean(isLeft); - if (isLeft) { - leftSerializer.copy(source, target); - } - else { - rightSerializer.copy(source, target); - } - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object obj) { - if (obj instanceof EitherSerializer) { - EitherSerializer<L, R> other = (EitherSerializer<L, R>) obj; - - return other.canEqual(this) && - leftSerializer.equals(other.leftSerializer) && - rightSerializer.equals(other.rightSerializer); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof EitherSerializer; - } - - @Override - public int hashCode() { - return 17 * leftSerializer.hashCode() + rightSerializer.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java deleted file mode 100644 index 28fea6a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.types.NormalizableKey; -import org.apache.flink.util.InstantiationUtil; - -/** - * TypeComparator for all types that extend Comparable. - */ -public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparator<T> { - - private static final long serialVersionUID = 1L; - - private final boolean ascending; - - private final Class<T> type; - - private TypeSerializer<T> serializer; - - private transient T reference; - - private transient T tmpReference; - - @SuppressWarnings("rawtypes") - private final TypeComparator[] comparators = new TypeComparator[] {this}; - - // ------------------------------------------------------------------------ - - public GenericTypeComparator(boolean ascending, TypeSerializer<T> serializer, Class<T> type) { - this.ascending = ascending; - this.serializer = serializer; - this.type = type; - } - - private GenericTypeComparator(GenericTypeComparator<T> toClone) { - this.ascending = toClone.ascending; - this.serializer = toClone.serializer.duplicate(); - this.type = toClone.type; - } - - @Override - public int hash(T record) { - return record.hashCode(); - } - - @Override - public void setReference(T toCompare) { - this.reference = this.serializer.copy(toCompare); - } - - @Override - public boolean equalToReference(T candidate) { - return candidate.equals(this.reference); - } - - @Override - public int compareToReference(TypeComparator<T> referencedComparator) { - T otherRef = ((GenericTypeComparator<T>) referencedComparator).reference; - int cmp = otherRef.compareTo(this.reference); - - return this.ascending ? cmp : -cmp; - } - - @Override - public int compare(T first, T second) { - int cmp = first.compareTo(second); - return this.ascending ? cmp : -cmp; - } - - @Override - public int compareSerialized(final DataInputView firstSource, final DataInputView secondSource) throws IOException { - - if (this.reference == null) { - this.reference = this.serializer.createInstance(); - } - - if (this.tmpReference == null) { - this.tmpReference = this.serializer.createInstance(); - } - - this.reference = this.serializer.deserialize(this.reference, firstSource); - this.tmpReference = this.serializer.deserialize(this.tmpReference, secondSource); - - int cmp = this.reference.compareTo(this.tmpReference); - return this.ascending ? cmp : -cmp; - } - - @Override - public boolean supportsNormalizedKey() { - return NormalizableKey.class.isAssignableFrom(this.type); - } - - @Override - public int getNormalizeKeyLen() { - if (this.reference == null) { - this.reference = InstantiationUtil.instantiate(this.type); - } - - NormalizableKey<?> key = (NormalizableKey<?>) this.reference; - return key.getMaxNormalizedKeyLen(); - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return keyBytes < getNormalizeKeyLen(); - } - - @Override - public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - NormalizableKey<?> key = (NormalizableKey<?>) record; - key.copyNormalizedKey(target, offset, numBytes); - } - - @Override - public boolean invertNormalizedKey() { - return !ascending; - } - - @Override - public TypeComparator<T> duplicate() { - return new GenericTypeComparator<T>(this); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @SuppressWarnings("rawtypes") - @Override - public TypeComparator[] getFlatComparators() { - return comparators; - } - - // ------------------------------------------------------------------------ - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java deleted file mode 100644 index faf5646..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoException; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.util.InstantiationUtil; - -import java.io.IOException; - -/** - * Convenience methods for Kryo - */ -public class KryoUtils { - - /** - * Tries to copy the given record from using the provided Kryo instance. If this fails, then - * the record from is copied by serializing it into a byte buffer and deserializing it from - * there. - * - * @param from Element to copy - * @param kryo Kryo instance to use - * @param serializer TypeSerializer which is used in case of a Kryo failure - * @param <T> Type of the element to be copied - * @return Copied element - */ - public static <T> T copy(T from, Kryo kryo, TypeSerializer<T> serializer) { - try { - return kryo.copy(from); - } catch (KryoException ke) { - // Kryo could not copy the object --> try to serialize/deserialize the object - try { - byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from); - - return InstantiationUtil.deserializeFromByteArray(serializer, byteArray); - } catch (IOException ioe) { - throw new RuntimeException("Could not copy object by serializing/deserializing" + - " it.", ioe); - } - } - } - - /** - * Tries to copy the given record from using the provided Kryo instance. If this fails, then - * the record from is copied by serializing it into a byte buffer and deserializing it from - * there. - * - * @param from Element to copy - * @param reuse Reuse element for the deserialization - * @param kryo Kryo instance to use - * @param serializer TypeSerializer which is used in case of a Kryo failure - * @param <T> Type of the element to be copied - * @return Copied element - */ - public static <T> T copy(T from, T reuse, Kryo kryo, TypeSerializer<T> serializer) { - try { - return kryo.copy(from); - } catch (KryoException ke) { - // Kryo could not copy the object --> try to serialize/deserialize the object - try { - byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from); - - return InstantiationUtil.deserializeFromByteArray(serializer, reuse, byteArray); - } catch (IOException ioe) { - throw new RuntimeException("Could not copy object by serializing/deserializing" + - " it.", ioe); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java deleted file mode 100644 index 0f4fe94..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import com.esotericsoftware.kryo.KryoException; -import com.esotericsoftware.kryo.io.Input; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -public class NoFetchingInput extends Input { - public NoFetchingInput(InputStream inputStream){ - super(inputStream, 8); - } - - @Override - public boolean eof(){ - throw new UnsupportedOperationException("NoFetchingInput does not support EOF."); - } - - @Override - public int read() throws KryoException { - require(1); - return buffer[position++] & 0xFF; - } - - @Override - public boolean canReadInt() throws KryoException { - throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data."); - } - - @Override - public boolean canReadLong() throws KryoException { - throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data."); - } - - /** - * Require makes sure that at least required number of bytes are kept in the buffer. If not, then - * it will load exactly the difference between required and currently available number of bytes. - * Thus, it will only load the data which is required and never prefetch data. - * - * @param required the number of bytes being available in the buffer - * @return the number of bytes remaining, which is equal to required - * @throws KryoException - */ - @Override - protected int require(int required) throws KryoException { - if(required > capacity) { - throw new KryoException("Buffer too small: capacity: " + capacity + ", " + - "required: " + required); - } - - position = 0; - int bytesRead = 0; - int count; - while(true){ - count = fill(buffer, bytesRead, required - bytesRead); - - if(count == -1){ - throw new KryoException(new EOFException("No more bytes left.")); - } - - bytesRead += count; - if(bytesRead == required){ - break; - } - } - limit = required; - return required; - } - - @Override - public int read(byte[] bytes, int offset, int count) throws KryoException { - if(bytes == null){ - throw new IllegalArgumentException("bytes cannot be null."); - } - - try { - return inputStream.read(bytes, offset, count); - }catch(IOException ex){ - throw new KryoException(ex); - } - } - - @Override - public void skip(int count) throws KryoException { - try{ - inputStream.skip(count); - }catch(IOException ex){ - throw new KryoException(ex); - } - } - - @Override - public void readBytes(byte[] bytes, int offset, int count) throws KryoException { - if(bytes == null){ - throw new IllegalArgumentException("bytes cannot be null."); - } - - try{ - int bytesRead = 0; - int c; - - while(true){ - c = inputStream.read(bytes, offset+bytesRead, count-bytesRead); - - if(c == -1){ - throw new KryoException(new EOFException("No more bytes left.")); - } - - bytesRead += c; - - if(bytesRead == count){ - break; - } - } - }catch(IOException ex){ - throw new KryoException(ex); - } - } - - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java deleted file mode 100644 index c0c7797..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; -import java.util.List; - -import org.apache.flink.api.common.typeutils.CompositeTypeComparator; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.types.NullKeyFieldException; -import org.apache.flink.util.InstantiationUtil; - - -public final class PojoComparator<T> extends CompositeTypeComparator<T> implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - // Reflection fields for the comp fields - private transient Field[] keyFields; - - private final TypeComparator<Object>[] comparators; - - private final int[] normalizedKeyLengths; - - private final int numLeadingNormalizableKeys; - - private final int normalizableKeyPrefixLen; - - private final boolean invertNormKey; - - private TypeSerializer<T> serializer; - - private final Class<T> type; - - @SuppressWarnings("unchecked") - public PojoComparator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer, Class<T> type) { - this.keyFields = keyFields; - this.comparators = (TypeComparator<Object>[]) comparators; - - this.type = type; - this.serializer = serializer; - - // set up auxiliary fields for normalized key support - this.normalizedKeyLengths = new int[keyFields.length]; - int nKeys = 0; - int nKeyLen = 0; - boolean inverted = false; - - for (int i = 0; i < this.comparators.length; i++) { - TypeComparator<?> k = this.comparators[i]; - if(k == null) { - throw new IllegalArgumentException("One of the passed comparators is null"); - } - if(keyFields[i] == null) { - throw new IllegalArgumentException("One of the passed reflection fields is null"); - } - - // as long as the leading keys support normalized keys, we can build up the composite key - if (k.supportsNormalizedKey()) { - if (i == 0) { - // the first comparator decides whether we need to invert the key direction - inverted = k.invertNormalizedKey(); - } - else if (k.invertNormalizedKey() != inverted) { - // if a successor does not agree on the invertion direction, it cannot be part of the normalized key - break; - } - - nKeys++; - final int len = k.getNormalizeKeyLen(); - if (len < 0) { - throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len); - } - this.normalizedKeyLengths[i] = len; - nKeyLen += this.normalizedKeyLengths[i]; - - if (nKeyLen < 0) { - // overflow, which means we are out of budget for normalized key space anyways - nKeyLen = Integer.MAX_VALUE; - break; - } - } else { - break; - } - } - this.numLeadingNormalizableKeys = nKeys; - this.normalizableKeyPrefixLen = nKeyLen; - this.invertNormKey = inverted; - } - - @SuppressWarnings("unchecked") - private PojoComparator(PojoComparator<T> toClone) { - this.keyFields = toClone.keyFields; - this.comparators = new TypeComparator[toClone.comparators.length]; - - for (int i = 0; i < toClone.comparators.length; i++) { - this.comparators[i] = toClone.comparators[i].duplicate(); - } - - this.normalizedKeyLengths = toClone.normalizedKeyLengths; - this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys; - this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen; - this.invertNormKey = toClone.invertNormKey; - - this.type = toClone.type; - - try { - this.serializer = (TypeSerializer<T>) InstantiationUtil.deserializeObject( - InstantiationUtil.serializeObject(toClone.serializer), Thread.currentThread().getContextClassLoader()); - } catch (IOException e) { - throw new RuntimeException("Cannot copy serializer", e); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot copy serializer", e); - } - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - out.writeInt(keyFields.length); - for (Field field: keyFields) { - out.writeObject(field.getDeclaringClass()); - out.writeUTF(field.getName()); - } - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - int numKeyFields = in.readInt(); - keyFields = new Field[numKeyFields]; - for (int i = 0; i < numKeyFields; i++) { - Class<?> clazz = (Class<?>) in.readObject(); - String fieldName = in.readUTF(); - // try superclasses as well - while (clazz != null) { - try { - Field field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - keyFields[i] = field; - break; - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); - } - } - if (keyFields[i] == null ) { - throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." - + " (" + fieldName + ")"); - } - } - } - - public Field[] getKeyFields() { - return this.keyFields; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public void getFlatComparator(List<TypeComparator> flatComparators) { - for(int i = 0; i < comparators.length; i++) { - if(comparators[i] instanceof CompositeTypeComparator) { - ((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators); - } else { - flatComparators.add(comparators[i]); - } - } - } - - /** - * This method is handling the IllegalAccess exceptions of Field.get() - */ - public final Object accessField(Field field, Object object) { - try { - object = field.get(object); - } catch (NullPointerException npex) { - throw new NullKeyFieldException("Unable to access field "+field+" on object "+object); - } catch (IllegalAccessException iaex) { - throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo." - + " fields: " + field + " obj: " + object); - } - return object; - } - - @Override - public int hash(T value) { - int i = 0; - int code = 0; - for (; i < this.keyFields.length; i++) { - code *= TupleComparatorBase.HASH_SALT[i & 0x1F]; - try { - code += this.comparators[i].hash(accessField(keyFields[i], value)); - }catch(NullPointerException npe) { - throw new RuntimeException("A NullPointerException occured while accessing a key field in a POJO. " + - "Most likely, the value grouped/joined on is null. Field name: "+keyFields[i].getName(), npe); - } - } - return code; - - } - - @Override - public void setReference(T toCompare) { - int i = 0; - for (; i < this.keyFields.length; i++) { - this.comparators[i].setReference(accessField(keyFields[i], toCompare)); - } - } - - @Override - public boolean equalToReference(T candidate) { - int i = 0; - for (; i < this.keyFields.length; i++) { - if (!this.comparators[i].equalToReference(accessField(keyFields[i], candidate))) { - return false; - } - } - return true; - } - - @Override - public int compareToReference(TypeComparator<T> referencedComparator) { - PojoComparator<T> other = (PojoComparator<T>) referencedComparator; - - int i = 0; - try { - for (; i < this.keyFields.length; i++) { - int cmp = this.comparators[i].compareToReference(other.comparators[i]); - if (cmp != 0) { - return cmp; - } - } - return 0; - } - catch (NullPointerException npex) { - throw new NullKeyFieldException(this.keyFields[i].toString()); - } - } - - @Override - public int compare(T first, T second) { - int i = 0; - for (; i < keyFields.length; i++) { - int cmp = comparators[i].compare(accessField(keyFields[i], first), accessField(keyFields[i], second)); - if (cmp != 0) { - return cmp; - } - } - - return 0; - } - - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - T first = this.serializer.createInstance(); - T second = this.serializer.createInstance(); - - first = this.serializer.deserialize(first, firstSource); - second = this.serializer.deserialize(second, secondSource); - - return this.compare(first, second); - } - - @Override - public boolean supportsNormalizedKey() { - return this.numLeadingNormalizableKeys > 0; - } - - @Override - public int getNormalizeKeyLen() { - return this.normalizableKeyPrefixLen; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return this.numLeadingNormalizableKeys < this.keyFields.length || - this.normalizableKeyPrefixLen == Integer.MAX_VALUE || - this.normalizableKeyPrefixLen > keyBytes; - } - - @Override - public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) { - int i = 0; - for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++) - { - int len = this.normalizedKeyLengths[i]; - len = numBytes >= len ? len : numBytes; - this.comparators[i].putNormalizedKey(accessField(keyFields[i], value), target, offset, len); - numBytes -= len; - offset += len; - } - } - - @Override - public boolean invertNormalizedKey() { - return this.invertNormKey; - } - - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public PojoComparator<T> duplicate() { - return new PojoComparator<T>(this); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - int localIndex = index; - for (int i = 0; i < comparators.length; i++) { - localIndex += comparators[i].extractKeys(accessField(keyFields[i], record), target, localIndex); - } - return localIndex - index; - } - - // -------------------------------------------------------------------------------------------- -} -