[FLINK-1039] Fix pojo expression keys for group reduce
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/23289d6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/23289d6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/23289d6e Branch: refs/heads/release-0.6 Commit: 23289d6eadeb7ad8798f41e747a4962745e35769 Parents: 32d168f Author: Stephan Ewen <[email protected]> Authored: Thu Jul 31 02:32:37 2014 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Jul 31 14:39:06 2014 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/operators/Keys.java | 5 ++--- .../flink/api/java/operators/ReduceGroupOperator.java | 13 +++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/23289d6e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index 131fba9..2fdb520 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -197,8 +197,8 @@ public abstract class Keys<T> { public ExpressionKeys(String[] expressions, TypeInformation<T> type) { if (!(type instanceof PojoTypeInfo<?>)) { throw new UnsupportedOperationException("Key expressions can only be used on POJOs." + " " + - "A POCO must have a default constructor without arguments and not have readObject" + - " and/or writeObject methods. Also, it can only have nested POJOs or primitive (also boxed)" + + "A POJO must have a default constructor without arguments and not have readObject" + + " and/or writeObject methods. A current restriction is that it can only have nested POJOs or primitive (also boxed)" + " fields."); } PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) type; @@ -212,7 +212,6 @@ public abstract class Keys<T> { " type " + type.toString() + "."); } } - } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/23289d6e/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java index 099860c..d88d43d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java @@ -166,6 +166,19 @@ public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT return po; } + else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { + + int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); + UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); + GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po = + new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); + + po.setCombinable(combinable); + po.setInput(input); + po.setDegreeOfParallelism(this.getParallelism()); + + return po; + } else { throw new UnsupportedOperationException("Unrecognized key type."); }
