http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java deleted file mode 100644 index 8ff0b1b..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import com.esotericsoftware.kryo.Kryo; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; -import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Random; - -import static org.junit.Assert.*; - -@SuppressWarnings("unchecked") -public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { - - ExecutionConfig ec = new ExecutionConfig(); - - @Test - public void testJavaList(){ - Collection<Integer> a = new ArrayList<>(); - - fillCollection(a); - - runTests(a); - } - - @Test - public void testJavaSet(){ - Collection<Integer> b = new HashSet<>(); - - fillCollection(b); - - runTests(b); - } - - - - @Test - public void testJavaDequeue(){ - Collection<Integer> c = new LinkedList<>(); - fillCollection(c); - runTests(c); - } - - private void fillCollection(Collection<Integer> coll) { - coll.add(42); - coll.add(1337); - coll.add(49); - coll.add(1); - } - - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new KryoSerializer<T>(type, ec); - } - - /** - * Make sure that the kryo serializer forwards EOF exceptions properly when serializing - */ - @Test - public void testForwardEOFExceptionWhileSerializing() { - try { - // construct a long string - String str; - { - char[] charData = new char[40000]; - Random rnd = new Random(); - - for (int i = 0; i < charData.length; i++) { - charData[i] = (char) rnd.nextInt(10000); - } - - str = new String(charData); - } - - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000); - KryoSerializer<String> serializer = new KryoSerializer<String>(String.class, new ExecutionConfig()); - - try { - serializer.serialize(str, target); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - /** - * Make sure that the kryo serializer forwards EOF exceptions properly when serializing - */ - @Test - public void testForwardEOFExceptionWhileDeserializing() { - try { - int numElements = 100; - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements); - KryoSerializer<Integer> serializer = new KryoSerializer<>(Integer.class, new ExecutionConfig()); - - for(int i = 0; i < numElements; i++){ - serializer.serialize(i, target); - } - - ComparatorTestBase.TestInputView source = new ComparatorTestBase.TestInputView(target.copyByteBuffer()); - - for(int i = 0; i < numElements; i++){ - int value = serializer.deserialize(source); - assertEquals(i, value); - } - - try { - serializer.deserialize(source); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it :-) - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void validateReferenceMappingDisabled() { - KryoSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); - Kryo kryo = serializer.getKryo(); - assertFalse(kryo.getReferences()); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java deleted file mode 100644 index d68afd6..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import java.util.Collection; -import java.util.HashSet; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; -import org.joda.time.LocalDate; -import org.junit.Test; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -@SuppressWarnings("unchecked") -public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializerTest { - - - @Test - public void testJodaTime(){ - Collection<LocalDate> b = new HashSet<LocalDate>(); - - b.add(new LocalDate(1L)); - b.add(new LocalDate(2L)); - - runTests(b); - } - - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - ExecutionConfig conf = new ExecutionConfig(); - conf.registerTypeWithKryoSerializer(LocalDate.class, LocalDateSerializer.class); - TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type); - return typeInfo.createSerializer(conf); - } - - public static final class LocalDateSerializer extends Serializer<LocalDate> implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - @Override - public void write(Kryo kryo, Output output, LocalDate object) { - output.writeInt(object.getYear()); - output.writeInt(object.getMonthOfYear()); - output.writeInt(object.getDayOfMonth()); - } - - @Override - public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) { - return new LocalDate(input.readInt(), input.readInt(), input.readInt()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java deleted file mode 100644 index 7c6d023..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import org.apache.flink.api.common.ExecutionConfig; - -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.core.fs.Path; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashSet; - -import static org.junit.Assert.assertTrue; - -public class SerializersTest { - - // recursive - public static class Node { - private Node parent; - } - - public static class FromNested { - Node recurseMe; - } - - public static class FromGeneric1 {} - public static class FromGeneric2 {} - - public static class Nested1 { - private FromNested fromNested; - private Path yodaIntervall; - } - - public static class ClassWithNested { - - Nested1 nested; - int ab; - - ArrayList<FromGeneric1> addGenType; - FromGeneric2[] genericArrayType; - } - - @Test - public void testTypeRegistration() { - ExecutionConfig conf = new ExecutionConfig(); - Serializers.recursivelyRegisterType(ClassWithNested.class, conf, new HashSet<Class<?>>()); - - KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); // we create Kryo from another type. - - Assert.assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0); - Assert.assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0); - Assert.assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0); - - // check if the generic type from one field is also registered (its very likely that - // generic types are also used as fields somewhere. - Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0); - Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0); - Assert.assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0); - - - // register again and make sure classes are still registered - ExecutionConfig conf2 = new ExecutionConfig(); - Serializers.recursivelyRegisterType(ClassWithNested.class, conf2, new HashSet<Class<?>>()); - KryoSerializer<String> kryo2 = new KryoSerializer<>(String.class, conf); - assertTrue(kryo2.getKryo().getRegistration(FromNested.class).getId() > 0); - } - - @Test - public void testTypeRegistrationFromTypeInfo() { - ExecutionConfig conf = new ExecutionConfig(); - Serializers.recursivelyRegisterType(new GenericTypeInfo<>(ClassWithNested.class), conf, new HashSet<Class<?>>()); - - KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); // we create Kryo from another type. - - assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0); - assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0); - assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0); - - // check if the generic type from one field is also registered (its very likely that - // generic types are also used as fields somewhere. - assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0); - assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0); - assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java deleted file mode 100644 index faab26a..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.tuple.base; - -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; - -import static org.junit.Assert.assertEquals; - -public abstract class TupleComparatorTestBase<T extends Tuple> extends ComparatorTestBase<T> { - - @Override - protected void deepEquals(String message, T should, T is) { - for (int x = 0; x < should.getArity(); x++) { - assertEquals(should.getField(x), is.getField(x)); - } - } - - @Override - protected abstract TupleComparator<T> createComparator(boolean ascending); - - @Override - protected abstract TupleSerializer<T> createSerializer(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java deleted file mode 100644 index 1d414d8..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.tuple.base; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.TestLogger; -import org.junit.Test; - -/** - * Abstract test base for TuplePairComparators. - * - * @param <T> - * @param <R> - */ -public abstract class TuplePairComparatorTestBase<T extends Tuple, R extends Tuple> extends TestLogger { - - protected abstract TypePairComparator<T, R> createComparator(boolean ascending); - - protected abstract Tuple2<T[], R[]> getSortedTestData(); - - @Test - public void testEqualityWithReference() { - try { - TypePairComparator<T, R> comparator = getComparator(true); - Tuple2<T[], R[]> data = getSortedData(); - for (int x = 0; x < data.f0.length; x++) { - comparator.setReference(data.f0[x]); - - assertTrue(comparator.equalToReference(data.f1[x])); - } - } catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } - } - - @Test - public void testInequalityWithReference() { - testGreatSmallAscDescWithReference(true); - testGreatSmallAscDescWithReference(false); - } - - protected void testGreatSmallAscDescWithReference(boolean ascending) { - try { - Tuple2<T[], R[]> data = getSortedData(); - - TypePairComparator<T, R> comparator = getComparator(ascending); - - //compares every element in high with every element in low - for (int x = 0; x < data.f0.length - 1; x++) { - for (int y = x + 1; y < data.f1.length; y++) { - comparator.setReference(data.f0[x]); - if (ascending) { - assertTrue(comparator.compareToReference(data.f1[y]) > 0); - } else { - assertTrue(comparator.compareToReference(data.f1[y]) < 0); - } - } - } - } catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - protected TypePairComparator<T, R> getComparator(boolean ascending) { - TypePairComparator<T, R> comparator = createComparator(ascending); - if (comparator == null) { - throw new RuntimeException("Test case corrupt. Returns null as comparator."); - } - return comparator; - } - - protected Tuple2<T[], R[]> getSortedData() { - Tuple2<T[], R[]> data = getSortedTestData(); - if (data == null || data.f0 == null || data.f1 == null) { - throw new RuntimeException("Test case corrupt. Returns null as test data."); - } - if (data.f0.length < 2 || data.f1.length < 2) { - throw new RuntimeException("Test case does not provide enough sorted test data."); - } - - return data; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 1534ebf..0479c0b 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.CoGroupRawOperator; import org.apache.flink.api.java.operators.CrossOperator.DefaultCross; import org.apache.flink.api.java.operators.Grouping; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.operators.SortedGrouping; import org.apache.flink.api.java.operators.UdfOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index d25fa9d..22be45a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -20,13 +20,14 @@ package org.apache.flink.api.java.table import java.lang.reflect.Modifier +import org.apache.flink.api.common.operators.Keys import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.aggregation.AggregationFunction import org.apache.flink.api.java.operators.JoinOperator.EquiJoin -import org.apache.flink.api.java.operators.Keys.ExpressionKeys -import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping} +import Keys.ExpressionKeys +import org.apache.flink.api.java.operators.{GroupReduceOperator, MapOperator, UnsortedGrouping} import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields import org.apache.flink.api.table.plan._ http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java index a79c843..1e0a2b3 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java @@ -34,7 +34,7 @@ import org.apache.flink.api.java.aggregation.AggregationFunction; import org.apache.flink.api.java.aggregation.AggregationFunctionFactory; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.operators.Grouping; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.operators.SingleInputOperator; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala index 3eb6472..a6cce43 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala @@ -21,10 +21,10 @@ package org.apache.flink.api.scala import org.apache.commons.lang3.tuple.{ImmutablePair, Pair} import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.functions.{CoGroupFunction, Partitioner, RichCoGroupFunction} -import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.operators.{Keys, Order} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import Keys.ExpressionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.util.Collector http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 35f0faf..151e6b3 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator import org.apache.flink.api.common.aggregators.Aggregator import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat} -import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.operators.{Keys, Order} import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod @@ -31,7 +31,7 @@ import org.apache.flink.api.java.Utils.CountHelper import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.functions.{FirstReducer, KeySelector} import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat} -import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import Keys.ExpressionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.java.{DataSet => JavaDataSet, Utils} @@ -93,6 +93,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { /** * Returns the execution environment associated with the current DataSet. + * * @return associated execution environment */ def getExecutionEnvironment: ExecutionEnvironment = @@ -515,7 +516,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * Convenience method to get the count (number of elements) of a DataSet * * @return A long integer that represents the number of elements in the set - * * @see org.apache.flink.api.java.Utils.CountHelper */ @throws(classOf[Exception]) @@ -531,7 +531,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * As DataSet can contain a lot of data, this method should be used with caution. * * @return A Seq containing the elements of the DataSet - * * @see org.apache.flink.api.java.Utils.CollectHelper */ @throws(classOf[Exception]) @@ -1369,7 +1368,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { /** * Range-partitions a DataSet on the specified fields. * - '''important:''' This operation requires an extra pass over the DataSet to compute the range + *'''important:''' This operation requires an extra pass over the DataSet to compute the range * boundaries and shuffles the whole DataSet over the network. * This can take significant amount of time. */ @@ -1385,7 +1384,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { /** * Range-partitions a DataSet using the specified key selector function. * - '''important:''' This operation requires an extra pass over the DataSet to compute the range + *'''important:''' This operation requires an extra pass over the DataSet to compute the range * boundaries and shuffles the whole DataSet over the network. * This can take significant amount of time. */ @@ -1516,6 +1515,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { /** * Writes `this` DataSet to the specified location. This uses [[AnyRef.toString]] on * each element. + * * @see org.apache.flink.api.java.DataSet#writeAsText(String) */ def writeAsText( @@ -1532,6 +1532,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * Writes `this` DataSet to the specified location as CSV file(s). * * This only works on Tuple DataSets. For individual tuple fields [[AnyRef.toString]] is used. + * * @see org.apache.flink.api.java.DataSet#writeAsText(String) */ def writeAsCsv( @@ -1623,8 +1624,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * * * Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed. * This uses [[AnyRef.toString]] on each element. - * @param sinkIdentifier The string to prefix the output with. - * + * + * @param sinkIdentifier The string to prefix the output with. * @deprecated Use [[printOnTaskManager(String)]] instead. */ @Deprecated @@ -1636,8 +1637,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { /** * Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed. * This uses [[AnyRef.toString]] on each element. - * @param sinkIdentifier The string to prefix the output with. - * + * + * @param sinkIdentifier The string to prefix the output with. * @deprecated Use [[printOnTaskManager(String)]] instead. */ @Deprecated http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index 7d5419b..bb8287a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -19,11 +19,11 @@ package org.apache.flink.api.scala import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceFunction, Partitioner, ReduceFunction} -import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.operators.{Keys, Order} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.functions.{FirstReducer, KeySelector} -import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import Keys.ExpressionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.api.scala.operators.ScalaAggregateOperator import org.apache.flink.util.Collector http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala index 91f8c85..ace0790 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.scala import org.apache.flink.api.common.functions.CoGroupFunction +import org.apache.flink.api.common.operators.Keys import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo @@ -36,6 +37,7 @@ import scala.reflect.ClassTag * val right = ... * val coGroupResult = left.coGroup(right).where(...).isEqualTo(...) * }}} + * * @tparam L The type of the left input of the coGroup. * @tparam R The type of the right input of the coGroup. */ http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala index 49b2701..71f2bfb 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction, Partitioner, RichFlatJoinFunction} +import org.apache.flink.api.common.operators.Keys import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlatJoinFunction @@ -256,6 +257,7 @@ private[flink] abstract class UnfinishedJoinOperationBase[L, R, O <: JoinFunctio * val right = ... * val joinResult = left.join(right).where(...).equalTo(...) * }}} + * * @tparam L The type of the left input of the join. * @tparam R The type of the right input of the join. */ @@ -287,6 +289,7 @@ class UnfinishedJoinOperation[L, R]( * (first, second) => ... * } * }}} + * * @tparam L The type of the left input of the join. * @tparam R The type of the right input of the join. */ http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index 5db7a91..e8bc3a4 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -22,11 +22,12 @@ import java.util import java.util.regex.{Pattern, Matcher} import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.operators.Keys import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder, InvalidFieldReferenceException, FlatFieldDescriptor} import org.apache.flink.api.common.typeutils._ -import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import Keys.ExpressionKeys import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala index 08d0242..ddb45a4 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala @@ -19,10 +19,10 @@ package org.apache.flink.api.scala import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.common.operators.Keys import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.java.operators.Keys -import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import Keys.ExpressionKeys import org.apache.flink.api.common.typeinfo.TypeInformation /** http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 254af19..f4b6e7f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -37,7 +37,7 @@ import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.io.CsvOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TypeExtractor; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java index afbd8ab..cf40a3b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java @@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index dd8dec9..d0617d0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -29,7 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala index 46059a6..ab5ebf5 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala @@ -20,7 +20,8 @@ package org.apache.flink.api.scala.operators import java.util import org.apache.flink.api.common.InvalidProgramException -import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException +import org.apache.flink.api.common.operators.Keys +import Keys.IncompatibleKeysException import org.junit.Assert import org.junit.Test import org.apache.flink.api.scala._ http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala index 576ecdf..2dabb56 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala @@ -19,9 +19,9 @@ package org.apache.flink.api.scala.operators import org.apache.flink.api.common.operators.Order import org.apache.flink.api.java.io.DiscardingOutputFormat -import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.test.javaApiOperators.GroupCombineITCase +import org.apache.flink.test.javaApiOperators.GroupCombineITCase.ScalaGroupCombineFunctionExample import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.MultipleProgramsTestBase import org.apache.flink.util.Collector @@ -43,7 +43,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa .map(str => Tuple1(str)) // all methods on DataSet - ds.combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) + ds.combineGroup(new ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) @@ -51,7 +51,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa // all methods on UnsortedGrouping ds.groupBy(0) - .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) + .combineGroup(new ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0) @@ -60,7 +60,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa // all methods on SortedGrouping ds.groupBy(0).sortGroup(0, Order.ASCENDING) - .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) + .combineGroup(new ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0).sortGroup(0, Order.ASCENDING) http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala index 0d9ea9e..81a7d7e 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala @@ -20,7 +20,8 @@ package org.apache.flink.api.scala.operators import java.util import org.apache.flink.api.common.InvalidProgramException -import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException +import org.apache.flink.api.common.operators.Keys +import Keys.IncompatibleKeysException import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets.CustomType import org.junit.{Assert, Test} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala index 70276cb..46495d0 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.scala.types import java.io.{DataInput, DataOutput} -import org.apache.flink.api.java.`type`.extractor.TypeExtractorTest.CustomTuple +import org.apache.flink.api.java.typeutils.TypeExtractorTest.CustomTuple import org.apache.flink.api.java.io.CollectionInputFormat import org.apache.hadoop.io.Writable import org.junit.{Assert, Test} @@ -602,7 +602,7 @@ class TypeInformationGenTest { // This checks the condition in checkCollection. If this fails with IllegalArgumentException, // then things like "env.fromElements((),(),())" won't work. import scala.collection.JavaConversions._ - CollectionInputFormat.checkCollection(Seq((),(),()), (new UnitTypeInfo).getTypeClass) + CollectionInputFormat.checkCollection(Seq((),(),()), (new UnitTypeInfo).getTypeClass()) } }