flinksql insert????

2021-11-08 文章 ??????
sql :
String sql1="CREATE TABLE detal (\n" +
"   id INT,\n" +
"  produceId VARCHAR,\n"+
"  color VARCHAR,\n"+
"  size VARCHAR,\n"+
"  PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
"  'username' = 'ark_admin', \n" +
"  'password' = 'ark_Admin@0927Da', \n" +
"  'table-name' = 'detal'\n" +
")\n";
tenv.executeSql(sql1).print();

String sql2 = "SELECT * FROM detal";
tenv.executeSql(sql2).print();

String sql3="CREATE TABLE shangping (\n" +
"   id INT,\n" +
"  orderId INT,\n"+
"  produceId VARCHAR,\n"+
"  PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
"  'username' = 'ark_admin', \n" +
"  'password' = 'ark_Admin@0927Da', \n" +
"  'table-name' = 'shangping'\n" +
")\n";
tenv.executeSql(sql3).print();

String sql4 = "SELECT * FROM shangping";
tenv.executeSql(sql4).print();

String sql5="CREATE TABLE new_table (\n" +
"   id INT,\n" +
"  orderId INT,\n"+
"  produceId VARCHAR,\n"+
"  color VARCHAR,\n"+
"  size VARCHAR\n"+
   // "  PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
"  'username' = 'ark_admin', \n" +
"  'password' = 'ark_Admin@0927Da', \n" +
"  'table-name' = 'new_table'\n" +
")\n";
tenv.executeSql(sql5).print();
String sql6 = "SELECT * FROM new_table";

tenv.executeSql(sql6).print();
String insertSql = "insert into new_table " +
"select * " +
"from detal";


:??sink










:

java.lang.IllegalArgumentException: open() failed.Table 'test_1.new_table' 
doesn't exist
    at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:215)
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 
'test_1.new_table' doesn't exist
    at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:406)
    at com.mysql.jdbc.Util.getInstance(Util.java:381)
    at 
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1030)
    at 
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:956)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3558)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3490)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1959)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2109)
    at 
com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2643)
    at 
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2077)
    at 
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2228)
    at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:212)
    ... 4 common frames omitted


??


 

Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 文章 chenxuying
谢谢, 明白了








在 2020-08-03 10:42:53,"Leonard Xu"  写道:
>如果 DB 支持 upsert 语法,执行的是update, 如果不支持 upsert语法, 则是 delete + insert,MySQL 和 PG 
>都支持 upsert, 底层对应的sql语句是
>
>Database   Upsert Grammar
>MySQL  INSERT .. ON DUPLICATE KEY UPDATE ..
>PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..
>
>MySQL connector 不支持 replace into, 用的是 on duplicate key update.
>
>祝好
>Leonard 
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes
> 
>
>
>
>> 在 2020年8月3日,10:33,chenxuying  写道:
>> 
>> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
>> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
>> 然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update 
>> , 我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>>> Hi,
>>> 
>>> 这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>>> 支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>>> connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  
>>> INSERT OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>>> 就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>>> OVERWRITE到DB的场景吗?
>>> 
>>> Best
>>> Leonard
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>>  
>>> 
>>> 
 在 2020年8月1日,19:20,chenxuying  写道:
 
 Hello
 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
 overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
 Exception in thread "main" org.apache.flink.table.api.ValidationException: 
 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
 SupportsOverwrite interface.
 是得自定义connector吗,实现DynamicTableSink?
 
 
 祝好
 chenxuying
 [1] 
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>>> 
>


Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 文章 Leonard Xu
如果 DB 支持 upsert 语法,执行的是update, 如果不支持 upsert语法, 则是 delete + insert,MySQL 和 PG 
都支持 upsert, 底层对应的sql语句是

DatabaseUpsert Grammar
MySQL   INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL  INSERT .. ON CONFLICT .. DO UPDATE SET ..

MySQL connector 不支持 replace into, 用的是 on duplicate key update.

祝好
Leonard 
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes
 



> 在 2020年8月3日,10:33,chenxuying  写道:
> 
> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
> 然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update , 
> 我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>> Hi,
>> 
>> 这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>> 支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>> connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  INSERT 
>> OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>> 就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>> OVERWRITE到DB的场景吗?
>> 
>> Best
>> Leonard
>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>  
>> 
>> 
>>> 在 2020年8月1日,19:20,chenxuying  写道:
>>> 
>>> Hello
>>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
>>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
>>> SupportsOverwrite interface.
>>> 是得自定义connector吗,实现DynamicTableSink?
>>> 
>>> 
>>> 祝好
>>> chenxuying
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>> 



Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 文章 chenxuying
你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update , 
我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持

















在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>Hi,
>
>这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  INSERT 
>OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>OVERWRITE到DB的场景吗?
>
>Best
>Leonard
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> 
>
>
>> 在 2020年8月1日,19:20,chenxuying  写道:
>> 
>> Hello
>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
>> SupportsOverwrite interface.
>> 是得自定义connector吗,实现DynamicTableSink?
>> 
>> 
>> 祝好
>> chenxuying
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>


Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-01 文章 Leonard Xu
Hi,

这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  INSERT 
OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
OVERWRITE到DB的场景吗?

Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
 


> 在 2020年8月1日,19:20,chenxuying  写道:
> 
> Hello
> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
> SupportsOverwrite interface.
> 是得自定义connector吗,实现DynamicTableSink?
> 
> 
> 祝好
> chenxuying
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax



flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-01 文章 chenxuying
Hello
请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
SupportsOverwrite interface.
是得自定义connector吗,实现DynamicTableSink?


祝好
chenxuying
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax