ccciudatu commented on a change in pull request #13572: URL: https://github.com/apache/beam/pull/13572#discussion_r546413965
########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java ########## @@ -32,10 +34,10 @@ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class BeamKafkaProtoTable extends BeamKafkaTable { - private final Class<?> protoClass; + private final Class<? extends Message> protoClass; public BeamKafkaProtoTable( - Schema messageSchema, String bootstrapServers, List<String> topics, Class<?> protoClass) { + Schema messageSchema, String bootstrapServers, List<String> topics, Class<? extends Message> protoClass) { Review comment: This change breaks any clients that used to pass a mere `Class<?>` here! Is this acceptable? ########## File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java ########## @@ -129,68 +129,29 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) "unchecked" }) - public static <T> SimpleFunction<byte[], Row> getProtoBytesToRowFn(Class<T> clazz) { + public static <T extends Message> SimpleFunction<byte[], Row> getProtoBytesToRowFn( Review comment: This again breaks compatibility with `Class<?>`. ########## File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java ########## @@ -129,68 +129,29 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) "unchecked" }) - public static <T> SimpleFunction<byte[], Row> getProtoBytesToRowFn(Class<T> clazz) { + public static <T extends Message> SimpleFunction<byte[], Row> getProtoBytesToRowFn( + Class<T> clazz) { checkForMessageType(clazz); - return new ProtoBytesToRowFn(clazz); - } - - private static class ProtoBytesToRowFn<T extends Message> extends SimpleFunction<byte[], Row> { - private final ProtoCoder<T> protoCoder; - private final SerializableFunction<T, Row> toRowFunction; - - public ProtoBytesToRowFn(Class<T> clazz) { - this.protoCoder = ProtoCoder.of(clazz); - this.toRowFunction = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz)); - } - - @Override - public Row apply(byte[] bytes) { - try { - T message = protoCoder.getParser().parseFrom(bytes); - return toRowFunction.apply(message); - } catch (IOException e) { - throw new IllegalArgumentException("Could not decode row from proto payload.", e); - } - } + final ProtoCoder<T> protoCoder = ProtoCoder.of(clazz); + final SerializableFunction<T, Row> toRowFunction = + new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz)); + return new SqlRows.BytesToRowFn<>(protoCoder.getParser()::parseFrom, toRowFunction); } // Other modules are not allowed to use non-vendored Message class @SuppressWarnings({ "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) "unchecked" }) - public static <T> SimpleFunction<Row, byte[]> getRowToProtoBytesFn(Class<T> clazz) { + public static <T extends Message> SimpleFunction<Row, byte[]> getRowToProtoBytesFn( Review comment: `Class<?>` won't work anymore ########## File path: sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftCoder.java ########## @@ -72,15 +71,13 @@ protected ThriftCoder(Class<T> type, TProtocolFactory protocolFactory) { */ @Override public void encode(T value, OutputStream outStream) throws CoderException, IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos)); + TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(outStream)); Review comment: @chrlarsen Just trying to make sure I haven't missed anything: was there a reason why we're currently writing to a temporary in-memory stream and then copy that to the actual stream? i.e. do you see any potential pitfalls with the optimization that I just made? ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java ########## @@ -95,9 +97,9 @@ private static Schema inferAndVerifySchema(Class<?> protoClass, Schema messageSc /** A PTransform to convert {@link Row} to {@code KV<byte[], byte[]>}. */ private static class ProtoRecorderEncoder extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> { - private final Class<?> clazz; + private final Class<? extends Message> clazz; - public ProtoRecorderEncoder(Class<?> clazz) { + public ProtoRecorderEncoder(Class<? extends Message> clazz) { Review comment: Is this acceptable? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
