piotr-szuberski commented on a change in pull request #13572:
URL: https://github.com/apache/beam/pull/13572#discussion_r546382347



##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
##########
@@ -64,15 +66,34 @@ public void testBuildBeamSqlAvroTable() {
 
   @Test
   public void testBuildBeamSqlProtoTable() {
-    Table table = mockTable("hello", "proto", 
KafkaMessages.SimpleMessage.class.getName());
+    Table table =
+        mockTable("hello", "proto", 
KafkaMessages.SimpleMessage.class.getName(), null, null);
     BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
 
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaProtoTable);
 
-    BeamKafkaProtoTable csvTable = (BeamKafkaProtoTable) sqlTable;
-    assertEquals("localhost:9092", csvTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+    BeamKafkaProtoTable protoTable = (BeamKafkaProtoTable) sqlTable;

Review comment:
       Good catch! Thanks!

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaThriftTable.java
##########
@@ -74,55 +92,81 @@ private static Schema thriftSchema(
 
   @Override
   protected PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> 
getPTransformForInput() {
-    final @NonNull SchemaProvider schemaProvider = ThriftSchema.provider();
-    return new PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>() 
{
-      @Override
-      @SuppressWarnings("nullness")
-      public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
-        return input
-            .apply(Values.create())
-            
.apply(MapElements.into(typeDescriptor).via(BeamKafkaThriftTable.this::decode))
-            .setSchema(
-                schema,
-                typeDescriptor,
-                schemaProvider.toRowFunction(typeDescriptor),
-                schemaProvider.fromRowFunction(typeDescriptor))
-            .apply(Convert.toRows());
-      }
-    };
+    return new InputTransformer(typeDescriptor, coder, schema);
   }
 
-  private T decode(byte[] bytes) {
-    try {
-      return thriftCoder.decode(new ByteArrayInputStream(bytes));
-    } catch (IOException e) {
-      throw new IllegalStateException(e);
+  private static class InputTransformer<T extends TBase<?, ?>>

Review comment:
       What I meant was to create some SerializableFunction or SimpleFuntion 
outside of this package (e.g. in  io/thrift) that would do the 
encoding/decoding. Right now we'll have to implement encode/decode it in every 
table provider that supports thrift format. You wouldn't need InputTransformer 
and OutputTransformer then.
   
   What I have in mind it's the same as is done here:
   
https://github.com/apache/beam/blob/1466db958ca22222ae8badf1f21a860267d46d31/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java#L132
   
   And then here:
   
https://github.com/apache/beam/blob/1466db958ca22222ae8badf1f21a860267d46d31/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java#L83
   
   only for Thrift encoding/decoding
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to