piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495869443



##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -123,57 +144,37 @@ public void testAllLate() {
 
   @Test
   public void testEmptyPartitionsRate() {
-    KafkaCSVTestTable table = getTable(3);
+    KafkaTestTable table = getTable(3);
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
   @Test
   public void allTheRecordsSameTimeRate() {
-    KafkaCSVTestTable table = getTable(3);
-    for (int i = 0; i < 100; i++) {
-      table.addRecord(KafkaTestRecord.create("key" + i, i + ",1,2", "topic1", 
1000));
+    KafkaTestTable table = getTable(3);
+    for (long i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, ImmutableList.of(i, 1, 
2d), 1000L));
     }
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
-  private static class PrintDoFn extends DoFn<Row, Row> {
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      System.out.println("we are here");
-      System.out.println(c.element().getValues());
-    }
-  }
-
   @Test
-  public void testCsvRecorderDecoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of("1,\"1\",1.0", "2,2,2.0"))
-            .apply(ParDo.of(new String2KvBytes()))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), 
CSVFormat.DEFAULT));
-
+  public void testRecorderDecoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecorderEncoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of(ROW1, ROW2))
-            .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genSchema(), 
CSVFormat.DEFAULT))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), 
CSVFormat.DEFAULT));
-
+  public void testRecorderEncoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
-
     pipeline.run();
   }
 
-  private static Schema genSchema() {
+  protected static Schema genSchema() {

Review comment:
       I thought it's for some integration with SQL types in Beam Tables. I'll 
change it to normal Schema builder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to