[ 
https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=393055&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393055
 ]

ASF GitHub Bot logged work on BEAM-7274:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/20 23:36
            Start Date: 25/Feb/20 23:36
    Worklog Time Spent: 10m 
      Work Description: alexvanboxel commented on pull request #10502: 
[BEAM-7274] Add DynamicMessage Schema support
URL: https://github.com/apache/beam/pull/10502#discussion_r384192806
 
 

 ##########
 File path: 
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchema.java
 ##########
 @@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getFieldNumber;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getMapKeyMessageName;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getMapValueMessageName;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getMessageName;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.withFieldNumber;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.withMessageName;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+@Experimental(Experimental.Kind.SCHEMAS)
+public class ProtoDynamicMessageSchema<T> implements Serializable {
+  public static final long serialVersionUID = 1L;
+
+  /**
+   * Context of the schema, the context can be generated from a source schema 
or descriptors. The
+   * ability of converting back from Row to proto depends on the type of 
context.
+   */
+  private final Context context;
+
+  /** The toRow function to convert the Message to a Row. */
+  private transient SerializableFunction<T, Row> toRowFunction;
+
+  /** The fromRow function to convert the Row to a Message. */
+  private transient SerializableFunction<Row, T> fromRowFunction;
+
+  /** List of field converters for each field in the row. */
+  private transient List<Convert> converters;
+
+  private ProtoDynamicMessageSchema(String messageName, ProtoDomain domain) {
+    this.context = new DescriptorContext(messageName, domain);
+    readResolve();
+  }
+
+  private ProtoDynamicMessageSchema(Context context) {
+    this.context = context;
+    readResolve();
+  }
+
+  /**
+   * Create a new ProtoDynamicMessageSchema from a {@link ProtoDomain} and for 
a message. The
+   * message need to be in the domain and needs to be the fully qualified name.
+   */
+  public static ProtoDynamicMessageSchema forDescriptor(ProtoDomain domain, 
String messageName) {
+    return new ProtoDynamicMessageSchema(messageName, domain);
+  }
+
+  /**
+   * Create a new ProtoDynamicMessageSchema from a {@link ProtoDomain} and for 
a descriptor. The
+   * descriptor is only used for it's name, that name will be used for a 
search in the domain.
+   */
+  public static ProtoDynamicMessageSchema<DynamicMessage> forDescriptor(
+      ProtoDomain domain, Descriptors.Descriptor descriptor) {
+    return new ProtoDynamicMessageSchema<>(descriptor.getFullName(), domain);
+  }
+
+  static ProtoDynamicMessageSchema<?> forContext(Context context, Schema.Field 
field) {
+    return new ProtoDynamicMessageSchema<>(context.getSubContext(field));
+  }
+
+  static ProtoDynamicMessageSchema<Message> forSchema(Schema schema) {
+    return new ProtoDynamicMessageSchema<>(new Context(schema, Message.class));
+  }
+
+  /** Initialize the transient fields after deserialization or construction. */
+  private Object readResolve() {
+    converters = createConverters(context.getSchema());
+    toRowFunction = new MessageToRowFunction();
+    fromRowFunction = new RowToMessageFunction();
+    return this;
+  }
+
+  Convert createConverter(Schema.Field field) {
+    Schema.FieldType fieldType = field.getType();
+    String messageName = getMessageName(fieldType);
+    if (messageName != null && messageName.length() > 0) {
+      Schema.Field valueField =
+          Schema.Field.of("value", withFieldNumber(Schema.FieldType.BOOLEAN, 
1));
+      switch (messageName) {
+        case "google.protobuf.StringValue":
+        case "google.protobuf.DoubleValue":
+        case "google.protobuf.FloatValue":
+        case "google.protobuf.BoolValue":
+        case "google.protobuf.Int64Value":
+        case "google.protobuf.Int32Value":
+        case "google.protobuf.UInt64Value":
+        case "google.protobuf.UInt32Value":
+          return new WrapperConvert(field, new PrimitiveConvert(valueField));
+        case "google.protobuf.BytesValue":
+          return new WrapperConvert(field, new BytesConvert(valueField));
+        case "google.protobuf.Timestamp":
+        case "google.protobuf.Duration":
+          // handled by logical type case
+          break;
+      }
+    }
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+      case BOOLEAN:
+        return new PrimitiveConvert(field);
+      case BYTES:
+        return new BytesConvert(field);
+      case ARRAY:
+      case ITERABLE:
+        return new ArrayConvert(this, field);
+      case MAP:
+        return new MapConvert(this, field);
+      case LOGICAL_TYPE:
+        String identifier = field.getType().getLogicalType().getIdentifier();
+        switch (identifier) {
+          case ProtoSchemaLogicalTypes.Fixed32.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.Fixed64.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.SFixed32.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.SFixed64.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.SInt32.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.SInt64.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.UInt32.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.UInt64.IDENTIFIER:
+            return new LogicalTypeConvert(field, fieldType.getLogicalType());
+          case NanosInstant.IDENTIFIER:
+            return new TimestampConvert(field);
+          case NanosDuration.IDENTIFIER:
+            return new DurationConvert(field);
+          case EnumerationType.IDENTIFIER:
+            return new EnumConvert(field, fieldType.getLogicalType());
+          case OneOfType.IDENTIFIER:
+            return new OneOfConvert(this, field, fieldType.getLogicalType());
+          default:
+            throw new IllegalStateException("Unexpected logical type : " + 
identifier);
+        }
+      case ROW:
+        return new MessageConvert(this, field);
+      default:
+        throw new IllegalStateException("Unexpected value: " + fieldType);
+    }
+  }
+
+  private List<Convert> createConverters(Schema schema) {
+    List<Convert> fieldOverlays = new ArrayList<>();
+    for (Schema.Field field : schema.getFields()) {
+      fieldOverlays.add(createConverter(field));
+    }
+    return fieldOverlays;
+  }
+
+  public Schema getSchema() {
+    return context.getSchema();
+  }
+
+  public SchemaCoder<T> getSchemaCoder() {
+    return SchemaCoder.of(
+        context.getSchema(),
+        TypeDescriptor.of(context.getBaseClass()),
+        toRowFunction,
+        fromRowFunction);
+  }
 
 Review comment:
   I used it in a production pipeline, but as it's not available in the other 
providers I removed it, (squashed and rebased).
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 393055)
    Time Spent: 26.5h  (was: 26h 20m)

> Protobuf Beam Schema support
> ----------------------------
>
>                 Key: BEAM-7274
>                 URL: https://issues.apache.org/jira/browse/BEAM-7274
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Alex Van Boxel
>            Assignee: Alex Van Boxel
>            Priority: Minor
>             Fix For: 2.21.0
>
>          Time Spent: 26.5h
>  Remaining Estimate: 0h
>
> Add support for the new Beam Schema to the Protobuf extension.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to