Re:回复: Re: Table Api执行sql如何设置sink并行度
hi,Shengkai Fang,Cayden chen: 谢谢解答,这个DISCUSS应该可以解决我的问题 -- Best, wldd 在 2020-08-07 16:56:30,"Cayden chen" <1193216...@qq.com> 写道: >hi: > 你可以把sink >之前的table转成datastream,然后改变全局的为你想设置的sink并行度,再dataStream.addSink(sink)(由于这里会取全局并行度并给算子设置), > 之后把全局并行度改回去。理论上这个方法可以为每个算子设置单独并行度 > > > > >--原始邮件-- >发件人: > "user-zh" > >发送时间:2020年8月7日(星期五) 下午4:35 >收件人:"user-zh" >主题:Re: Re: Table Api执行sql如何设置sink并行度 > > > >hi, 现在仅支持全局设置,现在并不支持对于单个sink并行度的设置。 >对于单个sink的设置社区正在讨论中,见 >https://www.mail-archive.com/dev@flink.apache.org/msg40251.html >wldd > hi: > 这应该是对应的flink-conf.yaml的配置,这是一个全局的配置,并不能指定sink的并行度 > > > > > > > > > > > > > > -- > > Best, > wldd > > > > > > 在 2020-08-07 15:26:34,"Shengkai Fang" hi > 不知道 这个能不能满足你的要求 > > tEnv.getConfig().addConfiguration( > new Configuration() > .set(CoreOptions.DEFAULT_PARALLELISM, 128) > ); > > 参见文档: > https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html > > > wldd > hi,all: > 请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度 > > > > > > > > > > > > > > > -- > > Best, > wldd >
Re:Re: Table Api执行sql如何设置sink并行度
hi: 这应该是对应的flink-conf.yaml的配置,这是一个全局的配置,并不能指定sink的并行度 -- Best, wldd 在 2020-08-07 15:26:34,"Shengkai Fang" 写道: >hi >不知道 这个能不能满足你的要求 > >tEnv.getConfig().addConfiguration( >new Configuration() >.set(CoreOptions.DEFAULT_PARALLELISM, 128) >); > >参见文档:https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html > > >wldd 于2020年8月7日周五 下午3:16写道: > >> hi,all: >> 请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> Best, >> wldd
Table Api执行sql如何设置sink并行度
hi,all: 请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度 -- Best, wldd
Re:写入hive 问题
hi: 1.你可以看下你配置hive catalog时的hive版本和你当前使用的hive版本是否一致 2.你也可以尝试在配置hive catalog的时候,不设置hive版本 -- Best, wldd 在 2020-08-05 15:38:26,"air23" 写道: >你好 >15:33:59,781 INFO org.apache.flink.table.catalog.hive.HiveCatalog > - Created HiveCatalog 'myhive1' >Exception in thread "main" >org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create >Hive Metastore client >at >org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:58) >at >org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240) >at >org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71) >at >org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35) >at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223) >at >org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191) >at >org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337) >at com.zt.kafka.KafkaTest4.main(KafkaTest4.java:73) >Caused by: java.lang.NoSuchMethodException: >org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf) >at java.lang.Class.getMethod(Class.java:1786) >at >org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:54) >... 7 mor > > > > >请问这个是什么问题 Metastore 也已经启动了。 >谢谢
Re:flink中Heartbeat问题
你可以看下是不是TM挂了 -- Best, wldd 在 2020-08-04 10:12:12,"小学生" <201782...@qq.com> 写道: >各位大佬,flink1.11单机跑程序中发错了,说是Heartbeat的问题,Caused by: >java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id >9ba7d203-0241-46cd-a1e7-d99f4666c7e6 timed out. >请问,怎么排查解决呢
Re:Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题
Hi: batchi模式执行结果: https://imgchr.com/i/UUqec6 batch模式日志: https://imgchr.com/i/UUboX8 streaming模式日志: https://imgchr.com/i/UUbYmF -- Best, wldd At 2020-07-14 18:43:39, "wldd" wrote: Hi: 图片的内容没展示出来,图片的内容就是个查询结果, error日志这是batch模式的debug日志: 2020-07-14 18:33:23,180 DEBUG org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input splitting configured (data will be read with parallelism 1). 2020-07-14 18:33:23,181 DEBUG org.apache.calcite.sql2rel [] - Plan after converting SqlNode to RelNode LogicalProject(money=[$0]) LogicalTableScan(table=[[mydb, test, test2]]) 2020-07-14 18:33:23,197 DEBUG org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input splitting configured (data will be read with parallelism 1). 2020-07-14 18:33:23,198 DEBUG org.apache.calcite.sql2rel [] - Plan after converting SqlNode to RelNode LogicalProject(money=[$0]) LogicalTableScan(table=[[mydb, test, test2]]) 2020-07-14 18:33:23,201 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - iteration: 1 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#2907:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2906,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money) 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#2905:LogicalProject.NONE.any.[](input=HepRelVertex#2904,inputs=0) 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2]) 2020-07-14 18:33:23,202 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize convert table references before rewriting sub-queries to semi-join cost 1 ms. optimize result: LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money]) +- LogicalProject(money=[$0]) +- LogicalTableScan(table=[[mydb, test, test2]]) 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#2912:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2911,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money) 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#2910:LogicalProject.NONE.any.[](input=HepRelVertex#2909,inputs=0) 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2]) 2020-07-14 18:33:23,202 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize rewrite sub-queries to semi-join cost 0 ms. optimize result: LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money]) +- LogicalProject(money=[$0]) +- LogicalTableScan(table=[[mydb, test, test2]]) 这是streaming模式的debug日志: 2020-07-14 18:35:45,995 DEBUG org.apache.calcite.sql2rel [] - Plan after converting SqlNode to RelNode LogicalProject(money=[$0]) LogicalTableScan(table=[[mydb, test, test2]]) 2020-07-14 18:35:46,015 DEBUG org.apache.calcite.sql2rel [] - Plan after converting SqlNode to RelNode LogicalProject(money=[$0]) LogicalTableScan(table=[[mydb, test, test2]]) 2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils [] - Trying to connect to (mosh-data-1/192.168.0.29:6123) from local address mosh-data-1/192.168.0.29 with timeout 200 2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils [] - Using InetAddress.getLocalHost() immediately for the connecting address 2020-07-14 18:35:46,022 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - iteration: 1 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#3047:LogicalLegacySink.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3046,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money) 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)]) 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[mydb, test, test2]) 2020-07-14 18:35:46,022 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize convert table references before rewriting sub-queries to semi-join cost 0 ms. optimize result: LogicalLegacySink(name=[`m
Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题
org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#3052:LogicalLegacySink.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3051,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money) 2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#3050:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3049,exprs=[CAST($0):DECIMAL(38, 18)]) 2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[mydb, test, test2]) 2020-07-14 18:35:46,023 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize rewrite sub-queries to semi-join cost 1 ms. optimize result: LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money]) +- LogicalProject(money=[CAST($0):DECIMAL(38, 18)]) +- LogicalTableScan(table=[[mydb, test, test2]]) 主要区别就是streaming模式下: 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)]) -- Best, wldd 在 2020-07-14 18:31:33,"Leonard Xu" 写道: >Hi, > >前面邮件图都挂了,理论上 SQL Client 都是会强转的,可以发个图床链接上或者贴下可以复现的代码吗? > >祝好 > >> 在 2020年7月14日,18:21,wldd 写道: >> >> Hi, >> batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转 >> >> >> >> >> >> -- >> Best, >> wldd >> >> >> 在 2020-07-14 18:08:41,"Leonard Xu" 写道: >> >Hi, >> > >> >SQL client 读取mysql的部分想当于一个connector, 这个connector只支持 >> >DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。 >> >你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。 >> > >> >祝好, >> >Leonard Xu >> >[1] https://issues.apache.org/jira/browse/FLINK-17948 >> ><https://issues.apache.org/jira/browse/FLINK-17948> >> > >> >> 在 2020年7月14日,17:58,wldd 写道: >> >> >> >> sql-client >> > >> >> >> >
Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题
Hi, batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转 -- Best, wldd 在 2020-07-14 18:08:41,"Leonard Xu" 写道: >Hi, > >SQL client 读取mysql的部分想当于一个connector, 这个connector只支持 >DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。 >你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。 > >祝好, >Leonard Xu >[1] https://issues.apache.org/jira/browse/FLINK-17948 ><https://issues.apache.org/jira/browse/FLINK-17948> > >> 在 2020年7月14日,17:58,wldd 写道: >> >> sql-client >
flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题
hi,all: 现在遇到一个问题,通过sql-client读取mysql数据时,decimal类型会强转成decimal(38,18) mysql ddl: CREATE TABLE `test2` ( `money` decimal(10,2) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; insert into test2 values(10.22); flink ddl: CREATE TABLE test2 ( money decimal(10, 2) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'test2', 'connector.username' = 'root', 'connector.password' = 'root' ); flink查询结果,streaming模式: sql:select * from test2; debug信息: -- Best, wldd
Re:Re: Re: flink 读写hive问题
hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题 -- Best, wldd 在 2020-05-26 17:49:56,"Enzo wang" 写道: >Hi Wldd, > >Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。 > >还需要什么信息我再提供。 > > > > flink insert into hive error > >org.apache.flink.table.api.TableException: Exception in close > at > org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131) > at > org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.base/java.lang.Thread.run(Thread.java:830) >Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): >File >/user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 >could only be replicated to 0 nodes instead of minReplication (=1). >There are 1 datanode(s) running and 1 node(s) are excluded in this >operation. > at > org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213) > > at org.apache.hadoop.ipc.Client.call(Client.java:1476) > at org.apache.hadoop.ipc.Client.call(Client.java:1413) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy22.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy23.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554) > > Flink 1.10.0 的lib目录 > >mysql-connecto
Re:Re: flink 读写hive问题
Hi,Enzo wang 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么 -- Best, wldd 在 2020-05-26 17:01:32,"Enzo wang" 写道: Hi Wldd, 谢谢回复。 1. datanode 是可用的 ❯ docker-compose exec namenode hadoop fs -ls /tmp Found 1 items drwx-wx-wx - root supergroup 0 2020-05-26 05:40 /tmp/hive namenode 的webui 也可以看到: 2. 设置set execution.type=batch; 以后,执行报错,错误如下 Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and1 node(s) are excluded inthis operation. 完整错误见: https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673 On Tue, 26 May 2020 at 16:52, wldd wrote: 问题1: org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问 问题2: 写hive,需要用batch模式,set execution.type=batch; 在 2020-05-26 16:42:12,"Enzo wang" 写道: Hi Flink group, 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b 问题1:Flink SQL 读Hive 表pokes 失败 Flink SQL> select * from pokes; 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 4 [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt 问题2:Flink SQL 写Hive 表pokes 失败 Flink SQL> insert into pokes select 12,'tom'; [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. Cheers, Enzo
Re:flink 读写hive问题
问题1: org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问 问题2: 写hive,需要用batch模式,set execution.type=batch; 在 2020-05-26 16:42:12,"Enzo wang" 写道: Hi Flink group, 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b 问题1:Flink SQL 读Hive 表pokes 失败 Flink SQL> select * from pokes; 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 4 [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt 问题2:Flink SQL 写Hive 表pokes 失败 Flink SQL> insert into pokes select 12,'tom'; [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. Cheers, Enzo
jdbc connector写入数据到mysql数据不一致的问题
场景:从hive读取数据计算之后写入到mysql demo sql: insert into data_hotel_day select order_date,play_date,company_code,company_name,company_region,device, cast(coalesce(sum(current_amt),0) as decimal(38,2)) current_amt, cast(coalesce(sum(order_amt),0) as decimal(38,2)) order_amt, coalesce(sum(room_cnt),0) room_cnt, cast(coalesce(sum(refund_amt),0) as decimal(38,2)) refund_amt, coalesce(sum(budget_room_cnt),0) budget_room_cnt from db.table where plate_type='hotel' group by order_date,play_date,company_code,company_name,company_region,device; 问题:由于jdbc connector在group by语句之后默认使用upsert sink, 但是upsert sink会从查询语句提取唯一建,通常把group by后面的字段组合作为唯一建, 因为我的场景中group by后面的字段组合并不是唯一的,这样就会造成写入到mysql 和实际查询的数据不一致,请问有什么解决办法,或者替代方案么