Hello, In the upcoming process, we extract Avro messages from Kafka
utilizing the Confluent Schema Registry.

Our intention is to implement SQL queries on the streaming data.


As far as I understand, since I am using the Flink runner, when
creating  the features PCollection, I must specify the

row schema or a coder.


I am interested in utilizing the schema obtained from the recently
read message (refer to ConvertRow).

Is it possible to accomplish this when executing on a Flinkrunner?

I noticed that the Flink runner anticipates the row schema to be
predetermined during pipeline deployment.


Are there any potential solutions or workarounds for this situation?


public class BeamSqlTest {


    public static void main(String[] args) {

        Pipeline pipeline;
        PCollection<KafkaRecord<String, GenericRecord>>
readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka",
KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
options.getSourceKafkaTopic(), PIPELINE_NAME));
        PCollection<KV<String, GenericRecord>> avroMessages =
readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new
ConvertFromKafkaRecord<>()));

        PCollection<Row> features = avroMessages.apply(ParDo.of(new
ConvertToRow())).setRowSchema(XXX);
        final PCollection<Row> select_fields = features.apply("Select
Fields", Select.fieldNames("X","Y","Z"));

        final PCollection<Row> windowRes =
select_fields.apply("Windowing",
Window.into(FixedWindows.of(Duration.standardMinutes(1))));
        PCollection<Row> outputStream =
windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
        pipeline.run().waitUntilFinish();
    }

    @AllArgsConstructor
    public static class ConvertToRow extends DoFn<KV<String,
GenericRecord>, Row> {
        @ProcessElement
        @SuppressWarnings({"ConstantConditions", "unused"})
        public void processElement(ProcessContext c) {
            GenericRecord record = c.element().getValue();
            final org.apache.avro.Schema avroSchema = record.getSchema();
            Schema schema = AvroUtils.toBeamSchema(avroSchema);

            Object x = record.get("X");
            Object y = record.get("Y");
            Object z = record.get("Z");
            Row row = Row.withSchema(schema).addValues(x, y, z).build();
            c.output(row);
        }
    }
}


Thanks

Sigalit

Reply via email to