Hi Jark Thanks very much will this work with Avro On Tue, Nov 17, 2020 at 07:44 Jark Wu <imj...@gmail.com> wrote:
> Hi Slim, > > In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and > set the class name to 'sink.partitioner' option. > > In 1.12, you can re-partition the data by specifying the key field (Kafka > producer will partition data by the message key by default). You can do > this by adding some additional options in 1.12. > > CREATE TABLE output_kafkaTable ( > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior_partition_by_iid', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.fields' = 'item_id', -- specify which columns will be written to > message key > 'key.format' = 'raw', > 'value.format' = 'json' > ); > > > Best, > Jark > > > > On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi, >> >> I'm pulling in some Flink SQL experts (in CC) to help you with this one :) >> >> Cheers, >> Gordon >> >> On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <slim.bougue...@gmail.com> >> wrote: >> >>> Hi, >>> I am trying to author a SQL job that does repartitioning a Kafka SQL >>> table into another Kafka SQL table. >>> as example input/output tables have exactly the same SQL schema (see >>> below) and data the only difference is that the new kafka stream need to be >>> repartition using a simple project like item_id (input stream is >>> partitioned by user_id) >>> is there a way to do this via SQL only ? without using >>> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner >>> >>> In other words how can we express the stream key (keyedBy) via the SQL >>> layer ? >>> >>> For instance in Hive they expose a system column called __key or >>> __partition that can be used to do this via SQL layer (see >>> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions >>> ) >>> >>> CREATE TABLE input_kafkaTable ( >>> user_id BIGINT, >>> item_id BIGINT, >>> category_id BIGINT, >>> behavior STRING, >>> ts TIMESTAMP(3) >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'user_behavior_partition_by_uid', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> ) >>> >>> CREATE TABLE output_kafkaTable ( >>> user_id BIGINT, >>> item_id BIGINT, >>> category_id BIGINT, >>> behavior STRING, >>> ts TIMESTAMP(3) >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'user_behavior_partition_by_iid', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> ) >>> >>> >>> >>> -- >>> >>> B-Slim >>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ >>> >> -- B-Slim _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______