[ 
https://issues.apache.org/jira/browse/BEAM-10885?focusedWorklogId=491924&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-491924
 ]

ASF GitHub Bot logged work on BEAM-10885:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Sep/20 11:20
            Start Date: 28/Sep/20 11:20
    Worklog Time Spent: 10m 
      Work Description: piotr-szuberski commented on a change in pull request 
#12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495865093



##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 
1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 
2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", 
"topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 
1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 
2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);
+
+  protected abstract PCollection<Row> createRecorderDecoder(TestPipeline 
pipeline);
+
+  protected abstract PCollection<Row> createRecorderEncoder(TestPipeline 
pipeline);

Review comment:
       I've got rid of createRecorderDecoder and Encoder but I had to make 
rowToBytesKV function to test the decoder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 491924)
    Time Spent: 2.5h  (was: 2h 20m)

> Add Avro support to Kafka Table Provider
> ----------------------------------------
>
>                 Key: BEAM-10885
>                 URL: https://issues.apache.org/jira/browse/BEAM-10885
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>    Affects Versions: 2.25.0
>            Reporter: Piotr Szuberski
>            Assignee: Piotr Szuberski
>            Priority: P2
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to