Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2094#discussion_r87094884
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
    @@ -0,0 +1,386 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.util.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.operators.Keys;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.CompositeType;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
    +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
    +import scala.Product;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.lang.reflect.Array;
    +import java.lang.reflect.Field;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +
    +/**
    + * These classes encapsulate the logic of accessing a field specified by 
the user as either an index
    + * or a field expression string. TypeInformation can also be requested for 
the field.
    + * The position index might specify a field of a Tuple, an array, or a 
simple type (only "0th field").
    + *
    + * Field expressions that specify nested fields (e.g. "f1.a.foo") result 
in nested field accessors.
    + * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
    + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
ProductFieldAccessor)
    + */
    +@PublicEvolving
    +public abstract class FieldAccessor<T, F> implements Serializable {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   protected TypeInformation fieldType;
    +
    +   /**
    +    * Gets the TypeInformation for the type of the field.
    +    * Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
    +    */
    +   @SuppressWarnings("unchecked")
    +   public TypeInformation<F> getFieldType() {
    +           return fieldType;
    +   }
    +
    +
    +   /**
    +    * Gets the value of the field (specified in the constructor) of the 
given record.
    +    * @param record The record on which the field will be accessed
    +    * @return The value of the field.
    +    */
    +   public abstract F get(T record);
    +
    +   /**
    +    * Sets the field (specified in the constructor) of the given record to 
the given value.
    +    *
    +    * Warning: This might modify the original object, or might return a 
new object instance.
    +    * (This is necessary, because the record might be immutable.)
    +    *
    +    * @param record The record to modify
    +    * @param fieldValue The new value of the field
    +    * @return A record that has the given field value. (this might be a 
new instance or the original)
    +    */
    +   public abstract T set(T record, F fieldValue);
    +
    +
    +   // 
--------------------------------------------------------------------------------------------------
    +
    +
    +   /**
    +    * This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
    +    * field of a POJO that is itself some composite type but is not 
further decomposed)
    +    */
    +   final static class SimpleFieldAccessor<T> extends FieldAccessor<T, T> {
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           public SimpleFieldAccessor(TypeInformation<T> typeInfo) {
    +                   checkNotNull(typeInfo, "typeInfo must not be null.");
    +
    +                   this.fieldType = typeInfo;
    +           }
    +
    +           @Override
    +           public T get(T record) {
    +                   return record;
    +           }
    +
    +           @Override
    +           public T set(T record, T fieldValue) {
    +                   return fieldValue;
    +           }
    +   }
    +
    +   final static class ArrayFieldAccessor<T, F> extends FieldAccessor<T, F> 
{
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           private final int pos;
    +
    +           public ArrayFieldAccessor(int pos, TypeInformation typeInfo) {
    +                   if(pos < 0) {
    +                           throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on" +
    +                                   " an array, which is an invalid 
index.");
    +                   }
    +                   checkNotNull(typeInfo, "typeInfo must not be null.");
    +
    +                   this.pos = pos;
    +                   this.fieldType = 
BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType());
    +           }
    +
    +           @SuppressWarnings("unchecked")
    +           @Override
    +           public F get(T record) {
    +                   return (F) Array.get(record, pos);
    +           }
    +
    +           @Override
    +           public T set(T record, F fieldValue) {
    +                   Array.set(record, pos, fieldValue);
    +                   return record;
    +           }
    +   }
    +
    +   /**
    +    * There are two versions of TupleFieldAccessor, differing in whether 
there is an other
    +    * FieldAccessor nested inside. The no inner accessor version is 
probably a little faster.
    +    */
    +   static final class SimpleTupleFieldAccessor<T, F> extends 
FieldAccessor<T, F> {
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           private final int pos;
    +
    +           SimpleTupleFieldAccessor(int pos, TypeInformation<T> typeInfo) {
    +                   checkNotNull(typeInfo, "typeInfo must not be null.");
    +                   int arity = ((TupleTypeInfo)typeInfo).getArity();
    +                   if(pos < 0 || pos >= arity) {
    +                           throw new 
CompositeType.InvalidFieldReferenceException(
    +                                   "Tried to select " + ((Integer) 
pos).toString() + ". field on \"" +
    +                                   typeInfo.toString() + "\", which is an 
invalid index.");
    +                   }
    +
    +                   this.pos = pos;
    +                   this.fieldType = 
((TupleTypeInfo)typeInfo).getTypeAt(pos);
    +           }
    +
    +           @SuppressWarnings("unchecked")
    +           @Override
    +           public F get(T record) {
    +                   final Tuple tuple = (Tuple) record;
    +                   return (F) tuple.getField(pos);
    +           }
    +
    +           @Override
    +           public T set(T record, F fieldValue) {
    +                   final Tuple tuple = (Tuple) record;
    +                   tuple.setField(fieldValue, pos);
    +                   return record;
    +           }
    +   }
    +
    +   /**
    +    * @param <T> The Tuple type
    +    * @param <R> The field type at the first level
    +    * @param <F> The field type at the innermost level
    +    */
    +   static final class RecursiveTupleFieldAccessor<T, R, F> extends 
FieldAccessor<T, F> {
    --- End diff --
    
    `T extends Tuple`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to