Hi Jark
Thanks very much will this work with Avro

On Tue, Nov 17, 2020 at 07:44 Jark Wu <imj...@gmail.com> wrote:

> Hi Slim,
>
> In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and
> set the class name to 'sink.partitioner' option.
>
> In 1.12, you can re-partition the data by specifying the key field (Kafka
> producer will partition data by the message key by default). You can do
> this by adding some additional options in 1.12.
>
> CREATE TABLE output_kafkaTable (
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior_partition_by_iid',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'key.fields' = 'item_id',  -- specify which columns will be written to
> message key
>  'key.format' = 'raw',
>  'value.format' = 'json'
> );
>
>
> Best,
> Jark
>
>
>
> On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi,
>>
>> I'm pulling in some Flink SQL experts (in CC) to help you with this one :)
>>
>> Cheers,
>> Gordon
>>
>> On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <slim.bougue...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am trying to author a SQL job that does repartitioning a Kafka SQL
>>> table into another Kafka SQL table.
>>> as example input/output tables have exactly the same SQL schema (see
>>> below) and data the only difference is that the new kafka stream need to be
>>> repartition using a simple project like item_id (input stream is
>>> partitioned by user_id)
>>> is there a way to do this via SQL only ? without using
>>> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
>>>
>>> In other words how can we express the stream key (keyedBy) via the SQL
>>> layer ?
>>>
>>> For instance in Hive they expose a system column called  __key or
>>> __partition that can be used to do this via SQL layer  (see
>>> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions
>>> )
>>>
>>> CREATE TABLE input_kafkaTable (
>>>  user_id BIGINT,
>>>  item_id BIGINT,
>>>  category_id BIGINT,
>>>  behavior STRING,
>>>  ts TIMESTAMP(3)
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'user_behavior_partition_by_uid',
>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>> )
>>>
>>> CREATE TABLE output_kafkaTable (
>>>  user_id BIGINT,
>>>  item_id BIGINT,
>>>  category_id BIGINT,
>>>  behavior STRING,
>>>  ts TIMESTAMP(3)
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'user_behavior_partition_by_iid',
>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>> )
>>>
>>>
>>>
>>> --
>>>
>>> B-Slim
>>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
>>>
>> --

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______

Reply via email to