Re:回复: Re: Table Api执行sql如何设置sink并行度

2020-08-07 Thread wldd
hi,Shengkai Fang,Cayden chen:


谢谢解答,这个DISCUSS应该可以解决我的问题










--

Best,
wldd





在 2020-08-07 16:56:30,"Cayden chen" <1193216...@qq.com> 写道:
>hi:
> 你可以把sink 
>之前的table转成datastream,然后改变全局的为你想设置的sink并行度,再dataStream.addSink(sink)(由于这里会取全局并行度并给算子设置),
> 之后把全局并行度改回去。理论上这个方法可以为每个算子设置单独并行度
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2020年8月7日(星期五) 下午4:35
>收件人:"user-zh"
>主题:Re: Re: Table Api执行sql如何设置sink并行度
>
>
>
>hi, 现在仅支持全局设置,现在并不支持对于单个sink并行度的设置。
>对于单个sink的设置社区正在讨论中,见
>https://www.mail-archive.com/dev@flink.apache.org/msg40251.html
>wldd 
> hi:
> 这应该是对应的flink-conf.yaml的配置,这是一个全局的配置,并不能指定sink的并行度
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd
>
>
>
>
>
> 在 2020-08-07 15:26:34,"Shengkai Fang"  hi
> 不知道 这个能不能满足你的要求
> 
> tEnv.getConfig().addConfiguration(
> new Configuration()
> .set(CoreOptions.DEFAULT_PARALLELISM, 128)
> );
> 
> 参见文档:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
> 
> 
> wldd  
>  hi,all:
>  请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>  --
> 
>  Best,
>  wldd
>


Re:Re: Table Api执行sql如何设置sink并行度

2020-08-07 Thread wldd
hi:
  这应该是对应的flink-conf.yaml的配置,这是一个全局的配置,并不能指定sink的并行度













--

Best,
wldd





在 2020-08-07 15:26:34,"Shengkai Fang"  写道:
>hi
>不知道 这个能不能满足你的要求
>
>tEnv.getConfig().addConfiguration(
>new Configuration()
>.set(CoreOptions.DEFAULT_PARALLELISM, 128)
>);
>
>参见文档:https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
>
>wldd  于2020年8月7日周五 下午3:16写道:
>
>> hi,all:
>>   请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best,
>> wldd


Table Api执行sql如何设置sink并行度

2020-08-07 Thread wldd
hi,all:
  请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度














--

Best,
wldd

Re:写入hive 问题

2020-08-05 Thread wldd
hi:
1.你可以看下你配置hive catalog时的hive版本和你当前使用的hive版本是否一致
2.你也可以尝试在配置hive catalog的时候,不设置hive版本













--

Best,
wldd





在 2020-08-05 15:38:26,"air23"  写道:
>你好 
>15:33:59,781 INFO  org.apache.flink.table.catalog.hive.HiveCatalog 
>   - Created HiveCatalog 'myhive1'
>Exception in thread "main" 
>org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
>Hive Metastore client
>at 
>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:58)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
>at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
>at 
>org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
>at com.zt.kafka.KafkaTest4.main(KafkaTest4.java:73)
>Caused by: java.lang.NoSuchMethodException: 
>org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf)
>at java.lang.Class.getMethod(Class.java:1786)
>at 
>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:54)
>... 7 mor
>
>
>
>
>请问这个是什么问题 Metastore 也已经启动了。
>谢谢


Re:flink中Heartbeat问题

2020-08-03 Thread wldd
你可以看下是不是TM挂了













--

Best,
wldd





在 2020-08-04 10:12:12,"小学生" <201782...@qq.com> 写道:
>各位大佬,flink1.11单机跑程序中发错了,说是Heartbeat的问题,Caused by: 
>java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
>9ba7d203-0241-46cd-a1e7-d99f4666c7e6 timed out.
>请问,怎么排查解决呢


Re:Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 Thread wldd
Hi:
batchi模式执行结果:
https://imgchr.com/i/UUqec6
batch模式日志:
https://imgchr.com/i/UUboX8
streaming模式日志:
https://imgchr.com/i/UUbYmF













--

Best,
wldd




At 2020-07-14 18:43:39, "wldd"  wrote:

Hi:
图片的内容没展示出来,图片的内容就是个查询结果,


error日志这是batch模式的debug日志:
2020-07-14 18:33:23,180 DEBUG 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input 
splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,181 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,197 DEBUG 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input 
splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,198 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,201 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
iteration: 1
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2907:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2906,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2905:LogicalProject.NONE.any.[](input=HepRelVertex#2904,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize convert table references before rewriting sub-queries to semi-join 
cost 1 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2912:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2911,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2910:LogicalProject.NONE.any.[](input=HepRelVertex#2909,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize rewrite sub-queries to semi-join cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])








这是streaming模式的debug日志:
2020-07-14 18:35:45,995 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,015 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Trying to connect to (mosh-data-1/192.168.0.29:6123) from local 
address mosh-data-1/192.168.0.29 with timeout 200
2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Using InetAddress.getLocalHost() immediately for the connecting 
address
2020-07-14 18:35:46,022 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
iteration: 1
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3047:LogicalLegacySink.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3046,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 
0.[NONE].[NONE](table=[mydb, test, test2])
2020-07-14 18:35:46,022 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize convert table references before rewriting sub-queries to semi-join 
cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`m

Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 Thread wldd
 org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3052:LogicalLegacySink.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3051,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3050:LogicalProject.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3049,exprs=[CAST($0):DECIMAL(38, 18)])
2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 
0.[NONE].[NONE](table=[mydb, test, test2])
2020-07-14 18:35:46,023 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize rewrite sub-queries to semi-join cost 1 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[CAST($0):DECIMAL(38, 18)])
   +- LogicalTableScan(table=[[mydb, test, test2]])







主要区别就是streaming模式下:
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])



--

Best,
wldd





在 2020-07-14 18:31:33,"Leonard Xu"  写道:
>Hi,
>
>前面邮件图都挂了,理论上 SQL Client 都是会强转的,可以发个图床链接上或者贴下可以复现的代码吗?
>
>祝好
>
>> 在 2020年7月14日,18:21,wldd  写道:
>> 
>> Hi,
>> batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转
>> 
>> 
>> 
>> 
>> 
>> --
>> Best,
>> wldd
>> 
>> 
>> 在 2020-07-14 18:08:41,"Leonard Xu"  写道:
>> >Hi,
>> >
>> >SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 
>> >DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
>> >你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
>> >
>> >祝好,
>> >Leonard Xu
>> >[1] https://issues.apache.org/jira/browse/FLINK-17948 
>> ><https://issues.apache.org/jira/browse/FLINK-17948>
>> >
>> >> 在 2020年7月14日,17:58,wldd  写道:
>> >> 
>> >> sql-client
>> >
>> 
>> 
>>  
>


Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 Thread wldd
Hi,
batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转










--

Best,
wldd





在 2020-07-14 18:08:41,"Leonard Xu"  写道:
>Hi,
>
>SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 
>DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
>你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
>
>祝好,
>Leonard Xu
>[1] https://issues.apache.org/jira/browse/FLINK-17948 
><https://issues.apache.org/jira/browse/FLINK-17948>
>
>> 在 2020年7月14日,17:58,wldd  写道:
>> 
>> sql-client
>


flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 Thread wldd
hi,all:
现在遇到一个问题,通过sql-client读取mysql数据时,decimal类型会强转成decimal(38,18)
mysql ddl:
CREATE TABLE `test2` (
  `money` decimal(10,2) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


insert into test2 values(10.22);


flink ddl:
CREATE TABLE test2 (
money decimal(10, 2)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/test',
'connector.table' = 'test2',
'connector.username' = 'root',
'connector.password' = 'root'
);


flink查询结果,streaming模式:
sql:select * from test2;


debug信息:










--

Best,
wldd

Re:Re: Re: flink 读写hive问题

2020-05-26 Thread wldd
hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题










--

Best,
wldd





在 2020-05-26 17:49:56,"Enzo wang"  写道:
>Hi Wldd,
>
>Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。
>
>还需要什么信息我再提供。
>
>
>
>  flink insert into hive error 
>
>org.apache.flink.table.api.TableException: Exception in close
>   at 
> org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131)
>   at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>   at java.base/java.lang.Thread.run(Thread.java:830)
>Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>File 
>/user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
>could only be replicated to 0 nodes instead of minReplication (=1).
>There are 1 datanode(s) running and 1 node(s) are excluded in this
>operation.
>   at 
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)
>
>   at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1413)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)
>
> Flink 1.10.0 的lib目录 
>
>mysql-connecto

Re:Re: flink 读写hive问题

2020-05-26 Thread wldd
Hi,Enzo wang
图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么













--

Best,
wldd




在 2020-05-26 17:01:32,"Enzo wang"  写道:

Hi Wldd,


谢谢回复。


1.  datanode 是可用的


❯ docker-compose exec namenode hadoop fs -ls /tmp
Found 1 items
drwx-wx-wx   - root supergroup  0 2020-05-26 05:40 /tmp/hive


namenode 的webui 也可以看到:




2.  设置set execution.type=batch; 以后,执行报错,错误如下
Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
 could only be replicated to 0 nodes instead of minReplication (=1). There are 
1 datanode(s) running and1 node(s) are excluded inthis operation.


完整错误见:
https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673



On Tue, 26 May 2020 at 16:52, wldd  wrote:

问题1:

org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问


问题2:
写hive,需要用batch模式,set execution.type=batch;







在 2020-05-26 16:42:12,"Enzo wang"  写道:

Hi Flink group,


今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html


版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b


问题1:Flink SQL 读Hive 表pokes 失败


Flink SQL> select * from pokes;
2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat  
- Total input paths to process : 4
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: 
BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 
file=/user/hive/warehouse/pokes/kv1.txt







问题2:Flink SQL 写Hive 表pokes 失败


Flink SQL> insert into pokes select 12,'tom';
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by 
AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.







Cheers,
Enzo

Re:flink 读写hive问题

2020-05-26 Thread wldd
问题1:

org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问


问题2:
写hive,需要用batch模式,set execution.type=batch;







在 2020-05-26 16:42:12,"Enzo wang"  写道:

Hi Flink group,


今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html


版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b


问题1:Flink SQL 读Hive 表pokes 失败


Flink SQL> select * from pokes;
2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat  
- Total input paths to process : 4
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: 
BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 
file=/user/hive/warehouse/pokes/kv1.txt







问题2:Flink SQL 写Hive 表pokes 失败


Flink SQL> insert into pokes select 12,'tom';
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by 
AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.







Cheers,
Enzo

jdbc connector写入数据到mysql数据不一致的问题

2020-04-16 Thread wldd
场景:从hive读取数据计算之后写入到mysql


demo sql:
insert into data_hotel_day
select order_date,play_date,company_code,company_name,company_region,device,
cast(coalesce(sum(current_amt),0) as decimal(38,2)) current_amt,
cast(coalesce(sum(order_amt),0) as decimal(38,2)) order_amt,
coalesce(sum(room_cnt),0) room_cnt,
cast(coalesce(sum(refund_amt),0) as decimal(38,2)) refund_amt,
coalesce(sum(budget_room_cnt),0) budget_room_cnt
from db.table where plate_type='hotel'
group by order_date,play_date,company_code,company_name,company_region,device;


问题:由于jdbc connector在group by语句之后默认使用upsert sink,
但是upsert sink会从查询语句提取唯一建,通常把group by后面的字段组合作为唯一建,
因为我的场景中group by后面的字段组合并不是唯一的,这样就会造成写入到mysql
和实际查询的数据不一致,请问有什么解决办法,或者替代方案么