[ 
https://issues.apache.org/jira/browse/BEAM-10885?focusedWorklogId=491928&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-491928
 ]

ASF GitHub Bot logged work on BEAM-10885:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Sep/20 11:27
            Start Date: 28/Sep/20 11:27
    Worklog Time Spent: 10m 
      Work Description: piotr-szuberski commented on a change in pull request 
#12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495869443



##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -123,57 +144,37 @@ public void testAllLate() {
 
   @Test
   public void testEmptyPartitionsRate() {
-    KafkaCSVTestTable table = getTable(3);
+    KafkaTestTable table = getTable(3);
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
   @Test
   public void allTheRecordsSameTimeRate() {
-    KafkaCSVTestTable table = getTable(3);
-    for (int i = 0; i < 100; i++) {
-      table.addRecord(KafkaTestRecord.create("key" + i, i + ",1,2", "topic1", 
1000));
+    KafkaTestTable table = getTable(3);
+    for (long i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, ImmutableList.of(i, 1, 
2d), 1000L));
     }
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
-  private static class PrintDoFn extends DoFn<Row, Row> {
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      System.out.println("we are here");
-      System.out.println(c.element().getValues());
-    }
-  }
-
   @Test
-  public void testCsvRecorderDecoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of("1,\"1\",1.0", "2,2,2.0"))
-            .apply(ParDo.of(new String2KvBytes()))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), 
CSVFormat.DEFAULT));
-
+  public void testRecorderDecoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecorderEncoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of(ROW1, ROW2))
-            .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genSchema(), 
CSVFormat.DEFAULT))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), 
CSVFormat.DEFAULT));
-
+  public void testRecorderEncoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
-
     pipeline.run();
   }
 
-  private static Schema genSchema() {
+  protected static Schema genSchema() {

Review comment:
       I thought it's for some integration with SQL types in Beam Tables. I'll 
change it to normal Schema builder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 491928)
    Time Spent: 2h 50m  (was: 2h 40m)

> Add Avro support to Kafka Table Provider
> ----------------------------------------
>
>                 Key: BEAM-10885
>                 URL: https://issues.apache.org/jira/browse/BEAM-10885
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>    Affects Versions: 2.25.0
>            Reporter: Piotr Szuberski
>            Assignee: Piotr Szuberski
>            Priority: P2
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to