Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
If you want to define the sql query via configuration, does that mean you
know the query when you launch the pipeline (as configuration is set at
launch time)? If so, you can also dynamically set the schema at pipeline
launch time.

On Sun, Jan 28, 2024 at 11:21 AM Sigalit Eliazov 
wrote:

> Im trying to define a generic pipeline that reads message from kafka where
> the topic is configurable.
> We read a generic record and schema id is part of thr consumed message .
> So this part can be generalized
>
> Then we would like to activate different queries on the stream.
> I would like to be able to define the sql query via configuration.
> In addition in our use case the kafka message schema and the row schema
> are pretty much the same. So i wonder if i could reuse it.
>
> Thanks
> Sigalit
>
> בתאריך יום א׳, 28 בינו׳ 2024, 20:23, מאת Reuven Lax via user ‏<
> user@beam.apache.org>:
>
>> Can you explain the use case a bit more? In order to write a SQL
>> statement (at least one that doesn't use wildcard selection) you also need
>> to know the schema ahead of time. What are you trying to accomplish with
>> these dynamic schemas?
>>
>> Reuven
>>
>> On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov 
>> wrote:
>>
>>> Hello, In the upcoming process, we extract Avro messages from Kafka 
>>> utilizing the Confluent Schema Registry.
>>>
>>> Our intention is to implement SQL queries on the streaming data.
>>>
>>>
>>> As far as I understand, since I am using the Flink runner, when creating  
>>> the features PCollection, I must specify the
>>>
>>> row schema or a coder.
>>>
>>>
>>> I am interested in utilizing the schema obtained from the recently read 
>>> message (refer to ConvertRow).
>>>
>>> Is it possible to accomplish this when executing on a Flinkrunner?
>>>
>>> I noticed that the Flink runner anticipates the row schema to be 
>>> predetermined during pipeline deployment.
>>>
>>>
>>> Are there any potential solutions or workarounds for this situation?
>>>
>>>
>>> public class BeamSqlTest {
>>>
>>>
>>> public static void main(String[] args) {
>>>
>>> Pipeline pipeline;
>>> PCollection> 
>>> readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka", 
>>> KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
>>>  options.getSourceKafkaTopic(), PIPELINE_NAME));
>>> PCollection> avroMessages = 
>>> readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new 
>>> ConvertFromKafkaRecord<>()));
>>>
>>> PCollection features = avroMessages.apply(ParDo.of(new 
>>> ConvertToRow())).setRowSchema(XXX);
>>> final PCollection select_fields = features.apply("Select 
>>> Fields", Select.fieldNames("X","Y","Z"));
>>>
>>> final PCollection windowRes = select_fields.apply("Windowing", 
>>> Window.into(FixedWindows.of(Duration.standardMinutes(1;
>>> PCollection outputStream = 
>>> windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
>>> pipeline.run().waitUntilFinish();
>>> }
>>>
>>> @AllArgsConstructor
>>> public static class ConvertToRow extends DoFn>> GenericRecord>, Row> {
>>> @ProcessElement
>>> @SuppressWarnings({"ConstantConditions", "unused"})
>>> public void processElement(ProcessContext c) {
>>> GenericRecord record = c.element().getValue();
>>> final org.apache.avro.Schema avroSchema = record.getSchema();
>>> Schema schema = AvroUtils.toBeamSchema(avroSchema);
>>>
>>> Object x = record.get("X");
>>> Object y = record.get("Y");
>>> Object z = record.get("Z");
>>> Row row = Row.withSchema(schema).addValues(x, y, z).build();
>>> c.output(row);
>>> }
>>> }
>>> }
>>>
>>>
>>> Thanks
>>>
>>> Sigalit
>>>
>>>


Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Sigalit Eliazov
Im trying to define a generic pipeline that reads message from kafka where
the topic is configurable.
We read a generic record and schema id is part of thr consumed message . So
this part can be generalized

Then we would like to activate different queries on the stream.
I would like to be able to define the sql query via configuration.
In addition in our use case the kafka message schema and the row schema are
pretty much the same. So i wonder if i could reuse it.

Thanks
Sigalit

בתאריך יום א׳, 28 בינו׳ 2024, 20:23, מאת Reuven Lax via user ‏<
user@beam.apache.org>:

> Can you explain the use case a bit more? In order to write a SQL statement
> (at least one that doesn't use wildcard selection) you also need to know
> the schema ahead of time. What are you trying to accomplish with these
> dynamic schemas?
>
> Reuven
>
> On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov 
> wrote:
>
>> Hello, In the upcoming process, we extract Avro messages from Kafka 
>> utilizing the Confluent Schema Registry.
>>
>> Our intention is to implement SQL queries on the streaming data.
>>
>>
>> As far as I understand, since I am using the Flink runner, when creating  
>> the features PCollection, I must specify the
>>
>> row schema or a coder.
>>
>>
>> I am interested in utilizing the schema obtained from the recently read 
>> message (refer to ConvertRow).
>>
>> Is it possible to accomplish this when executing on a Flinkrunner?
>>
>> I noticed that the Flink runner anticipates the row schema to be 
>> predetermined during pipeline deployment.
>>
>>
>> Are there any potential solutions or workarounds for this situation?
>>
>>
>> public class BeamSqlTest {
>>
>>
>> public static void main(String[] args) {
>>
>> Pipeline pipeline;
>> PCollection> 
>> readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka", 
>> KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
>>  options.getSourceKafkaTopic(), PIPELINE_NAME));
>> PCollection> avroMessages = 
>> readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new 
>> ConvertFromKafkaRecord<>()));
>>
>> PCollection features = avroMessages.apply(ParDo.of(new 
>> ConvertToRow())).setRowSchema(XXX);
>> final PCollection select_fields = features.apply("Select 
>> Fields", Select.fieldNames("X","Y","Z"));
>>
>> final PCollection windowRes = select_fields.apply("Windowing", 
>> Window.into(FixedWindows.of(Duration.standardMinutes(1;
>> PCollection outputStream = 
>> windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
>> pipeline.run().waitUntilFinish();
>> }
>>
>> @AllArgsConstructor
>> public static class ConvertToRow extends DoFn, 
>> Row> {
>> @ProcessElement
>> @SuppressWarnings({"ConstantConditions", "unused"})
>> public void processElement(ProcessContext c) {
>> GenericRecord record = c.element().getValue();
>> final org.apache.avro.Schema avroSchema = record.getSchema();
>> Schema schema = AvroUtils.toBeamSchema(avroSchema);
>>
>> Object x = record.get("X");
>> Object y = record.get("Y");
>> Object z = record.get("Z");
>> Row row = Row.withSchema(schema).addValues(x, y, z).build();
>> c.output(row);
>> }
>> }
>> }
>>
>>
>> Thanks
>>
>> Sigalit
>>
>>


Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
Can you explain the use case a bit more? In order to write a SQL statement
(at least one that doesn't use wildcard selection) you also need to know
the schema ahead of time. What are you trying to accomplish with these
dynamic schemas?

Reuven

On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov  wrote:

> Hello, In the upcoming process, we extract Avro messages from Kafka utilizing 
> the Confluent Schema Registry.
>
> Our intention is to implement SQL queries on the streaming data.
>
>
> As far as I understand, since I am using the Flink runner, when creating  the 
> features PCollection, I must specify the
>
> row schema or a coder.
>
>
> I am interested in utilizing the schema obtained from the recently read 
> message (refer to ConvertRow).
>
> Is it possible to accomplish this when executing on a Flinkrunner?
>
> I noticed that the Flink runner anticipates the row schema to be 
> predetermined during pipeline deployment.
>
>
> Are there any potential solutions or workarounds for this situation?
>
>
> public class BeamSqlTest {
>
>
> public static void main(String[] args) {
>
> Pipeline pipeline;
> PCollection> 
> readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka", 
> KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
>  options.getSourceKafkaTopic(), PIPELINE_NAME));
> PCollection> avroMessages = 
> readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new 
> ConvertFromKafkaRecord<>()));
>
> PCollection features = avroMessages.apply(ParDo.of(new 
> ConvertToRow())).setRowSchema(XXX);
> final PCollection select_fields = features.apply("Select 
> Fields", Select.fieldNames("X","Y","Z"));
>
> final PCollection windowRes = select_fields.apply("Windowing", 
> Window.into(FixedWindows.of(Duration.standardMinutes(1;
> PCollection outputStream = 
> windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
> pipeline.run().waitUntilFinish();
> }
>
> @AllArgsConstructor
> public static class ConvertToRow extends DoFn, 
> Row> {
> @ProcessElement
> @SuppressWarnings({"ConstantConditions", "unused"})
> public void processElement(ProcessContext c) {
> GenericRecord record = c.element().getValue();
> final org.apache.avro.Schema avroSchema = record.getSchema();
> Schema schema = AvroUtils.toBeamSchema(avroSchema);
>
> Object x = record.get("X");
> Object y = record.get("Y");
> Object z = record.get("Z");
> Row row = Row.withSchema(schema).addValues(x, y, z).build();
> c.output(row);
> }
> }
> }
>
>
> Thanks
>
> Sigalit
>
>


usage of dynamic schema in BEAM SQL

2024-01-28 Thread Sigalit Eliazov
Hello, In the upcoming process, we extract Avro messages from Kafka
utilizing the Confluent Schema Registry.

Our intention is to implement SQL queries on the streaming data.


As far as I understand, since I am using the Flink runner, when
creating  the features PCollection, I must specify the

row schema or a coder.


I am interested in utilizing the schema obtained from the recently
read message (refer to ConvertRow).

Is it possible to accomplish this when executing on a Flinkrunner?

I noticed that the Flink runner anticipates the row schema to be
predetermined during pipeline deployment.


Are there any potential solutions or workarounds for this situation?


public class BeamSqlTest {


public static void main(String[] args) {

Pipeline pipeline;
PCollection>
readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka",
KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
options.getSourceKafkaTopic(), PIPELINE_NAME));
PCollection> avroMessages =
readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new
ConvertFromKafkaRecord<>()));

PCollection features = avroMessages.apply(ParDo.of(new
ConvertToRow())).setRowSchema(XXX);
final PCollection select_fields = features.apply("Select
Fields", Select.fieldNames("X","Y","Z"));

final PCollection windowRes =
select_fields.apply("Windowing",
Window.into(FixedWindows.of(Duration.standardMinutes(1;
PCollection outputStream =
windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
pipeline.run().waitUntilFinish();
}

@AllArgsConstructor
public static class ConvertToRow extends DoFn, Row> {
@ProcessElement
@SuppressWarnings({"ConstantConditions", "unused"})
public void processElement(ProcessContext c) {
GenericRecord record = c.element().getValue();
final org.apache.avro.Schema avroSchema = record.getSchema();
Schema schema = AvroUtils.toBeamSchema(avroSchema);

Object x = record.get("X");
Object y = record.get("Y");
Object z = record.get("Z");
Row row = Row.withSchema(schema).addValues(x, y, z).build();
c.output(row);
}
}
}


Thanks

Sigalit