[FLINK-1039] Fix pojo expression keys for group reduce

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/23289d6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/23289d6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/23289d6e

Branch: refs/heads/release-0.6
Commit: 23289d6eadeb7ad8798f41e747a4962745e35769
Parents: 32d168f
Author: Stephan Ewen <[email protected]>
Authored: Thu Jul 31 02:32:37 2014 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jul 31 14:39:06 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/operators/Keys.java |  5 ++---
 .../flink/api/java/operators/ReduceGroupOperator.java  | 13 +++++++++++++
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/23289d6e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 131fba9..2fdb520 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -197,8 +197,8 @@ public abstract class Keys<T> {
                public ExpressionKeys(String[] expressions, TypeInformation<T> 
type) {
                        if (!(type instanceof PojoTypeInfo<?>)) {
                                throw new UnsupportedOperationException("Key 
expressions can only be used on POJOs." + " " +
-                                               "A POCO must have a default 
constructor without arguments and not have readObject" +
-                                               " and/or writeObject methods. 
Also, it can only have nested POJOs or primitive (also boxed)" +
+                                               "A POJO must have a default 
constructor without arguments and not have readObject" +
+                                               " and/or writeObject methods. A 
current restriction is that it can only have nested POJOs or primitive (also 
boxed)" +
                                                " fields.");
                        }
                        PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) type;
@@ -212,7 +212,6 @@ public abstract class Keys<T> {
                                                        " type " + 
type.toString() + ".");
                                }
                        }
-
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/23289d6e/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
index 099860c..d88d43d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
@@ -166,6 +166,19 @@ public class ReduceGroupOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                        
                        return po;
                }
+               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
+
+                       int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
+                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+                       GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, 
OUT>> po =
+                                       new GroupReduceOperatorBase<IN, OUT, 
GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+
+                       po.setCombinable(combinable);
+                       po.setInput(input);
+                       po.setDegreeOfParallelism(this.getParallelism());
+                       
+                       return po;
+               }
                else {
                        throw new UnsupportedOperationException("Unrecognized 
key type.");
                }

Reply via email to