Repository: flink Updated Branches: refs/heads/master c06213706 -> 08ca9ffa9
[FLINK-1963] Improve distinct() transformation This closes #905 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08ca9ffa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08ca9ffa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08ca9ffa Branch: refs/heads/master Commit: 08ca9ffa9a95610c073145a09e731311e728c4fd Parents: c062137 Author: pietro pinoli <pie...@pietros-mbp.lan> Authored: Mon Jul 13 13:32:20 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Jul 17 01:39:25 2015 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 15 ++--- .../api/java/operators/DistinctOperator.java | 26 +++----- .../api/java/operator/DistinctOperatorTest.java | 65 +++++++++++++++++++- .../org/apache/flink/api/scala/DataSet.scala | 49 ++++++++++----- .../test/javaApiOperators/DistinctITCase.java | 44 +++++++++++++ .../api/scala/operators/DistinctITCase.scala | 39 ++++++++++++ .../scala/operators/DistinctOperatorTest.scala | 26 ++++++++ 7 files changed, 223 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index c628b04..81ba279 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -606,13 +606,13 @@ public abstract class DataSet<T> { } /** - * Returns a distinct set of a {@link Tuple} {@link DataSet} using expression keys. - * <p> - * The field position keys specify the fields of Tuples or Pojos on which the decision is made if two elements are distinct or - * not. + * Returns a distinct set of a {@link DataSet} using expression keys. * <p> + * The field expression keys specify the fields of a {@link org.apache.flink.api.common.typeutils.CompositeType} + * (e.g., Tuple or Pojo type) on which the decision is made if two elements are distinct or not. + * In case of a {@link org.apache.flink.api.common.typeinfo.AtomicType}, only the wildcard expression ("*") is valid. * - * @param fields One or more field positions on which the distinction of the DataSet is decided. + * @param fields One or more field expressions on which the distinction of the DataSet is decided. * @return A DistinctOperator that represents the distinct DataSet. */ public DistinctOperator<T> distinct(String... fields) { @@ -620,9 +620,10 @@ public abstract class DataSet<T> { } /** - * Returns a distinct set of a {@link Tuple} {@link DataSet} using all fields of the tuple. + * Returns a distinct set of a {@link DataSet}. * <p> - * Note: This operator can only be applied to Tuple DataSets. + * If the input is a {@link org.apache.flink.api.common.typeutils.CompositeType} (Tuple or Pojo type), + * distinct is performed on all fields and each field must be a key type * * @return A DistinctOperator that represents the distinct DataSet. */ http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index 686823c..a6eb43e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.functions.RichGroupReduceFunction; @@ -47,28 +48,21 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera private final Keys<T> keys; private final String distinctLocationName; - + public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) { super(input, input.getType()); this.distinctLocationName = distinctLocationName; - - // if keys is null distinction is done on all tuple fields - if (keys == null) { - if (input.getType() instanceof CompositeType) { - keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType()); - } - else { - throw new InvalidProgramException("Distinction on all fields is only possible on composite (pojo / tuple) data types."); - } + + if (!(input.getType() instanceof CompositeType) && + !(input.getType() instanceof AtomicType && input.getType().isKeyType())){ + throw new InvalidProgramException("Distinct only possible on composite or atomic key types."); } - - - // FieldPositionKeys can only be applied on Tuples and POJOs - if (keys instanceof Keys.ExpressionKeys && !(input.getType() instanceof CompositeType)) { - throw new InvalidProgramException("Distinction on field positions is only possible on composite type DataSets."); + // if keys is null distinction is done on all fields + if (keys == null) { + keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType()); } - + this.keys = keys; } http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java index f4c87c8..f4bd945 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java @@ -114,18 +114,31 @@ public class DistinctOperatorTest { @Test(expected = IllegalArgumentException.class) public void testDistinctByKeyFields6() { - + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); // should not work, negative field position tupleDs.distinct(-1); } + + @Test + public void testDistinctByKeyFields7(){ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO); + + // should work + try { + longDs.distinct("*"); + } catch (Exception e){ + Assert.fail(); + } + } @Test @SuppressWarnings("serial") public void testDistinctByKeySelector1() { - + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); this.customTypeData.add(new CustomType()); @@ -145,7 +158,53 @@ public class DistinctOperatorTest { } } - + + @Test + public void testDistinctByKeyIndices1() { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + try { + DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO); + // should work + longDs.distinct(); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test(expected = InvalidProgramException.class) + public void testDistinctOnNotKeyDataType() throws Exception { + /* + * should not work. NotComparable data type cannot be used as key + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + NotComparable a = new NotComparable(); + List<NotComparable> l = new ArrayList<NotComparable>(); + l.add(a); + + DataSet<NotComparable> ds = env.fromCollection(l); + DataSet<NotComparable> reduceDs = ds.distinct(); + + } + + @Test(expected = InvalidProgramException.class) + public void testDistinctOnNotKeyDataTypeOnSelectAllChar() throws Exception { + /* + * should not work. NotComparable data type cannot be used as key + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + NotComparable a = new NotComparable(); + List<NotComparable> l = new ArrayList<NotComparable>(); + l.add(a); + + DataSet<NotComparable> ds = env.fromCollection(l); + DataSet<NotComparable> reduceDs = ds.distinct("*"); + } + + class NotComparable { + public List<Integer> myInts; + } public static class CustomType implements Serializable { http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index fd1492a..3a0f6d9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -710,10 +710,12 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { // -------------------------------------------------------------------------------------------- // distinct // -------------------------------------------------------------------------------------------- - /** * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether * two elements are distinct or not is made using the return value of the given function. + * + * @param fun The function which extracts the key values from the DataSet on which the + * distinction of the DataSet is decided. */ def distinct[K: TypeInformation](fun: T => K): DataSet[T] = { val keyExtractor = new KeySelector[T, K] { @@ -728,10 +730,24 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** - * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether - * two elements are distinct or not is made based on only the specified tuple fields. + * Returns a distinct set of a {@link DataSet}. + * <p> + * If the input is a composite type (Tuple or Pojo type), distinct is performed on all fields + * and each field must be a key type. + */ + def distinct: DataSet[T] = { + wrap(new DistinctOperator[T](javaSet, null, getCallLocationName())) + } + + /** + * Returns a distinct set of a {@link Tuple} {@link DataSet} using field position keys. + * <p> + * The field position keys specify the fields of Tuples on which the decision is made if + * two Tuples are distinct or not. + * <p> + * Note: Field position keys can only be specified for Tuple DataSets. * - * This only works on tuple DataSets. + * @param fields One or more field positions on which the distinction of the DataSet is decided. */ def distinct(fields: Int*): DataSet[T] = { wrap(new DistinctOperator[T]( @@ -741,8 +757,20 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** - * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether - * two elements are distinct or not is made based on only the specified fields. + * Returns a distinct set of a {@link Tuple} {@link DataSet} using expression keys. + * <p> + * The field position keys specify the fields of Tuples or Pojos on which the decision is made + * if two elements are distinct or not. + * + * The field expression keys specify the fields of a + * {@link org.apache.flink.api.common.typeutils.CompositeType} + * (e.g., Tuple or Pojo type) on which the decision is made if two elements are distinct or not. + * In case of a {@link org.apache.flink.api.common.typeinfo.AtomicType}, only the + * wildcard expression ("_") is valid. + * + * @param firstField First field position on which the distinction of the DataSet is decided + * @param otherFields Zero or more field positions on which the distinction of the DataSet + * is decided */ def distinct(firstField: String, otherFields: String*): DataSet[T] = { wrap(new DistinctOperator[T]( @@ -751,15 +779,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { getCallLocationName())) } - /** - * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether - * two elements are distinct or not is made based on all tuple fields. - * - * This only works if this DataSet contains Tuples. - */ - def distinct: DataSet[T] = { - wrap(new DistinctOperator[T](javaSet, null, getCallLocationName())) - } // -------------------------------------------------------------------------------------------- // Keyed DataSet http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java index 02dbb76..d32986d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java @@ -18,8 +18,11 @@ package org.apache.flink.test.javaApiOperators; +import java.util.ArrayList; import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; @@ -274,4 +277,45 @@ public class DistinctITCase extends MultipleProgramsTestBase { return (int) value.nestedPojo.longNumber; } } + + @Test + public void testCorrectnessOfDistinctOnAtomic() throws Exception { + /* + * check correctness of distinct on Integers + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env); + DataSet<Integer> reduceDs = ds.distinct(); + + List<Integer> result = reduceDs.collect(); + + String expected = "1\n2\n3\n4\n5"; + + compareResultAsText(result, expected); + } + + @Test + public void testCorrectnessOfDistinctOnAtomicWithSelectAllChar() throws Exception { + /* + * check correctness of distinct on Strings, using Keys.ExpressionKeys.SELECT_ALL_CHAR + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<String> ds = CollectionDataSets.getStringDataSet(env); + DataSet<String> reduceDs = ds.union(ds).distinct("*"); + + List<String> result = reduceDs.collect(); + + String expected = "I am fine.\n" + + "Luke Skywalker\n" + + "LOL\n" + + "Hello world, how are you?\n" + + "Hi\n" + + "Hello world\n" + + "Hello\n" + + "Random comment\n"; + + compareResultAsText(result, expected); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala index cf82ce9..8b1e2fc 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala @@ -174,6 +174,45 @@ class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m env.execute() expected = "10000\n20000\n30000\n" } + + @Test + def testCorrectnessOfDistinctOnAtomic(): Unit = { + /* + * check correctness of distinct on Integers + */ + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.getIntDataSet(env) + + val reduceDs = ds.distinct + + reduceDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "1\n2\n3\n4\n5" + } + + @Test + def testCorrectnessOfDistinctOnAtomicWithSelectAllChar(): Unit = { + /* + * check correctness of distinct on Strings, using Keys.ExpressionKeys.SELECT_ALL_CHAR + */ + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.getStringDataSet(env) + val reduceDs = ds.union(ds).distinct("_") + + reduceDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "I am fine.\n" + + "Luke Skywalker\n" + + "LOL\n" + + "Hello world, how are you?\n" + + "Hi\n" + + "Hello world\n" + + "Hello\n" + + "Random comment\n" + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala index ca93d86..7fc53e5 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala @@ -90,6 +90,19 @@ class DistinctOperatorTest { } @Test + def testDistinctByKeyIndices7(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val longDs = env.fromCollection(emptyLongData) + + // should work + try { + longDs.distinct + } catch { + case e: Exception => Assert.fail() + } + } + + @Test def testDistinctByKeyFields1(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tupleDs = env.fromCollection(emptyTupleData) @@ -140,6 +153,19 @@ class DistinctOperatorTest { } @Test + def testDistinctByKeyFields6(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val longDs = env.fromCollection(emptyLongData) + + // should work + try { + longDs.distinct("_") + } catch { + case e: Exception => Assert.fail() + } + } + + @Test def testDistinctByKeySelector1(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment try {