SQL Client Kafka (UPSERT?) Sink for confluent-avro

2022-03-24 Thread Georg Heiler
Hi,

how can I get Flinks SQL client to nicely sink some data to either the
regular kafka or the kafka-upsert connector?

I have a table/ topic with dummy data:
CREATE TABLE metrics_brand_stream (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
brand string,
duration int,
rating int

) WITH (
'connector' = 'kafka',
'topic' = 'commercials_avro',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
'properties.group.id' = 'flink-test-001',
'properties.bootstrap.servers' = 'localhost:9092'
);

And the following aggregation:

SELECT brand,
 COUNT(*) AS cnt,
 AVG(duration) AS  duration_mean,
 AVG(rating) AS rating_mean
  FROM metrics_brand_stream
  GROUP BY brand;

When trying to define an output table:

CREATE TABLE metrics_per_brand (
brand string,
cnt BIGINT,
duration_mean DOUBLE,
rating_mean DOUBLE

) WITH (
'connector' = 'upsert-kafka',
'topic' = 'metrics_per_brand',
'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
'properties.group.id' = 'flink-test-001',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'avro-confluent',
'value.format' = 'avro-confluent'
);

And trying to INSERT some result data:

INSERT INTO metrics_per_brand
  SELECT brand,
 COUNT(*) AS cnt,
 AVG(duration) AS  duration_mean,
 AVG(rating) AS rating_mean
  FROM metrics_brand_stream
  GROUP BY brand;

The query fails with:

org.apache.flink.table.api.ValidationException: One or more required
options are missing.

Missing required options are:

url

But neither:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
nor
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
nor
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
seems to list the right configuration (or I am misreading the
documentation).


How can I sink data to kafka after some arbitrary computation using the
flink-sql client using either the kafka or upsert-kafka connector where the
input is AVRO with a schema from the confluent schema registry and the
output should store its schema there as well (and serialize using AVRO).


Best,
Georg


Re: SQL Client Kafka (UPSERT?) Sink for confluent-avro

2022-03-28 Thread Ingo Bürk

Hi Georg,

which Flink version are you using? The missing property is for the 
avro-confluent format, and if I recall correctly, how these are passed 
has changed in recent versions, so it'd be good to double check you are 
using the documentation for the version you are running on.



Best
Ingo

On 24.03.22 11:57, Georg Heiler wrote:

Hi,

how can I get Flinks SQL client to nicely sink some data to either the 
regular kafka or the kafka-upsert connector?


I have a table/ topic with dummy data:
CREATE TABLE metrics_brand_stream (
     `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
     WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
   `partition` BIGINT METADATA VIRTUAL,
   `offset` BIGINT METADATA VIRTUAL,
     brand string,
     duration int,
     rating int

) WITH (
     'connector' = 'kafka',
     'topic' = 'commercials_avro',
     'scan.startup.mode' = 'earliest-offset',
     'format' = 'avro-confluent',
     'avro-confluent.schema-registry.url' = 'http://localhost:8081/ 
',

     'properties.group.id ' = 'flink-test-001',
     'properties.bootstrap.servers' = 'localhost:9092'
);

And the following aggregation:

SELECT brand,
          COUNT(*) AS cnt,
          AVG(duration) AS  duration_mean,
          AVG(rating) AS rating_mean
   FROM metrics_brand_stream
   GROUP BY brand;

When trying to define an output table:

CREATE TABLE metrics_per_brand (
     brand string,
     cnt BIGINT,
     duration_mean DOUBLE,
     rating_mean DOUBLE

) WITH (
     'connector' = 'upsert-kafka',
     'topic' = 'metrics_per_brand',
     'avro-confluent.schema-registry.url' = 'http://localhost:8081/ 
',

     'properties.group.id ' = 'flink-test-001',
     'properties.bootstrap.servers' = 'localhost:9092',
     'key.format' = 'avro-confluent',
     'value.format' = 'avro-confluent'
);

And trying to INSERT some result data:

INSERT INTO metrics_per_brand
   SELECT brand,
          COUNT(*) AS cnt,
          AVG(duration) AS  duration_mean,
          AVG(rating) AS rating_mean
   FROM metrics_brand_stream
   GROUP BY brand;

The query fails with:

org.apache.flink.table.api.ValidationException: One or more required 
options are missing.


Missing required options are:

url

But neither: 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/ 
 
nor 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/ 
 
nor 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ 
 
seems to list the right configuration (or I am misreading the 
documentation).



How can I sink data to kafka after some arbitrary computation using the 
flink-sql client using either the kafka or upsert-kafka connector where 
the input is AVRO with a schema from the confluent schema registry and 
the output should store its schema there as well (and serialize using AVRO).



Best,
Georg


Re: SQL Client Kafka (UPSERT?) Sink for confluent-avro

2022-03-29 Thread Georg Heiler
I got it working now: It needs to be specified both for the key and value



thanks

Am Mo., 28. März 2022 um 13:33 Uhr schrieb Ingo Bürk :

> Hi Georg,
>
> which Flink version are you using? The missing property is for the
> avro-confluent format, and if I recall correctly, how these are passed
> has changed in recent versions, so it'd be good to double check you are
> using the documentation for the version you are running on.
>
>
> Best
> Ingo
>
> On 24.03.22 11:57, Georg Heiler wrote:
> > Hi,
> >
> > how can I get Flinks SQL client to nicely sink some data to either the
> > regular kafka or the kafka-upsert connector?
> >
> > I have a table/ topic with dummy data:
> > CREATE TABLE metrics_brand_stream (
> >  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> >  WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
> >`partition` BIGINT METADATA VIRTUAL,
> >`offset` BIGINT METADATA VIRTUAL,
> >  brand string,
> >  duration int,
> >  rating int
> >
> > ) WITH (
> >  'connector' = 'kafka',
> >  'topic' = 'commercials_avro',
> >  'scan.startup.mode' = 'earliest-offset',
> >  'format' = 'avro-confluent',
> >  'avro-confluent.schema-registry.url' = 'http://localhost:8081/
> > ',
> >  'properties.group.id ' =
> 'flink-test-001',
> >  'properties.bootstrap.servers' = 'localhost:9092'
> > );
> >
> > And the following aggregation:
> >
> > SELECT brand,
> >   COUNT(*) AS cnt,
> >   AVG(duration) AS  duration_mean,
> >   AVG(rating) AS rating_mean
> >FROM metrics_brand_stream
> >GROUP BY brand;
> >
> > When trying to define an output table:
> >
> > CREATE TABLE metrics_per_brand (
> >  brand string,
> >  cnt BIGINT,
> >  duration_mean DOUBLE,
> >  rating_mean DOUBLE
> >
> > ) WITH (
> >  'connector' = 'upsert-kafka',
> >  'topic' = 'metrics_per_brand',
> >  'avro-confluent.schema-registry.url' = 'http://localhost:8081/
> > ',
> >  'properties.group.id ' =
> 'flink-test-001',
> >  'properties.bootstrap.servers' = 'localhost:9092',
> >  'key.format' = 'avro-confluent',
> >  'value.format' = 'avro-confluent'
> > );
> >
> > And trying to INSERT some result data:
> >
> > INSERT INTO metrics_per_brand
> >SELECT brand,
> >   COUNT(*) AS cnt,
> >   AVG(duration) AS  duration_mean,
> >   AVG(rating) AS rating_mean
> >FROM metrics_brand_stream
> >GROUP BY brand;
> >
> > The query fails with:
> >
> > org.apache.flink.table.api.ValidationException: One or more required
> > options are missing.
> >
> > Missing required options are:
> >
> > url
> >
> > But neither:
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/>
>
> > nor
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/>
>
> > nor
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/>
>
> > seems to list the right configuration (or I am misreading the
> > documentation).
> >
> >
> > How can I sink data to kafka after some arbitrary computation using the
> > flink-sql client using either the kafka or upsert-kafka connector where
> > the input is AVRO with a schema from the confluent schema registry and
> > the output should store its schema there as well (and serialize using
> AVRO).
> >
> >
> > Best,
> > Georg
>