I am using flink-1.10.  But I add flink-json-1.9.1.jar and 
flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory.
After change to flink-json-1.10.0.jar, 
flink-sql-connector-kafka_2.12-1.10.0.jar, it works.

But I have no idea why the yaml way works when i use  flink-json-1.9.1.jar and 
flink-sql-connector-kafka_2.11-1.9.1.jar in  flink-1.10 environment.

Thanks,
Lei



wangl...@geekplus.com.cn

 
From: wangl...@geekplus.com.cn
Date: 2020-03-11 14:51
To: Jark Wu
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and 
it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i 
can select the result correctly

Thanks,
Lei
 



 
From: Jark Wu
Date: 2020-03-11 11:13
To: wangl...@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. 
And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn 
<wangl...@geekplus.com.cn> wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
<wangl...@geekplus.com.cn> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei

Reply via email to