Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no
deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:

tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> I have configured  source table successfully using the following
> configuration:
>
> - name: out_order
>     type: source
>     update-mode: append
>     schema:
>     - name: out_order_code
>       type: STRING
>     - name: input_date
>       type: BIGINT
>     - name: owner_code
>       type: STRING
>     connector:
>       property-version: 1
>       type: kafka
>       version: universal
>       topic: out_order
>       startup-mode: latest-offset
>       properties:
>       - key: zookeeper.connect
>         value: 172.19.78.32:2181
>       - key: bootstrap.servers
>         value: 172.19.78.32:9092
>       - key: group.id
>       property-version: 1
>       type: json
>       schema: "ROW(out_order_code STRING,owner_code STRING,input_date
> BIGINT)"
>
> How can i configure a sink table? I haven't found any useful docs for this.
>
> Thanks,
> Lei
>

Reply via email to