[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15694237#comment-15694237
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2094


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15693171#comment-15693171
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2094
  
Merging...


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676177#comment-15676177
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r88617516
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java
 ---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import java.io.Serializable;
+
+
+/**
+ * Static factories for the {@link FieldAccessor} utilities.
+ */
+@Internal
+public class FieldAccessorFactory implements Serializable {
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @Internal
+   public static  FieldAccessor getAccessor(TypeInformation 
typeInfo, int pos, ExecutionConfig config){
+
+   // In case of arrays
+   if (typeInfo instanceof BasicArrayTypeInfo || typeInfo 
instanceof PrimitiveArrayTypeInfo) {
+   return new FieldAccessor.ArrayFieldAccessor<>(pos, 
typeInfo);
+
+   // In case of basic types
+   } else if (typeInfo instanceof BasicTypeInfo) {
+   if (pos != 0) {
+   throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on a " +
+   "basic type (" + typeInfo.toString() + 
"). A field expression on a basic type can only select " +
+   "the 0th field (which means selecting 
the entire basic type).");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor<>(typeInfo);
+
+   // In case of case classes
+   } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) 
typeInfo).isCaseClass()) {
+   TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) 
typeInfo;
+   TypeInformation[] types = new 
TypeInformation[tupleTypeInfo.getArity()];
+   for (int i = 0; i < types.length; i++) {
+   types[i] = tupleTypeInfo.getTypeAt(i);
+   }
+   return new FieldAccessor.ProductFieldAccessor<>(
--- End diff --

Forgotten, adding it.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676146#comment-15676146
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r88616007
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java
 ---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import java.io.Serializable;
+
+
+/**
+ * Static factories for the {@link FieldAccessor} utilities.
+ */
+@Internal
+public class FieldAccessorFactory implements Serializable {
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @Internal
+   public static  FieldAccessor getAccessor(TypeInformation 
typeInfo, int pos, ExecutionConfig config){
+
+   // In case of arrays
+   if (typeInfo instanceof BasicArrayTypeInfo || typeInfo 
instanceof PrimitiveArrayTypeInfo) {
+   return new FieldAccessor.ArrayFieldAccessor<>(pos, 
typeInfo);
+
+   // In case of basic types
+   } else if (typeInfo instanceof BasicTypeInfo) {
+   if (pos != 0) {
+   throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on a " +
+   "basic type (" + typeInfo.toString() + 
"). A field expression on a basic type can only select " +
+   "the 0th field (which means selecting 
the entire basic type).");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor<>(typeInfo);
+
+   // In case of case classes
+   } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) 
typeInfo).isCaseClass()) {
+   TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) 
typeInfo;
+   TypeInformation[] types = new 
TypeInformation[tupleTypeInfo.getArity()];
+   for (int i = 0; i < types.length; i++) {
+   types[i] = tupleTypeInfo.getTypeAt(i);
+   }
+   return new FieldAccessor.ProductFieldAccessor<>(
+   pos, typeInfo, new 
FieldAccessor.SimpleFieldAccessor<>((TypeInformation)types[pos]), config);
+
+   // In case of tuples
+   } else if (typeInfo.isTupleType()) {
+   return new 
FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo);
+
+   // Default case, PojoType is directed to this statement
+   } else {
+   throw new 
CompositeType.InvalidFieldReferenceException("Cannot reference field by 
position on " + typeInfo.toString()
+  

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676144#comment-15676144
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r88615926
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java
 ---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import java.io.Serializable;
+
+
+/**
+ * Static factories for the {@link FieldAccessor} utilities.
+ */
+@Internal
+public class FieldAccessorFactory implements Serializable {
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @Internal
+   public static  FieldAccessor getAccessor(TypeInformation 
typeInfo, int pos, ExecutionConfig config){
+
+   // In case of arrays
+   if (typeInfo instanceof BasicArrayTypeInfo || typeInfo 
instanceof PrimitiveArrayTypeInfo) {
+   return new FieldAccessor.ArrayFieldAccessor<>(pos, 
typeInfo);
+
+   // In case of basic types
+   } else if (typeInfo instanceof BasicTypeInfo) {
+   if (pos != 0) {
+   throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on a " +
+   "basic type (" + typeInfo.toString() + 
"). A field expression on a basic type can only select " +
+   "the 0th field (which means selecting 
the entire basic type).");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor<>(typeInfo);
+
+   // In case of case classes
+   } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) 
typeInfo).isCaseClass()) {
+   TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) 
typeInfo;
+   TypeInformation[] types = new 
TypeInformation[tupleTypeInfo.getArity()];
--- End diff --

That is definitely more convenient, missed that.  


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676137#comment-15676137
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r88615630
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record The record on which the field will be accessed
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
--
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   final static class 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676123#comment-15676123
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r88614862
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record The record on which the field will be accessed
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
--
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   final static class 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676124#comment-15676124
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r88614874
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record The record on which the field will be accessed
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
--
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   final static class 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676121#comment-15676121
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r88614846
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
--- End diff --

Good catch, reverting.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649139#comment-15649139
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r87100173
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java
 ---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import java.io.Serializable;
+
+
+/**
+ * Static factories for the {@link FieldAccessor} utilities.
+ */
+@Internal
+public class FieldAccessorFactory implements Serializable {
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @Internal
+   public static  FieldAccessor getAccessor(TypeInformation 
typeInfo, int pos, ExecutionConfig config){
+
+   // In case of arrays
+   if (typeInfo instanceof BasicArrayTypeInfo || typeInfo 
instanceof PrimitiveArrayTypeInfo) {
+   return new FieldAccessor.ArrayFieldAccessor<>(pos, 
typeInfo);
+
+   // In case of basic types
+   } else if (typeInfo instanceof BasicTypeInfo) {
+   if (pos != 0) {
+   throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on a " +
+   "basic type (" + typeInfo.toString() + 
"). A field expression on a basic type can only select " +
+   "the 0th field (which means selecting 
the entire basic type).");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor<>(typeInfo);
+
+   // In case of case classes
+   } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) 
typeInfo).isCaseClass()) {
+   TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) 
typeInfo;
+   TypeInformation[] types = new 
TypeInformation[tupleTypeInfo.getArity()];
+   for (int i = 0; i < types.length; i++) {
+   types[i] = tupleTypeInfo.getTypeAt(i);
+   }
+   return new FieldAccessor.ProductFieldAccessor<>(
--- End diff --

In a previous review I suggested to add a `SimpleProductFieldAccessor` 
similar to the `SimpleTupleFieldAccessor`. Did you forget to add it or do you 
think it is not necessary?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>  

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649137#comment-15649137
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r87093199
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
--- End diff --

`FieldAccessor` was `@Internal` before. Why did you make it 
`@PublicEvolving`?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649136#comment-15649136
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r87094727
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record The record on which the field will be accessed
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
--
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   final static class 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649144#comment-15649144
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r87096436
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -337,37 +337,43 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
 * per key.
 *
 * @param positionToSum
-*The position in the data point to sum
+*The field position in the data points to sum. This is 
applicable to
+*Tuple types, basic and primitive array types, Scala case 
classes,
--- End diff --

Not sure if we should document the behavior that we might remove soon. 
I'd rather have this as undocumented feature and open a JIRA issue to 
either add the documentation or remove the support for arrays and `0` for 
atomic types. What do you think @mbalassi?

Same applies for all other Java / ScalaDocs.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649140#comment-15649140
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r87099565
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java
 ---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import java.io.Serializable;
+
+
+/**
+ * Static factories for the {@link FieldAccessor} utilities.
+ */
+@Internal
+public class FieldAccessorFactory implements Serializable {
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @Internal
+   public static  FieldAccessor getAccessor(TypeInformation 
typeInfo, int pos, ExecutionConfig config){
+
+   // In case of arrays
+   if (typeInfo instanceof BasicArrayTypeInfo || typeInfo 
instanceof PrimitiveArrayTypeInfo) {
+   return new FieldAccessor.ArrayFieldAccessor<>(pos, 
typeInfo);
+
+   // In case of basic types
+   } else if (typeInfo instanceof BasicTypeInfo) {
+   if (pos != 0) {
+   throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on a " +
+   "basic type (" + typeInfo.toString() + 
"). A field expression on a basic type can only select " +
+   "the 0th field (which means selecting 
the entire basic type).");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor<>(typeInfo);
+
+   // In case of case classes
+   } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) 
typeInfo).isCaseClass()) {
+   TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) 
typeInfo;
+   TypeInformation[] types = new 
TypeInformation[tupleTypeInfo.getArity()];
--- End diff --

Why do you fetch all field types in an array? 
You can do `tupleTypeInfo.getTypeAt(pos)` instead of `types[pos]`, no?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649138#comment-15649138
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r87102206
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java
 ---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import java.io.Serializable;
+
+
+/**
+ * Static factories for the {@link FieldAccessor} utilities.
+ */
+@Internal
+public class FieldAccessorFactory implements Serializable {
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @Internal
+   public static  FieldAccessor getAccessor(TypeInformation 
typeInfo, int pos, ExecutionConfig config){
+
+   // In case of arrays
+   if (typeInfo instanceof BasicArrayTypeInfo || typeInfo 
instanceof PrimitiveArrayTypeInfo) {
+   return new FieldAccessor.ArrayFieldAccessor<>(pos, 
typeInfo);
+
+   // In case of basic types
+   } else if (typeInfo instanceof BasicTypeInfo) {
+   if (pos != 0) {
+   throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on a " +
+   "basic type (" + typeInfo.toString() + 
"). A field expression on a basic type can only select " +
+   "the 0th field (which means selecting 
the entire basic type).");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor<>(typeInfo);
+
+   // In case of case classes
+   } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) 
typeInfo).isCaseClass()) {
+   TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) 
typeInfo;
+   TypeInformation[] types = new 
TypeInformation[tupleTypeInfo.getArity()];
+   for (int i = 0; i < types.length; i++) {
+   types[i] = tupleTypeInfo.getTypeAt(i);
+   }
+   return new FieldAccessor.ProductFieldAccessor<>(
+   pos, typeInfo, new 
FieldAccessor.SimpleFieldAccessor<>((TypeInformation)types[pos]), config);
+
+   // In case of tuples
+   } else if (typeInfo.isTupleType()) {
+   return new 
FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo);
+
+   // Default case, PojoType is directed to this statement
+   } else {
+   throw new 
CompositeType.InvalidFieldReferenceException("Cannot reference field by 
position on " + typeInfo.toString()
+   

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649142#comment-15649142
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r87101256
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java
 ---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import java.io.Serializable;
+
+
+/**
+ * Static factories for the {@link FieldAccessor} utilities.
+ */
+@Internal
+public class FieldAccessorFactory implements Serializable {
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @Internal
+   public static  FieldAccessor getAccessor(TypeInformation 
typeInfo, int pos, ExecutionConfig config){
+
+   // In case of arrays
+   if (typeInfo instanceof BasicArrayTypeInfo || typeInfo 
instanceof PrimitiveArrayTypeInfo) {
+   return new FieldAccessor.ArrayFieldAccessor<>(pos, 
typeInfo);
+
+   // In case of basic types
+   } else if (typeInfo instanceof BasicTypeInfo) {
+   if (pos != 0) {
+   throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on a " +
+   "basic type (" + typeInfo.toString() + 
"). A field expression on a basic type can only select " +
+   "the 0th field (which means selecting 
the entire basic type).");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor<>(typeInfo);
+
+   // In case of case classes
+   } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) 
typeInfo).isCaseClass()) {
+   TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) 
typeInfo;
+   TypeInformation[] types = new 
TypeInformation[tupleTypeInfo.getArity()];
+   for (int i = 0; i < types.length; i++) {
+   types[i] = tupleTypeInfo.getTypeAt(i);
+   }
+   return new FieldAccessor.ProductFieldAccessor<>(
+   pos, typeInfo, new 
FieldAccessor.SimpleFieldAccessor<>((TypeInformation)types[pos]), config);
+
+   // In case of tuples
+   } else if (typeInfo.isTupleType()) {
+   return new 
FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo);
+
+   // Default case, PojoType is directed to this statement
+   } else {
+   throw new 
CompositeType.InvalidFieldReferenceException("Cannot reference field by 
position on " + typeInfo.toString()
+   

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15641702#comment-15641702
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks @fhueske. I am happy to squash the commits and merge the PR given 
that you are satisfied with it.

I would like to merge it by mid next week, so I could proceed with adding 
the necessary final touches to @Xazax-hun's PR on the serializer code 
generation during the end of the week. I would love to have that issue also 
wrapped up.  


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15635754#comment-15635754
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks for the update @mbalassi. Will have a look at the PR next week.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633099#comment-15633099
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2094
  
As @fhueske and @StephanEwen has suggested I have refactored @ggevay's code 
and now instead of adding two methods to the `TypeInformation` I have moved the 
functionality to a `FieldAccessorFactory` class.

To achieve this I had to move the code back to `flink-streaming-java` as 
the static factory needs to import `scala.Product` to implement the 
functionality for case classes. Given that a scala dependency is better avoided 
at this point in `flink-core` and `flink-streaming` already has the scala 
dependency I felt the most comfortable with this approach. 

If we decide to use the `FieldAccessor`s in other parts of the system we 
might need to revert back to Gabor's previous approach based on inheritance 
instead of the current static monstrosity, so that we can separate the scala 
utilities into their respective projects.

I have also addressed Fabian's inline comments. 


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633036#comment-15633036
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r86366630
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -264,6 +265,7 @@ public void getFlatFields(String fieldExpression, int 
offset, List DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603086#comment-15603086
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84765869
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
--- End diff --

`FieldAccessor` was `@Internal` before. 
Why changing it to `@PublicEvolving`?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603085#comment-15603085
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84767836
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 ---
@@ -265,16 +265,6 @@ public boolean hasDeterministicFieldOrder() {
@PublicEvolving
public abstract int getFieldIndex(String fieldName);
 
-   @PublicEvolving
-   public static class InvalidFieldReferenceException extends 
IllegalArgumentException {
--- End diff --

I would not move this exception. 
It is not strictly required and causes many necessary changes.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603084#comment-15603084
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84768233
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.FieldAccessor;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public final class ProductFieldAccessor extends FieldAccessor {
--- End diff --

I think it makes sense to add a simple (non-nested) field accessor for 
`Product`. 
Case classes are very commonly used in Flink Scala programs. So this might 
pay off.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603088#comment-15603088
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84767676
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 ---
@@ -203,7 +206,34 @@ public void getFlatFields(String fieldExpression, int 
offset, List typed = (TypeInformation) this.types[pos];
return typed;
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   return new FieldAccessor.SimpleTupleFieldAccessor(pos, 
this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   FieldAccessor.FieldExpression decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
+   int fieldPos = this.getFieldIndex(decomp.head);
+   if (fieldPos == -1) {
+   try {
+   fieldPos = Integer.parseInt(decomp.head);
+   } catch (NumberFormatException ex) {
+   throw new InvalidFieldReferenceException("Tried 
to select field \"" + decomp.head
--- End diff --

Exception does not tell user that only integer values are allowed.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603093#comment-15603093
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84767443
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record The record on which the field will be accessed
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
--
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   checkNotNull(typeInfo, "typeInfo must not be null.");
+
+

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603091#comment-15603091
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84768779
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1ns = 
tpeInfo.getFieldAccessor("1", null);
+   assertEquals(10, (int) f1ns.get(t));
+   assertEquals(10, (int) t.f1);
+   t = f1ns.set(t, 11);
+   assertEquals(11, (int) f1ns.get(t));
+   assertEquals(11, (int) f1.get(t));
+   assertEquals(11, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   // This is technically valid (the ".0" is selecting the 0th 
field of a basic type).
+   FieldAccessor, String> f0_0 = 
tpeInfo.getFieldAccessor("f0.0", null);
+   assertEquals("b", f0_0.get(t));
+   assertEquals("b", t.f0);
+   t = f0_0.set(t, "cc");
+   assertEquals("cc", f0_0.get(t));
+   assertEquals("cc", t.f0);
+
+   try {
+   FieldAccessor, String> bad = 
tpeInfo.getFieldAccessor("almafa", null);
+   assertFalse("Expected exception, because of bad field 
name", false);
--- End diff --

Can we use `@Test(expected = InvalidFieldReferenceException)` instead of 
catching an 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603092#comment-15603092
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84769097
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1ns = 
tpeInfo.getFieldAccessor("1", null);
+   assertEquals(10, (int) f1ns.get(t));
+   assertEquals(10, (int) t.f1);
+   t = f1ns.set(t, 11);
+   assertEquals(11, (int) f1ns.get(t));
+   assertEquals(11, (int) f1.get(t));
+   assertEquals(11, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   // This is technically valid (the ".0" is selecting the 0th 
field of a basic type).
+   FieldAccessor, String> f0_0 = 
tpeInfo.getFieldAccessor("f0.0", null);
+   assertEquals("b", f0_0.get(t));
+   assertEquals("b", t.f0);
+   t = f0_0.set(t, "cc");
+   assertEquals("cc", f0_0.get(t));
+   assertEquals("cc", t.f0);
+
+   try {
+   FieldAccessor, String> bad = 
tpeInfo.getFieldAccessor("almafa", null);
+   assertFalse("Expected exception, because of bad field 
name", false);
+   } catch (InvalidFieldReferenceException ex) {
+   // OK
+   }
+   }
 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603087#comment-15603087
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84767248
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record The record on which the field will be accessed
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
--
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   checkNotNull(typeInfo, "typeInfo must not be null.");
+
+

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603090#comment-15603090
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r84766789
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -264,6 +265,7 @@ public void getFlatFields(String fieldExpression, int 
offset, List DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548291#comment-15548291
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
I have rebased now to the current master.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548239#comment-15548239
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2094
  
@mbalassi any news on this PR? Would you like to review it again and merge 
it?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493517#comment-15493517
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
> As for this pull request I suggest to update the false Javadoc from the 
streaming API and document in the concepts that the aggregation field accessors 
and the keyselectors are slightly different due to there semantics.

I've fixed the javadoc now in 154352ba879350bb1ab10b799456a67eb5819375.

As for the differences between the key selectors and aggregations (which 
use `FieldAccessor`): I tried to consolidate the supported field expressions 
between the two in 7c2dceeb87b06963cc2618312f5f1039387003ad. Now the 
aggregations should accept any field expression that selects a single field and 
would be a valid key selector, and some more (selecting array elements).

As for documenting that the aggregations don't work on a field that is 
still some composite type, I'm not sure if this is necessary, since this should 
be pretty obvious to the user.

> The batch API aggregations do not have the overloaded version where you 
could pass a field accessor string.

Here is the Jira for this: https://issues.apache.org/jira/browse/FLINK-4575


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15490154#comment-15490154
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2094
  
We have gone through the changes with @ggevay and found that the 
documentation is lacking or misleading in a couple of cases.

1. The DataStream API JavaDocs mention that one can aggregate on 
expressions including Java Bean-like getters which is not true regardless of 
this change. [1]
2. I think this is due to the confusion between the `KeySelectors` produced 
by the `Keys.ExpressionKeys` and the String expressions that we can pass to an 
aggregation operator. E.g. one of the distinctions between the two options are 
that the former can return a `CompositeType`, while we forbid that to happen in 
the case of the latter as that could cause the aggregation itself to fail.
3. The batch API aggregations do not have the overloaded version where you 
could pass a field accessor string. Of course this is just a convenience method 
anyway, which is easily implemented on the user side, e.g. as it is done in the 
`WordCountPojo` example. [2]

As for this pull request I suggest to update the false Javadoc from the 
streaming API and document in the concepts that the aggregation field accessors 
and the keyselectors are slightly different due to there semantics.

Other than this observation I like the changes and would like to have them 
in by the end of next week.

[1] 
https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java#L295-L310
[2] 
https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java#L106-L111


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463156#comment-15463156
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
https://issues.apache.org/jira/browse/FLINK-4575


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463135#comment-15463135
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
@rmetzger Thanks for looking at it.

> Is there a reason why the DataSet methods for "sum()" for example do not 
allow to aggregate on POJOs with this change?

The reason is just that i didn't have time to do it. But I think it could 
be done in a few hours. I'll open a JIRA.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463094#comment-15463094
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thank you for your work @ggevay, and the reviews @twalthr.

Is there a reason why the DataSet methods for "sum()" for example do not 
allow to aggregate on POJOs with this change?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445332#comment-15445332
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76576754
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+   assertEquals(2.0, f1f2.get(t), 0);
+   assertEquals(2.0, t.f1.f2, 0);
+   t = f1f2.set(t, 3.0);
+   assertEquals(3.0, f1f2.get(t), 0);
+   assertEquals(3.0, t.f1.f2, 0);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+   assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+ 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445334#comment-15445334
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76576801
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   // This is technically valid (the ".0" is selecting the 0th 
field of a basic type).
+   FieldAccessor, String> f0_0 = 
tpeInfo.getFieldAccessor("f0.0", null);
+   assertEquals("b", f0_0.get(t));
+   assertEquals("b", t.f0);
+   t = f0_0.set(t, "cc");
+   assertEquals("cc", f0_0.get(t));
+   assertEquals("cc", t.f0);
+
+   try {
+   FieldAccessor, String> bad = 
tpeInfo.getFieldAccessor("almafa", null);
+   assertFalse("Expected exception, because of bad field 
name", false);
+   } catch (InvalidFieldReferenceException ex) {
+   // OK
+   }
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445240#comment-15445240
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks for updating the PR @ggevay. Beside some small comments the PR looks 
good to merge. I can fix them when merging.
@rmetzger you opened the Jira issue. Do you also want to take a final look 
at it? A second person should have a look at it as it touches important classes.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445167#comment-15445167
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76567175
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+   assertEquals(2.0, f1f2.get(t), 0);
+   assertEquals(2.0, t.f1.f2, 0);
+   t = f1f2.set(t, 3.0);
+   assertEquals(3.0, f1f2.get(t), 0);
+   assertEquals(3.0, t.f1.f2, 0);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+   assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445170#comment-15445170
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76567285
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   // This is technically valid (the ".0" is selecting the 0th 
field of a basic type).
+   FieldAccessor, String> f0_0 = 
tpeInfo.getFieldAccessor("f0.0", null);
+   assertEquals("b", f0_0.get(t));
+   assertEquals("b", t.f0);
+   t = f0_0.set(t, "cc");
+   assertEquals("cc", f0_0.get(t));
+   assertEquals("cc", t.f0);
+
+   try {
+   FieldAccessor, String> bad = 
tpeInfo.getFieldAccessor("almafa", null);
+   assertFalse("Expected exception, because of bad field 
name", false);
+   } catch (InvalidFieldReferenceException ex) {
+   // OK
+   }
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434772#comment-15434772
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
@twalthr I think I have addressed all your comments.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434766#comment-15434766
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040669
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
 ---
@@ -139,17 +139,6 @@ public void flatMap(Data value, Collector out) 
throws Exception {
}
 
 
-   /**
-* As per FLINK-3702 Flink doesn't support nested pojo fields for sum()
-*/
-   @Test(expected = IllegalArgumentException.class)
--- End diff --

48be8d50dc6538f25605daf8f7373c39306e79d9


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434764#comment-15434764
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040657
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
 ---
@@ -236,6 +237,31 @@ abstract class CaseClassTypeInfo[T <: Product](
 }
   }
 
+  override def getFieldAccessor[F](pos: Int, config: ExecutionConfig): 
FieldAccessor[T, F] = {
+new ProductFieldAccessor[T,F,F](
+  pos, this, new 
SimpleFieldAccessor[F](types(pos).asInstanceOf[TypeInformation[F]]), config)
+  }
+
+  override def getFieldAccessor[F](fieldExpression: String, config: 
ExecutionConfig):
+FieldAccessor[T, F] = {
+
+val decomp = FieldAccessor.decomposeFieldExpression(fieldExpression)
+
+val pos = getFieldIndex(decomp.head)
+if(pos == -2) {
--- End diff --

5db003ba72496bb638f3ebe38f864ba978bb0887


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434765#comment-15434765
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040664
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -301,12 +301,12 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
}
 
/**
-* Applies an aggregation that that gives the current sum of the pojo 
data
+* Applies an aggregation that gives the current sum of the pojo data
--- End diff --

b461f49e43b1bb397284c3bbbcd3830e44aaf8b3


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434762#comment-15434762
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040630
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   this.fieldType = typeInfo;
+   }
+
+   @Override
+   public T get(T record) {
+   return record;
+   }
+
+   @Override
+   public T set(T record, T fieldValue) {
+  

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434761#comment-15434761
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040621
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+   assertEquals(2.0, f1f2.get(t), 0);
+   assertEquals(2.0, t.f1.f2, 0);
+   t = f1f2.set(t, 3.0);
+   assertEquals(3.0, f1f2.get(t), 0);
+   assertEquals(3.0, t.f1.f2, 0);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+   assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+ 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434751#comment-15434751
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040520
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 ---
@@ -160,6 +161,53 @@ public boolean isSortKeyType() {
@PublicEvolving
public abstract TypeSerializer createSerializer(ExecutionConfig 
config);
 
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config){
+   throw new InvalidFieldReferenceException("Cannot reference 
field by position on " + this.toString()
+   + "Referencing a field by position is supported on 
tuples, case classes, and arrays. "
+   + "Additionally, you can select the 0th field of a 
primitive/basic type (e.g. int).");
+   }
+
+   /**
+* Creates a {@link FieldAccessor} for the field that is given by a 
field expression,
+* which can be used to get and set the specified field on instances of 
this type.
+*
+* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;>
+* Defining keys using Field Expressions
+*
+* @param field The field expression
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String field, 
ExecutionConfig config) {
+   throw new InvalidFieldReferenceException("Cannot reference 
field by field expression on " + this.toString()
+   + "Field expressions are only supported on POJO 
types, tuples, and case classes. "
+   + "For the requirements for a type to be 
considered a POJO, see "
+   + 
"https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#pojos;);
+   }
+
+   @PublicEvolving
+   public static class InvalidFieldReferenceException extends 
IllegalArgumentException {
--- End diff --

3b770b27c938015fedeb9e76661fd0393bdbf566


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434750#comment-15434750
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040516
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 ---
@@ -160,6 +161,53 @@ public boolean isSortKeyType() {
@PublicEvolving
public abstract TypeSerializer createSerializer(ExecutionConfig 
config);
 
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config){
+   throw new InvalidFieldReferenceException("Cannot reference 
field by position on " + this.toString()
+   + "Referencing a field by position is supported on 
tuples, case classes, and arrays. "
+   + "Additionally, you can select the 0th field of a 
primitive/basic type (e.g. int).");
+   }
+
+   /**
+* Creates a {@link FieldAccessor} for the field that is given by a 
field expression,
+* which can be used to get and set the specified field on instances of 
this type.
+*
+* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;>
+* Defining keys using Field Expressions
+*
+* @param field The field expression
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String field, 
ExecutionConfig config) {
+   throw new InvalidFieldReferenceException("Cannot reference 
field by field expression on " + this.toString()
+   + "Field expressions are only supported on POJO 
types, tuples, and case classes. "
+   + "For the requirements for a type to be 
considered a POJO, see "
+   + 
"https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#pojos;);
--- End diff --

f0bdc99a33612b5326f134daf8f36e2c4553fe3a


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434755#comment-15434755
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040551
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   this.fieldType = typeInfo;
+   }
+
+   @Override
+   public T get(T record) {
+   return record;
+   }
+
+   @Override
+   public T set(T record, T fieldValue) {
+  

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434759#comment-15434759
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040571
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 ---
@@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int 
offset, List typed = (TypeInformation) this.types[pos];
return typed;
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   return new FieldAccessor.SimpleTupleFieldAccessor(pos, 
this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
+   int FieldPos = this.getFieldIndex(decomp.head);
--- End diff --

5be08372611ed7c7e2fa83395f07c5ab1193f4fc
22326c7aea0c8f350d3453eef97df92382ee87f1


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434752#comment-15434752
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040529
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
--- End diff --

4129d1d2a30e602d21e81e833c95490c23dab06d


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434763#comment-15434763
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040641
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.FieldAccessor;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+public final class ProductFieldAccessor extends FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   private final int pos;
+   private final TupleSerializerBase serializer;
+   private final Object[] fields;
+   private final int length;
+   private final FieldAccessor innerAccessor;
+
+   ProductFieldAccessor(int pos, TypeInformation typeInfo, 
FieldAccessor innerAccessor, ExecutionConfig config) {
+   this.pos = pos;
--- End diff --

22326c7aea0c8f350d3453eef97df92382ee87f1


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434760#comment-15434760
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040578
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 ---
@@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int 
offset, List typed = (TypeInformation) this.types[pos];
return typed;
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   return new FieldAccessor.SimpleTupleFieldAccessor(pos, 
this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
+   int FieldPos = this.getFieldIndex(decomp.head);
+   if(decomp.tail == null) {
+   return new FieldAccessor.SimpleTupleFieldAccessor(FieldPos, this);
+   } else {
+   FieldAccessor innerAccessor = 
getTypeAt(FieldPos).getFieldAccessor(decomp.tail, config);
+   return new FieldAccessor.RecursiveTupleFieldAccessor(FieldPos, innerAccessor);
--- End diff --

0b237300f9d8a82badb2ab0009c46fc13be64fa3


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434754#comment-15434754
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040541
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   this.fieldType = typeInfo;
+   }
+
+   @Override
+   public T get(T record) {
+   return record;
+   }
+
+   @Override
+   public T set(T record, T fieldValue) {
+  

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434753#comment-15434753
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040532
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
--- End diff --

8f191a91d202c6e25d8e0ccf8e68c4d39c397315


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434757#comment-15434757
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040561
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -317,7 +308,39 @@ public int getFieldIndex(String fieldName) {
 
return new PojoSerializer(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
--- End diff --

de7a68598d0933ee4016ecb87d27c46eb8011cf4


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434758#comment-15434758
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040566
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -317,7 +308,39 @@ public int getFieldIndex(String fieldName) {
 
return new PojoSerializer(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
+
+   // get field
+   PojoField field = null;
+   TypeInformation fieldType = null;
+   for (int i = 0; i < fields.length; i++) {
+   if (fields[i].getField().getName().equals(decomp.head)) 
{
+   field = fields[i];
+   fieldType = fields[i].getTypeInformation();
+   break;
+   }
+   }
+   if (field == null) {
+   throw new InvalidFieldReferenceException("Unable to 
find field \""+decomp.head+"\" in type "+this+".");
+   }
+
+   if(decomp.tail == null) {
--- End diff --

a0939f7762fa5ac71e5975a36fc31c39fd0cf5dd


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434748#comment-15434748
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040498
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
 ---
@@ -171,6 +172,23 @@ public boolean isKeyType() {
}
}
 
+   @Override
+   @PublicEvolving
+   @SuppressWarnings("unchecked")
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   if(pos != 0) {
+   throw new InvalidFieldReferenceException("Not the 0th 
field selected for a basic type.");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor(this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String field, 
ExecutionConfig config) {
+   throw new InvalidFieldReferenceException("Field expressions are 
not supported on basic types."
--- End diff --

155afcb56d0f87c888e49ecd9f8920d28a4514ca


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434747#comment-15434747
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040491
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
 ---
@@ -121,6 +122,18 @@ public boolean isKeyType() {
}
 
@Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   return new FieldAccessor.ArrayFieldAccessor(pos, this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String field, 
ExecutionConfig config) {
+   throw new InvalidFieldReferenceException("Selecting a field 
from an array type is only supported by index.");
--- End diff --

b9a6012bf349afa83a27ac8c9c7b7af660ee8655


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434749#comment-15434749
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76040502
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 ---
@@ -160,6 +161,53 @@ public boolean isSortKeyType() {
@PublicEvolving
public abstract TypeSerializer createSerializer(ExecutionConfig 
config);
 
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config){
+   throw new InvalidFieldReferenceException("Cannot reference 
field by position on " + this.toString()
+   + "Referencing a field by position is supported on 
tuples, case classes, and arrays. "
+   + "Additionally, you can select the 0th field of a 
primitive/basic type (e.g. int).");
+   }
+
+   /**
+* Creates a {@link FieldAccessor} for the field that is given by a 
field expression,
+* which can be used to get and set the specified field on instances of 
this type.
+*
+* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;>
--- End diff --

f0bdc99a33612b5326f134daf8f36e2c4553fe3a


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434590#comment-15434590
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r76024415
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   this.fieldType = typeInfo;
+   }
+
+   @Override
+   public T get(T record) {
+   return record;
+   }
+
+   @Override
+   public T set(T record, T fieldValue) {
+  

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433202#comment-15433202
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75908195
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+   assertEquals(2.0, f1f2.get(t), 0);
+   assertEquals(2.0, t.f1.f2, 0);
+   t = f1f2.set(t, 3.0);
+   assertEquals(3.0, f1f2.get(t), 0);
+   assertEquals(3.0, t.f1.f2, 0);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+   assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+ 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433196#comment-15433196
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75907421
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+   assertEquals(2.0, f1f2.get(t), 0);
+   assertEquals(2.0, t.f1.f2, 0);
+   t = f1f2.set(t, 3.0);
+   assertEquals(3.0, f1f2.get(t), 0);
+   assertEquals(3.0, t.f1.f2, 0);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+   assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+ 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432951#comment-15432951
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
@twalthr, thank you for the comments, I'll try to address them soon.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432945#comment-15432945
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
> I am a bit out of the loop here. Why do we need a field accessor in the 
TypeInformation?

We need this for situations when the user specifies a nested field 
expression, which goes through different types. (E.g. `.sum("foo.f1.bar")` on a 
POJO, where the field `foo` is a Tuple, and `f1` is a POJO again.) In this 
case, getFieldAccessor can recursively call itself on the TypeInformation of 
the field, which will be a virtual call that goes to the override for the 
field's type.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432912#comment-15432912
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2094
  
I am a bit out of the loop here. Why do we need a field accessor in the 
TypeInformation?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432896#comment-15432896
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks for the contribution and your patience @ggevay. I added some line 
comments. But in general it looks good.
@StephanEwen is it ok to add an `TypeInformation.getFieldAccessor()` method 
to `TypeInformation`?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432886#comment-15432886
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75876606
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
 ---
@@ -139,17 +139,6 @@ public void flatMap(Data value, Collector out) 
throws Exception {
}
 
 
-   /**
-* As per FLINK-3702 Flink doesn't support nested pojo fields for sum()
-*/
-   @Test(expected = IllegalArgumentException.class)
--- End diff --

I think it would be useful to keep this and `testNestedPojoFieldAccessor`. 
So that we also have an end-to-end test.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432811#comment-15432811
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75868407
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -301,12 +301,12 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
}
 
/**
-* Applies an aggregation that that gives the current sum of the pojo 
data
+* Applies an aggregation that gives the current sum of the pojo data
--- End diff --

I would rewrite the entire description. This method does not only process 
"pojo data" but also tuples etc.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432577#comment-15432577
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75839040
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
 ---
@@ -236,6 +237,31 @@ abstract class CaseClassTypeInfo[T <: Product](
 }
   }
 
+  override def getFieldAccessor[F](pos: Int, config: ExecutionConfig): 
FieldAccessor[T, F] = {
+new ProductFieldAccessor[T,F,F](
+  pos, this, new 
SimpleFieldAccessor[F](types(pos).asInstanceOf[TypeInformation[F]]), config)
+  }
+
+  override def getFieldAccessor[F](fieldExpression: String, config: 
ExecutionConfig):
+FieldAccessor[T, F] = {
+
+val decomp = FieldAccessor.decomposeFieldExpression(fieldExpression)
+
+val pos = getFieldIndex(decomp.head)
+if(pos == -2) {
--- End diff --

Would `-1` be a valid index? I think this should be `< 0`.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432566#comment-15432566
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75837546
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   this.fieldType = typeInfo;
+   }
+
+   @Override
+   public T get(T record) {
+   return record;
+   }
+
+   @Override
+   public T set(T record, T fieldValue) {
+ 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432557#comment-15432557
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75836333
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.FieldAccessor;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+public final class ProductFieldAccessor extends FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   private final int pos;
+   private final TupleSerializerBase serializer;
+   private final Object[] fields;
+   private final int length;
+   private final FieldAccessor innerAccessor;
+
+   ProductFieldAccessor(int pos, TypeInformation typeInfo, 
FieldAccessor innerAccessor, ExecutionConfig config) {
+   this.pos = pos;
--- End diff --

As I said above, some checking of parameters would not harm.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432553#comment-15432553
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75835835
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   this.fieldType = typeInfo;
+   }
+
+   @Override
+   public T get(T record) {
+   return record;
+   }
+
+   @Override
+   public T set(T record, T fieldValue) {
+ 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432539#comment-15432539
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75835061
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+   assertEquals(2.0, f1f2.get(t), 0);
+   assertEquals(2.0, t.f1.f2, 0);
+   t = f1f2.set(t, 3.0);
+   assertEquals(3.0, f1f2.get(t), 0);
+   assertEquals(3.0, t.f1.f2, 0);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+   assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432533#comment-15432533
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75834786
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+   assertEquals(2.0, f1f2.get(t), 0);
+   assertEquals(2.0, t.f1.f2, 0);
+   t = f1f2.set(t, 3.0);
+   assertEquals(3.0, f1f2.get(t), 0);
+   assertEquals(3.0, t.f1.f2, 0);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+   assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432535#comment-15432535
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75834833
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+   // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
+   // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+   @Test
+   public void testFlatTuple() {
+   Tuple2 t = Tuple2.of("aa", 5);
+   TupleTypeInfo> tpeInfo =
+   (TupleTypeInfo>) 
TypeExtractor.getForObject(t);
+
+   FieldAccessor, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+   t = f0.set(t, "b");
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+   FieldAccessor, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(5, (int) f1.get(t));
+   assertEquals(5, (int) t.f1);
+   t = f1.set(t, 7);
+   assertEquals(7, (int) f1.get(t));
+   assertEquals(7, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+
+
+   FieldAccessor, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
+   assertEquals(7, (int) f1n.get(t));
+   assertEquals(7, (int) t.f1);
+   t = f1n.set(t, 10);
+   assertEquals(10, (int) f1n.get(t));
+   assertEquals(10, (int) f1.get(t));
+   assertEquals(10, (int) t.f1);
+   assertEquals("b", f0.get(t));
+   assertEquals("b", t.f0);
+   }
+
+   @Test
+   public void testTupleInTuple() {
+   Tuple2> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+   TupleTypeInfo>> 
tpeInfo =
+   (TupleTypeInfo>>)TypeExtractor.getForObject(t);
+
+   FieldAccessor>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+   assertEquals(2.0, f1f2.get(t), 0);
+   assertEquals(2.0, t.f1.f2, 0);
+   t = f1f2.set(t, 3.0);
+   assertEquals(3.0, f1f2.get(t), 0);
+   assertEquals(3.0, t.f1.f2, 0);
+   assertEquals("aa", f0.get(t));
+   assertEquals("aa", t.f0);
+
+   FieldAccessor>, 
Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null);
+   assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+   assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432519#comment-15432519
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75833983
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 ---
@@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int 
offset, List typed = (TypeInformation) this.types[pos];
return typed;
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   return new FieldAccessor.SimpleTupleFieldAccessor(pos, 
this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
+   int FieldPos = this.getFieldIndex(decomp.head);
+   if(decomp.tail == null) {
+   return new FieldAccessor.SimpleTupleFieldAccessor(FieldPos, this);
+   } else {
+   FieldAccessor innerAccessor = 
getTypeAt(FieldPos).getFieldAccessor(decomp.tail, config);
+   return new FieldAccessor.RecursiveTupleFieldAccessor(FieldPos, innerAccessor);
--- End diff --

You can use `<>` here and `` above instead of `Object`.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432513#comment-15432513
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75833200
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 ---
@@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int 
offset, List typed = (TypeInformation) this.types[pos];
return typed;
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   return new FieldAccessor.SimpleTupleFieldAccessor(pos, 
this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
+   int FieldPos = this.getFieldIndex(decomp.head);
--- End diff --

`getFieldIndex()` can return -1. This should be handled somewhere. I think 
the constructors of all accessors should check their input parameters (also for 
null etc.).


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432502#comment-15432502
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75832360
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 ---
@@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int 
offset, List typed = (TypeInformation) this.types[pos];
return typed;
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   return new FieldAccessor.SimpleTupleFieldAccessor(pos, 
this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
+   int FieldPos = this.getFieldIndex(decomp.head);
--- End diff --

Should not be upper case.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432497#comment-15432497
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75831968
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -317,7 +308,39 @@ public int getFieldIndex(String fieldName) {
 
return new PojoSerializer(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
+
+   // get field
+   PojoField field = null;
+   TypeInformation fieldType = null;
+   for (int i = 0; i < fields.length; i++) {
+   if (fields[i].getField().getName().equals(decomp.head)) 
{
+   field = fields[i];
+   fieldType = fields[i].getTypeInformation();
+   break;
+   }
+   }
+   if (field == null) {
+   throw new InvalidFieldReferenceException("Unable to 
find field \""+decomp.head+"\" in type "+this+".");
+   }
+
+   if(decomp.tail == null) {
--- End diff --

It would great if you could document that tail can be null in 
`FieldExpressionDecomposition`.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432487#comment-15432487
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75830865
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -317,7 +308,39 @@ public int getFieldIndex(String fieldName) {
 
return new PojoSerializer(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
}
-   
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
+   
+   FieldAccessor.FieldExpressionDecomposition decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
--- End diff --

What about renaming `FieldExpressionDecomposition` to just 
`FieldExpression`? IMHO the old class name is too long.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432467#comment-15432467
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75829026
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   this.fieldType = typeInfo;
+   }
+
+   @Override
+   public T get(T record) {
+   return record;
+   }
+
+   @Override
+   public T set(T record, T fieldValue) {
+ 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432464#comment-15432464
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75828736
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
+
+
+   /**
+* This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+* field of a POJO that is itself some composite type but is not 
further decomposed)
+*/
+   public final static class SimpleFieldAccessor extends 
FieldAccessor {
+
+   private static final long serialVersionUID = 1L;
+
+   public SimpleFieldAccessor(TypeInformation typeInfo) {
+   this.fieldType = typeInfo;
+   }
+
+   @Override
+   public T get(T record) {
+   return record;
+   }
+
+   @Override
+   public T set(T record, T fieldValue) {
+ 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432461#comment-15432461
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75828673
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
+* @return The value of the field.
+*/
+   public abstract F get(T record);
+
+   /**
+* Sets the field (specified in the constructor) of the given record to 
the given value.
+*
+* Warning: This might modify the original object, or might return a 
new object instance.
+* (This is necessary, because the record might be immutable.)
+*
+* @param record The record to modify
+* @param fieldValue The new value of the field
+* @return A record that has the given field value. (this might be a 
new instance or the original)
+*/
+   public abstract T set(T record, F fieldValue);
+
+
+   // 
==
--- End diff --

Usually we always use `` for logical code sections.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent 

[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432455#comment-15432455
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75828177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java 
---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for 
the field.
+ * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   protected TypeInformation fieldType;
+
+   /**
+* Gets the TypeInformation for the type of the field.
+* Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+*/
+   @SuppressWarnings("unchecked")
+   public TypeInformation getFieldType() {
+   return fieldType;
+   }
+
+
+   /**
+* Gets the value of the field (specified in the constructor) of the 
given record.
+* @param record
--- End diff --

My IDE shows a warning here because of missing documentation.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432447#comment-15432447
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75827568
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 ---
@@ -160,6 +161,53 @@ public boolean isSortKeyType() {
@PublicEvolving
public abstract TypeSerializer createSerializer(ExecutionConfig 
config);
 
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config){
+   throw new InvalidFieldReferenceException("Cannot reference 
field by position on " + this.toString()
+   + "Referencing a field by position is supported on 
tuples, case classes, and arrays. "
+   + "Additionally, you can select the 0th field of a 
primitive/basic type (e.g. int).");
+   }
+
+   /**
+* Creates a {@link FieldAccessor} for the field that is given by a 
field expression,
+* which can be used to get and set the specified field on instances of 
this type.
+*
+* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;>
+* Defining keys using Field Expressions
+*
+* @param field The field expression
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String field, 
ExecutionConfig config) {
+   throw new InvalidFieldReferenceException("Cannot reference 
field by field expression on " + this.toString()
+   + "Field expressions are only supported on POJO 
types, tuples, and case classes. "
+   + "For the requirements for a type to be 
considered a POJO, see "
+   + 
"https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#pojos;);
+   }
+
+   @PublicEvolving
+   public static class InvalidFieldReferenceException extends 
IllegalArgumentException {
--- End diff --

Can you move this in a separate file? We should keep `TypeInformation` 
clear.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432439#comment-15432439
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75827289
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 ---
@@ -160,6 +161,53 @@ public boolean isSortKeyType() {
@PublicEvolving
public abstract TypeSerializer createSerializer(ExecutionConfig 
config);
 
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config){
+   throw new InvalidFieldReferenceException("Cannot reference 
field by position on " + this.toString()
+   + "Referencing a field by position is supported on 
tuples, case classes, and arrays. "
+   + "Additionally, you can select the 0th field of a 
primitive/basic type (e.g. int).");
+   }
+
+   /**
+* Creates a {@link FieldAccessor} for the field that is given by a 
field expression,
+* which can be used to get and set the specified field on instances of 
this type.
+*
+* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;>
--- End diff --

Don't use links to the documentation. They change over time and become 
invalid.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432442#comment-15432442
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75827397
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 ---
@@ -160,6 +161,53 @@ public boolean isSortKeyType() {
@PublicEvolving
public abstract TypeSerializer createSerializer(ExecutionConfig 
config);
 
+
+   /**
+* Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
+* the specified field on instances of this type.
+*
+* @param pos The field position (zero-based)
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config){
+   throw new InvalidFieldReferenceException("Cannot reference 
field by position on " + this.toString()
+   + "Referencing a field by position is supported on 
tuples, case classes, and arrays. "
+   + "Additionally, you can select the 0th field of a 
primitive/basic type (e.g. int).");
+   }
+
+   /**
+* Creates a {@link FieldAccessor} for the field that is given by a 
field expression,
+* which can be used to get and set the specified field on instances of 
this type.
+*
+* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;>
+* Defining keys using Field Expressions
+*
+* @param field The field expression
+* @param config Configuration object
+* @param  The type of the field to access
+* @return The created FieldAccessor
+*/
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String field, 
ExecutionConfig config) {
+   throw new InvalidFieldReferenceException("Cannot reference 
field by field expression on " + this.toString()
+   + "Field expressions are only supported on POJO 
types, tuples, and case classes. "
+   + "For the requirements for a type to be 
considered a POJO, see "
+   + 
"https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#pojos;);
--- End diff --

I would also remove this link.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432436#comment-15432436
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75826929
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
 ---
@@ -171,6 +172,23 @@ public boolean isKeyType() {
}
}
 
+   @Override
+   @PublicEvolving
+   @SuppressWarnings("unchecked")
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   if(pos != 0) {
+   throw new InvalidFieldReferenceException("Not the 0th 
field selected for a basic type.");
+   }
+   return (FieldAccessor) new 
FieldAccessor.SimpleFieldAccessor(this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String field, 
ExecutionConfig config) {
+   throw new InvalidFieldReferenceException("Field expressions are 
not supported on basic types."
--- End diff --

What about using `*` or also `0` here and get rid of the exception?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432430#comment-15432430
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2094#discussion_r75826683
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
 ---
@@ -121,6 +122,18 @@ public boolean isKeyType() {
}
 
@Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(int pos, 
ExecutionConfig config) {
+   return new FieldAccessor.ArrayFieldAccessor(pos, this);
+   }
+
+   @Override
+   @PublicEvolving
+   public  FieldAccessor getFieldAccessor(String field, 
ExecutionConfig config) {
+   throw new InvalidFieldReferenceException("Selecting a field 
from an array type is only supported by index.");
--- End diff --

We could also add an additional constructor in `ArrayFieldAccessor` that 
takes a string and parses the int position there. Then we don't need to throw 
exceptions. What do you think?


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421042#comment-15421042
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
Thanks!


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421039#comment-15421039
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2094
  
I will shepherd this PR. I think I can review it this week.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-06-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326578#comment-15326578
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

GitHub user ggevay opened a pull request:

https://github.com/apache/flink/pull/2094

[FLINK-3702] Make FieldAccessors support nested field expressions.

I finally had some time to complete this, sorry it took so long.

I have added `getFieldAccessor` to `TypeInformation`, which creates a 
`FieldAccessor` from a position or a (possibly nested) field expression. It 
uses recursion in the nested case, and also supports the heterogeneous case, 
e.g. pojo inside tuple inside pojo.

Additionally, I have noticed that the code to serialize/deserialize a 
`java.lang.reflect.Field` is duplicated at several places 
(`readObject`/`writeObject` of `PojoSerializer`, `PojoComparator`, 
`PojoField`). I also needed it in `PojoFieldAccessor`, so instead of adding 
more duplication, I factored it out into a new class (`FieldSerializer`).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ggevay/flink FieldAccessorRefactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2094.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2094


commit e8b584791144a2e9a22e39242010726fc76ae2a3
Author: Gabor Gevay 
Date:   2016-05-22T17:48:50Z

[FLINK-3702] Make FieldAccessors support nested field expressions.




> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-05-22 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295698#comment-15295698
 ] 

Gabor Gevay commented on FLINK-3702:


This turned out to be more complicated than I thought, but I'm 80% done. I'll 
try to wrap it up in the next few days.

I'm refactoring FieldAccessor, so that it will be created from a method of the 
TypeInfos, instead of the static method FieldAccessor.create. This makes it 
possible to process the outermost part of a field expression and then recurse 
into the specified field's TypeInfo with the rest of the field expression.

> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-05-22 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295621#comment-15295621
 ] 

Gabor Gevay commented on FLINK-3702:


I'll try to quickly do it now.

> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-05-22 Thread Rami (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295502#comment-15295502
 ] 

Rami commented on FLINK-3702:
-

This feature would really make a difference in our code. most of the events we 
are having needs count, and many of those events share the nested rawevent 
pojo, if we could have the count field only in the nested rawevent pojo , it 
would be much cleaner code and easier to understand rather than having the 
count field duplicated in the event and it's nested rawevent. Any ETA?

> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)