Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since
1.9. And the yaml way might be deprecated in the future.
By using DDL, a registered table can both be used as source and sink.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Thanks, works now.
>
> Seems it is because i added the
>    schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
> STRING, status INT)"
>
> under format label.
>
> *From:* Arvid Heise <ar...@ververica.com>
> *Date:* 2020-03-10 20:51
> *To:* wangl...@geekplus.com.cn
> *CC:* user <user@flink.apache.org>
> *Subject:* Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> yes Kafka as a sink is supported albeit only for appends (no
> deletions/updates yet) [1].
>
> An example is a bit hidden in the documentation [2]:
>
> tables:
>   - name: MyTableSink
>     type: sink-table
>     update-mode: append
>     connector:
>       property-version: 1
>       type: kafka
>       version: "0.11"
>       topic: OutputTopic
>       properties:
>         zookeeper.connect: localhost:2181
>         bootstrap.servers: localhost:9092
>         group.id: testGroup
>     format:
>       property-version: 1
>       type: json
>       derive-schema: true
>     schema:
>       - name: rideId
>         data-type: BIGINT
>       - name: lon
>         data-type: FLOAT
>       - name: lat
>         data-type: FLOAT
>       - name: rideTime
>         data-type: TIMESTAMP(3)
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
>
> On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> I have configured  source table successfully using the following
>> configuration:
>>
>> - name: out_order
>>     type: source
>>     update-mode: append
>>     schema:
>>     - name: out_order_code
>>       type: STRING
>>     - name: input_date
>>       type: BIGINT
>>     - name: owner_code
>>       type: STRING
>>     connector:
>>       property-version: 1
>>       type: kafka
>>       version: universal
>>       topic: out_order
>>       startup-mode: latest-offset
>>       properties:
>>       - key: zookeeper.connect
>>         value: 172.19.78.32:2181
>>       - key: bootstrap.servers
>>         value: 172.19.78.32:9092
>>       - key: group.id
>>       property-version: 1
>>       type: json
>>       schema: "ROW(out_order_code STRING,owner_code STRING,input_date
>> BIGINT)"
>>
>> How can i configure a sink table? I haven't found any useful docs for
>> this.
>>
>> Thanks,
>> Lei
>>
>

Reply via email to