piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495865093
##########
File path:
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
/** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
@Rule public TestPipeline pipeline = TestPipeline.create();
- private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1,
1.0).build();
+ protected static final Schema BEAM_SQL_SCHEMA =
+ TestTableUtils.buildBeamSqlSchema(
+ Schema.FieldType.INT32,
+ "order_id",
+ Schema.FieldType.INT32,
+ "site_id",
+ Schema.FieldType.INT32,
+ "price");
- private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2,
2.0).build();
+ protected static final List<String> TOPICS = ImmutableList.of("topic1",
"topic2");
+
+ protected static final Schema SCHEMA = genSchema();
+
+ protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1,
1d).build();
+
+ protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2,
2d).build();
+
+ private static final Map<String, BeamSqlTable> tables = new HashMap<>();
- private static Map<String, BeamSqlTable> tables = new HashMap<>();
protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
+ protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+ String key, List<Object> values, long timestamp);
+
+ protected abstract KafkaTestTable getTable(int numberOfPartitions);
+
+ protected abstract PCollection<Row> createRecorderDecoder(TestPipeline
pipeline);
+
+ protected abstract PCollection<Row> createRecorderEncoder(TestPipeline
pipeline);
Review comment:
I've got rid of createRecorderDecoder and Encoder but I had to make
rowToBytesKV function to test the decoder.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]