[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649138#comment-15649138 ]
ASF GitHub Bot commented on FLINK-3702: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87102206 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** + * Creates a {@link FieldAccessor} for the given field position, which can be used to get and set + * the specified field on instances of this type. + * + * @param pos The field position (zero-based) + * @param config Configuration object + * @param <F> The type of the field to access + * @return The created FieldAccessor + */ + @Internal + public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T> typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor<T, F>) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation<?>[] types = new TypeInformation<?>[tupleTypeInfo.getArity()]; + for (int i = 0; i < types.length; i++) { + types[i] = tupleTypeInfo.getTypeAt(i); + } + return new FieldAccessor.ProductFieldAccessor<>( + pos, typeInfo, new FieldAccessor.SimpleFieldAccessor<>((TypeInformation<F>)types[pos]), config); + + // In case of tuples + } else if (typeInfo.isTupleType()) { + return new FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo); + + // Default case, PojoType is directed to this statement + } else { + throw new CompositeType.InvalidFieldReferenceException("Cannot reference field by position on " + typeInfo.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + } + + /** + * Creates a {@link FieldAccessor} for the field that is given by a field expression, + * which can be used to get and set the specified field on instances of this type. + * + * @param field The field expression + * @param config Configuration object + * @param <F> The type of the field to access + * @return The created FieldAccessor + */ + @Internal + public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T> typeInfo, String field, ExecutionConfig config) { + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + try { + return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(field), typeInfo); + } catch (NumberFormatException ex) { + throw new CompositeType.InvalidFieldReferenceException + ("A field expression on an array must be an integer index (that might be given as a string)."); + } + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + try { + int pos = field.equals("*") ? 0 : Integer.parseInt(field); + return FieldAccessorFactory.getAccessor(typeInfo, pos, config); + } catch (NumberFormatException ex) { + throw new CompositeType.InvalidFieldReferenceException("You tried to select the field \"" + field + + "\" on a " + typeInfo.toString() + ". A field expression on a basic type can only be \"*\" or \"0\"" + + " (both of which mean selecting the entire basic type)."); + } + + // In case of Pojos + } else if (typeInfo instanceof PojoTypeInfo) { + FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(field); + + // get field + PojoField pojoField = null; + TypeInformation<?> fieldType = null; + PojoTypeInfo<?> pojoTypeInfo = (PojoTypeInfo) typeInfo; + PojoField[] fields = new PojoField[pojoTypeInfo.getArity()]; --- End diff -- You can use `int PojoTypeInfo.getFieldIndex(String)` to directly get the index of the field. It returns `-1` if the field does not exist. `PojoTypeInfo.getTypeAt(int)` and `PojoTypeInfo.getPojoFieldAt(int)` return the type info and pojo field. No need to iterate through all fields and search for yourself. > DataStream API PojoFieldAccessor doesn't support nested POJOs > ------------------------------------------------------------- > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Affects Versions: 1.0.0 > Reporter: Robert Metzger > Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)