[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15694237#comment-15694237 ] ASF GitHub Bot commented on FLINK-3702: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2094 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15693171#comment-15693171 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2094 Merging... > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676177#comment-15676177 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88617516 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @Internal + public staticFieldAccessor getAccessor(TypeInformation typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation[] types = new TypeInformation[tupleTypeInfo.getArity()]; + for (int i = 0; i < types.length; i++) { + types[i] = tupleTypeInfo.getTypeAt(i); + } + return new FieldAccessor.ProductFieldAccessor<>( --- End diff -- Forgotten, adding it. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676146#comment-15676146 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88616007 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @Internal + public staticFieldAccessor getAccessor(TypeInformation typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation[] types = new TypeInformation[tupleTypeInfo.getArity()]; + for (int i = 0; i < types.length; i++) { + types[i] = tupleTypeInfo.getTypeAt(i); + } + return new FieldAccessor.ProductFieldAccessor<>( + pos, typeInfo, new FieldAccessor.SimpleFieldAccessor<>((TypeInformation)types[pos]), config); + + // In case of tuples + } else if (typeInfo.isTupleType()) { + return new FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo); + + // Default case, PojoType is directed to this statement + } else { + throw new CompositeType.InvalidFieldReferenceException("Cannot reference field by position on " + typeInfo.toString() +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676144#comment-15676144 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88615926 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @Internal + public staticFieldAccessor getAccessor(TypeInformation typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation[] types = new TypeInformation[tupleTypeInfo.getArity()]; --- End diff -- That is definitely more convenient, missed that. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676137#comment-15676137 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88615630 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record The record on which the field will be accessed +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // -- + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + final static class
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676123#comment-15676123 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88614862 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record The record on which the field will be accessed +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // -- + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + final static class
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676124#comment-15676124 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88614874 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record The record on which the field will be accessed +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // -- + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + final static class
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676121#comment-15676121 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88614846 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving --- End diff -- Good catch, reverting. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649139#comment-15649139 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87100173 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @Internal + public staticFieldAccessor getAccessor(TypeInformation typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation[] types = new TypeInformation[tupleTypeInfo.getArity()]; + for (int i = 0; i < types.length; i++) { + types[i] = tupleTypeInfo.getTypeAt(i); + } + return new FieldAccessor.ProductFieldAccessor<>( --- End diff -- In a previous review I suggested to add a `SimpleProductFieldAccessor` similar to the `SimpleTupleFieldAccessor`. Did you forget to add it or do you think it is not necessary? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649137#comment-15649137 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87093199 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving --- End diff -- `FieldAccessor` was `@Internal` before. Why did you make it `@PublicEvolving`? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649136#comment-15649136 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87094727 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record The record on which the field will be accessed +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // -- + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + final static class
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649144#comment-15649144 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87096436 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -337,37 +337,43 @@ public KeyedStream(DataStream dataStream, KeySelectorkeySelector, Ty * per key. * * @param positionToSum -*The position in the data point to sum +*The field position in the data points to sum. This is applicable to +*Tuple types, basic and primitive array types, Scala case classes, --- End diff -- Not sure if we should document the behavior that we might remove soon. I'd rather have this as undocumented feature and open a JIRA issue to either add the documentation or remove the support for arrays and `0` for atomic types. What do you think @mbalassi? Same applies for all other Java / ScalaDocs. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649140#comment-15649140 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87099565 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @Internal + public staticFieldAccessor getAccessor(TypeInformation typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation[] types = new TypeInformation[tupleTypeInfo.getArity()]; --- End diff -- Why do you fetch all field types in an array? You can do `tupleTypeInfo.getTypeAt(pos)` instead of `types[pos]`, no? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. --
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649138#comment-15649138 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87102206 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @Internal + public staticFieldAccessor getAccessor(TypeInformation typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation[] types = new TypeInformation[tupleTypeInfo.getArity()]; + for (int i = 0; i < types.length; i++) { + types[i] = tupleTypeInfo.getTypeAt(i); + } + return new FieldAccessor.ProductFieldAccessor<>( + pos, typeInfo, new FieldAccessor.SimpleFieldAccessor<>((TypeInformation)types[pos]), config); + + // In case of tuples + } else if (typeInfo.isTupleType()) { + return new FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo); + + // Default case, PojoType is directed to this statement + } else { + throw new CompositeType.InvalidFieldReferenceException("Cannot reference field by position on " + typeInfo.toString() +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649142#comment-15649142 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87101256 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @Internal + public staticFieldAccessor getAccessor(TypeInformation typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation[] types = new TypeInformation[tupleTypeInfo.getArity()]; + for (int i = 0; i < types.length; i++) { + types[i] = tupleTypeInfo.getTypeAt(i); + } + return new FieldAccessor.ProductFieldAccessor<>( + pos, typeInfo, new FieldAccessor.SimpleFieldAccessor<>((TypeInformation)types[pos]), config); + + // In case of tuples + } else if (typeInfo.isTupleType()) { + return new FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo); + + // Default case, PojoType is directed to this statement + } else { + throw new CompositeType.InvalidFieldReferenceException("Cannot reference field by position on " + typeInfo.toString() +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15641702#comment-15641702 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2094 Thanks @fhueske. I am happy to squash the commits and merge the PR given that you are satisfied with it. I would like to merge it by mid next week, so I could proceed with adding the necessary final touches to @Xazax-hun's PR on the serializer code generation during the end of the week. I would love to have that issue also wrapped up. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15635754#comment-15635754 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2094 Thanks for the update @mbalassi. Will have a look at the PR next week. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633099#comment-15633099 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2094 As @fhueske and @StephanEwen has suggested I have refactored @ggevay's code and now instead of adding two methods to the `TypeInformation` I have moved the functionality to a `FieldAccessorFactory` class. To achieve this I had to move the code back to `flink-streaming-java` as the static factory needs to import `scala.Product` to implement the functionality for case classes. Given that a scala dependency is better avoided at this point in `flink-core` and `flink-streaming` already has the scala dependency I felt the most comfortable with this approach. If we decide to use the `FieldAccessor`s in other parts of the system we might need to revert back to Gabor's previous approach based on inheritance instead of the current static monstrosity, so that we can separate the scala utilities into their respective projects. I have also addressed Fabian's inline comments. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633036#comment-15633036 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r86366630 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -264,6 +265,7 @@ public void getFlatFields(String fieldExpression, int offset, List DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603086#comment-15603086 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84765869 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving --- End diff -- `FieldAccessor` was `@Internal` before. Why changing it to `@PublicEvolving`? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603085#comment-15603085 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84767836 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java --- @@ -265,16 +265,6 @@ public boolean hasDeterministicFieldOrder() { @PublicEvolving public abstract int getFieldIndex(String fieldName); - @PublicEvolving - public static class InvalidFieldReferenceException extends IllegalArgumentException { --- End diff -- I would not move this exception. It is not strictly required and causes many necessary changes. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603084#comment-15603084 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84768233 --- Diff: flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.FieldAccessor; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public final class ProductFieldAccessorextends FieldAccessor { --- End diff -- I think it makes sense to add a simple (non-nested) field accessor for `Product`. Case classes are very commonly used in Flink Scala programs. So this might pay off. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603088#comment-15603088 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84767676 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java --- @@ -203,7 +206,34 @@ public void getFlatFields(String fieldExpression, int offset, List typed = (TypeInformation) this.types[pos]; return typed; } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.SimpleTupleFieldAccessor (pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String fieldExpression, ExecutionConfig config) { + FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + int fieldPos = this.getFieldIndex(decomp.head); + if (fieldPos == -1) { + try { + fieldPos = Integer.parseInt(decomp.head); + } catch (NumberFormatException ex) { + throw new InvalidFieldReferenceException("Tried to select field \"" + decomp.head --- End diff -- Exception does not tell user that only integer values are allowed. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603093#comment-15603093 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84767443 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record The record on which the field will be accessed +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // -- + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603091#comment-15603091 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84768779 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1ns = tpeInfo.getFieldAccessor("1", null); + assertEquals(10, (int) f1ns.get(t)); + assertEquals(10, (int) t.f1); + t = f1ns.set(t, 11); + assertEquals(11, (int) f1ns.get(t)); + assertEquals(11, (int) f1.get(t)); + assertEquals(11, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + // This is technically valid (the ".0" is selecting the 0th field of a basic type). + FieldAccessor , String> f0_0 = tpeInfo.getFieldAccessor("f0.0", null); + assertEquals("b", f0_0.get(t)); + assertEquals("b", t.f0); + t = f0_0.set(t, "cc"); + assertEquals("cc", f0_0.get(t)); + assertEquals("cc", t.f0); + + try { + FieldAccessor , String> bad = tpeInfo.getFieldAccessor("almafa", null); + assertFalse("Expected exception, because of bad field name", false); --- End diff -- Can we use `@Test(expected = InvalidFieldReferenceException)` instead of catching an
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603092#comment-15603092 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84769097 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1ns = tpeInfo.getFieldAccessor("1", null); + assertEquals(10, (int) f1ns.get(t)); + assertEquals(10, (int) t.f1); + t = f1ns.set(t, 11); + assertEquals(11, (int) f1ns.get(t)); + assertEquals(11, (int) f1.get(t)); + assertEquals(11, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + // This is technically valid (the ".0" is selecting the 0th field of a basic type). + FieldAccessor , String> f0_0 = tpeInfo.getFieldAccessor("f0.0", null); + assertEquals("b", f0_0.get(t)); + assertEquals("b", t.f0); + t = f0_0.set(t, "cc"); + assertEquals("cc", f0_0.get(t)); + assertEquals("cc", t.f0); + + try { + FieldAccessor , String> bad = tpeInfo.getFieldAccessor("almafa", null); + assertFalse("Expected exception, because of bad field name", false); + } catch (InvalidFieldReferenceException ex) { + // OK + } + }
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603087#comment-15603087 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84767248 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record The record on which the field will be accessed +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // -- + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15603090#comment-15603090 ] ASF GitHub Bot commented on FLINK-3702: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r84766789 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -264,6 +265,7 @@ public void getFlatFields(String fieldExpression, int offset, List DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548291#comment-15548291 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 I have rebased now to the current master. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548239#comment-15548239 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2094 @mbalassi any news on this PR? Would you like to review it again and merge it? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493517#comment-15493517 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 > As for this pull request I suggest to update the false Javadoc from the streaming API and document in the concepts that the aggregation field accessors and the keyselectors are slightly different due to there semantics. I've fixed the javadoc now in 154352ba879350bb1ab10b799456a67eb5819375. As for the differences between the key selectors and aggregations (which use `FieldAccessor`): I tried to consolidate the supported field expressions between the two in 7c2dceeb87b06963cc2618312f5f1039387003ad. Now the aggregations should accept any field expression that selects a single field and would be a valid key selector, and some more (selecting array elements). As for documenting that the aggregations don't work on a field that is still some composite type, I'm not sure if this is necessary, since this should be pretty obvious to the user. > The batch API aggregations do not have the overloaded version where you could pass a field accessor string. Here is the Jira for this: https://issues.apache.org/jira/browse/FLINK-4575 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15490154#comment-15490154 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2094 We have gone through the changes with @ggevay and found that the documentation is lacking or misleading in a couple of cases. 1. The DataStream API JavaDocs mention that one can aggregate on expressions including Java Bean-like getters which is not true regardless of this change. [1] 2. I think this is due to the confusion between the `KeySelectors` produced by the `Keys.ExpressionKeys` and the String expressions that we can pass to an aggregation operator. E.g. one of the distinctions between the two options are that the former can return a `CompositeType`, while we forbid that to happen in the case of the latter as that could cause the aggregation itself to fail. 3. The batch API aggregations do not have the overloaded version where you could pass a field accessor string. Of course this is just a convenience method anyway, which is easily implemented on the user side, e.g. as it is done in the `WordCountPojo` example. [2] As for this pull request I suggest to update the false Javadoc from the streaming API and document in the concepts that the aggregation field accessors and the keyselectors are slightly different due to there semantics. Other than this observation I like the changes and would like to have them in by the end of next week. [1] https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java#L295-L310 [2] https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java#L106-L111 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463156#comment-15463156 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 https://issues.apache.org/jira/browse/FLINK-4575 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463135#comment-15463135 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 @rmetzger Thanks for looking at it. > Is there a reason why the DataSet methods for "sum()" for example do not allow to aggregate on POJOs with this change? The reason is just that i didn't have time to do it. But I think it could be done in a few hours. I'll open a JIRA. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463094#comment-15463094 ] ASF GitHub Bot commented on FLINK-3702: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2094 Thank you for your work @ggevay, and the reviews @twalthr. Is there a reason why the DataSet methods for "sum()" for example do not allow to aggregate on POJOs with this change? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445332#comment-15445332 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76576754 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Tuple3 > f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445334#comment-15445334 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76576801 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + // This is technically valid (the ".0" is selecting the 0th field of a basic type). + FieldAccessor , String> f0_0 = tpeInfo.getFieldAccessor("f0.0", null); + assertEquals("b", f0_0.get(t)); + assertEquals("b", t.f0); + t = f0_0.set(t, "cc"); + assertEquals("cc", f0_0.get(t)); + assertEquals("cc", t.f0); + + try { + FieldAccessor , String> bad = tpeInfo.getFieldAccessor("almafa", null); + assertFalse("Expected exception, because of bad field name", false); + } catch (InvalidFieldReferenceException ex) { + // OK + } + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 =
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445240#comment-15445240 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2094 Thanks for updating the PR @ggevay. Beside some small comments the PR looks good to merge. I can fix them when merging. @rmetzger you opened the Jira issue. Do you also want to take a final look at it? A second person should have a look at it as it touches important classes. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445167#comment-15445167 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76567175 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Tuple3 > f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445170#comment-15445170 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76567285 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + // This is technically valid (the ".0" is selecting the 0th field of a basic type). + FieldAccessor , String> f0_0 = tpeInfo.getFieldAccessor("f0.0", null); + assertEquals("b", f0_0.get(t)); + assertEquals("b", t.f0); + t = f0_0.set(t, "cc"); + assertEquals("cc", f0_0.get(t)); + assertEquals("cc", t.f0); + + try { + FieldAccessor , String> bad = tpeInfo.getFieldAccessor("almafa", null); + assertFalse("Expected exception, because of bad field name", false); + } catch (InvalidFieldReferenceException ex) { + // OK + } + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 =
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434772#comment-15434772 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 @twalthr I think I have addressed all your comments. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434766#comment-15434766 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040669 --- Diff: flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java --- @@ -139,17 +139,6 @@ public void flatMap(Data value, Collector out) throws Exception { } - /** -* As per FLINK-3702 Flink doesn't support nested pojo fields for sum() -*/ - @Test(expected = IllegalArgumentException.class) --- End diff -- 48be8d50dc6538f25605daf8f7373c39306e79d9 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434764#comment-15434764 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040657 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala --- @@ -236,6 +237,31 @@ abstract class CaseClassTypeInfo[T <: Product]( } } + override def getFieldAccessor[F](pos: Int, config: ExecutionConfig): FieldAccessor[T, F] = { +new ProductFieldAccessor[T,F,F]( + pos, this, new SimpleFieldAccessor[F](types(pos).asInstanceOf[TypeInformation[F]]), config) + } + + override def getFieldAccessor[F](fieldExpression: String, config: ExecutionConfig): +FieldAccessor[T, F] = { + +val decomp = FieldAccessor.decomposeFieldExpression(fieldExpression) + +val pos = getFieldIndex(decomp.head) +if(pos == -2) { --- End diff -- 5db003ba72496bb638f3ebe38f864ba978bb0887 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434765#comment-15434765 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040664 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -301,12 +301,12 @@ public KeyedStream(DataStream dataStream, KeySelectorkeySelector, Ty } /** -* Applies an aggregation that that gives the current sum of the pojo data +* Applies an aggregation that gives the current sum of the pojo data --- End diff -- b461f49e43b1bb397284c3bbbcd3830e44aaf8b3 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434762#comment-15434762 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040630 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434761#comment-15434761 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040621 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Tuple3 > f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434751#comment-15434751 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040520 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java --- @@ -160,6 +161,53 @@ public boolean isSortKeyType() { @PublicEvolving public abstract TypeSerializer createSerializer(ExecutionConfig config); + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config){ + throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + + /** +* Creates a {@link FieldAccessor} for the field that is given by a field expression, +* which can be used to get and set the specified field on instances of this type. +* +* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;> +* Defining keys using Field Expressions +* +* @param field The field expression +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Cannot reference field by field expression on " + this.toString() + + "Field expressions are only supported on POJO types, tuples, and case classes. " + + "For the requirements for a type to be considered a POJO, see " + + "https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#pojos;); + } + + @PublicEvolving + public static class InvalidFieldReferenceException extends IllegalArgumentException { --- End diff -- 3b770b27c938015fedeb9e76661fd0393bdbf566 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434750#comment-15434750 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040516 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java --- @@ -160,6 +161,53 @@ public boolean isSortKeyType() { @PublicEvolving public abstract TypeSerializer createSerializer(ExecutionConfig config); + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config){ + throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + + /** +* Creates a {@link FieldAccessor} for the field that is given by a field expression, +* which can be used to get and set the specified field on instances of this type. +* +* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;> +* Defining keys using Field Expressions +* +* @param field The field expression +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Cannot reference field by field expression on " + this.toString() + + "Field expressions are only supported on POJO types, tuples, and case classes. " + + "For the requirements for a type to be considered a POJO, see " + + "https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#pojos;); --- End diff -- f0bdc99a33612b5326f134daf8f36e2c4553fe3a > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434755#comment-15434755 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040551 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434759#comment-15434759 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040571 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java --- @@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int offset, List typed = (TypeInformation) this.types[pos]; return typed; } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.SimpleTupleFieldAccessor (pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String fieldExpression, ExecutionConfig config) { + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + int FieldPos = this.getFieldIndex(decomp.head); --- End diff -- 5be08372611ed7c7e2fa83395f07c5ab1193f4fc 22326c7aea0c8f350d3453eef97df92382ee87f1 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434752#comment-15434752 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040529 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record --- End diff -- 4129d1d2a30e602d21e81e833c95490c23dab06d > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434763#comment-15434763 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040641 --- Diff: flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.FieldAccessor; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +public final class ProductFieldAccessorextends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final TupleSerializerBase serializer; + private final Object[] fields; + private final int length; + private final FieldAccessor innerAccessor; + + ProductFieldAccessor(int pos, TypeInformation typeInfo, FieldAccessor innerAccessor, ExecutionConfig config) { + this.pos = pos; --- End diff -- 22326c7aea0c8f350d3453eef97df92382ee87f1 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434760#comment-15434760 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040578 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java --- @@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int offset, List typed = (TypeInformation) this.types[pos]; return typed; } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.SimpleTupleFieldAccessor (pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String fieldExpression, ExecutionConfig config) { + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + int FieldPos = this.getFieldIndex(decomp.head); + if(decomp.tail == null) { + return new FieldAccessor.SimpleTupleFieldAccessor (FieldPos, this); + } else { + FieldAccessor
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434754#comment-15434754 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040541 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434753#comment-15434753 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040532 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == --- End diff -- 8f191a91d202c6e25d8e0ccf8e68c4d39c397315 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434757#comment-15434757 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040561 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -317,7 +308,39 @@ public int getFieldIndex(String fieldName) { return new PojoSerializer(getTypeClass(), fieldSerializers, reflectiveFields, config); } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(String fieldExpression, ExecutionConfig config) { + + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); --- End diff -- de7a68598d0933ee4016ecb87d27c46eb8011cf4 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434758#comment-15434758 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040566 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -317,7 +308,39 @@ public int getFieldIndex(String fieldName) { return new PojoSerializer(getTypeClass(), fieldSerializers, reflectiveFields, config); } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(String fieldExpression, ExecutionConfig config) { + + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + + // get field + PojoField field = null; + TypeInformation fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].getField().getName().equals(decomp.head)) { + field = fields[i]; + fieldType = fields[i].getTypeInformation(); + break; + } + } + if (field == null) { + throw new InvalidFieldReferenceException("Unable to find field \""+decomp.head+"\" in type "+this+"."); + } + + if(decomp.tail == null) { --- End diff -- a0939f7762fa5ac71e5975a36fc31c39fd0cf5dd > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434748#comment-15434748 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040498 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java --- @@ -171,6 +172,23 @@ public boolean isKeyType() { } } + @Override + @PublicEvolving + @SuppressWarnings("unchecked") + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + if(pos != 0) { + throw new InvalidFieldReferenceException("Not the 0th field selected for a basic type."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor(this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Field expressions are not supported on basic types." --- End diff -- 155afcb56d0f87c888e49ecd9f8920d28a4514ca > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434747#comment-15434747 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040491 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java --- @@ -121,6 +122,18 @@ public boolean isKeyType() { } @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.ArrayFieldAccessor (pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Selecting a field from an array type is only supported by index."); --- End diff -- b9a6012bf349afa83a27ac8c9c7b7af660ee8655 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434749#comment-15434749 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76040502 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java --- @@ -160,6 +161,53 @@ public boolean isSortKeyType() { @PublicEvolving public abstract TypeSerializer createSerializer(ExecutionConfig config); + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config){ + throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + + /** +* Creates a {@link FieldAccessor} for the field that is given by a field expression, +* which can be used to get and set the specified field on instances of this type. +* +* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;> --- End diff -- f0bdc99a33612b5326f134daf8f36e2c4553fe3a > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434590#comment-15434590 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r76024415 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433202#comment-15433202 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75908195 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Tuple3 > f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433196#comment-15433196 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75907421 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Tuple3 > f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432951#comment-15432951 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 @twalthr, thank you for the comments, I'll try to address them soon. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432945#comment-15432945 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 > I am a bit out of the loop here. Why do we need a field accessor in the TypeInformation? We need this for situations when the user specifies a nested field expression, which goes through different types. (E.g. `.sum("foo.f1.bar")` on a POJO, where the field `foo` is a Tuple, and `f1` is a POJO again.) In this case, getFieldAccessor can recursively call itself on the TypeInformation of the field, which will be a virtual call that goes to the override for the field's type. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432912#comment-15432912 ] ASF GitHub Bot commented on FLINK-3702: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2094 I am a bit out of the loop here. Why do we need a field accessor in the TypeInformation? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432896#comment-15432896 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2094 Thanks for the contribution and your patience @ggevay. I added some line comments. But in general it looks good. @StephanEwen is it ok to add an `TypeInformation.getFieldAccessor()` method to `TypeInformation`? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432886#comment-15432886 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75876606 --- Diff: flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java --- @@ -139,17 +139,6 @@ public void flatMap(Data value, Collector out) throws Exception { } - /** -* As per FLINK-3702 Flink doesn't support nested pojo fields for sum() -*/ - @Test(expected = IllegalArgumentException.class) --- End diff -- I think it would be useful to keep this and `testNestedPojoFieldAccessor`. So that we also have an end-to-end test. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432811#comment-15432811 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75868407 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -301,12 +301,12 @@ public KeyedStream(DataStream dataStream, KeySelectorkeySelector, Ty } /** -* Applies an aggregation that that gives the current sum of the pojo data +* Applies an aggregation that gives the current sum of the pojo data --- End diff -- I would rewrite the entire description. This method does not only process "pojo data" but also tuples etc. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432577#comment-15432577 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75839040 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala --- @@ -236,6 +237,31 @@ abstract class CaseClassTypeInfo[T <: Product]( } } + override def getFieldAccessor[F](pos: Int, config: ExecutionConfig): FieldAccessor[T, F] = { +new ProductFieldAccessor[T,F,F]( + pos, this, new SimpleFieldAccessor[F](types(pos).asInstanceOf[TypeInformation[F]]), config) + } + + override def getFieldAccessor[F](fieldExpression: String, config: ExecutionConfig): +FieldAccessor[T, F] = { + +val decomp = FieldAccessor.decomposeFieldExpression(fieldExpression) + +val pos = getFieldIndex(decomp.head) +if(pos == -2) { --- End diff -- Would `-1` be a valid index? I think this should be `< 0`. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432566#comment-15432566 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75837546 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432557#comment-15432557 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75836333 --- Diff: flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.FieldAccessor; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +public final class ProductFieldAccessorextends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final TupleSerializerBase serializer; + private final Object[] fields; + private final int length; + private final FieldAccessor innerAccessor; + + ProductFieldAccessor(int pos, TypeInformation typeInfo, FieldAccessor innerAccessor, ExecutionConfig config) { + this.pos = pos; --- End diff -- As I said above, some checking of parameters would not harm. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432553#comment-15432553 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75835835 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432539#comment-15432539 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75835061 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Tuple3 > f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432533#comment-15432533 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75834786 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Tuple3 > f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432535#comment-15432535 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75834833 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2t = Tuple2.of("aa", 5); + TupleTypeInfo > tpeInfo = + (TupleTypeInfo >) TypeExtractor.getForObject(t); + + FieldAccessor , String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor , Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor , Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + } + + @Test + public void testTupleInTuple() { + Tuple2 > t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo >> tpeInfo = + (TupleTypeInfo >>)TypeExtractor.getForObject(t); + + FieldAccessor >, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor >, Tuple3 > f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432519#comment-15432519 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75833983 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java --- @@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int offset, List typed = (TypeInformation) this.types[pos]; return typed; } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.SimpleTupleFieldAccessor (pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String fieldExpression, ExecutionConfig config) { + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + int FieldPos = this.getFieldIndex(decomp.head); + if(decomp.tail == null) { + return new FieldAccessor.SimpleTupleFieldAccessor (FieldPos, this); + } else { + FieldAccessor
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432513#comment-15432513 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75833200 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java --- @@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int offset, List typed = (TypeInformation) this.types[pos]; return typed; } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.SimpleTupleFieldAccessor (pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String fieldExpression, ExecutionConfig config) { + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + int FieldPos = this.getFieldIndex(decomp.head); --- End diff -- `getFieldIndex()` can return -1. This should be handled somewhere. I think the constructors of all accessors should check their input parameters (also for null etc.). > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432502#comment-15432502 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75832360 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java --- @@ -203,7 +205,26 @@ public void getFlatFields(String fieldExpression, int offset, List typed = (TypeInformation) this.types[pos]; return typed; } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.SimpleTupleFieldAccessor (pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String fieldExpression, ExecutionConfig config) { + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + int FieldPos = this.getFieldIndex(decomp.head); --- End diff -- Should not be upper case. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432497#comment-15432497 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75831968 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -317,7 +308,39 @@ public int getFieldIndex(String fieldName) { return new PojoSerializer(getTypeClass(), fieldSerializers, reflectiveFields, config); } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(String fieldExpression, ExecutionConfig config) { + + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + + // get field + PojoField field = null; + TypeInformation fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].getField().getName().equals(decomp.head)) { + field = fields[i]; + fieldType = fields[i].getTypeInformation(); + break; + } + } + if (field == null) { + throw new InvalidFieldReferenceException("Unable to find field \""+decomp.head+"\" in type "+this+"."); + } + + if(decomp.tail == null) { --- End diff -- It would great if you could document that tail can be null in `FieldExpressionDecomposition`. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432487#comment-15432487 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75830865 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -317,7 +308,39 @@ public int getFieldIndex(String fieldName) { return new PojoSerializer(getTypeClass(), fieldSerializers, reflectiveFields, config); } - + + @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(String fieldExpression, ExecutionConfig config) { + + FieldAccessor.FieldExpressionDecomposition decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); --- End diff -- What about renaming `FieldExpressionDecomposition` to just `FieldExpression`? IMHO the old class name is too long. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432467#comment-15432467 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75829026 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432464#comment-15432464 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75828736 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == + + + /** +* This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a +* field of a POJO that is itself some composite type but is not further decomposed) +*/ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { +
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432461#comment-15432461 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75828673 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record +* @return The value of the field. +*/ + public abstract F get(T record); + + /** +* Sets the field (specified in the constructor) of the given record to the given value. +* +* Warning: This might modify the original object, or might return a new object instance. +* (This is necessary, because the record might be immutable.) +* +* @param record The record to modify +* @param fieldValue The new value of the field +* @return A record that has the given field value. (this might be a new instance or the original) +*/ + public abstract T set(T record, F fieldValue); + + + // == --- End diff -- Usually we always use `` for logical code sections. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432455#comment-15432455 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75828177 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessorimplements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** +* Gets the TypeInformation for the type of the field. +* Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). +*/ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** +* Gets the value of the field (specified in the constructor) of the given record. +* @param record --- End diff -- My IDE shows a warning here because of missing documentation. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432447#comment-15432447 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75827568 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java --- @@ -160,6 +161,53 @@ public boolean isSortKeyType() { @PublicEvolving public abstract TypeSerializer createSerializer(ExecutionConfig config); + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config){ + throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + + /** +* Creates a {@link FieldAccessor} for the field that is given by a field expression, +* which can be used to get and set the specified field on instances of this type. +* +* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;> +* Defining keys using Field Expressions +* +* @param field The field expression +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Cannot reference field by field expression on " + this.toString() + + "Field expressions are only supported on POJO types, tuples, and case classes. " + + "For the requirements for a type to be considered a POJO, see " + + "https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#pojos;); + } + + @PublicEvolving + public static class InvalidFieldReferenceException extends IllegalArgumentException { --- End diff -- Can you move this in a separate file? We should keep `TypeInformation` clear. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432439#comment-15432439 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75827289 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java --- @@ -160,6 +161,53 @@ public boolean isSortKeyType() { @PublicEvolving public abstract TypeSerializer createSerializer(ExecutionConfig config); + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config){ + throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + + /** +* Creates a {@link FieldAccessor} for the field that is given by a field expression, +* which can be used to get and set the specified field on instances of this type. +* +* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;> --- End diff -- Don't use links to the documentation. They change over time and become invalid. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432442#comment-15432442 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75827397 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java --- @@ -160,6 +161,53 @@ public boolean isSortKeyType() { @PublicEvolving public abstract TypeSerializer createSerializer(ExecutionConfig config); + + /** +* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set +* the specified field on instances of this type. +* +* @param pos The field position (zero-based) +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config){ + throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + + /** +* Creates a {@link FieldAccessor} for the field that is given by a field expression, +* which can be used to get and set the specified field on instances of this type. +* +* @see https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-using-field-expressions;> +* Defining keys using Field Expressions +* +* @param field The field expression +* @param config Configuration object +* @param The type of the field to access +* @return The created FieldAccessor +*/ + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Cannot reference field by field expression on " + this.toString() + + "Field expressions are only supported on POJO types, tuples, and case classes. " + + "For the requirements for a type to be considered a POJO, see " + + "https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#pojos;); --- End diff -- I would also remove this link. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432436#comment-15432436 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75826929 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java --- @@ -171,6 +172,23 @@ public boolean isKeyType() { } } + @Override + @PublicEvolving + @SuppressWarnings("unchecked") + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + if(pos != 0) { + throw new InvalidFieldReferenceException("Not the 0th field selected for a basic type."); + } + return (FieldAccessor ) new FieldAccessor.SimpleFieldAccessor(this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Field expressions are not supported on basic types." --- End diff -- What about using `*` or also `0` here and get rid of the exception? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432430#comment-15432430 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r75826683 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java --- @@ -121,6 +122,18 @@ public boolean isKeyType() { } @Override + @PublicEvolving + public FieldAccessorgetFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.ArrayFieldAccessor (pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Selecting a field from an array type is only supported by index."); --- End diff -- We could also add an additional constructor in `ArrayFieldAccessor` that takes a string and parses the int position there. Then we don't need to throw exceptions. What do you think? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421042#comment-15421042 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 Thanks! > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421039#comment-15421039 ] ASF GitHub Bot commented on FLINK-3702: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2094 I will shepherd this PR. I think I can review it this week. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326578#comment-15326578 ] ASF GitHub Bot commented on FLINK-3702: --- GitHub user ggevay opened a pull request: https://github.com/apache/flink/pull/2094 [FLINK-3702] Make FieldAccessors support nested field expressions. I finally had some time to complete this, sorry it took so long. I have added `getFieldAccessor` to `TypeInformation`, which creates a `FieldAccessor` from a position or a (possibly nested) field expression. It uses recursion in the nested case, and also supports the heterogeneous case, e.g. pojo inside tuple inside pojo. Additionally, I have noticed that the code to serialize/deserialize a `java.lang.reflect.Field` is duplicated at several places (`readObject`/`writeObject` of `PojoSerializer`, `PojoComparator`, `PojoField`). I also needed it in `PojoFieldAccessor`, so instead of adding more duplication, I factored it out into a new class (`FieldSerializer`). You can merge this pull request into a Git repository by running: $ git pull https://github.com/ggevay/flink FieldAccessorRefactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2094.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2094 commit e8b584791144a2e9a22e39242010726fc76ae2a3 Author: Gabor GevayDate: 2016-05-22T17:48:50Z [FLINK-3702] Make FieldAccessors support nested field expressions. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295698#comment-15295698 ] Gabor Gevay commented on FLINK-3702: This turned out to be more complicated than I thought, but I'm 80% done. I'll try to wrap it up in the next few days. I'm refactoring FieldAccessor, so that it will be created from a method of the TypeInfos, instead of the static method FieldAccessor.create. This makes it possible to process the outermost part of a field expression and then recurse into the specified field's TypeInfo with the rest of the field expression. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295621#comment-15295621 ] Gabor Gevay commented on FLINK-3702: I'll try to quickly do it now. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295502#comment-15295502 ] Rami commented on FLINK-3702: - This feature would really make a difference in our code. most of the events we are having needs count, and many of those events share the nested rawevent pojo, if we could have the count field only in the nested rawevent pojo , it would be much cleaner code and easier to understand rather than having the count field duplicated in the event and it's nested rawevent. Any ETA? > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)