Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
<wangl...@geekplus.com.cn> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei

Reply via email to