[ 
https://issues.apache.org/jira/browse/BEAM-4453?focusedWorklogId=120921&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120921
 ]

ASF GitHub Bot logged work on BEAM-4453:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jul/18 17:52
            Start Date: 09/Jul/18 17:52
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on a change in pull request #5873: 
[BEAM-4453] Add schema support for Java POJOs and Java Beans
URL: https://github.com/apache/beam/pull/5873#discussion_r201091950
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
 ##########
 @@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.implementation.FixedValue;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.Duplication;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import net.bytebuddy.implementation.bytecode.TypeCreation;
+import net.bytebuddy.implementation.bytecode.assign.Assigner;
+import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.collection.ArrayFactory;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import org.apache.beam.sdk.values.reflect.FieldValueGetter;
+import org.apache.beam.sdk.values.reflect.FieldValueSetter;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.ClassUtils;
+import org.joda.time.DateTime;
+import org.joda.time.ReadableDateTime;
+import org.joda.time.ReadableInstant;
+
+class ByteBuddyUtils {
+  private static final ForLoadedType ARRAYS_TYPE = new 
ForLoadedType(Arrays.class);
+  private static final ForLoadedType ARRAY_UTILS_TYPE = new 
ForLoadedType(ArrayUtils.class);
+  private static final ForLoadedType BYTE_ARRAY_TYPE = new 
ForLoadedType(byte[].class);
+  private static final ForLoadedType BYTE_BUFFER_TYPE = new 
ForLoadedType(ByteBuffer.class);
+  private static final ForLoadedType CHAR_SEQUENCE_TYPE = new 
ForLoadedType(CharSequence.class);
+  private static final ForLoadedType DATE_TIME_TYPE = new 
ForLoadedType(DateTime.class);
+  private static final ForLoadedType LIST_TYPE = new ForLoadedType(List.class);
+  private static final ForLoadedType READABLE_INSTANT_TYPE =
+      new ForLoadedType(ReadableInstant.class);
+
+  // Create a new FieldValueGetter subclass.
+  @SuppressWarnings("unchecked")
+  static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
+      ByteBuddy byteBuddy, Type objectType, Type fieldType) {
+    TypeDescription.Generic getterGenericType =
+        TypeDescription.Generic.Builder.parameterizedType(
+                FieldValueGetter.class, objectType, fieldType)
+            .build();
+    return (DynamicType.Builder<FieldValueGetter>) 
byteBuddy.subclass(getterGenericType);
+  }
+
+  // Create a new FieldValueSetter subclass.
+  @SuppressWarnings("unchecked")
+  static DynamicType.Builder<FieldValueSetter> subclassSetterInterface(
+      ByteBuddy byteBuddy, Type objectType, Type fieldType) {
+    TypeDescription.Generic setterGenericType =
+        TypeDescription.Generic.Builder.parameterizedType(
+                FieldValueSetter.class, objectType, fieldType)
+            .build();
+    return (DynamicType.Builder<FieldValueSetter>) 
byteBuddy.subclass(setterGenericType);
+  }
+
+  // Base class used below to convert types.
+  @SuppressWarnings("unchecked")
+  abstract static class TypeConversion<T> {
+    public T convert(TypeDescriptor typeDescriptor) {
+      if (typeDescriptor.isArray()
+          && 
!typeDescriptor.getComponentType().getRawType().equals(byte.class)) {
+        // Byte arrays are special, so leave those alone.
+        return convertArray(typeDescriptor);
+      } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+        return convertCollection(typeDescriptor);
+      } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+        return convertMap(typeDescriptor);
+      } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadableInstant.class))) {
+        return convertDateTime(typeDescriptor);
+      } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
+        return convertByteBuffer(typeDescriptor);
+      } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) {
+        return convertCharSequence(typeDescriptor);
+      } else if (typeDescriptor.getRawType().isPrimitive()) {
+        return convertPrimitive(typeDescriptor);
+      } else {
+        return convertDefault(typeDescriptor);
+      }
+    }
+
+    protected abstract T convertArray(TypeDescriptor<?> type);
+
+    protected abstract T convertCollection(TypeDescriptor<?> type);
+
+    protected abstract T convertMap(TypeDescriptor<?> type);
+
+    protected abstract T convertDateTime(TypeDescriptor<?> type);
+
+    protected abstract T convertByteBuffer(TypeDescriptor<?> type);
+
+    protected abstract T convertCharSequence(TypeDescriptor<?> type);
+
+    protected abstract T convertPrimitive(TypeDescriptor<?> type);
+
+    protected abstract T convertDefault(TypeDescriptor<?> type);
+  }
+
+  /**
+   * Give a Java type, returns the Java type expected for use with Row. For 
example, both {@link
+   * StringBuffer} and {@link String} are represented as a {@link String} in 
Row. This determines
+   * what the return type of the getter will be. For instance, the following 
POJO class:
+   *
+   * <pre><code>
+   * class POJO {
+   *   StringBuffer str;
+   *   int[] array;
+   * }
+   * </code></pre>
+   *
+   * Generates the following getters:
+   *
+   * <pre><code>{@literal FieldValueGetter<POJO, String>}</code></pre>
+   *
+   * <pre><code>{@literal FieldValueGetter<POJO, List<Integer>>}</code></pre>
+   */
+  static class ConvertType extends TypeConversion<Type> {
+    @Override
+    protected Type convertArray(TypeDescriptor<?> type) {
+      return createListType(type).getType();
+    }
+
+    @Override
+    protected Type convertCollection(TypeDescriptor<?> type) {
+      return Collection.class;
+    }
+
+    @Override
+    protected Type convertMap(TypeDescriptor<?> type) {
+      return Map.class;
+    }
+
+    @Override
+    protected Type convertDateTime(TypeDescriptor<?> type) {
+      return ReadableInstant.class;
+    }
+
+    @Override
+    protected Type convertByteBuffer(TypeDescriptor<?> type) {
+      return byte[].class;
+    }
+
+    @Override
+    protected Type convertCharSequence(TypeDescriptor<?> type) {
+      return String.class;
+    }
+
+    @Override
+    protected Type convertPrimitive(TypeDescriptor<?> type) {
+      return ClassUtils.primitiveToWrapper(type.getRawType());
+    }
+
+    @Override
+    protected Type convertDefault(TypeDescriptor<?> type) {
+      return type.getType();
+    }
+
+    @SuppressWarnings("unchecked")
+    private <ElementT> TypeDescriptor<List<ElementT>> 
createListType(TypeDescriptor<?> type) {
+      TypeDescriptor componentType =
+          
TypeDescriptor.of(ClassUtils.primitiveToWrapper(type.getComponentType().getRawType()));
+      return new TypeDescriptor<List<ElementT>>() {}.where(
+          new TypeParameter<ElementT>() {}, componentType);
+    }
+  }
+
+  /**
+   * Takes a {@link StackManipulation} that returns a value. Prepares this 
value to be returned by a
+   * getter. {@link org.apache.beam.sdk.values.Row} needs getters to return 
specific types, but we
+   * allow user objects to contain different but equivalent types. Therefore 
we must convert some of
+   * these types before returning. These conversions correspond to the ones 
defined in {@link
+   * ConvertType}. This class generates the code to do these conversion.
+   */
+  static class ConvertValueForGetter extends TypeConversion<StackManipulation> 
{
+    // The code that reads the value.
+    private StackManipulation readValue;
+
+    ConvertValueForGetter(StackManipulation readValue) {
+      this.readValue = readValue;
+    }
+
+    @Override
+    protected StackManipulation convertArray(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // return isComponentTypePrimitive ? 
Arrays.asList(ArrayUtils.toObject(value))
+      //     : Arrays.asList(value);
+
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      StackManipulation stackManipulation = readValue;
+      // Row always expects to get an Iterable back for array types. Wrap this 
array into a
+      // List using Arrays.asList before returning.
+      if (loadedType.getComponentType().isPrimitive()) {
+        // Arrays.asList doesn't take primitive arrays, so convert first using 
ArrayUtils.toObject.
+        stackManipulation =
+            new Compound(
+                stackManipulation,
+                MethodInvocation.invoke(
+                    ARRAY_UTILS_TYPE
+                        .getDeclaredMethods()
+                        .filter(
+                            ElementMatchers.isStatic()
+                                .and(ElementMatchers.named("toObject"))
+                                
.and(ElementMatchers.takesArguments(loadedType)))
+                        .getOnly()));
+      }
+      return new Compound(
+          stackManipulation,
+          MethodInvocation.invoke(
+              ARRAYS_TYPE
+                  .getDeclaredMethods()
+                  
.filter(ElementMatchers.isStatic().and(ElementMatchers.named("asList")))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertCollection(TypeDescriptor<?> type) {
+      return readValue;
+    }
+
+    @Override
+    protected StackManipulation convertMap(TypeDescriptor<?> type) {
+      return readValue;
+    }
+
+    @Override
+    protected StackManipulation convertDateTime(TypeDescriptor<?> type) {
+      // If the class type is a ReadableDateTime, then return it.
+      if (ReadableDateTime.class.isAssignableFrom(type.getRawType())) {
+        return readValue;
+      }
+      // Otherwise, generate the following code:
+      //   return new DateTime(value.getMillis());
+
+      return new StackManipulation.Compound(
+          // Create a new instance of the target type.
+          TypeCreation.of(DATE_TIME_TYPE),
+          Duplication.SINGLE,
+          readValue,
+          TypeCasting.to(READABLE_INSTANT_TYPE),
+          // Call ReadableInstant.getMillis to extract the millis since the 
epoch.
+          MethodInvocation.invoke(
+              READABLE_INSTANT_TYPE
+                  .getDeclaredMethods()
+                  .filter(ElementMatchers.named("getMillis"))
+                  .getOnly()),
+          // Construct a DateTime object containing the millis.
+          MethodInvocation.invoke(
+              DATE_TIME_TYPE
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.isConstructor()
+                          
.and(ElementMatchers.takesArguments(ForLoadedType.of(long.class))))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // return value.array();
+
+      // We must extract the array from the ByteBuffer before returning.
+      // NOTE: we only support array-backed byte buffers in these POJOs. 
Others (e.g. mmaped
+      // files) are not supported.
+      return new Compound(
+          readValue,
+          MethodInvocation.invoke(
+              BYTE_BUFFER_TYPE
+                  .getDeclaredMethods()
+                  .filter(
+                      
ElementMatchers.named("array").and(ElementMatchers.returns(BYTE_ARRAY_TYPE)))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertCharSequence(TypeDescriptor<?> type) {
+      // If the member is a String, then return it.
+      if (type.isSubtypeOf(TypeDescriptor.of(String.class))) {
+        return readValue;
+      }
+
+      // Otherwise, generate the following code:
+      // return value.toString();
+      return new Compound(
+          readValue,
+          MethodInvocation.invoke(
+              CHAR_SEQUENCE_TYPE
+                  .getDeclaredMethods()
+                  .filter(ElementMatchers.named("toString"))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertPrimitive(TypeDescriptor<?> type) {
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      // Box the primitive type.
+      return new Compound(
+          readValue,
+          Assigner.DEFAULT.assign(
+              loadedType.asGenericType(), 
loadedType.asBoxed().asGenericType(), Typing.STATIC));
+    }
+
+    @Override
+    protected StackManipulation convertDefault(TypeDescriptor<?> type) {
+      return readValue;
+    }
+  }
+
+  /**
+   * Row is going to call the setter with its internal Java type, however the 
user object being set
+   * might have a different type internally. For example, Row will be calling 
set with a {@link
+   * String} type (for string fields), but the user type might have a {@link 
StringBuffer} member
+   * there. This class generates code to convert between these types.
+   */
+  static class ConvertValueForSetter extends TypeConversion<StackManipulation> 
{
+    StackManipulation readValue;
+
+    ConvertValueForSetter(StackManipulation readValue) {
+      this.readValue = readValue;
+    }
+
+    @Override
+    protected StackManipulation convertArray(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // T[] toArray = (T[]) value.toArray(new T[0]);
+      // return isPrimitive ? toArray : ArrayUtils.toPrimitive(toArray);
+
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      // The type of the array containing the (possibly) boxed values.
+      TypeDescription arrayType =
+          
TypeDescription.Generic.Builder.rawType(loadedType.getComponentType().asBoxed())
+              .asArray()
+              .build()
+              .asErasure();
+
+      // Extract an array from the collection.
+      StackManipulation stackManipulation =
+          new Compound(
+              readValue,
+              TypeCasting.to(LIST_TYPE),
+              // Call Collection.toArray(T[[]) to extract the array. Push new 
T[0] on the stack
+              // before
+              // calling toArray.
+              
ArrayFactory.forType(loadedType.getComponentType().asBoxed().asGenericType())
+                  .withValues(Collections.emptyList()),
+              MethodInvocation.invoke(
+                  LIST_TYPE
+                      .getDeclaredMethods()
+                      .filter(
+                          
ElementMatchers.named("toArray").and(ElementMatchers.takesArguments(1)))
+                      .getOnly()),
+              // Cast the result to T[].
+              TypeCasting.to(arrayType));
+
+      if (loadedType.getComponentType().isPrimitive()) {
+        // The array we extract will be an array of objects. If the pojo field 
is an array of
+        // primitive types, we need to then convert to an array of unboxed 
objects.
+        stackManipulation =
+            new StackManipulation.Compound(
+                stackManipulation,
+                MethodInvocation.invoke(
+                    ARRAY_UTILS_TYPE
+                        .getDeclaredMethods()
+                        .filter(
+                            ElementMatchers.named("toPrimitive")
+                                
.and(ElementMatchers.takesArguments(arrayType)))
+                        .getOnly()));
+      }
+      return stackManipulation;
+    }
+
+    @Override
+    protected StackManipulation convertCollection(TypeDescriptor<?> type) {
+      return readValue;
+    }
+
+    @Override
+    protected StackManipulation convertMap(TypeDescriptor<?> type) {
+      return readValue;
+    }
+
+    @Override
+    protected StackManipulation convertDateTime(TypeDescriptor<?> type) {
+      // The setter might be called with a different subclass of 
ReadableInstant than the one stored
+      // in this POJO. We must extract the value passed into the setter and 
copy it into an instance
+      // that the POJO can accept.
+
+      // Generate the following code:
+      // return new T(value.getMillis);
+
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      return new Compound(
+          // Create a new instance of the target type.
+          TypeCreation.of(loadedType),
+          Duplication.SINGLE,
+          // Load the parameter and cast it to a ReadableInstant.
+          readValue,
+          TypeCasting.to(READABLE_INSTANT_TYPE),
+          // Call ReadableInstant.getMillis to extract the millis since the 
epoch.
+          MethodInvocation.invoke(
+              READABLE_INSTANT_TYPE
+                  .getDeclaredMethods()
+                  .filter(ElementMatchers.named("getMillis"))
+                  .getOnly()),
+          // All subclasses of ReadableInstant contain a ()(long) constructor 
that takes in a millis
+          // argument. Call that constructor of the field to initialize it.
+          MethodInvocation.invoke(
+              loadedType
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.isConstructor()
+                          
.and(ElementMatchers.takesArguments(ForLoadedType.of(long.class))))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // return ByteBuffer.wrap((byte[]) value);
+
+      // We currently assume that a byte[] setter will always accept a 
parameter of type byte[].
+      return new Compound(
+          readValue, // Load the value.
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 120921)
    Time Spent: 5h 50m  (was: 5h 40m)

> Provide automatic schema registration for POJOs
> -----------------------------------------------
>
>                 Key: BEAM-4453
>                 URL: https://issues.apache.org/jira/browse/BEAM-4453
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to