ccciudatu commented on a change in pull request #13572:
URL: https://github.com/apache/beam/pull/13572#discussion_r546413965



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -32,10 +34,10 @@
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class BeamKafkaProtoTable extends BeamKafkaTable {
-  private final Class<?> protoClass;
+  private final Class<? extends Message> protoClass;
 
   public BeamKafkaProtoTable(
-      Schema messageSchema, String bootstrapServers, List<String> topics, 
Class<?> protoClass) {
+      Schema messageSchema, String bootstrapServers, List<String> topics, 
Class<? extends Message> protoClass) {

Review comment:
       This change breaks any clients that used to pass a mere `Class<?>` here!
   Is this acceptable?

##########
File path: 
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
##########
@@ -129,68 +129,29 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> 
targetClass, Schema sche
     "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
     "unchecked"
   })
-  public static <T> SimpleFunction<byte[], Row> getProtoBytesToRowFn(Class<T> 
clazz) {
+  public static <T extends Message> SimpleFunction<byte[], Row> 
getProtoBytesToRowFn(

Review comment:
       This again breaks compatibility with `Class<?>`.

##########
File path: 
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
##########
@@ -129,68 +129,29 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> 
targetClass, Schema sche
     "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
     "unchecked"
   })
-  public static <T> SimpleFunction<byte[], Row> getProtoBytesToRowFn(Class<T> 
clazz) {
+  public static <T extends Message> SimpleFunction<byte[], Row> 
getProtoBytesToRowFn(
+      Class<T> clazz) {
     checkForMessageType(clazz);
-    return new ProtoBytesToRowFn(clazz);
-  }
-
-  private static class ProtoBytesToRowFn<T extends Message> extends 
SimpleFunction<byte[], Row> {
-    private final ProtoCoder<T> protoCoder;
-    private final SerializableFunction<T, Row> toRowFunction;
-
-    public ProtoBytesToRowFn(Class<T> clazz) {
-      this.protoCoder = ProtoCoder.of(clazz);
-      this.toRowFunction = new 
ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
-    }
-
-    @Override
-    public Row apply(byte[] bytes) {
-      try {
-        T message = protoCoder.getParser().parseFrom(bytes);
-        return toRowFunction.apply(message);
-      } catch (IOException e) {
-        throw new IllegalArgumentException("Could not decode row from proto 
payload.", e);
-      }
-    }
+    final ProtoCoder<T> protoCoder = ProtoCoder.of(clazz);
+    final SerializableFunction<T, Row> toRowFunction =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
+    return new SqlRows.BytesToRowFn<>(protoCoder.getParser()::parseFrom, 
toRowFunction);
   }
 
   // Other modules are not allowed to use non-vendored Message class
   @SuppressWarnings({
     "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
     "unchecked"
   })
-  public static <T> SimpleFunction<Row, byte[]> getRowToProtoBytesFn(Class<T> 
clazz) {
+  public static <T extends Message> SimpleFunction<Row, byte[]> 
getRowToProtoBytesFn(

Review comment:
       `Class<?>` won't work anymore

##########
File path: 
sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftCoder.java
##########
@@ -72,15 +71,13 @@ protected ThriftCoder(Class<T> type, TProtocolFactory 
protocolFactory) {
    */
   @Override
   public void encode(T value, OutputStream outStream) throws CoderException, 
IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    TProtocol protocol = protocolFactory.getProtocol(new 
TIOStreamTransport(baos));
+    TProtocol protocol = protocolFactory.getProtocol(new 
TIOStreamTransport(outStream));

Review comment:
       @chrlarsen Just trying to make sure I haven't missed anything: was there 
a reason why we're currently writing to a temporary in-memory stream and then 
copy that to the actual stream? i.e. do you see any potential pitfalls with the 
optimization that I just made?

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -95,9 +97,9 @@ private static Schema inferAndVerifySchema(Class<?> 
protoClass, Schema messageSc
   /** A PTransform to convert {@link Row} to {@code KV<byte[], byte[]>}. */
   private static class ProtoRecorderEncoder
       extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> {
-    private final Class<?> clazz;
+    private final Class<? extends Message> clazz;
 
-    public ProtoRecorderEncoder(Class<?> clazz) {
+    public ProtoRecorderEncoder(Class<? extends Message> clazz) {

Review comment:
       Is this acceptable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to