Hello, In the upcoming process, we extract Avro messages from Kafka
utilizing the Confluent Schema Registry.
Our intention is to implement SQL queries on the streaming data.
As far as I understand, since I am using the Flink runner, when
creating the features PCollection, I must specify the
row schema or a coder.
I am interested in utilizing the schema obtained from the recently
read message (refer to ConvertRow).
Is it possible to accomplish this when executing on a Flinkrunner?
I noticed that the Flink runner anticipates the row schema to be
predetermined during pipeline deployment.
Are there any potential solutions or workarounds for this situation?
public class BeamSqlTest {
public static void main(String[] args) {
Pipeline pipeline;
PCollection<KafkaRecord<String, GenericRecord>>
readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka",
KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
options.getSourceKafkaTopic(), PIPELINE_NAME));
PCollection<KV<String, GenericRecord>> avroMessages =
readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new
ConvertFromKafkaRecord<>()));
PCollection<Row> features = avroMessages.apply(ParDo.of(new
ConvertToRow())).setRowSchema(XXX);
final PCollection<Row> select_fields = features.apply("Select
Fields", Select.fieldNames("X","Y","Z"));
final PCollection<Row> windowRes =
select_fields.apply("Windowing",
Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<Row> outputStream =
windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
pipeline.run().waitUntilFinish();
}
@AllArgsConstructor
public static class ConvertToRow extends DoFn<KV<String,
GenericRecord>, Row> {
@ProcessElement
@SuppressWarnings({"ConstantConditions", "unused"})
public void processElement(ProcessContext c) {
GenericRecord record = c.element().getValue();
final org.apache.avro.Schema avroSchema = record.getSchema();
Schema schema = AvroUtils.toBeamSchema(avroSchema);
Object x = record.get("X");
Object y = record.get("Y");
Object z = record.get("Z");
Row row = Row.withSchema(schema).addValues(x, y, z).build();
c.output(row);
}
}
}
Thanks
Sigalit