Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87102206 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** + * Creates a {@link FieldAccessor} for the given field position, which can be used to get and set + * the specified field on instances of this type. + * + * @param pos The field position (zero-based) + * @param config Configuration object + * @param <F> The type of the field to access + * @return The created FieldAccessor + */ + @Internal + public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T> typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor<T, F>) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation<?>[] types = new TypeInformation<?>[tupleTypeInfo.getArity()]; + for (int i = 0; i < types.length; i++) { + types[i] = tupleTypeInfo.getTypeAt(i); + } + return new FieldAccessor.ProductFieldAccessor<>( + pos, typeInfo, new FieldAccessor.SimpleFieldAccessor<>((TypeInformation<F>)types[pos]), config); + + // In case of tuples + } else if (typeInfo.isTupleType()) { + return new FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo); + + // Default case, PojoType is directed to this statement + } else { + throw new CompositeType.InvalidFieldReferenceException("Cannot reference field by position on " + typeInfo.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + } + + /** + * Creates a {@link FieldAccessor} for the field that is given by a field expression, + * which can be used to get and set the specified field on instances of this type. + * + * @param field The field expression + * @param config Configuration object + * @param <F> The type of the field to access + * @return The created FieldAccessor + */ + @Internal + public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T> typeInfo, String field, ExecutionConfig config) { + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + try { + return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(field), typeInfo); + } catch (NumberFormatException ex) { + throw new CompositeType.InvalidFieldReferenceException + ("A field expression on an array must be an integer index (that might be given as a string)."); + } + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + try { + int pos = field.equals("*") ? 0 : Integer.parseInt(field); + return FieldAccessorFactory.getAccessor(typeInfo, pos, config); + } catch (NumberFormatException ex) { + throw new CompositeType.InvalidFieldReferenceException("You tried to select the field \"" + field + + "\" on a " + typeInfo.toString() + ". A field expression on a basic type can only be \"*\" or \"0\"" + + " (both of which mean selecting the entire basic type)."); + } + + // In case of Pojos + } else if (typeInfo instanceof PojoTypeInfo) { + FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(field); + + // get field + PojoField pojoField = null; + TypeInformation<?> fieldType = null; + PojoTypeInfo<?> pojoTypeInfo = (PojoTypeInfo) typeInfo; + PojoField[] fields = new PojoField[pojoTypeInfo.getArity()]; --- End diff -- You can use `int PojoTypeInfo.getFieldIndex(String)` to directly get the index of the field. It returns `-1` if the field does not exist. `PojoTypeInfo.getTypeAt(int)` and `PojoTypeInfo.getPojoFieldAt(int)` return the type info and pojo field. No need to iterate through all fields and search for yourself.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---