Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r87101256 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; + +import java.io.Serializable; + + +/** + * Static factories for the {@link FieldAccessor} utilities. + */ +@Internal +public class FieldAccessorFactory implements Serializable { + + /** + * Creates a {@link FieldAccessor} for the given field position, which can be used to get and set + * the specified field on instances of this type. + * + * @param pos The field position (zero-based) + * @param config Configuration object + * @param <F> The type of the field to access + * @return The created FieldAccessor + */ + @Internal + public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T> typeInfo, int pos, ExecutionConfig config){ + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, typeInfo); + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + if (pos != 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + typeInfo.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor<T, F>) new FieldAccessor.SimpleFieldAccessor<>(typeInfo); + + // In case of case classes + } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase) typeInfo).isCaseClass()) { + TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; + TypeInformation<?>[] types = new TypeInformation<?>[tupleTypeInfo.getArity()]; + for (int i = 0; i < types.length; i++) { + types[i] = tupleTypeInfo.getTypeAt(i); + } + return new FieldAccessor.ProductFieldAccessor<>( + pos, typeInfo, new FieldAccessor.SimpleFieldAccessor<>((TypeInformation<F>)types[pos]), config); + + // In case of tuples + } else if (typeInfo.isTupleType()) { + return new FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo); + + // Default case, PojoType is directed to this statement + } else { + throw new CompositeType.InvalidFieldReferenceException("Cannot reference field by position on " + typeInfo.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + } + + /** + * Creates a {@link FieldAccessor} for the field that is given by a field expression, + * which can be used to get and set the specified field on instances of this type. + * + * @param field The field expression + * @param config Configuration object + * @param <F> The type of the field to access + * @return The created FieldAccessor + */ + @Internal + public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T> typeInfo, String field, ExecutionConfig config) { + + // In case of arrays + if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) { + try { + return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(field), typeInfo); + } catch (NumberFormatException ex) { + throw new CompositeType.InvalidFieldReferenceException + ("A field expression on an array must be an integer index (that might be given as a string)."); + } + + // In case of basic types + } else if (typeInfo instanceof BasicTypeInfo) { + try { + int pos = field.equals("*") ? 0 : Integer.parseInt(field); --- End diff -- use `Keys.ExpressionKeys.SELECT_ALL_CHAR` instead of `"*"`?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---