kennknowles commented on code in PR #35150: URL: https://github.com/apache/beam/pull/35150#discussion_r2135755770
########## sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoBeamConverter.java: ########## @@ -0,0 +1,818 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.UnknownInitialization; +import org.checkerframework.checker.nullness.qual.EnsuresNonNull; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Provides converts between Protobuf Message and Beam Row. + * + * <p>Read <a href="https://s.apache.org/beam-protobuf">https://s.apache.org/beam-protobuf</a> + */ +public class ProtoBeamConverter { + + /** Returns a conversion method from Beam Row to Protobuf Message. */ + public static SerializableFunction<@NonNull Row, @NonNull Message> toProto( + Descriptors.Descriptor descriptor) { + return new ToProto(descriptor); + } + + /** Returns a conversion method from Protobuf Message to Beam Row. */ + public static SerializableFunction<@NonNull Message, @NonNull Row> toRow(Schema schema) { + return new FromProto(schema); + } + + static BeamConverter<?, ?> createBeamConverter(Schema.FieldType fieldType) { + switch (fieldType.getTypeName()) { + case BYTE: + throw new UnsupportedOperationException(); + case INT16: + throw new UnsupportedOperationException(); + case INT32: + return new BeamPassThroughConverter<>(); + case INT64: + return new BeamPassThroughConverter<>(); + case DECIMAL: + throw new UnsupportedOperationException(); + case FLOAT: + return new BeamPassThroughConverter<>(); + case DOUBLE: + return new BeamPassThroughConverter<>(); + case STRING: + return new BeamPassThroughConverter<>(); + case DATETIME: + throw new UnsupportedOperationException(); + case BOOLEAN: + return new BeamPassThroughConverter<>(); + case BYTES: + return new BeamBytesConverter(); + case ARRAY: + case ITERABLE: + return new BeamListConverter<>(fieldType); + case MAP: + return new BeamMapConverter<>(fieldType); + case ROW: + return new BeamRowConverter(fieldType); + case LOGICAL_TYPE: + switch (Preconditions.checkNotNull(fieldType.getLogicalType()).getIdentifier()) { + case ProtoSchemaLogicalTypes.UInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.UInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed64.IDENTIFIER: + return new BeamPassThroughConverter<>(); + case NanosDuration.IDENTIFIER: + return new BeamNanosDurationConverter(); + case NanosInstant.IDENTIFIER: + return new BeamNanosInstantConverter(); + case EnumerationType.IDENTIFIER: + return new BeamEnumerationConverter(fieldType); + default: + throw new UnsupportedOperationException(); + } + default: + throw new UnsupportedOperationException(); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + if (fieldDescriptor.isRepeated()) { + if (fieldDescriptor.isMapField()) { + return new ProtoMapConverter<>(fieldDescriptor); + } else { + return new ProtoListConverter<>(fieldDescriptor); + } + } else { + return createToProtoSingularConverter(fieldDescriptor); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoSingularConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + switch (fieldDescriptor.getJavaType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case STRING: + return new ProtoPassThroughConverter<>(fieldDescriptor); + case BYTE_STRING: + return new ProtoByteStringConverter(fieldDescriptor); + case ENUM: + return new ProtoEnumConverter(fieldDescriptor); + case MESSAGE: + String fullName = fieldDescriptor.getMessageType().getFullName(); + switch (fullName) { + case "google.protobuf.Int32Value": + case "google.protobuf.UInt32Value": + case "google.protobuf.Int64Value": + case "google.protobuf.UInt64Value": + case "google.protobuf.FloatValue": + case "google.protobuf.DoubleValue": + case "google.protobuf.StringValue": + case "google.protobuf.BoolValue": + return new ProtoPassThroughConverter<>(fieldDescriptor); + case "google.protobuf.BytesValue": + return new ProtoByteStringConverter(fieldDescriptor); + case "google.protobuf.Timestamp": + return new ProtoTimestampConverter(fieldDescriptor); + case "google.protobuf.Duration": + return new ProtoDurationConverter(fieldDescriptor); + case "google.protobuf.Any": + throw new UnsupportedOperationException("google.protobuf.Any is not supported"); + default: + return new ProtoMessageConverter(fieldDescriptor); + } + default: + throw new UnsupportedOperationException( + "Unsupported proto type: " + fieldDescriptor.getJavaType()); + } + } + + interface FromProtoGetter<BeamT> { + @Nullable Review Comment: What is the meaning of the `@Nullable` here? Is it that the desired field may be missing? Note that `BeamT` itself can already be a nullable type, so you don't have to (and shouldn't) add a nullable annotation just to allow that case. See https://checkerframework.org/releases/1.1.2/checkers-manual.html#generics especially Section 16.1.3 ########## sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoBeamConverter.java: ########## @@ -0,0 +1,818 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.UnknownInitialization; +import org.checkerframework.checker.nullness.qual.EnsuresNonNull; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Provides converts between Protobuf Message and Beam Row. + * + * <p>Read <a href="https://s.apache.org/beam-protobuf">https://s.apache.org/beam-protobuf</a> + */ +public class ProtoBeamConverter { + + /** Returns a conversion method from Beam Row to Protobuf Message. */ + public static SerializableFunction<@NonNull Row, @NonNull Message> toProto( + Descriptors.Descriptor descriptor) { + return new ToProto(descriptor); + } + + /** Returns a conversion method from Protobuf Message to Beam Row. */ + public static SerializableFunction<@NonNull Message, @NonNull Row> toRow(Schema schema) { + return new FromProto(schema); + } + + static BeamConverter<?, ?> createBeamConverter(Schema.FieldType fieldType) { + switch (fieldType.getTypeName()) { + case BYTE: + throw new UnsupportedOperationException(); + case INT16: + throw new UnsupportedOperationException(); + case INT32: + return new BeamPassThroughConverter<>(); + case INT64: + return new BeamPassThroughConverter<>(); + case DECIMAL: + throw new UnsupportedOperationException(); + case FLOAT: + return new BeamPassThroughConverter<>(); + case DOUBLE: + return new BeamPassThroughConverter<>(); + case STRING: + return new BeamPassThroughConverter<>(); + case DATETIME: + throw new UnsupportedOperationException(); + case BOOLEAN: + return new BeamPassThroughConverter<>(); + case BYTES: + return new BeamBytesConverter(); + case ARRAY: + case ITERABLE: + return new BeamListConverter<>(fieldType); + case MAP: + return new BeamMapConverter<>(fieldType); + case ROW: + return new BeamRowConverter(fieldType); + case LOGICAL_TYPE: + switch (Preconditions.checkNotNull(fieldType.getLogicalType()).getIdentifier()) { + case ProtoSchemaLogicalTypes.UInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.UInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed64.IDENTIFIER: + return new BeamPassThroughConverter<>(); + case NanosDuration.IDENTIFIER: + return new BeamNanosDurationConverter(); + case NanosInstant.IDENTIFIER: + return new BeamNanosInstantConverter(); + case EnumerationType.IDENTIFIER: + return new BeamEnumerationConverter(fieldType); + default: + throw new UnsupportedOperationException(); + } + default: + throw new UnsupportedOperationException(); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + if (fieldDescriptor.isRepeated()) { + if (fieldDescriptor.isMapField()) { + return new ProtoMapConverter<>(fieldDescriptor); + } else { + return new ProtoListConverter<>(fieldDescriptor); + } + } else { + return createToProtoSingularConverter(fieldDescriptor); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoSingularConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + switch (fieldDescriptor.getJavaType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case STRING: + return new ProtoPassThroughConverter<>(fieldDescriptor); + case BYTE_STRING: + return new ProtoByteStringConverter(fieldDescriptor); + case ENUM: + return new ProtoEnumConverter(fieldDescriptor); + case MESSAGE: + String fullName = fieldDescriptor.getMessageType().getFullName(); + switch (fullName) { + case "google.protobuf.Int32Value": + case "google.protobuf.UInt32Value": + case "google.protobuf.Int64Value": + case "google.protobuf.UInt64Value": + case "google.protobuf.FloatValue": + case "google.protobuf.DoubleValue": + case "google.protobuf.StringValue": + case "google.protobuf.BoolValue": + return new ProtoPassThroughConverter<>(fieldDescriptor); + case "google.protobuf.BytesValue": + return new ProtoByteStringConverter(fieldDescriptor); + case "google.protobuf.Timestamp": + return new ProtoTimestampConverter(fieldDescriptor); + case "google.protobuf.Duration": + return new ProtoDurationConverter(fieldDescriptor); + case "google.protobuf.Any": + throw new UnsupportedOperationException("google.protobuf.Any is not supported"); + default: + return new ProtoMessageConverter(fieldDescriptor); + } + default: + throw new UnsupportedOperationException( + "Unsupported proto type: " + fieldDescriptor.getJavaType()); + } + } + + interface FromProtoGetter<BeamT> { + @Nullable + BeamT getFromProto(Message message); + } + + interface ToProtoSetter<BeamT> { + void setToProto(Message.Builder message, Schema.FieldType fieldType, BeamT beamFieldValue); + } + + abstract static class BeamConverter<ProtoT, BeamT> { + abstract @Nullable BeamT convert(@Nullable ProtoT protoValue); + } + + abstract static class BeamNoWrapConverter<ProtoT, BeamT> extends BeamConverter<ProtoT, BeamT> { + @Override + @Nullable + BeamT convert(@Nullable ProtoT protoValue) { + if (protoValue == null) { Review Comment: Just inline the null check to where you are converting to an Optional Beam schema type. ########## sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoBeamConverter.java: ########## @@ -0,0 +1,818 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.UnknownInitialization; +import org.checkerframework.checker.nullness.qual.EnsuresNonNull; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Provides converts between Protobuf Message and Beam Row. + * + * <p>Read <a href="https://s.apache.org/beam-protobuf">https://s.apache.org/beam-protobuf</a> + */ +public class ProtoBeamConverter { + + /** Returns a conversion method from Beam Row to Protobuf Message. */ + public static SerializableFunction<@NonNull Row, @NonNull Message> toProto( + Descriptors.Descriptor descriptor) { + return new ToProto(descriptor); + } + + /** Returns a conversion method from Protobuf Message to Beam Row. */ + public static SerializableFunction<@NonNull Message, @NonNull Row> toRow(Schema schema) { + return new FromProto(schema); + } + + static BeamConverter<?, ?> createBeamConverter(Schema.FieldType fieldType) { + switch (fieldType.getTypeName()) { + case BYTE: + throw new UnsupportedOperationException(); + case INT16: + throw new UnsupportedOperationException(); + case INT32: + return new BeamPassThroughConverter<>(); + case INT64: + return new BeamPassThroughConverter<>(); + case DECIMAL: + throw new UnsupportedOperationException(); + case FLOAT: + return new BeamPassThroughConverter<>(); + case DOUBLE: + return new BeamPassThroughConverter<>(); + case STRING: + return new BeamPassThroughConverter<>(); + case DATETIME: + throw new UnsupportedOperationException(); + case BOOLEAN: + return new BeamPassThroughConverter<>(); + case BYTES: + return new BeamBytesConverter(); + case ARRAY: + case ITERABLE: + return new BeamListConverter<>(fieldType); + case MAP: + return new BeamMapConverter<>(fieldType); + case ROW: + return new BeamRowConverter(fieldType); + case LOGICAL_TYPE: + switch (Preconditions.checkNotNull(fieldType.getLogicalType()).getIdentifier()) { + case ProtoSchemaLogicalTypes.UInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.UInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed64.IDENTIFIER: + return new BeamPassThroughConverter<>(); + case NanosDuration.IDENTIFIER: + return new BeamNanosDurationConverter(); + case NanosInstant.IDENTIFIER: + return new BeamNanosInstantConverter(); + case EnumerationType.IDENTIFIER: + return new BeamEnumerationConverter(fieldType); + default: + throw new UnsupportedOperationException(); + } + default: + throw new UnsupportedOperationException(); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + if (fieldDescriptor.isRepeated()) { + if (fieldDescriptor.isMapField()) { + return new ProtoMapConverter<>(fieldDescriptor); + } else { + return new ProtoListConverter<>(fieldDescriptor); + } + } else { + return createToProtoSingularConverter(fieldDescriptor); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoSingularConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + switch (fieldDescriptor.getJavaType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case STRING: + return new ProtoPassThroughConverter<>(fieldDescriptor); + case BYTE_STRING: + return new ProtoByteStringConverter(fieldDescriptor); + case ENUM: + return new ProtoEnumConverter(fieldDescriptor); + case MESSAGE: + String fullName = fieldDescriptor.getMessageType().getFullName(); + switch (fullName) { + case "google.protobuf.Int32Value": + case "google.protobuf.UInt32Value": + case "google.protobuf.Int64Value": + case "google.protobuf.UInt64Value": + case "google.protobuf.FloatValue": + case "google.protobuf.DoubleValue": + case "google.protobuf.StringValue": + case "google.protobuf.BoolValue": + return new ProtoPassThroughConverter<>(fieldDescriptor); + case "google.protobuf.BytesValue": + return new ProtoByteStringConverter(fieldDescriptor); + case "google.protobuf.Timestamp": + return new ProtoTimestampConverter(fieldDescriptor); + case "google.protobuf.Duration": + return new ProtoDurationConverter(fieldDescriptor); + case "google.protobuf.Any": + throw new UnsupportedOperationException("google.protobuf.Any is not supported"); + default: + return new ProtoMessageConverter(fieldDescriptor); + } + default: + throw new UnsupportedOperationException( + "Unsupported proto type: " + fieldDescriptor.getJavaType()); + } + } + + interface FromProtoGetter<BeamT> { + @Nullable + BeamT getFromProto(Message message); + } + + interface ToProtoSetter<BeamT> { + void setToProto(Message.Builder message, Schema.FieldType fieldType, BeamT beamFieldValue); + } + + abstract static class BeamConverter<ProtoT, BeamT> { + abstract @Nullable BeamT convert(@Nullable ProtoT protoValue); + } + + abstract static class BeamNoWrapConverter<ProtoT, BeamT> extends BeamConverter<ProtoT, BeamT> { + @Override + @Nullable + BeamT convert(@Nullable ProtoT protoValue) { + if (protoValue == null) { + return null; + } + + return convertNonNull(protoValue); + } + + abstract @NonNull BeamT convertNonNull(@NonNull ProtoT protoValue); + } + + abstract static class BeamWrapConverter<ProtoT, ProtoUnwrappedT, BeamT> + extends BeamConverter<ProtoT, BeamT> { + + @SuppressWarnings("unchecked") + @Override + public @Nullable BeamT convert(@Nullable ProtoT protoValue) { + if (protoValue == null) { + return null; + } + + @NonNull ProtoUnwrappedT unwrappedProtoValue; + if (protoValue instanceof Message) { + // A google protobuf wrapper + Message protoWrapper = (Message) protoValue; + Descriptors.FieldDescriptor wrapperValueFieldDescriptor = + protoWrapper.getDescriptorForType().findFieldByNumber(1); + unwrappedProtoValue = + (@NonNull ProtoUnwrappedT) + Preconditions.checkNotNull(protoWrapper.getField(wrapperValueFieldDescriptor)); + } else { + unwrappedProtoValue = (@NonNull ProtoUnwrappedT) protoValue; + } + return convertNonNullUnwrapped(unwrappedProtoValue); + } + + abstract @NonNull BeamT convertNonNullUnwrapped(@NonNull ProtoUnwrappedT protoValue); + } + + abstract static class ProtoConverter<BeamT, ProtoT> { + private transient Descriptors.FieldDescriptor fieldDescriptor; + + ProtoConverter(Descriptors.FieldDescriptor fieldDescriptor) { + this.fieldDescriptor = fieldDescriptor; + } + + public abstract @Nullable ProtoT convert(@Nullable BeamT beamValue); Review Comment: Another place you don't want `@Nullable`. ########## sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoBeamConverter.java: ########## @@ -0,0 +1,818 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.UnknownInitialization; +import org.checkerframework.checker.nullness.qual.EnsuresNonNull; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Provides converts between Protobuf Message and Beam Row. + * + * <p>Read <a href="https://s.apache.org/beam-protobuf">https://s.apache.org/beam-protobuf</a> + */ +public class ProtoBeamConverter { + + /** Returns a conversion method from Beam Row to Protobuf Message. */ + public static SerializableFunction<@NonNull Row, @NonNull Message> toProto( + Descriptors.Descriptor descriptor) { + return new ToProto(descriptor); + } + + /** Returns a conversion method from Protobuf Message to Beam Row. */ + public static SerializableFunction<@NonNull Message, @NonNull Row> toRow(Schema schema) { + return new FromProto(schema); + } + + static BeamConverter<?, ?> createBeamConverter(Schema.FieldType fieldType) { + switch (fieldType.getTypeName()) { + case BYTE: + throw new UnsupportedOperationException(); + case INT16: + throw new UnsupportedOperationException(); + case INT32: + return new BeamPassThroughConverter<>(); + case INT64: + return new BeamPassThroughConverter<>(); + case DECIMAL: + throw new UnsupportedOperationException(); + case FLOAT: + return new BeamPassThroughConverter<>(); + case DOUBLE: + return new BeamPassThroughConverter<>(); + case STRING: + return new BeamPassThroughConverter<>(); + case DATETIME: + throw new UnsupportedOperationException(); + case BOOLEAN: + return new BeamPassThroughConverter<>(); + case BYTES: + return new BeamBytesConverter(); + case ARRAY: + case ITERABLE: + return new BeamListConverter<>(fieldType); + case MAP: + return new BeamMapConverter<>(fieldType); + case ROW: + return new BeamRowConverter(fieldType); + case LOGICAL_TYPE: + switch (Preconditions.checkNotNull(fieldType.getLogicalType()).getIdentifier()) { + case ProtoSchemaLogicalTypes.UInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.UInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed64.IDENTIFIER: + return new BeamPassThroughConverter<>(); + case NanosDuration.IDENTIFIER: + return new BeamNanosDurationConverter(); + case NanosInstant.IDENTIFIER: + return new BeamNanosInstantConverter(); + case EnumerationType.IDENTIFIER: + return new BeamEnumerationConverter(fieldType); + default: + throw new UnsupportedOperationException(); + } + default: + throw new UnsupportedOperationException(); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + if (fieldDescriptor.isRepeated()) { + if (fieldDescriptor.isMapField()) { + return new ProtoMapConverter<>(fieldDescriptor); + } else { + return new ProtoListConverter<>(fieldDescriptor); + } + } else { + return createToProtoSingularConverter(fieldDescriptor); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoSingularConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + switch (fieldDescriptor.getJavaType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case STRING: + return new ProtoPassThroughConverter<>(fieldDescriptor); + case BYTE_STRING: + return new ProtoByteStringConverter(fieldDescriptor); + case ENUM: + return new ProtoEnumConverter(fieldDescriptor); + case MESSAGE: + String fullName = fieldDescriptor.getMessageType().getFullName(); + switch (fullName) { + case "google.protobuf.Int32Value": + case "google.protobuf.UInt32Value": + case "google.protobuf.Int64Value": + case "google.protobuf.UInt64Value": + case "google.protobuf.FloatValue": + case "google.protobuf.DoubleValue": + case "google.protobuf.StringValue": + case "google.protobuf.BoolValue": + return new ProtoPassThroughConverter<>(fieldDescriptor); + case "google.protobuf.BytesValue": + return new ProtoByteStringConverter(fieldDescriptor); + case "google.protobuf.Timestamp": + return new ProtoTimestampConverter(fieldDescriptor); + case "google.protobuf.Duration": + return new ProtoDurationConverter(fieldDescriptor); + case "google.protobuf.Any": + throw new UnsupportedOperationException("google.protobuf.Any is not supported"); + default: + return new ProtoMessageConverter(fieldDescriptor); + } + default: + throw new UnsupportedOperationException( + "Unsupported proto type: " + fieldDescriptor.getJavaType()); + } + } + + interface FromProtoGetter<BeamT> { + @Nullable + BeamT getFromProto(Message message); + } + + interface ToProtoSetter<BeamT> { + void setToProto(Message.Builder message, Schema.FieldType fieldType, BeamT beamFieldValue); + } + + abstract static class BeamConverter<ProtoT, BeamT> { Review Comment: Use `interface` not `abstract class`, and then it can also be annotated `@FunctionalInterface` and can accept lambdas (the annotation is not required for this but it adds some clarity) Naming for clarity: `ProtoToBeamConverter` to show which direction the conversion is meant to go. But of course this interface does not actually constrain the direction. Because it is isomorphic to `Function<ProtoT, BeamT>`. it is fine to make a new interface to make the type more clear, but also not needed in this case. ########## sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoBeamConverter.java: ########## @@ -0,0 +1,818 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.UnknownInitialization; +import org.checkerframework.checker.nullness.qual.EnsuresNonNull; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Provides converts between Protobuf Message and Beam Row. + * + * <p>Read <a href="https://s.apache.org/beam-protobuf">https://s.apache.org/beam-protobuf</a> + */ +public class ProtoBeamConverter { + + /** Returns a conversion method from Beam Row to Protobuf Message. */ + public static SerializableFunction<@NonNull Row, @NonNull Message> toProto( + Descriptors.Descriptor descriptor) { + return new ToProto(descriptor); + } + + /** Returns a conversion method from Protobuf Message to Beam Row. */ + public static SerializableFunction<@NonNull Message, @NonNull Row> toRow(Schema schema) { + return new FromProto(schema); + } + + static BeamConverter<?, ?> createBeamConverter(Schema.FieldType fieldType) { + switch (fieldType.getTypeName()) { + case BYTE: + throw new UnsupportedOperationException(); + case INT16: + throw new UnsupportedOperationException(); + case INT32: + return new BeamPassThroughConverter<>(); + case INT64: + return new BeamPassThroughConverter<>(); + case DECIMAL: + throw new UnsupportedOperationException(); + case FLOAT: + return new BeamPassThroughConverter<>(); + case DOUBLE: + return new BeamPassThroughConverter<>(); + case STRING: + return new BeamPassThroughConverter<>(); + case DATETIME: + throw new UnsupportedOperationException(); + case BOOLEAN: + return new BeamPassThroughConverter<>(); + case BYTES: + return new BeamBytesConverter(); + case ARRAY: + case ITERABLE: + return new BeamListConverter<>(fieldType); + case MAP: + return new BeamMapConverter<>(fieldType); + case ROW: + return new BeamRowConverter(fieldType); + case LOGICAL_TYPE: + switch (Preconditions.checkNotNull(fieldType.getLogicalType()).getIdentifier()) { + case ProtoSchemaLogicalTypes.UInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.UInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed64.IDENTIFIER: + return new BeamPassThroughConverter<>(); + case NanosDuration.IDENTIFIER: + return new BeamNanosDurationConverter(); + case NanosInstant.IDENTIFIER: + return new BeamNanosInstantConverter(); + case EnumerationType.IDENTIFIER: + return new BeamEnumerationConverter(fieldType); + default: + throw new UnsupportedOperationException(); + } + default: + throw new UnsupportedOperationException(); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + if (fieldDescriptor.isRepeated()) { + if (fieldDescriptor.isMapField()) { + return new ProtoMapConverter<>(fieldDescriptor); + } else { + return new ProtoListConverter<>(fieldDescriptor); + } + } else { + return createToProtoSingularConverter(fieldDescriptor); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoSingularConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + switch (fieldDescriptor.getJavaType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case STRING: + return new ProtoPassThroughConverter<>(fieldDescriptor); + case BYTE_STRING: + return new ProtoByteStringConverter(fieldDescriptor); + case ENUM: + return new ProtoEnumConverter(fieldDescriptor); + case MESSAGE: + String fullName = fieldDescriptor.getMessageType().getFullName(); + switch (fullName) { + case "google.protobuf.Int32Value": + case "google.protobuf.UInt32Value": + case "google.protobuf.Int64Value": + case "google.protobuf.UInt64Value": + case "google.protobuf.FloatValue": + case "google.protobuf.DoubleValue": + case "google.protobuf.StringValue": + case "google.protobuf.BoolValue": + return new ProtoPassThroughConverter<>(fieldDescriptor); + case "google.protobuf.BytesValue": + return new ProtoByteStringConverter(fieldDescriptor); + case "google.protobuf.Timestamp": + return new ProtoTimestampConverter(fieldDescriptor); + case "google.protobuf.Duration": + return new ProtoDurationConverter(fieldDescriptor); + case "google.protobuf.Any": + throw new UnsupportedOperationException("google.protobuf.Any is not supported"); + default: + return new ProtoMessageConverter(fieldDescriptor); + } + default: + throw new UnsupportedOperationException( + "Unsupported proto type: " + fieldDescriptor.getJavaType()); + } + } + + interface FromProtoGetter<BeamT> { + @Nullable + BeamT getFromProto(Message message); + } + + interface ToProtoSetter<BeamT> { + void setToProto(Message.Builder message, Schema.FieldType fieldType, BeamT beamFieldValue); + } + + abstract static class BeamConverter<ProtoT, BeamT> { + abstract @Nullable BeamT convert(@Nullable ProtoT protoValue); + } + + abstract static class BeamNoWrapConverter<ProtoT, BeamT> extends BeamConverter<ProtoT, BeamT> { + @Override + @Nullable + BeamT convert(@Nullable ProtoT protoValue) { + if (protoValue == null) { + return null; + } + + return convertNonNull(protoValue); + } + + abstract @NonNull BeamT convertNonNull(@NonNull ProtoT protoValue); Review Comment: Here also, `@NonNull` won't have the effect you want. I believe the nullability will be "contained within" the `BeamT`. ########## sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoBeamConverter.java: ########## @@ -0,0 +1,818 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.UnknownInitialization; +import org.checkerframework.checker.nullness.qual.EnsuresNonNull; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Provides converts between Protobuf Message and Beam Row. + * + * <p>Read <a href="https://s.apache.org/beam-protobuf">https://s.apache.org/beam-protobuf</a> + */ +public class ProtoBeamConverter { + + /** Returns a conversion method from Beam Row to Protobuf Message. */ + public static SerializableFunction<@NonNull Row, @NonNull Message> toProto( + Descriptors.Descriptor descriptor) { + return new ToProto(descriptor); + } + + /** Returns a conversion method from Protobuf Message to Beam Row. */ + public static SerializableFunction<@NonNull Message, @NonNull Row> toRow(Schema schema) { + return new FromProto(schema); + } + + static BeamConverter<?, ?> createBeamConverter(Schema.FieldType fieldType) { + switch (fieldType.getTypeName()) { + case BYTE: + throw new UnsupportedOperationException(); + case INT16: + throw new UnsupportedOperationException(); + case INT32: + return new BeamPassThroughConverter<>(); + case INT64: + return new BeamPassThroughConverter<>(); + case DECIMAL: + throw new UnsupportedOperationException(); + case FLOAT: + return new BeamPassThroughConverter<>(); + case DOUBLE: + return new BeamPassThroughConverter<>(); + case STRING: + return new BeamPassThroughConverter<>(); + case DATETIME: + throw new UnsupportedOperationException(); + case BOOLEAN: + return new BeamPassThroughConverter<>(); + case BYTES: + return new BeamBytesConverter(); + case ARRAY: + case ITERABLE: + return new BeamListConverter<>(fieldType); + case MAP: + return new BeamMapConverter<>(fieldType); + case ROW: + return new BeamRowConverter(fieldType); + case LOGICAL_TYPE: + switch (Preconditions.checkNotNull(fieldType.getLogicalType()).getIdentifier()) { + case ProtoSchemaLogicalTypes.UInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt32.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed32.IDENTIFIER: + case ProtoSchemaLogicalTypes.UInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SInt64.IDENTIFIER: + case ProtoSchemaLogicalTypes.Fixed64.IDENTIFIER: + case ProtoSchemaLogicalTypes.SFixed64.IDENTIFIER: + return new BeamPassThroughConverter<>(); + case NanosDuration.IDENTIFIER: + return new BeamNanosDurationConverter(); + case NanosInstant.IDENTIFIER: + return new BeamNanosInstantConverter(); + case EnumerationType.IDENTIFIER: + return new BeamEnumerationConverter(fieldType); + default: + throw new UnsupportedOperationException(); + } + default: + throw new UnsupportedOperationException(); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + if (fieldDescriptor.isRepeated()) { + if (fieldDescriptor.isMapField()) { + return new ProtoMapConverter<>(fieldDescriptor); + } else { + return new ProtoListConverter<>(fieldDescriptor); + } + } else { + return createToProtoSingularConverter(fieldDescriptor); + } + } + + static @NonNull ProtoConverter<?, ?> createToProtoSingularConverter( + Descriptors.FieldDescriptor fieldDescriptor) { + switch (fieldDescriptor.getJavaType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case STRING: + return new ProtoPassThroughConverter<>(fieldDescriptor); + case BYTE_STRING: + return new ProtoByteStringConverter(fieldDescriptor); + case ENUM: + return new ProtoEnumConverter(fieldDescriptor); + case MESSAGE: + String fullName = fieldDescriptor.getMessageType().getFullName(); + switch (fullName) { + case "google.protobuf.Int32Value": + case "google.protobuf.UInt32Value": + case "google.protobuf.Int64Value": + case "google.protobuf.UInt64Value": + case "google.protobuf.FloatValue": + case "google.protobuf.DoubleValue": + case "google.protobuf.StringValue": + case "google.protobuf.BoolValue": + return new ProtoPassThroughConverter<>(fieldDescriptor); + case "google.protobuf.BytesValue": + return new ProtoByteStringConverter(fieldDescriptor); + case "google.protobuf.Timestamp": + return new ProtoTimestampConverter(fieldDescriptor); + case "google.protobuf.Duration": + return new ProtoDurationConverter(fieldDescriptor); + case "google.protobuf.Any": + throw new UnsupportedOperationException("google.protobuf.Any is not supported"); + default: + return new ProtoMessageConverter(fieldDescriptor); + } + default: + throw new UnsupportedOperationException( + "Unsupported proto type: " + fieldDescriptor.getJavaType()); + } + } + + interface FromProtoGetter<BeamT> { + @Nullable + BeamT getFromProto(Message message); + } + + interface ToProtoSetter<BeamT> { + void setToProto(Message.Builder message, Schema.FieldType fieldType, BeamT beamFieldValue); + } + + abstract static class BeamConverter<ProtoT, BeamT> { + abstract @Nullable BeamT convert(@Nullable ProtoT protoValue); + } + + abstract static class BeamNoWrapConverter<ProtoT, BeamT> extends BeamConverter<ProtoT, BeamT> { + @Override + @Nullable + BeamT convert(@Nullable ProtoT protoValue) { + if (protoValue == null) { + return null; + } + + return convertNonNull(protoValue); + } + + abstract @NonNull BeamT convertNonNull(@NonNull ProtoT protoValue); + } + + abstract static class BeamWrapConverter<ProtoT, ProtoUnwrappedT, BeamT> + extends BeamConverter<ProtoT, BeamT> { + + @SuppressWarnings("unchecked") + @Override + public @Nullable BeamT convert(@Nullable ProtoT protoValue) { + if (protoValue == null) { + return null; + } + + @NonNull ProtoUnwrappedT unwrappedProtoValue; + if (protoValue instanceof Message) { + // A google protobuf wrapper + Message protoWrapper = (Message) protoValue; + Descriptors.FieldDescriptor wrapperValueFieldDescriptor = + protoWrapper.getDescriptorForType().findFieldByNumber(1); + unwrappedProtoValue = + (@NonNull ProtoUnwrappedT) + Preconditions.checkNotNull(protoWrapper.getField(wrapperValueFieldDescriptor)); + } else { + unwrappedProtoValue = (@NonNull ProtoUnwrappedT) protoValue; + } + return convertNonNullUnwrapped(unwrappedProtoValue); + } + + abstract @NonNull BeamT convertNonNullUnwrapped(@NonNull ProtoUnwrappedT protoValue); + } + + abstract static class ProtoConverter<BeamT, ProtoT> { + private transient Descriptors.FieldDescriptor fieldDescriptor; Review Comment: Just noticing that this is marked `transient` which means it also must be `@Nullable`. You probably don't want that, but actually just want to add `transient` at some higher level where it was accidentally serialized? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org