Re:Re:flink的sql gateway支持自定义的UDF吗?

2023-11-19 Thread RS
Hi,
这种ADD JAR的方式测试了也可以用,谢谢了老哥


Thanks





在 2023-11-01 17:34:48,"Xuyang"  写道:
>Hi, 
>你指的是sql gateway上 ADD JAR这种方式来上传自定义UDF的jar包[1]么?
>
>
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/jar/
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2023-11-01 14:21:04,"RS"  写道:
>>Hi
>>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?


Re:Re:flink的sql gateway支持自定义的UDF吗?

2023-11-19 Thread RS
Hi,
是的,自定义的UDF比较多,或者实现方式不同,所以加载的时候,想单独加载下,
sql-client有个参数就可以支持,-j 
sql gateway为什么不提供了?


Thanks





在 2023-11-01 17:34:48,"Xuyang"  写道:
>Hi, 
>你指的是sql gateway上 ADD JAR这种方式来上传自定义UDF的jar包[1]么?
>
>
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/jar/
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2023-11-01 14:21:04,"RS"  写道:
>>Hi
>>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?


flink的sql gateway支持自定义的UDF吗?

2023-11-01 Thread RS
Hi
flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?

Re:flink 1.18.0的sql gateway支持提交作业到k8s上运行吗?

2023-11-01 Thread RS
Hi,
提交到本地是flink配置文件里面配置的jobmanager的地址,所以肯定也是提交到K8S的吧
yarn的不太清楚。





在 2023-10-30 14:36:23,"casel.chen"  写道:
>想问一下目前flink 1.18.0的sql gateway只支持提交作业到本地运行吗?能否支持提交作业到yarn或k8s上运行呢?


Re:[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-18 Thread RS
Hi,
connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into


Thanks



在 2023-02-17 15:56:51,"casel.chen"  写道:
>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner 
>join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink 
>Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>
>
>请问:
>flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>我理解flink 
>sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>


Re:Re:Re:flink sql connector options如何支持Map数据类型?

2022-12-27 Thread RS
Hi,
这个看你的需求啊,用户想自定义哪些Header,怎么定义?


比如用户想在Header中添加上报的时间戳,那么这种是随时间变化的,就无法在options里面定义了
比如用户想在Header中添加上报数据的元信息,数据大小,数据字段个数等,那么这个也是和数据强相关的,无法在options里面定义


所以要看用户想要什么,你们想给用户开放到哪个程度?
至于是不是可以像flink sql kafka connector定义 `properties.*` 
,这个是具体实现的方式,现在都不清楚你要做什么,先确定目标,再考虑实现。


Thanks

在 2022-12-27 13:24:38,"casel.chen"  写道:
>
>
>遇到用户添加自定义请求头Headers问题
>
>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
>>> 是不是可以像flink sql kafka connector定义 `properties.*` 那样定义 `headers.*` 呢?
>如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了
>>> 是说有一些公用headers吗?例如 Content-Type 之类的,对应的flink sql kafka connector中的 
>>> properties.group.id 和 properties.bootstrap.servers
>
>在 2022-12-26 11:12:57,"RS"  写道:
>>Hi,
>>
>>
>>> 遇到用户添加自定义请求头Headers问题
>>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
>>如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了
>>
>>
>>> 如何在connector options中支持Map数据类型呢?
>>options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map
>>
>>
>>
>>
>>Thanks
>>
>>在 2022-12-17 10:20:29,"casel.chen"  写道:
>>>我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector 
>>>options中支持Map数据类型呢?


Re:如何查看Flink on Native K8s模式下失败作业的日志

2022-12-25 Thread RS
Hi,


我的也是on K8S,是session模式的,得看你的模式是什么
我的流作业失败了,配置了checkpoint,会自动重试,jm和tm都还在,可以直接看到作业异常信息。


Thanks




在 2022-12-22 23:18:27,"hjw"  写道:
>Flink On Native K8s 
>模式下,如果流作业因异常失败了,作业的JobManager和TaskManager所在Pod都会消失,就无法查看作业日志。
>请问在K8s模式下,在查看日志方面有没有相关解决方案。
>目前我只想到将作业Jm和Tm打印的日志通过pv-pvc方式挂载NFS做持久化。这样做日志就可以保存下来方便查看。
>
>
>
>
>--
>
>Best,
>Hjw


Re:Re: 提交任务不能指定第三方jar

2022-12-25 Thread RS
Hi,


试试 -C,--classpath,我都是用这个提交UDF的


另外邮件列表发图片是看不到了,别再发截图了


Thanks,










在 2022-12-13 18:13:47,"melin li"  写道:

类似: spark-submit 支持--jars,更灵活方便,




melin li  于2022年12月8日周四 11:09写道:

如果是作业依赖的jar,是可以打一个flat jar。有两种场景:

1、sql作业中,用户依赖某个connector jar,但平台不提供这个connector,需要用户上传,
2、自定义udf 管理,依赖的jar 需要和任务一起提交。


yuxia  于2022年12月8日周四 10:06写道:

为啥说 不能提交依赖的第三方jar?用户的 job 把这些包打进去不就好了吗? 还是说你指的是 sql 作业?

Best regards,
Yuxia


发件人: "melin li" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 12 月 08日 上午 9:46:45
主题: 提交任务不能指定第三方jar

客户端提交flink job 不能提交依赖的第三方jar,例如自定的函数jar,sql 里面的依赖connector 
jar,需要提前放置好。如果基于flink 平台化,需要动态的添加jar。目前可能的做法,就是把依赖的jar, 动态的添加作业jar 的lib目录下。 
getJobJarAndDependencies 就是从jar 中获取依赖的jar。不是很方便。 是可以添加一个参数,指定依赖的jar, flink 
设计各种诡异。



Re:flink sql connector options如何支持Map数据类型?

2022-12-25 Thread RS
Hi,


> 遇到用户添加自定义请求头Headers问题
如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了


> 如何在connector options中支持Map数据类型呢?
options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map




Thanks

在 2022-12-17 10:20:29,"casel.chen"  写道:
>我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector 
>options中支持Map数据类型呢?


Re:flink如何调用py模型

2022-12-25 Thread RS
Hi,
我考虑过的几种方式:
1. 做成http服务,和flink分离,py模型本来跑起来也不快,做成http可以动态扩容,无状态的话
2. 用pyflink来跑任务,可以嵌python代码,就是任务启动非常慢,要复制虚拟环境,模型可以写成pandas的输入输出,这样模型也是可以独立开发的
3. Java调python的udf,py必须要能封装成函数,写udf毕竟麻烦


Thanks

在 2022-12-19 16:51:33,"kcz" <573693...@qq.com.INVALID> 写道:
>flink调用py模型时候,大是采取什么方案,直接跟flink集成嘛? 还是py模型做成web服务。
>如果是web服务的话,是不是会出现无法支撑flink的大量调用呢?
>
>
>kcz
>573693...@qq.com
>
>
>
>


Re:Re:Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread RS
Hi,
我单独测试了下python的虚拟环境使用,发现目标机器必须存在python的环境,不然复制了虚拟环境过去也是无法执行的,
还以为虚拟环境复制过去就可以跑起来了,和我想的不太一样,哈哈~


3.6版本报错
# ./python
./python: error while loading shared libraries: libpython3.6m.so.1.0: cannot 
open shared object file: No such file or directory


3.9版本报错

# ./venv/bin/python

Could not find platform independent libraries 

Could not find platform dependent libraries 

Consider setting $PYTHONHOME to [:]

Python path configuration:

  PYTHONHOME = (not set)

  PYTHONPATH = (not set)

  program name = './venv/bin/python'

  isolated = 0

  environment = 1

  user site = 1

  import site = 1

  sys._base_executable = '/root/tmp/venv/bin/python'

  sys.base_prefix = '/usr/local'

  sys.base_exec_prefix = '/usr/local'

  sys.platlibdir = 'lib'

  sys.executable = '/root/tmp/venv/bin/python'

  sys.prefix = '/usr/local'

  sys.exec_prefix = '/usr/local'

  sys.path = [

'/usr/local/lib/python39.zip',

'/usr/local/lib/python3.9',

'/usr/local/lib/lib-dynload',

  ]

Fatal Python error: init_fs_encoding: failed to get the Python codec of the 
filesystem encoding

Python runtime state: core initialized

ModuleNotFoundError: No module named 'encodings'




Current thread 0x7ff010275740 (most recent call first):






Thanks





在 2022-11-23 09:25:04,"RS"  写道:
>Hi,
>我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
>python3 -m venv jxy_venv
>
>
>我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的
>
>
>
>Thanks
>
>
>
>
>
>在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
>>Hi RS,
>>
>>你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
>>
>>Best,
>>Xingbo
>>
>>[1]
>>https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
>>
>>RS  于2022年11月22日周二 15:14写道:
>>
>>> Hi,
>>> Flink版本:1.15.1
>>>
>>>
>>> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
>>> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
>>> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
>>>
>>>
>>> 启动命令:
>>> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n
>>> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
>>> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
>>>
>>>
>>> 报错信息:
>>> ...
>>> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
>>> undefined symbol: _Py_LegacyLocaleDetected
>>> org.apache.flink.client.program.ProgramAbortException:
>>> java.lang.RuntimeException: Python process exits with code: 127
>>> at
>>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
>>> at
>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
>>> Caused by: java.lang.RuntimeException: Python process exits with code: 127
>>> at
>>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>>> ... 16 more
>>>
>>>
>>> 请问下,这种情况需要怎么处理 ?
>>> flink的环境中,一定要安装python命令吗?
>>>
>>>
>>> Thanks
>>>
>>>


Re:flink作业提交运行后如何监听作业状态发生变化?

2022-11-22 Thread RS
Hi,


Flink的Metric了解下,里面应该有作业的状态
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#reporter


配置不同的Metric方式,有的是拉取,有的是推送的机制,
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/



Thanks




在 2022-11-23 08:32:11,"casel.chen"  写道:
>请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?


Re:Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread RS
Hi,
我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
python3 -m venv jxy_venv


我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的



Thanks





在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
>Hi RS,
>
>你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
>
>Best,
>Xingbo
>
>[1]
>https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
>
>RS  于2022年11月22日周二 15:14写道:
>
>> Hi,
>> Flink版本:1.15.1
>>
>>
>> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
>> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
>> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
>>
>>
>> 启动命令:
>> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n
>> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
>> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
>>
>>
>> 报错信息:
>> ...
>> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
>> undefined symbol: _Py_LegacyLocaleDetected
>> org.apache.flink.client.program.ProgramAbortException:
>> java.lang.RuntimeException: Python process exits with code: 127
>> at
>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
>> Caused by: java.lang.RuntimeException: Python process exits with code: 127
>> at
>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>> ... 16 more
>>
>>
>> 请问下,这种情况需要怎么处理 ?
>> flink的环境中,一定要安装python命令吗?
>>
>>
>> Thanks
>>
>>


Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-21 Thread RS
Hi,
Flink版本:1.15.1


A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
C机器:在C机器上,有flink的client,存在python环境,负责启动任务


启动命令:
./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n -py 
/xxx/main.py -pyfs hdfs://xxx/config.py -pyarch hdfs://xxx/jxy_venv.zip#venv 
-pyclientexec venv/jxy_venv/bin/python


报错信息:
...
venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python: 
undefined symbol: _Py_LegacyLocaleDetected
org.apache.flink.client.program.ProgramAbortException: 
java.lang.RuntimeException: Python process exits with code: 127
at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
Caused by: java.lang.RuntimeException: Python process exits with code: 127
at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
... 16 more


请问下,这种情况需要怎么处理 ?
flink的环境中,一定要安装python命令吗?


Thanks



Re:flinksql join

2022-11-14 Thread RS
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H"  写道:
>
>
>hi,大家好
>我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
>kakfa输入:
>账号 金额 笔数
> 100 1  -> 未匹配
> 100 1  -> 未匹配
> 100 1  -> 匹配上
>
>维表 
>账号  企业
>  
>     -> 后插入的账号信息
>实际输出结果
>企业  金额  笔数
> 100   1
>
>
>我想要的结果:
>企业  金额  笔数
> 300   3
>
>
>
>
>
>sql如下:
>String sql2 =  "insert into dws_b2b_trade_year_index\n" +
>   "WITH temp AS (\n" +
>   "select \n" +
>   "  ta.gmtStatistical as gmtStatistical,\n" +
>   "  ta.paymentMethod as paymentMethod,\n" +
>   "  tb.CORP_ID as outCorpId,\n" +
>   "  tc.CORP_ID as inCorpId,\n" +
>   "  sum(ta.tradeAmt) as tranAmount,\n" +
>   "  sum(ta.tradeCnt) as tranNum \n" +
>   "from dws_a2a_trade_year_index ta \n" +
>   "left join dob_dim_account for system_time as of ta.proc as tb 
> on ta.outAcctCode = tb.ACCT_CODE \n" +
>   "left join dob_dim_account for system_time as of ta.proc as tc 
> on ta.inAcctCode = tc.ACCT_CODE \n" +
>   "group by \n" +
>   " ta.gmtStatistical, \n" +
>   " ta.paymentMethod, \n" +
>   " tb.CORP_ID, \n" +
>   " tc.CORP_ID \n" +
>   ") \n" +
>   "SELECT \n" +
>   "   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as 
> gmtUpdate, \n" +
>   "   gmtStatistical, \n" +
>   "   paymentMethod, \n" +
>   "   outCorpId, \n" +
>   "   inCorpId, \n" +
>   "   tranAmount, \n" +
>   "   tranNum \n" +
>   "FROM temp";
>
>| |
>Jason_H
>|
>|
>hyb_he...@163.com
>|


Re:Re: OutOfMemoryError: Direct buffer memory

2022-10-10 Thread RS
Hi,
调大 taskmanager.memory.task.off-heap.size 应该能解决部分问题,
我这里还有些疑问,部署的session集群,每次集群只跑这一个任务,执行结束才开始下一个任务,如果是前面的任务read/write申请的堆外内存,执行结束的时候,会立即释放吗?
执行几次任务之后,才出现这种异常,前面任务都是成功的,后面任务就异常了,感觉有内存泄漏的现象。Flink 
taskmanager的off-heap内存管理有更多的介绍吗?(官网的看过了)
Thanks

在 2022-10-10 12:34:55,"yanfei lei"  写道:
>从报错看是Direct memory不够导致的,可以将taskmanager.memory.task.off-heap.size调大试试看。
>
>Best,
>Yanfei
>
>allanqinjy  于2022年10月8日周六 21:19写道:
>
>>
>> 看堆栈信息是内存不够,调大一些看看。我之前在读取hdfs上的一个获取地理位置的离线库,也是内存溢出,通过调整内存大小解决的。用的streamingapi开发的作业,1.12.5版本。
>>
>>
>> | |
>> allanqinjy
>> |
>> |
>> allanqi...@163.com
>> |
>> 签名由网易邮箱大师定制
>>
>>
>> On 10/8/2022 21:00,RS wrote:
>> Hi,
>>
>>
>> 版本:Flink-1.15.1
>>
>>
>> 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink
>> SQL定义执行,source是connector=filesystem,format=raw,path=
>>
>>
>> 执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?
>>
>>
>> 集群的off-heap都是默认配置,
>> taskmanager.memory.task.off-heap.size=0
>> taskmanager.memory.framework.off-heap.size=128MB
>>
>>
>> 报错堆栈:
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.nio.Bits.reserveMemory(Bits.java:695)
>> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> at
>> org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
>> at
>> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
>> at
>> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142)
>> at
>> org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:704)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:765)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:825)
>> at java.io.DataInputStream.read(DataInputStream.java:149)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:96)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:415)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:98)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:122)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:348)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileRead

Re:Re: Re: table store 和connector-kafka包冲突吗?

2022-10-10 Thread RS
Hi,


去除掉后运行是没有问题的,所以这种lib的冲突问题,能在官网上加个说明吗,避免后续其他人也遇到这种问题。


Thanks

在 2022-10-10 12:50:33,"yanfei lei"  写道:
>Hi, 从table store的pom中看,table store的dist包shade了一份connector-kafka。
>https://repo1.maven.org/maven2/org/apache/flink/flink-table-store-dist/0.2.0/flink-table-store-dist-0.2.0.pom
>把flink-connector-kafka-1.15.1.jar 去掉再试试?
>
>
>RS  于2022年10月8日周六 17:19写道:
>
>> Hi,
>> 报错如下:
>>
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.ValidationException: Multiple factories for
>> identifier 'kafka' that implement
>> 'org.apache.flink.table.factories.DynamicTableFactory' found in the
>> classpath.
>>
>>
>> Ambiguous factory classes are:
>>
>>
>> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
>> org.apache.flink.table.store.kafka.KafkaLogStoreFactory
>>
>> org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> 在 2022-10-08 13:38:20,"Shammon FY"  写道:
>> >Hi RS
>> >你这边能提供一下具体的冲突错误栈吗?
>> >
>> >On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:
>> >
>> >> Hi,
>> >>
>> >>
>> >> 版本:flink-1.15.1
>> >> 使用table
>> >>
>> store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
>> >>
>> >>
>> 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
>> >>
>> >>
>> >> Thanks
>>


OutOfMemoryError: Direct buffer memory

2022-10-08 Thread RS
Hi,


版本:Flink-1.15.1


有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink 
SQL定义执行,source是connector=filesystem,format=raw,path=


执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?


集群的off-heap都是默认配置,
taskmanager.memory.task.off-heap.size=0
taskmanager.memory.framework.off-heap.size=128MB


报错堆栈:
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct 
out-of-memory error has occurred. This can mean two things: either job(s) 
require(s) a larger size of JVM direct memory or there is a direct memory leak. 
The direct memory can be allocated by user code or some of its dependencies. In 
this case 'taskmanager.memory.task.off-heap.size' configuration option should 
be increased. Flink framework and its dependencies also consume the direct 
memory, mostly for network communication. The most of network memory is managed 
by Flink and should not result in out-of-memory error. In certain special 
cases, in particular for jobs with high parallelism, the framework may require 
more direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies which has to be investigated and fixed. 
The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142)
at 
org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:704)
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:765)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:825)
at java.io.DataInputStream.read(DataInputStream.java:149)
at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:96)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:415)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:98)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:122)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:348)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)


Thanks

Re:Re: table store 和connector-kafka包冲突吗?

2022-10-08 Thread RS
Hi,
报错如下:


[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Multiple factories for 
identifier 'kafka' that implement 
'org.apache.flink.table.factories.DynamicTableFactory' found in the classpath.


Ambiguous factory classes are:


org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.table.store.kafka.KafkaLogStoreFactory
org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory


Thanks





在 2022-10-08 13:38:20,"Shammon FY"  写道:
>Hi RS
>你这边能提供一下具体的冲突错误栈吗?
>
>On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:
>
>> Hi,
>>
>>
>> 版本:flink-1.15.1
>> 使用table
>> store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
>>
>> 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
>>
>>
>> Thanks


table store 和connector-kafka包冲突吗?

2022-10-07 Thread RS
Hi,


版本:flink-1.15.1
使用table 
store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?


Thanks

Re:如何处理Flink KafkaSource的异常的数据

2022-10-05 Thread RS
Hi,


当前的SQL是不支持的,需要的话,可以自己实现一个connector或者UDF,把错误数据输出到其他地方


Thanks




在 2022-09-29 10:02:34,"Summer"  写道:
>
>你好,我想问一下,如果来源于Kakfka的一条数据出现错误,会导致任务执行失败,日志抛出该条错误数据。
>
>
>为保证任务执行,需要在*** WITH内加'value.json.ignore-parse-errors' = 'true',  
>'value.json.fail-on-missing-field' = 'false'
>
>
>
>
>那么之后如果出现异常的数据,我应该怎么感知到呢??
>
>
>
>
>
>
>
>


Re:如何实现flink作业失败实时通知告警?

2022-09-30 Thread RS
Hi,
个人推荐方式二,
1. 部分场景下,有些异常可以自动恢复,任务异常会自动重启,继续运行
2. 告警通知到介入处理,如果是人来介入处理的话,20s通常时间不是问题,到分钟级都可以
3. failure之前调用某个hook去通知相关方,应该是要修改jobmanager的代码,具体就要请教大佬们了。


在 2022-09-30 13:50:56,"casel.chen"  写道:
>当flink作业失败时如何第一时间发通知告警到相关方?现有方式
>方式一:flink作业本身提供的rest 
>api需要client不断去请求,不是实时不说还浪费资源,而且受网络抖动影响有时候还会超时获取不到,但不代表作业有问题。
>方式二:通过作业暴露指标给promemtheus,因为prometheus是周期性(10s~20s) 来pull指标的,所以也达不到实时性要求。
>
>
>flink作业能否在failure之前调用某个hook去通知相关方呢?如果要自己改的话,是要动哪个类呢?谢谢!


Re:Hive提交分区异常,Caused by: java.io.FileNotFoundException: /tmp/... (No such file or directory)

2022-09-30 Thread RS
Hi,
之前的问题还是没有搞定,不过现象更明晰了点,


版本:flink-1.15.1
场景:写hive数据的时候,写完提交分区,会异常


错误日志:

Caused by: java.io.FileNotFoundException: 
/tmp/jm_253c182f914fb67750844d2e71864a5a/blobStorage/job_615800b00c211de674f17e46938daeb7/blob_p-a813f094892f1c71b7884d0aec7972edbeae08e3-65d1205985504738577e6a7d90385f17
 (没有那个文件或目录)
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.(ZipFile.java:228)
at java.util.zip.ZipFile.(ZipFile.java:157)
at java.util.jar.JarFile.(JarFile.java:171)
at java.util.jar.JarFile.(JarFile.java:108)
at sun.net.www.protocol.jar.URLJarFile.(URLJarFile.java:93)
at sun.net.www.protocol.jar.URLJarFile.getJarFile(URLJarFile.java:69)
at sun.net.www.protocol.jar.JarFileFactory.get(JarFileFactory.java:99)
at sun.net.www.protocol.jar.JarURLConnection.connect(JarURLConnection.java:122)
at 
sun.net.www.protocol.jar.JarURLConnection.getInputStream(JarURLConnection.java:152)
at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2943)
at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3034)
at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
... 54 more




这里应该是要访问hive-site,jobmanager访问本地路径/tmp/jm_253c182f914fb67750844d2e71864a5a/blobStorage/job_615800b00c211de674f17e46938daeb7/blob_p-a813f094892f1c71b7884d0aec7972edbeae08e3-65d1205985504738577e6a7d90385f17找hive-site,但是这个路径是不存在的,导致异常




这个路径曾经存在过,路径中的job 
id=615800b00c211de674f17e46938daeb7是历史执行的一次任务,执行完之后,job_615800b00c211de674f17e46938daeb7这个目录就没了




但是后面新启动的任务全部都还是在这个路径下查找hive配置,导致异常




如果重启集群的话,同样的任务提交,不会报错,看起来是个概率事件,所以这个问题可能是什么原因导致的呢?




Thanks






在 2022-07-21 14:52:51,"RS"  写道:
>Hi,
>
>
>环境:
>flink-1.15.1 on K8S session集群
>hive3
>flink写hive任务,配置了定时提交分区
>
>
>现象:
>1. checkpoint是30s一次
>2. HDFS上有数据文件产生
>3. hive里面没有分区信息
>4. 任务异常,自动重启后下次ck的时候还是异常
>5. 写hive的任务有的一直正常运行,有的有这种异常
>6. 任务停掉,重新创建后恢复正常
>
>
>异常日志如下:
>java.lang.RuntimeException: java.io.FileNotFoundException: 
>/tmp/tm_10.244.25.164:6122-5b9301/blobStorage/job_ac640bf7276279f0452642918561670e/blob_p-d9755afa943119c325c059ddc70a45c904d9e4bd-d3a2ca82ca3a2429095708d4908ae184
> (No such file or directory)
>
>at 
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3021)
>
>at 
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2973)
>
>at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
>
>at org.apache.hadoop.conf.Configuration.get(Configuration.java:1460)
>
>at org.apache.hadoop.hive.conf.HiveConf.getVar(HiveConf.java:5001)
>
>at org.apache.hadoop.hive.conf.HiveConf.getVar(HiveConf.java:5074)
>
>at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:5161)
>
>at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:5114)
>
>at 
> org.apache.flink.connectors.hive.util.HiveConfUtils.create(HiveConfUtils.java:38)
>
>at 
> org.apache.flink.connectors.hive.HiveTableMetaStoreFactory$HiveTableMetaStore.(HiveTableMetaStoreFactory.java:72)
>
>at 
> org.apache.flink.connectors.hive.HiveTableMetaStoreFactory$HiveTableMetaStore.(HiveTableMetaStoreFactory.java:64)
>
>at 
> org.apache.flink.connectors.hive.HiveTableMetaStoreFactory.createTableMetaStore(HiveTableMetaStoreFactory.java:61)
>
>at 
> org.apache.flink.connectors.hive.HiveTableMetaStoreFactory.createTableMetaStore(HiveTableMetaStoreFactory.java:43)
>
>at 
> org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:159)
>
>at 
> org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(PartitionCommitter.java:145)
>
>at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>
>at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>
>at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>
>at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>
>at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>
>at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task

Re:Re:Flink1.15.1读取Tidb6,sql-client执行select count异常 java.sql.SQLSyntaxErrorException

2022-09-01 Thread RS
Hi,


1. sql client的日志附录最后
2. 用的是flink-connector-jdbc-1.15.1.jar
3. origin_object_data_61是里面的表,外层是t1,类似这样,
create view t1 as
select 
aaa,bbb,ccc 
from 
origin_object_data_61;


sql client相关错误日志:
Caused by: java.lang.IllegalArgumentException: open() failed.You have an error 
in your SQL syntax; check the manual that corresponds to your TiDB version for 
the right syntax to use line 1 column 12 near "FROM `origin_object_data_61`" 
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:207)
 ~[flink-connector-jdbc-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) 
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
 ~[flink-dist-1.15.1.jar:1.15.1]
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL 
syntax; check the manual that corresponds to your TiDB version for the right 
syntax to use line 1 column 12 near "FROM `origin_object_data_61`" 
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) 
~[mysql-connector-java-8.0.28.jar:8.0.28]
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
 ~[mysql-connector-java-8.0.28.jar:8.0.28]
at 
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
 ~[mysql-connector-java-8.0.28.jar:8.0.28]
at 
com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1009)
 ~[mysql-connector-java-8.0.28.jar:8.0.28]
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:204)
 ~[flink-connector-jdbc-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) 
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
 ~[flink-dist-1.15.1.jar:1.15.1]



在 2022-09-01 22:52:21,"Xuyang"  写道:
>Hi,可以贴一下log目录下的sql 
>client日志吗?另外问一下,你是用的jdbc的connector嘛?你select的表名是`origin_object_data_61`?
>在 2022-09-01 20:18:05,"RS"  写道:
>>Hi,
>>环境:
>>flink-1.15.1
>>TiDB-v6.1.0
>>
>>
>>现象:
>>Flink SQL> select count(*) from t1;
>>
>>[ERROR] Could not execute SQL statement. Reason:
>>
>>java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check 
>>the manual that corresponds to your TiDB version for the right syntax to use 
>>line 1 column 12 near "FROM `origin_object_data_61`" 
>>
>>执行失败了
>>
>>
>>Flink SQL> select * from t1 limit 3;
>>执行成功了,有结果返回
>>
>>
>>请教下各位,为什么count不能执行,select字段就可以执行??
>>
>>
>>Thanks
>>
>>
>>


Flink1.15.1读取Tidb6,sql-client执行select count异常 java.sql.SQLSyntaxErrorException

2022-09-01 Thread RS
Hi,
环境:
flink-1.15.1
TiDB-v6.1.0


现象:
Flink SQL> select count(*) from t1;

[ERROR] Could not execute SQL statement. Reason:

java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check 
the manual that corresponds to your TiDB version for the right syntax to use 
line 1 column 12 near "FROM `origin_object_data_61`" 

执行失败了


Flink SQL> select * from t1 limit 3;
执行成功了,有结果返回


请教下各位,为什么count不能执行,select字段就可以执行??


Thanks





Re:Client启动有异常: SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.

2022-08-12 Thread RS
Hi,
1. lib下只需要一个flink-sql-connector-hive的包,不再需要flink-connector-hive和hive-exec
2. flink-sql-connector-hive包版本不对,需要和flink的1.13.3版本一致


先试试看,不行的话,就是代码的问题了。



Thanks




At 2022-08-12 10:31:35, "Summer"  wrote:
>
>
>版本:1.13.3
>
>
>lib目录下:
>-rw-r--r-- 1 root root   7759243 Aug 12 10:12 
>flink-connector-hive_2.12-1.13.3.jar
>-rw-r--r-- 1 root root 92313 Aug 12 10:12 flink-csv-1.13.3.jar
>-rw-r--r-- 1 root root 106535831 Aug 12 10:12 flink-dist_2.12-1.13.3.jar
>-rw-r--r-- 1 root root 78644 Aug 12 10:12 
>flink-hadoop-compatibility_2.12-1.13.3.jar
>-rw-r--r-- 1 root root148127 Aug 12 10:12 flink-json-1.13.3.jar
>-rw-r--r-- 1 root root  43317025 Aug 12 10:12 
>flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>-rwxr-xr-x 1 root root   7709740 Aug 12 10:12 flink-shaded-zookeeper-3.4.14.jar
>-rw-r--r-- 1 root root  47172843 Aug 12 10:12 
>flink-sql-connector-hive-3.1.2_2.11-1.12-SNAPSHOT.jar
>-rw-r--r-- 1 root root  35051553 Aug 12 10:13 flink-table_2.12-1.13.3.jar
>-rw-r--r-- 1 root root  38613339 Aug 12 10:13 flink-table-blink_2.12-1.13.3.jar
>-rw-r--r-- 1 root root  40623959 Aug 12 10:13 hive-exec-3.1.2.jar
>-rw-r--r-- 1 root root  31227259 Aug 11 18:08 
>hudi-flink-bundle_2.11-0.10.1-rc1.jar
>-rwxr-xr-x 1 root root 67114 Aug 12 10:13 log4j-1.2-api-2.12.1.jar
>-rwxr-xr-x 1 root root276771 Aug 12 10:13 log4j-api-2.12.1.jar
>-rwxr-xr-x 1 root root   1674433 Aug 12 10:13 log4j-core-2.12.1.jar
>-rwxr-xr-x 1 root root 23518 Aug 12 10:13 log4j-slf4j-impl-2.12.1.jar
>
>
>在JobManager节点启动Sql-Client时有问题,异常如下:
>2022-08-12 10:19:33,434 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: env.ssh.opts, -p 8808
>2022-08-12 10:19:33,436 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: env.hadoop.conf.dir, /opt/flink-1.13.3/conf/hadoop
>2022-08-12 10:19:33,436 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: rest.flamegraph.enabled, true
>2022-08-12 10:19:33,437 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: jobmanager.rpc.address, 172.17.2.27
>2022-08-12 10:19:33,437 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: jobmanager.rpc.port, 6123
>2022-08-12 10:19:33,437 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: jobmanager.memory.process.size, 10480m
>2022-08-12 10:19:33,437 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: jobmanager.memory.jvm-metaspace.size, 1024m
>2022-08-12 10:19:33,437 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: taskmanager.memory.process.size, 10480m
>2022-08-12 10:19:33,437 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: taskmanager.numberOfTaskSlots, 16
>2022-08-12 10:19:33,437 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: parallelism.default, 1
>2022-08-12 10:19:33,437 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: high-availability, zookeeper
>2022-08-12 10:19:33,438 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: high-availability.storageDir, 
>hdfs://emr-cluster/flink/v1/ha
>2022-08-12 10:19:33,438 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: high-availability.zookeeper.quorum, 
>172.17.2.26:2181,172.17.2.27:2181,172.17.2.29:2181
>2022-08-12 10:19:33,438 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: high-availability.zookeeper.path.root, /flink
>2022-08-12 10:19:33,438 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: high-availability.cluster-id, /v1
>2022-08-12 10:19:33,438 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: state.backend, filesystem
>2022-08-12 10:19:33,438 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: state.checkpoints.dir, 
>hdfs://emr-cluster/flink/v1/flink-checkpoints/
>2022-08-12 10:19:33,438 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: state.savepoints.dir, 
>hdfs://emr-cluster/flink/v1/flink-savepoints/
>2022-08-12 10:19:33,438 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
>configuration property: jobmanager.execution.failover-strategy, region
>2022-08-12 10:19:33,439 INFO  
>org.apache.flink.configuration.GlobalConfiguration   [] - Loading 

Flink SQL 如何描述 ES(ElasticSearch)的nested字段类型?

2022-07-30 Thread RS
Hi,


flink sql如何写es的nested数组数据?


原始示例数据:
{
"id": "123",
"field1":[
{
"k1":1
},
{
"k1":1,
"k2":2
},
{
"k3":"3"
}
]
}


filed1是一个数组,里面的原始是字典,字典内的字段名是动态的,我不知道里面有多少个key


现在准备将数据从kafka消费入库到es中,
已经在es中定义字段 field1mapping,为nested类型, "field1":{"type":"nested"},


flink sql类似如下:
create table source_kafka (
`id` string,
`field1` ARRAY)
with (...)


create table sink_es(
`id` string,
`field1` ARRAY)
with (...)


select id, field1 from source_kafka;
这个可以查看到数据


insert into sink_es select field1 from source_kafka;
这样会报错,
Caused by: [CIRCULAR REFERENCE: ElasticsearchException[Elasticsearch exception 
[type=mapper_parsing_exception, reason=object mapping for [field1] tried to 
parse field [null] as object, but found a concrete value]]]


看错误提示感觉是因为null导致的,但是field1不存在null的数据,所以请教下各位这个问题是什么,如何解决?


Thanks



Re:咨询 Flink 在 OLAP、即席查询场景下的应用问题

2022-07-14 Thread RS
Hi,


打算通过Flink查询HDFS中的数据,对查询实效性要求高,查询平均时延要求在秒级。
=
这种高实时性的要求,是不适合Presto或者Flink引擎的。


如果是数据量不大,查询逻辑不复杂,实时性要求高,建议数据同步到数据库中,使用数据库引擎来查询;
如果是数据量大,查询逻辑复杂,实时性要求不高,Flink或者Presto是可以的;
如果是数据量大,查询逻辑复杂,实时性要求高,那什么都拯救不了你


Thanks



在 2022-07-14 11:54:00,"barbzhang(张博)"  写道:

您好,我目前在调研Flink对于即席查询场景的支持程度,打算通过Flink查询HDFS中的数据,对查询实效性要求高,查询平均时延要求在秒级。

我调研了Flink集群的多种部署模式,发现Standalone on k8s 模式下的 Flink 
Session集群最满足这种需求,因此搭建了该种模式的Flink集群,打算通过我们自研的Java项目集成Flink API提交查询SQL到Flink集群执行。

 

目前我发现通过Java项目往Flink提交SQL有两种方式:

方式一:通过Flink Table API的方式

这种方式需要将集成Flink Table 
API的代码打成jar包,放在我们Java项目服务的服务器上,然后在Java项目内通过调用启动脚本的方式往Flink集群提交任务,类似:flink run 
-m {host}:{port} xxx.jar。

这种方式的缺点是main()方法在客户端执行,而且涉及到客户端往JobManager、JobManager往TaskManager分发jar包的过程,时延较高,一般至少需要十秒以上,不太满足即席查询对时延的要求。

方式二:采用类似SQL客户端的方式

这种方式没有分发jar包的过程,相对第一种方式而言时延较低,问题就在于Java项目该如何集成SQL客户端?我研究了相关代码,打算通过我们自研的Java项目直接调用Flink
 SqlClient的相关方法,类似:SQL客户端提交SQL demo。但是这种方式我们接收到的返回内容是字符串,而不是结构化的Java对象,不像Table 
API封装的那么好,需要自行做反序列化处理,而且我个人觉得这种方式不太合适。

 

综上,我想请教下您两个问题:

问题一:Flink Standalone集群其实就是常驻进程了,类似Presto这种引擎,上述方式一有没有可能Java项目集成Flink Table 
API时,直接在Java项目内运行这段代码,相当于该Java服务作为客户端,直接往Flink集群提交SQL,而不是绕了一次,先打好jar包再通过 flink 
run提交jar包的方式提交SQL。

我想让这段代码直接在Java项目提供的服务内直接运行,不知道目前能不能做到。

问题二:除了问题一的解决方案,还有没有其他方式能满足目前我们这种需求?

 

抱歉打扰您了,万分感谢!

IF IS NULL 函数运行报错:NullPointerException

2022-07-13 Thread RS
Hi,


版本:Flink-1.15.1 
模式:单机


s1为kafka的source表,已定义,
g_name为s1表的字段,string类型


select `g_name` from s1;
这个运行正常,有数据输出


然后加上 IF 语句,则会报错:
Flink SQL> select if(`g_name` is null, 'no', `g_name`) from s1;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
运行报错


堆栈里面的Unknown Source,感觉有问题。
at StreamExecCalc$1247.processElement_split196(Unknown Source) ~[?:?]


报错堆栈:
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
 ~[flink-connector-files-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_302]
Caused by: java.lang.NullPointerException
at StreamExecCalc$1247.processElement_split196(Unknown Source) ~[?:?]
at StreamExecCalc$1247.processElement(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 

Re:请教:关于如何释放 Flink Job 中某个对象持有的资源

2022-07-12 Thread RS
Hi,


如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法


资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象,


Thanks


在 2022-07-12 12:35:31,"Bruce Zu"  写道:
> Flink team好,
> 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
>
> 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。
>
>我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
>org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
>一旦不再使用它就需要调用它的`close`方法来释放资源。
>
>所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常
>
>我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
>在 main 方法结束时释放资源。
>
>类似这样的伪代码:
>```java
>公共类 EsClientHolder {
>  private static final ThreadLocal local = new
>InheritableThreadLocal<>();
>
>  public static final void createAndSetEsClient(EsClient esClient){
>local.set(esClient);
>  }
>
>  private static final createAndSetEsClientBy(EsClientConfig
>esClientConfig){
>EsClient instance = new EsClient(esClientConfig);
>createAndSetEsClient(instance)  ;
>  }
>
>   private static final   EsClient get() {
>EsClient c = local.get();
>if(c == null){
>  throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
>}
>return c;
>  }
>
>private static final  close()抛出 IOException {
>EsClient o = local.get();
>if(o!= null){
>  o.close();
>}
>  }
>
>// 在 Fink 应用程序代码中的用法
>   public class main class {
>public static void main(String[] args) throws IOException {
>  try {
>property prop = null;
>EsClientConfig configuration = getEsClientConfig(prop);
>EsClientHolder.createAndSetEsClientBy(config);
>   // …
>   SomeClass.method1();
>   other classes.method2();
>   // ...
>  } at last {
>EsClientHolder.close();
>  }
>}
>  }
>
>class SomeClass{
>   public void. method 1(){
>// 1. Use EsClient in any calling method of any other class:
>EsClient esClient = EsClientHolder.get();
>   // …
>   }
>}
>class other class {
>  public void method 2() {
>  // 2. Use EsClient in any calling method of any forked child thread
>new thread (
>() -> {
>  EsClient client = EsClientHolder.get();
>  // …
>})
>. start();
> // …
>  }
>}
>
>```
>
>我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。
>
>但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。
>
>比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
>的线程不一样的线程,
>那么运行method1和mehod2的线程就没有办法拿到EsClient了。
>这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() 拆分为在不同的线程中运行,则就
>没有办法释放资源。
>
>谢谢!


Re:Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 Thread RS
(SqlToRelConverter.java:2050)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 ~[?:?]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197)
 ~[?:?]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189)
 ~[?:?]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240)
 ~[?:?]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188)
 ~[?:?]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345)
 ~[?:?]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
 ~[?:?]
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) 
~[?:?]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172)
 ~[flink-sql-client-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88)
 ~[flink-sql-client-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
 ~[flink-sql-client-1.15.1.jar:1.15.1]
... 11 more


Thanks

在 2022-07-11 19:45:04,"Weihua Hu"  写道:
>Hi,
>
>有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了
>
>Best,
>Weihua
>
>
>On Wed, Jul 6, 2022 at 1:53 PM RS  wrote:
>
>> Hi,
>>
>>
>> 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
>> java.lang.ClassNotFoundException:
>> org.apache.flink.table.planner.delegation.ParserFactory
>>
>>
>> Flink SQL> CREATE TABLE t1 (
>> > a STRING,
>> > b INT
>> > )WITH(
>> > 'connector'='filesystem',
>> > 'path'='/tmp/qwe',
>> > 'format'='csv',
>> > 'csv.ignore-parse-errors' = 'true',
>> > 'csv.allow-comments' = 'true'
>> > );
>> [INFO] Execute statement succeed.
>> Flink SQL> select * from t1;
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.ClassNotFoundException:
>> org.apache.flink.table.planner.delegation.ParserFactory
>>
>>
>> 我测试了下,是因为我的lib目录下,有
>> flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,放lib下是因为还要其他任务需要读写hive
>> 如果lib下没有flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,则没有这个报错
>>
>>
>> 请教下,这个问题如何解决呢?
>>
>>
>> Thanks


Re:Re:Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 Thread RS
Hi,


只有 flink-table-planner-loader-1.15.1.jar,没有flink-table-planner_2.12-1.15.1.jar 
也是不行的,并不是去掉一个就可以了


只能使用flink-table-planner_2.12-1.15.1.jar,所以感到奇怪


Thanks


在 2022-07-11 20:19:01,"jiangjiguang719"  写道:
>hi,
>你这个问题是,
>
>flink-table-planner-loader-1.15.1.jar  和 flink-table-planner_2.12-1.15.1.jar  
>冲突了 去掉一个就可以了
>
>
>
>
>
>
>
>在 2022-07-11 19:45:04,"Weihua Hu"  写道:
>>Hi,
>>
>>有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了
>>
>>Best,
>>Weihua
>>
>>
>>On Wed, Jul 6, 2022 at 1:53 PM RS  wrote:
>>
>>> Hi,
>>>
>>>
>>> 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
>>> java.lang.ClassNotFoundException:
>>> org.apache.flink.table.planner.delegation.ParserFactory
>>>
>>>
>>> Flink SQL> CREATE TABLE t1 (
>>> > a STRING,
>>> > b INT
>>> > )WITH(
>>> > 'connector'='filesystem',
>>> > 'path'='/tmp/qwe',
>>> > 'format'='csv',
>>> > 'csv.ignore-parse-errors' = 'true',
>>> > 'csv.allow-comments' = 'true'
>>> > );
>>> [INFO] Execute statement succeed.
>>> Flink SQL> select * from t1;
>>> [ERROR] Could not execute SQL statement. Reason:
>>> java.lang.ClassNotFoundException:
>>> org.apache.flink.table.planner.delegation.ParserFactory
>>>
>>>
>>> 我测试了下,是因为我的lib目录下,有
>>> flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,放lib下是因为还要其他任务需要读写hive
>>> 如果lib下没有flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,则没有这个报错
>>>
>>>
>>> 请教下,这个问题如何解决呢?
>>>
>>>
>>> Thanks


Re:flink sql解析kafka数据

2022-07-06 Thread RS
Hi,


你可以在定义表ccc_test_20220630_2字段的时候,结构如果固定,可以指定字段类型为ARRAY+ROW吧,例如 abc 
ARRAY>,如果里面是动态结构,可以定义为MAP
结构如果比较复杂,或者字段不明确,就自定义UDF解决。


Thanks




在 2022-06-30 15:02:55,"小昌同学"  写道:

各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
   trans_number   STRING,
   end_timestamp  STRING,
   return_flagSTRING,
   commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '6',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '165373920',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fight...@163.com
|

Re:请教下flink的提交方式

2022-07-06 Thread RS
Hi,


通过命令行的方式提交,可以捕获flink run的标准输出,里面包含job id,然后正则匹配或者字符串截取就可以提取到job id了


Thanks





在 2022-07-04 17:50:07,"sherlock zw"  写道:
>目前我需要去监控已经提交的flink任务, 
>但是通过命令行方式提交的话拿不到任务id,只能通过INFO级别的日志过滤出来,但是我们的环境里面的日志界别是WARN,看不到任务id的日志输出,所以想问下除了命令行的方式提交任务还有其他方式吗,例如有和Spark类似的SparkLaunch一样的jar提交的方式吗?希望大佬指点下,谢谢。
>
>


filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-05 Thread RS
Hi,


通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
java.lang.ClassNotFoundException: 
org.apache.flink.table.planner.delegation.ParserFactory


Flink SQL> CREATE TABLE t1 (
> a STRING,
> b INT
> )WITH(
> 'connector'='filesystem',
> 'path'='/tmp/qwe',
> 'format'='csv',
> 'csv.ignore-parse-errors' = 'true',
> 'csv.allow-comments' = 'true'
> );
[INFO] Execute statement succeed.
Flink SQL> select * from t1;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: 
org.apache.flink.table.planner.delegation.ParserFactory


我测试了下,是因为我的lib目录下,有 
flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,放lib下是因为还要其他任务需要读写hive
如果lib下没有flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,则没有这个报错


请教下,这个问题如何解决呢?


Thanks

Flink-1.15.0 消费kafka提交offset失败?

2022-06-26 Thread RS
Hi,
请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理?


现象如下:
1. 任务没有异常,
2. 数据能正常消费处理,不影响数据使用
3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset
4. 部分任务的从Kafka的offset提交失败,部分正常


WARN日志如下:
2022-06-27 01:07:42,725 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11398 (max part counter=1).
2022-06-27 01:07:42,830 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11398.
2022-06-27 01:07:43,820 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11476 (max part counter=0).
2022-06-27 01:07:43,946 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11476.
2022-06-27 01:07:45,218 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11521 (max part counter=47).
2022-06-27 01:07:45,290 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11521.
2022-06-27 01:07:45,521 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 11443
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-06-27 01:07:45,990 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 11398
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.


Thanks~

Re:Re: sql-client pyexec参数生效疑问

2022-06-08 Thread RS
Hi,
是这个问题了,成功了,不清楚为什么要把UDF的解释器分开配置



Thx


在 2022-06-08 13:29:48,"Dian Fu"  写道:
>有两个参数指定Python解释器:
>1)-pyexec,指定的是作业执行过程中,用来运行Python UDF的Python解释器路径
>2)-pyclientexec,指定客户端编译作业的时候,用到的Python解释器路径,具体信息可以看一下:
>https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-interpreter-of-client
>
>可以把这个参数-pyclientexec 也加上试试。
>
>On Tue, Jun 7, 2022 at 11:24 AM RS  wrote:
>
>> Hi,
>>
>>
>> 环境:
>> - flink-1.14.3, 单机集群
>> - 服务器上默认python2,也存在python3.6.8
>> - /xxx/bin/python3是python3生成的虚拟环境
>>
>>
>> 使用sql-client测试pyflink的udf,自定义了一个函数f1,/xxx/p.py
>> 启动命令:
>> ./bin/sql-client.sh -pyfs file:///xxx/p.py -pyexec /xxx/bin/python3
>> 配置pyexec指定了使用的python为python3
>>
>>
>> 执行命令报错,报错信息如下:
>> Flink SQL> create temporary function fun1 as 'p.f1' language python;
>> [INFO] Execute statement succeed.
>> Flink SQL> select fun1('a',1,'s');
>> Traceback (most recent call last):
>>   File "/usr/lib64/python2.7/runpy.py", line 151, in _run_module_as_main
>> mod_name, loader, code, fname = _get_module_details(mod_name)
>>   File "/usr/lib64/python2.7/runpy.py", line 101, in _get_module_details
>> loader = get_loader(mod_name)
>>   File "/usr/lib64/python2.7/pkgutil.py", line 464, in get_loader
>> return find_loader(fullname)
>>   File "/usr/lib64/python2.7/pkgutil.py", line 474, in find_loader
>> for importer in iter_importers(fullname):
>>   File "/usr/lib64/python2.7/pkgutil.py", line 430, in iter_importers
>> __import__(pkg)
>>   File "/home/flink-1.14.3/opt/python/pyflink.zip/pyflink/__init__.py",
>> line 26, in 
>> RuntimeError: Python versions prior to 3.6 are not supported for PyFlink
>> [sys.version_info(major=2, minor=7, micro=5, releaselevel='final',
>> serial=0)].
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.IllegalStateException: Instantiating python function 'p.f1'
>> failed.
>>
>>
>> 报错提示中使用到的是python2,不是参数里面配置的python3,如何让pyexec生效?
>>
>>
>> Thx


sql-client pyexec参数生效疑问

2022-06-06 Thread RS
Hi,


环境:
- flink-1.14.3, 单机集群
- 服务器上默认python2,也存在python3.6.8
- /xxx/bin/python3是python3生成的虚拟环境


使用sql-client测试pyflink的udf,自定义了一个函数f1,/xxx/p.py
启动命令:
./bin/sql-client.sh -pyfs file:///xxx/p.py -pyexec /xxx/bin/python3 
配置pyexec指定了使用的python为python3


执行命令报错,报错信息如下:
Flink SQL> create temporary function fun1 as 'p.f1' language python;
[INFO] Execute statement succeed.
Flink SQL> select fun1('a',1,'s');
Traceback (most recent call last):
  File "/usr/lib64/python2.7/runpy.py", line 151, in _run_module_as_main
mod_name, loader, code, fname = _get_module_details(mod_name)
  File "/usr/lib64/python2.7/runpy.py", line 101, in _get_module_details
loader = get_loader(mod_name)
  File "/usr/lib64/python2.7/pkgutil.py", line 464, in get_loader
return find_loader(fullname)
  File "/usr/lib64/python2.7/pkgutil.py", line 474, in find_loader
for importer in iter_importers(fullname):
  File "/usr/lib64/python2.7/pkgutil.py", line 430, in iter_importers
__import__(pkg)
  File "/home/flink-1.14.3/opt/python/pyflink.zip/pyflink/__init__.py", line 
26, in 
RuntimeError: Python versions prior to 3.6 are not supported for PyFlink 
[sys.version_info(major=2, minor=7, micro=5, releaselevel='final', serial=0)].
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function 'p.f1' failed.


报错提示中使用到的是python2,不是参数里面配置的python3,如何让pyexec生效?


Thx

Re:kafka数据落地到Hive/Filesystem(orc/parquet格式)的疑问

2022-06-06 Thread RS
Hi,
每次checkpoint都会生成文件的,在一个checkpoint期间内,可以配置文件大小合并等,
不同checkpoint的生成的文件无法合并,所以文件数量最少是和checkpoint时间间隔相关的,
如果checkpoint时间间隔比较短,就需要自己去合并小文件了

在 2022-06-06 16:44:51,"谭家良"  写道:
>大家好,关于kafka数据消费到hive/filesystem(orc/parquet)我有个疑问。orc/parquet如何调整落地的文件大小?是根据checkpoint时间来的吗?在落地到hive/filesystem
> connector有什么优化建议吗?
>
>
>best,
>tanjialiang.
>
>
>| |
>谭家良
>|
>|
>tanjl_w...@126.com
>|


Re:Re: pyflink报错:Java gateway process exited before sending its port number

2022-05-23 Thread RS
Hi,
感谢,查询pyflink目录下,里面确实存在多个版本的jar包,我清理了下,可以运行起来了,
看来是PyCharm的bug了,安装新版本的时候没有成功清理旧的版本


Thanks~



在 2022-05-23 19:27:42,"Dian Fu"  写道:
>>> java.lang.NoSuchMethodError:
>org.apache.flink.util.NetUtils.getAvailablePort()I
>
>你的环境是不是不太干净?可以检查一下 PyFlink 安装目录下(site-packages/pyflink/lib) 的那些 jar 包的版本。
>
>
>
>On Mon, May 23, 2022 at 4:22 PM RS  wrote:
>
>> Hi,
>> 在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码
>> 参考官方文档:
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/
>> 报错如下:
>> 
>> Exception in thread "main" java.util.concurrent.ExecutionException:
>> java.lang.NoSuchMethodError:
>> org.apache.flink.util.NetUtils.getAvailablePort()I
>> at
>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>> at
>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>> at
>> org.apache.flink.client.python.PythonEnvUtils.startGatewayServer(PythonEnvUtils.java:387)
>> at
>> org.apache.flink.client.python.PythonGatewayServer.main(PythonGatewayServer.java:46)
>> Caused by: java.lang.NoSuchMethodError:
>> org.apache.flink.util.NetUtils.getAvailablePort()I
>> at
>> org.apache.flink.client.python.PythonEnvUtils.lambda$startGatewayServer$3(PythonEnvUtils.java:366)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>> Traceback (most recent call last):
>>   File "C:/flink/ex2.py", line 132, in 
>> word_count(known_args.input, known_args.output)
>>   File "C:/flink/ex2.py", line 50, in word_count
>> t_env =
>> TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>   File
>> "C:\code\py\venv\3.6\lib\site-packages\pyflink\table\environment_settings.py",
>> line 221, in in_streaming_mode
>> get_gateway().jvm.EnvironmentSettings.inStreamingMode())
>>   File "C:\code\py\venv\3.6\lib\site-packages\pyflink\java_gateway.py",
>> line 62, in get_gateway
>> _gateway = launch_gateway()
>>   File "C:\code\py\venv\3.6\lib\site-packages\pyflink\java_gateway.py",
>> line 112, in launch_gateway
>> raise Exception("Java gateway process exited before sending its port
>> number")
>> Exception: Java gateway process exited before sending its port number
>> 
>>
>>
>> 环境:java11,python3.6,apache-flink=1.15.0
>> 网上搜索解决方案:比如配置JAVA_HOME,但是还是一样的报错
>> import os
>> os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk-11.0.15.1'
>>
>>
>> 请教下大佬们,这种情况下如何解决?
>>
>>
>> Thanks
>>
>>


pyflink报错:Java gateway process exited before sending its port number

2022-05-23 Thread RS
Hi,
在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码
参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/
报错如下:

Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getAvailablePort()I
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at 
org.apache.flink.client.python.PythonEnvUtils.startGatewayServer(PythonEnvUtils.java:387)
at 
org.apache.flink.client.python.PythonGatewayServer.main(PythonGatewayServer.java:46)
Caused by: java.lang.NoSuchMethodError: 
org.apache.flink.util.NetUtils.getAvailablePort()I
at 
org.apache.flink.client.python.PythonEnvUtils.lambda$startGatewayServer$3(PythonEnvUtils.java:366)
at java.base/java.lang.Thread.run(Thread.java:834)
Traceback (most recent call last):
  File "C:/flink/ex2.py", line 132, in 
word_count(known_args.input, known_args.output)
  File "C:/flink/ex2.py", line 50, in word_count
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
  File 
"C:\code\py\venv\3.6\lib\site-packages\pyflink\table\environment_settings.py", 
line 221, in in_streaming_mode
get_gateway().jvm.EnvironmentSettings.inStreamingMode())
  File "C:\code\py\venv\3.6\lib\site-packages\pyflink\java_gateway.py", line 
62, in get_gateway
_gateway = launch_gateway()
  File "C:\code\py\venv\3.6\lib\site-packages\pyflink\java_gateway.py", line 
112, in launch_gateway
raise Exception("Java gateway process exited before sending its port 
number")
Exception: Java gateway process exited before sending its port number



环境:java11,python3.6,apache-flink=1.15.0
网上搜索解决方案:比如配置JAVA_HOME,但是还是一样的报错
import os
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk-11.0.15.1'


请教下大佬们,这种情况下如何解决?


Thanks



Re:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-16 Thread RS
Hi,
cancel的时候要加savepoint,然后启动的时候指定savepoint应该就不会丢数据了,直接cancel的话是可能丢数据的,
checkpoint的作用和你想到可能不一样,你再看看

Thx







在 2022-05-12 10:38:33,"徐战辉"  写道:

 hello, 请教下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。

目前有一份作业,开启checkpoint,  cancel 后重新启动,发现数据会丢失1小部分。




1. flink.conf


execution.checkpointing.interval: 1
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

state.backend: filesystem
state.checkpoints.dir: hdfs://**:8020/flink/checkpoints
state.savepoints.dir: hdfs://:8020/flink/savepoints


2. source table
CREATE TABLE source_kafka_nginxlog (
 ts BIGINT,
 ..
 pt AS PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog',
-- 有将flink 1.15针对的补丁(FLINK-24697)打上

 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 

 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'format'='json'
);


3. sink table



CREATE TABLE sink_kafka_nginxlog_statistic (
 ts BIGINT,
  ..
 clt_rq BIGINT not null
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog-statistic-flink',
 'sink.parallelism' = '20',
 'sink.delivery-guarantee' = 'exactly-once',
 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink',
 'properties.transaction.timeout.ms' = '360',
 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest',
 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'value.format' = 'csv'
)




| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|

Re:flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 Thread RS
Hi,
partition.discovery.interval.ms 这个是Flink connector 
kafka里面加上的,KafkaSourceOptions里面定义的,
看下你的kafka-client的版本,官方的是 2.4.1,如果版本一样,那只能先忽略了。





在 2022-03-22 17:10:52,"Michael Ran"  写道:
>dear all :
> 目前用flink1.4   table api +kafka 的情况下,有各种警告,比如:
> The configuration 'partition.discovery.interval.ms' was supplied 
> but isn't a known config.
> 这些额外的参数,在SQL WITH参数里面没定义,不知道各位时在哪个位置加入配置的?
> 有什么建议吗?
>感谢!


Re:Re: Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-21 Thread RS
Hi,
你这个例子中,捕获到B的变更CDC,若最终结果表支持部分字段更新,就直接更新结果表就行,都不需要关联,
只要你的B的CDC处理 晚于 A流的join处理就行
如果一定要全部关联的话,ttl又不可行,那你这个数据量会无限增大,后面就无法关联了的,设计肯定得改

在 2022-03-22 09:01:30,"JianWen Huang"  写道:
>是的。其实我想到的也是将维度表和事实表都通过Cdc方式做成流,然后regular
>join,实现这样的需求好像只能把双流数据都得存到状态里才可以实现,但是状态会不断增大且业务上不能接受数据不准确的结果,配ttl又不可行。所以来请教大家有没有碰到过这种场景。
>
>casel.chen  于2022年3月22日周二 08:43写道:
>>
>> 用cdc join也需要将事实表缓存下来才能实现吧,这就是普通的regular 
>> join,优点是双流驱动,缺点是需要缓存两边的数据,状态会变得很大,建议使用带ssd的rocksdb增量状态后端。
>> 业务上如果可以接受超过一定时间范围不用关联的话,还可以设置state ttl 进一步使状态大小可控。
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道:
>> >Cdc join
>> >
>> >> 2022年3月21日 14:01,JianWen Huang  写道:
>> >>
>> >> 事实表流A需关联维度表B做数据打宽。需求是当纬度表B发生变化时,关联结果需全部发生变化更新到最新。
>> >> 例子:
>> >> 变化前:
>> >> A流:
>> >> name  gender
>> >> a male
>> >> b male
>> >> c female
>> >>
>> >> 纬度表B:
>> >> nameage
>> >> a   16
>> >> b17
>> >>
>> >> 结果:
>> >> name   gender   age
>> >> a   male  16
>> >> b   male 17
>> >>
>> >> 发生变化后:
>> >> 纬度表B:
>> >> nameage
>> >> a   16->17
>> >> b17->18
>> >>
>> >> 结果:
>> >> name   gender   age
>> >> a   male  17
>> >> b   male  18
>> >>
>> >> 目前我想到一个做法是将维度表做成流然后关联事实表,最后根据更新时间取top1最新sink到存储里。请问大家有别的更好做法吗


Re:Table Api Connectors kafka bounded

2022-03-21 Thread RS
Hi,
参考官方文档的话,目前应该是不支持的,kafka的connector, source只支持Unbounded Scan, 不支持Bounded Scan



NameVersionSourceSink

Apache Kafka0.10+Unbounded ScanStreaming Sink, Batch Sink


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/#supported-connectors


需要的话,你得自己实现一个source了,支持配置from/to timestamp/offset


在 2022-03-18 18:51:02,"李航飞"  写道:
>现在有一个需求是,创建一个任务,消费kafka,仅消费一个片段 即设定起始消费点和结束消费位置
>我看到DataStream Connectors kafka 中有一个setBounded (setUnbounded)属性 ,可以满足需求。
>问题;
>我想使用 Table API 完成上面的需求,该怎么办?
>Table API 是否有相关属性? 
>有其他办法满足这个需求吗?
>流处理批处理都行。


Re:回复:hive 进行 overwrite 合并数据后文件变大?

2022-02-24 Thread RS
感谢,确定了下是压缩格式的问题,
原hive文件的压缩是SNAPPY压缩,使用Flink SQL合并小文件之后,默认不压缩,导致文件变大了。
Flink默认没有继承原文件的压缩算法。。。




在 2022-02-22 12:08:39,"‪junjie.m...@goupwith.com‬"  
写道:

检查下数据格式和压缩格式是否和之前的不一致


 原始邮件 ----
发件人: RS 
日期: 2022年2月22日周二 09:35
收件人: user-zh@flink.apache.org
主 题: hive 进行 overwrite 合并数据后文件变大?
Hi,
flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录,
然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的?
合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了?


hive表table1的分区字段是`date`
insert overwrite aw_topic_compact select * from `table1` where 
`date`='2022-02-21';


合并前:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
274.0 M  822.1 M  /user/hive/warehouse/ods.db/table1/date=2022-02-21
48.1 M   144.2 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



合并后:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
2.9 G8.7 G/user/hive/warehouse/ods.db/table1/date=2022-02-21
47.6 M   142.9 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



hive 进行 overwrite 合并数据后文件变大?

2022-02-21 Thread RS
Hi,
flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录,
然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的?
合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了?


hive表table1的分区字段是`date`
insert overwrite aw_topic_compact select * from `table1` where 
`date`='2022-02-21';


合并前:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
274.0 M  822.1 M  /user/hive/warehouse/ods.db/table1/date=2022-02-21
48.1 M   144.2 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



合并后:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
2.9 G8.7 G/user/hive/warehouse/ods.db/table1/date=2022-02-21
47.6 M   142.9 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



Re:flink 不触发checkpoint

2022-02-18 Thread RS
1. 图片挂了,看不到,尽量用文字,或者用图床等工具
2. 启动任务有配置checkpoint吗?










在 2022-02-17 11:40:04,"董少杰"  写道:

flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
flink版本1.12.2。
谢谢!






| |
董少杰
|
|
eric21...@163.com
|

Re:Re: flink sql 如何提高下游并发度?

2022-01-11 Thread RS
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
>Hi!
>
>可以设置 parallelism.default 为需要的并发数。
>
>Jeff  于2022年1月9日周日 19:44写道:
>
>> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


Re:Re: 咨询个Flink SQL的问题,如何去除null的字段

2022-01-06 Thread RS
Hi,
感谢回复,我也测试过这类方法,
我给json format加了个参数,在序列化的时候,row里面去除null,但是这个要修改代码,单独更新flink-json的jar包了,后期维护可能会有问题
这种很适合写ES和写文件,不会有冗余的字段
如果社区能新增这个功能或者合并进去就方便了

在 2022-01-06 21:18:37,"Benchao Li"  写道:
>我们内部是给json format加了一个功能,允许不把null字段进行序列化。主要解决的也是es这个写入的场景。
>你们也可以试一下。
>
>RS  于2021年12月29日周三 16:41写道:
>
>> Hi,
>> 使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>>
>>
>> 比如:源数据有3个字段,a,b,c
>> insert into table2
>> select
>> a,b,c
>> from table1
>> 当b=null的时候,只希望写入a和c
>> 当c=null的时候,只希望写入a和b
>>
>>
>
>-- 
>
>Best,
>Benchao Li


Re:Re: Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 Thread RS
Hi,
你好,是这样的,从kafka消费的话,如果表定义了 a,b,c三个字段,如果kafka的数据少了一个a,那么在flink sql里面,读出来的就是 
a=null,写入ES的话,就会有个a=null


比如从ES查询数据的话
期望 没有a的时候,查询结果类似 {b=1,c=2}
如果写了a=null进去,查询结果类似 {a=null,b=1,c=2}
这样结果就和期望的不一样了,所以期望是Flink SQL insert的时候 ,不写数值为null字段

在 2021-12-31 10:15:41,"Caizhi Weng"  写道:
>Hi!
>
>我不太熟悉 es,如果某一个字段不写的话,是会写入一个默认值吗?如果是的话,可以使用 coalesce 函数。coalesce(a, b, c,
>...) 会返回第一个非 null 的值,因此只要把默认值放在最后一个,如果前面都是 null 就会写默认值。
>
>RS  于2021年12月30日周四 17:06写道:
>
>> 有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL?
>>
>>
>>
>>
>>
>> 在 2021-12-30 11:36:21,"Xuyang"  写道:
>> >可以使用case when试一下
>> >在 2021-12-29 16:40:39,"RS"  写道:
>> >>Hi,
>> >>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>> >>
>> >>
>> >>比如:源数据有3个字段,a,b,c
>> >>insert into table2
>> >>select
>> >>a,b,c
>> >>from table1
>> >>当b=null的时候,只希望写入a和c
>> >>当c=null的时候,只希望写入a和b
>> >>
>>


Re:Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 Thread RS
有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL?





在 2021-12-30 11:36:21,"Xuyang"  写道:
>可以使用case when试一下
>在 2021-12-29 16:40:39,"RS"  写道:
>>Hi,
>>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>>
>>
>>比如:源数据有3个字段,a,b,c
>>insert into table2
>>select
>>a,b,c
>>from table1
>>当b=null的时候,只希望写入a和c
>>当c=null的时候,只希望写入a和b
>>


咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 Thread RS
Hi,
使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?


比如:源数据有3个字段,a,b,c
insert into table2
select
a,b,c
from table1
当b=null的时候,只希望写入a和c
当c=null的时候,只希望写入a和b



Re:请教flink sql作业链路延迟监控如何实现

2021-12-22 Thread RS
我是直接监控kafka的lag,如果lag数值较大或持续上升,肯定就有延迟了。收到告警后,再查看下plan,有个busy指标,红色的节点就是有问题的








在 2021-12-23 08:36:33,"casel.chen"  写道:
>想问一下flink sql作业链路延迟监控如何实现?
>我们的flink 
>sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储
>想监控如下三种延迟,目前有什么办法实现吗?会有相应的metrics暴露出来吗?目前我们在用的flink版本是1.13.2
>1. 端到端的延迟
>2. kafka本身的延迟
>3. flink处理的延迟


Re:Re: Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-22 Thread RS
只有一条SQL,只是数据量比较大,使用的BATCH模式。
SELECT price

FROM hive.data.data1

ORDER BY price DESC



在 2021-12-22 18:35:13,"刘建刚"  写道:
>你的SQL是怎么写的?两个独立的SQL吗?Flink中有个参数table.dml-sync
>,决定是否多条SQL语句顺序执行,默认是false,也就是多条语句是同时执行的。
>
>RS  于2021年12月22日周三 09:25写道:
>
>> 跑了10几个小时终于跑完了,测试发现BATCH模式下,只有Source把所有数据消费完,后面的SortLimit plan才会创建,和流模式不太一样
>>
>>
>>
>>
>> 在 2021-12-21 20:06:08,"RS"  写道:
>> >slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?
>> >
>> >
>> >
>> >
>> >
>> >在 2021-12-21 17:57:21,"刘建刚"  写道:
>>
>> >>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。
>> >>
>> >>RS  于2021年12月21日周二 16:53写道:
>> >>
>> >>> hi,
>> >>>
>> >>> 版本:flink1.14
>> >>>
>> >>> 模式:batch
>> >>>
>> >>> 测试场景:消费hive大量数据,计算某个字段的 top 10
>> >>>
>> >>>
>> >>>
>> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
>> >>>
>> >>> 请问下,SortLimit状态一直为CREATED是正常现象吗?
>> >>>
>> >>> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> 测试SQL:
>> >>>
>> >>> SELECT price
>> >>>
>> >>> FROM hive.data.data1
>> >>>
>> >>> ORDER BY price DESC
>> >>>
>> >>> LIMIT 10;
>>


Re:Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-21 Thread RS
跑了10几个小时终于跑完了,测试发现BATCH模式下,只有Source把所有数据消费完,后面的SortLimit plan才会创建,和流模式不太一样




在 2021-12-21 20:06:08,"RS"  写道:
>slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?
>
>
>
>
>
>在 2021-12-21 17:57:21,"刘建刚"  写道:
>>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。
>>
>>RS  于2021年12月21日周二 16:53写道:
>>
>>> hi,
>>>
>>> 版本:flink1.14
>>>
>>> 模式:batch
>>>
>>> 测试场景:消费hive大量数据,计算某个字段的 top 10
>>>
>>>
>>> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
>>>
>>> 请问下,SortLimit状态一直为CREATED是正常现象吗?
>>>
>>> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
>>>
>>>
>>>
>>>
>>> 测试SQL:
>>>
>>> SELECT price
>>>
>>> FROM hive.data.data1
>>>
>>> ORDER BY price DESC
>>>
>>> LIMIT 10;


Re:实时读取hive参数不生效

2021-12-21 Thread RS
set table.dynamic-table-options.enabled=true;

sql-client的话这样配置,不要引号


在 2021-12-21 20:20:11,"Fei Han"  写道:
>@all:
>大家好!
>  我在实时读取hive的时候动态参数不生效,另外flink是否可以通过流读方式读取hive的普通表呢?
>版本如下:
>Flink版本1.13.3
>   Hive版本hive2.1.1-CDH6.2.0
>设置的参数是set 'table.dynamic-table-options.enabled'='true'
>报错如下:
>INSERT INTO qhc_catalog.qhc_hms.qhc_ods_assassin_dept
>select * from qhc_catalog.qhc_assassin_ods.assassin_dept /*+ 
>OPTIONS('streaming-source.enable'='true', 
>'streaming-source.partition-order'='create-time') */
>2021-12-21 19:56:45,198 ERROR com.flink.streaming.core.JobApplication  
>[] - 任务执行失败:
>org.apache.flink.table.api.ValidationException: The 'OPTIONS' hint is allowed 
>only when the config option 'table.dynamic-table-options.enabled' is set to 
>true.
>   at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createFinalCatalogTable(CatalogSourceTable.java:104)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:79)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2140)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>  ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51)
>  ~[flink-table_2.12-1.13.3.jar:1.13.3]
>   at 
> com.flink.streaming.core.execute.ExecuteSql.exeSql(ExecuteSql.java:38) 
> ~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
>   at com.flink.streaming.core.JobApplication.main(JobApplication.java:80) 
> ~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_211]
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_211]
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_211]
>   at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211]
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
> 

Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-21 Thread RS
slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?





在 2021-12-21 17:57:21,"刘建刚"  写道:
>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。
>
>RS  于2021年12月21日周二 16:53写道:
>
>> hi,
>>
>> 版本:flink1.14
>>
>> 模式:batch
>>
>> 测试场景:消费hive大量数据,计算某个字段的 top 10
>>
>>
>> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
>>
>> 请问下,SortLimit状态一直为CREATED是正常现象吗?
>>
>> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
>>
>>
>>
>>
>> 测试SQL:
>>
>> SELECT price
>>
>> FROM hive.data.data1
>>
>> ORDER BY price DESC
>>
>> LIMIT 10;


batch模式下任务plan的状态一直为CREATED

2021-12-21 Thread RS
hi,

版本:flink1.14

模式:batch

测试场景:消费hive大量数据,计算某个字段的 top 10

使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。

请问下,SortLimit状态一直为CREATED是正常现象吗? 

数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?




测试SQL:

SELECT price

FROM hive.data.data1

ORDER BY price DESC

LIMIT 10;

Re:Re:Re: flink sql collect函数使用问题

2021-12-03 Thread RS
SELECT class_no, collect(info)

FROM (

SELECT class_no, ROW(student_no, name, age) AS info

FROM source_table

)

GROUP BY class_no;


从SQL层面想到比较接近的方法,但multiset无法转array


从你的需求描述看,mongodb目标表的这种班级设计平时可能不太需要,如果是为了查某个班所有的学生的话,在查询的时候加个where条件即可,没有必要把明细数据再放到一个数组里面
感觉可能是你定义表结构和实际使用方面的问题,可以换个角度思考下

在 2021-12-03 08:36:57,"casel.chen"  写道:
>可我要的最终结果不是string,最好是通用的Row类型,这样的话下次聚合其他维度就不用重复开发UDF了。
>类似我这样的需求应该其他人也会遇到吧?
>功能:collect出一个Multiset即map,key是数据本身,value是数据出现的次数,可以按出现次数排序等。
>   输出可以是去重或不去重的Array(按出现次数排序或不排序),也可以就是map本身
>
>
>目前collect函数可以输出一个Multiset即map,但要怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-12-02 09:58:28,"cyril cui"  写道:
>>af里acc为个list,merge的时候合并,输出的时候 list拼成string即可
>>
>>casel.chen  于2021年12月2日周四 上午9:46写道:
>>
>>> 使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group
>>> by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql?
>>> 如果不能的话要怎么写UDAF,有例子参考吗?谢谢!
>>>
>>> kafka源表:
>>> 班级 学号  姓名  年龄
>>> 1 20001张三   15
>>> 2 20011李四   16
>>> 1 20002王五   16
>>> 2 20012吴六   15
>>>
>>> create table source_table (
>>>class_no: INT,
>>>student_no: INT,
>>>name: STRING,
>>>age: INT
>>> ) with (
>>>'connector' = 'kafka',
>>>...
>>> );
>>>
>>>
>>>
>>> 通过flink sql处理输出 ==>
>>>
>>>
>>> mongodb目标表:
>>> 班级 学生信息
>>> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>>> 20002, "name":"王五", "age": 16}]
>>> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>>> 20012, "name":"吴六", "age": 15}]
>>>
>>> create table sink_table (
>>>   class_no INT,
>>>   students: ARRAY>
>>> ) with (
>>>   'connector' = 'mongodb',
>>>   ...
>>> );
>>>
>>>


Re:回复: flink远程调用时环境变量问题

2021-11-29 Thread RS
试试 flink-conf.yam里面配置 env.hadoop.conf.dir: /xxx/hadoop

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/





在 2021-11-26 11:14:17,"王健" <13166339...@163.com> 写道:
>
>
>您好,ssh远程调用,/etc/profile配置是不起作用的呢
>
>
>| |
>王健
>|
>|
>13166339...@163.com
>|
>签名由网易邮箱大师定制
>在2021年11月26日 11:12,Mabin 写道:
>在/etc/profile里面配的
>
>发自我的iPhone
>
>在 2021年11月26日,上午11:07,王健 <13166339...@163.com> 写道:
>
>
>
>大佬们:
>远程调用flink启动任务,如何解决hadoop的环境变量问题呢,像java,hbase其他的环境变量都可以通过在flink-conf.yaml配置文件里配置,但是hadoop配置env.hadoop.conf.dir不起作用。
>可能是需要增加export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop 
>classpath`,但是这个是不能在flink-conf.yaml配置吧
>
>
>急求解决,万分感谢
>
>
>
>| |
>王健
>|
>|
>13166339...@163.com
>|
>签名由网易邮箱大师定制


关于flink plugins目录不生效的疑问

2021-11-22 Thread RS
Hi,

环境:flink-1.14.0,单节点standalone



https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/

参考官方文档,执行下面命令:

mkdir plugins/s3-fs-hadoop

cp opt/flink-s3-fs-hadoop-1.14.0.jar plugins/s3-fs-hadoop/



在flink-conf中配置了hadoop的路径(s3使用了hadoop的配置文件)

env.hadoop.conf.dir: /data/hadoop/s3

然后启动集群,成功启动后,执行./bin/sql-client.sh,用SQL测试读取s3数据, 出现连接超时报错



但是把flink-s3-fs-hadoop-1.14.0.jar挪到flink的lib下,重启集群,重新执行同样的测试,就可以读取到数据了,其余的配置都没有修改
所以感觉这个plugins目录没有生效?这个plugins和lib目录的区别在哪里,应该如何使用??


附上报错信息:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.util.SerializedThrowable: connect timed out


Caused by: com.amazonaws.SdkClientException: Failed to connect to service 
endpoint: 
at 
com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:100)
at 
com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:70)
at 
com.amazonaws.internal.InstanceMetadataServiceResourceFetcher.readResource(InstanceMetadataServiceResourceFetcher.java:75)
at 
com.amazonaws.internal.EC2ResourceFetcher.readResource(EC2ResourceFetcher.java:66)
at 
com.amazonaws.auth.InstanceMetadataServiceCredentialsFetcher.getCredentialsEndpoint(InstanceMetadataServiceCredentialsFetcher.java:58)
at 
com.amazonaws.auth.InstanceMetadataServiceCredentialsFetcher.getCredentialsResponse(InstanceMetadataServiceCredentialsFetcher.java:46)
at 
com.amazonaws.auth.BaseCredentialsFetcher.fetchCredentials(BaseCredentialsFetcher.java:112)
at 
com.amazonaws.auth.BaseCredentialsFetcher.getCredentials(BaseCredentialsFetcher.java:68)
at 
com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:165)
at 
org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137)
... 49 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at 
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at 
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at 
com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52)
at 
com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80)
... 58 more



flink1.12 请教下如何配置多hadoop参数,s3使用问题

2021-11-22 Thread RS
hi,


环境:
1. flink-1.12,版本可以升级
2. flink-conf中配置了env.hadoop.conf.dir,路径下有hdfs集群的core-site.xml和hdfs-site.xml, 
state.backend保存在该HDFS上
3. flink的部署模式是K8S+session


需求:
需要从一个s3协议的分布式文件系统中读取文件,处理完写到mysql中


问题:
s3配置采用hadoop的配置方式,保存为一个新的core-site.xml文件,参考的 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A


按照官网说明文档中,需要 修改hadoop的环境变量,但是就和以前的core-site.xml冲突了,无法同时配置2个hadoop路径
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/


或者 在flink-conf.yaml中添加一堆s3配置,这样又写死了,再新增一个s3集群的时候如何处理?


所以请教下如何解决这类问题(可以修改代码)?如何配置多个hadoop配置(比如从第一个文件系统(s3协议)读数据,写到第二个文件系统中(s3协议))?





Re:回复:flink1.13.1 sql client connect hivecatalog 报错

2021-11-21 Thread RS
图片看不到的,尽量不要发图片,你可以复制文字出来并说明下,
















在 2021-11-22 13:14:13,"zhiyuan su"  写道:

我使用的是上面的jar 包。从1.13的文档处获取的,但维标注flink 版本,我理解应该是flink1.13版本编译的。



这个是yaml文件,我直接在sql 客户端,通过DDL 的方式去编写的话,也是如下报错:
Caused by: java.util.ServiceConfigurationError: 
org.apache.flink.table.factories.Factory: 
org.apache.flink.table.module.hive.HiveModuleFactory not a subtype


Re:Re: flinksql 写 hive ,orc格式,应该支持下压缩。

2021-11-17 Thread RS
1. 文件名是不带.zlib后缀的
2. ORC格式默认是配置了ZIP压缩的,并且开启的,你可以配置'orc.compress'='NONE'测试下,看下不压缩的大小,没有压缩的文件应该是更大的


在 2021-11-16 17:29:17,"yidan zhao"  写道:
>我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。
>其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩?
>
>RS  于2021年11月15日周一 上午9:55写道:
>
>> 官网里面有介绍这个,你是要这个吧
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/orc/
>>
>>
>> Orc format also supports table properties from Table properties. For
>> example, you can configure orc.compress=SNAPPY to enable snappy compression.
>>
>>
>> 在 2021-11-11 12:37:31,"yidan zhao"  写道:
>> >如题,有支持压缩的方法吗当前,看文档没找到应该。
>>


Re:flinksql 写 hive ,orc格式,应该支持下压缩。

2021-11-14 Thread RS
官网里面有介绍这个,你是要这个吧
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/orc/


Orc format also supports table properties from Table properties. For example, 
you can configure orc.compress=SNAPPY to enable snappy compression.


在 2021-11-11 12:37:31,"yidan zhao"  写道:
>如题,有支持压缩的方法吗当前,看文档没找到应该。


Re:请求帮助

2021-11-14 Thread RS
查看下client的日志,一般在flink的logs目录下




在 2021-11-12 20:59:59,"sky"  写道:
>我使用的事flink on yarn。在执行命令时:  flink run -m yarn-cluster 
>./examples/batch/WordCount.jar  结果却报错了:
>
>The program finished with the following exception:
>
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: org.apache.flink.runtime.rest.util.RestClientException: 
>[org.apache.flink.runtime.rest.handler.RestHandlerException: 
>org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
>Flink job (397a081a0313f462818575fc725b3582)
> at 
>org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:94)
> at 
>org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:84)
> at 
>java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>
> ...
>麻烦告知是什么原因呢,我配置文件是这样的:
>#===
>high-availability: zookeeper
>high-availability.storageDir: hdfs://mycluster/flink/ha/
>high-availability.zookeeper.quorum: 
>hadoop201:2181,hadoop202:2181,hadoop203:2181
>high-availability.zookeeper.path.root: /flink
>high-availability.cluster-id: /default_one # important: customize per cluster
>#设置ck的状态后端
>state.backend: filesystem
>state.checkpoints.dir: hdfs://mycluster/flink/checkpoints
>#设置默认的savepoint的保存位置
>state.savepoints.dir: hdfs://mycluster/flink/savepoints
># 集群名称不能写错
>jobmanager.archive.fs.dir: hdfs://mycluster/flink/completed-jobs/
>historyserver.archive.fs.dir: hdfs://mycluster/flink/completed-jobs/
>#===
>
>谢谢!


Re:flink 1.13.1 通过yarn-application运行批应用,处理mysql源一亿条数据到hive,发现需要配置16G+的Taskmangaer内存

2021-11-04 Thread RS
看看任务并行度是多少,可能是并发太大导致的内存占用?? 

















在 2021-11-04 15:52:14,"Asahi Lee" <978466...@qq.com.INVALID> 写道:
>hi!
>我通过flink sql,将mysql的一亿条数据传输到hive库中,通过yarn-application方式运行,结果配置16G的内存,执行失败!


Re:回复:如何监控kafka延迟

2021-08-17 Thread RS
1. metric指标每次都会清0的2. 数据对账的话, 可以将每次的统计数据按时间点保存起来, 然后查询时间范围的时候, 做sum求和来对账
在 2021-08-09 09:51:43,"Jimmy Zhang"  写道:
>您好,看到你们在用kafka相关metrics,我想咨询一个问题。你们是否遇见过在重启一个kafka sink 
>job后,相关指标清零的情况?这样是不是就无法持续的进行数据想加?我们想做一个数据对账,查询不同时间段的输出量统计,这样可能中间归零就有问题,所以想咨询下,任何的回复都非常感谢!
>
>
>
>
>|
>Best,
>Jimmy
>|
>
>Signature is customized by Netease Mail Master
>
>在2021年07月28日 17:58,jie mei 写道:
>hi,all
>
>我们是通过 grafana 对采集到的 flink kafka 的
>metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。
>
>xuhaiLong  于2021年7月28日周三 下午5:46写道:
>
>> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。
>>
>>
>> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道:
>> Hi comsir
>>
>> kafka的控制台能力比较弱,想知道延迟只能自己维护。
>>
>> 维护方式:
>>
>> 1. 每个服务的topic的offset 减去 groupid的offset
>>
>> 2. 尽量可以计算出各种消费速度
>>
>> 3. rocketmq控制台,可看到消费进度,可以参照下。
>>
>>
>> 在 2021/7/28 上午11:02, 龙逸尘 写道:
>> Hi comsir,
>> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。
>> group  id 需要自己维护。
>>
>> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道:
>>
>> hi all
>> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况??
>> 监控这个延迟的目的:1.大盘展示,2.延迟后报警
>> 小问题:
>> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标?
>> 2.怎么获取groupId呢,多个group消费的话,如何区分呀?
>> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗?
>> 4.有比较优雅的实现方式吗?
>> 非常感谢 期待解答 感谢感谢
>>
>
>
>--
>
>*Best Regards*
>*Jeremy Mei*


Re:Re:Re: 请教下Flink时间戳问题

2021-08-16 Thread RS
T只是时间格式显示问题, 数据格式都是timestamp(3), 这个和T应该无关的
在 2021-08-16 13:45:12,"Geoff nie"  写道:
>谢谢你!第二个问题确实是我版本太低问题,我flink版本是1.12.1。
>第一个问题,是因为我通过flink写入iceberg 
>表中,然后通过presto查询iceberg表,其他字段的表都可以查询,但是当写入的是含有TIMESTAMP 类型的表时,presto查询如下报错:
>
>Query failed (#20210816_020321_00011_wa8bs) in your-presto: Cannot convert 
>from Iceberg type 'timestamp' (TIMESTAMP) to Presto type
>
>所以,我怀疑要么flink TIMESTAMP 类型中间带T问题,要么presto(0.258版本)问题。如果这个问题您也知道答复下,感激不尽。
>
>
>
>
>
>
>
>
>
>
>
>在 2021-08-16 12:26:27,"Leonard Xu"  写道:
>>Hi,
>>你贴的图都挂了,需要传图可以用下图床工具,代码少可以直接贴代码。
>>TIMESTAMP 类型中 显示的T 没有任何含义,只是 format 一个时间戳时的一个分割符,你最终把 TIMESTAMP 
>>写入到你的sink,你自己的sink(比如mysql)会有其自己的format。
>>第二个问题,看不到你的图,你看下你flink的版本,1.13后这个TIMESTAMP_LTZ类型支持才完善的。
>>
>>祝好,
>>Leonard
>>
>>
>>> 在 2021年8月16日,10:27,Geoff nie  写道:
>>> 
>>> 问题一:flink timestamp时间戳为何中间多了个T,怎么才能少去中间T呢?
>>> 
>>


Re:关于 Flink on K8S Deploy Job Cluster 部署问题

2020-11-21 Thread RS
taskmanager-job-deployment.yaml 和 jobmanager-job.yaml 部署的时候只用启动一次服务,

后续启动实际job的时候,就占用slot,available slot就少了 
当job执行完之后,slot资源就释放了,available的slot又恢复了,可以给下一次的job提供资源 
如果你的slot用完了的话,那就是资源不够了,需要重新配置taskmanager-job-deployment.yaml












在 2020-11-20 15:20:58,"WeiXubin" <18925434...@163.com> 写道:
>我们打算采用 Flink on K8S Job
>Cluster(perjob)的部署方式。我们使用taskmanager-job-deployment.yaml
>在K8S启动taskmananger,副本数为2,每个taskMananger的solt为8。我们把TaskMananger理解为资源池,当有一个Job启动时,会根据任务情况自动分配一定数量的TaskMananger给它,当它用完时把TaskMananger归还。
>
>当我们使用 jobmanager-job.yaml
>启动Job(Job只需要一个solt)时候,发现该Job会占用这两个TaskMananger,即使其并不需要那么多solt。这导致第二个Job启动时没有可用的TaskMananger,导致资源浪费。
>
>问题:
>是否pre-job模式每次启动都是需要创建 taskmanager-job-deployment.yaml 和
>jobmanager-job.yaml,然后这部分taskmananger归属于这个job,当运行完需要销毁掉
>taskmanager?但这样就会导致每次都要创建和销毁taskmanager
>
>Thanks,
>Bin
>
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:关于flink任务挂掉报警的监控指标选择

2020-11-05 Thread RS
可以配置任务重启告警, flink任务挂掉之后会自动尝试重启
如果是固定任务数量的话, 还可以配置slot数量告警



在 2020-11-05 10:15:01,"bradyMk"  写道:
>请问各位大佬,我基于grafana+prometheus构建的Flink监控,现在想实现flink任务挂掉后,grafana就发出报警的功能,但是目前不知道该用什么指标去监控,我之前想监控flink_jobmanager_job_uptime这个指标,设置的监控规则是:max_over_time(flink_jobmanager_job_uptime[1m])
>-
>min_over_time(flink_jobmanager_job_uptime[1m])的差小于等于0就报警,但是任务刚启动,会有误报,想请教下有没有更好的办法
>
>
>
>-
>Best Wishes
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


sql-client 连接hive报错 TTransportException

2020-10-26 Thread RS
Hi, 请教下
我尝试使用sql-client连接hive,  hive正常, 使用beeline -u jdbc:hive2://x.x.x.x:1 可以正常连接


sql-client-defaults.yaml配置内容:
tables: []
functions: []
catalogs:
- name: myhive
  type: hive
  hive-conf-dir: /home/hive/flink-1.11.1/conf
  default-database: default
execution:
  planner: blink
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 100
  parallelism: 1
  max-parallelism: 128 
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  restart-strategy:
type: fallback
deployment:
  response-timeout: 5000
  gateway-address: ""
  gateway-port: 0


然后启动sql-client报错
$./bin/sql-client.sh embedded


最后的报错信息:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed 
to determine whether database default exists or not
at 
org.apache.flink.table.catalog.hive.HiveCatalog.databaseExists(HiveCatalog.java:335)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:227)
at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:627)
at java.util.HashMap.forEach(HashMap.java:1289)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
... 3 more
Caused by: org.apache.thrift.transport.TTransportException
at 
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at 
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_database(ThriftHiveMetastore.java:1135)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_database(ThriftHiveMetastore.java:1122)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:1511)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:1506)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)
at com.sun.proxy.$Proxy28.getDatabase(Unknown Source)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.getDatabase(HiveMetastoreClientWrapper.java:107)
at 
org.apache.flink.table.catalog.hive.HiveCatalog.databaseExists(HiveCatalog.java:330)
... 15 more




附录完整错误信息:
Searching for '/home/hive/flink-1.11.1/conf/sql-client-defaults.yaml'...found.
Reading default environment from: 
file:/home/hive/flink-1.11.1/conf/sql-client-defaults.yaml
No session environment specified.
2020-10-27 09:48:14,533 INFO  org.apache.hadoop.hive.conf.HiveConf  
   [] - Found configuration file 
file:/home/hive/flink-1.11.1/conf/hive-site.xml
2020-10-27 09:48:15,144 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://x.x.x.x:1
2020-10-27 09:48:15,168 INFO  

Re:启动任务异常, Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1

2020-08-30 Thread RS
测试出来了, rowtime参数需要是最后一个参数, $(timeField).rowtime()

但是这个报错也太隐晦了吧 .





在 2020-08-30 14:54:15,"RS"  写道:
>Hi, 请教下
>
>
>启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下
>这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理
>DataStreamSource source = env.addSource(consumer);
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>tableEnv.createTemporaryView(table_name, source, 
> $(timeField).rowtime(), $("cpu"));
>tableEnv.from(table_name).window(
>Tumble.over(lit(1).minutes())
>.on($(timeField))
>.as(table_name + "Window")
>);
>tableEnv.executeSql(sql1);  // CREATE TABLE t_out (`ts` TIMESTAMP(3), 
> `count` BIGINT) WITH ('connector' = 'print')   没有报错
>tableEnv.executeSql(sql2);  //  INSERT INTO t_out SELECT 
> TUMBLE_START(`ts`, INTERVAL '1' MINUTE), COUNT(1) as `count` FROM t1 GROUP BY 
> TUMBLE(`ts`, INTERVAL '1' MINUTE)  抛异常
>
>
>异常堆栈:
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: Index: 1, Size: 1
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>
>at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>
>at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>
>at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>
>at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>
>at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>
>at java.security.AccessController.doPrivileged(Native Method)
>
>at javax.security.auth.Subject.doAs(Subject.java:422)
>
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
>
>at 
>org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
>Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
>
>at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>
>at java.util.ArrayList.get(ArrayList.java:433)
>
>at java.util.Collections$UnmodifiableList.get(Collections.java:1311)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:682)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:665)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:561)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1(ExprCodeGenerator.scala:184)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(ExprCodeGenerator.scala:158)
>
>at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>
>at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>
>at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
>
>at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateConverterResultExpression(ExprCodeGenerator.scala:158)
>
>at 
>org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:103)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:126)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:55)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlan(StreamExecDataStreamScan.scala:55)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExec

启动任务异常, Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1

2020-08-30 Thread RS
Hi, 请教下


启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下
这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理
DataStreamSource source = env.addSource(consumer);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
tableEnv.createTemporaryView(table_name, source, 
$(timeField).rowtime(), $("cpu"));
tableEnv.from(table_name).window(
Tumble.over(lit(1).minutes())
.on($(timeField))
.as(table_name + "Window")
);
tableEnv.executeSql(sql1);  // CREATE TABLE t_out (`ts` TIMESTAMP(3), 
`count` BIGINT) WITH ('connector' = 'print')   没有报错
tableEnv.executeSql(sql2);  //  INSERT INTO t_out SELECT 
TUMBLE_START(`ts`, INTERVAL '1' MINUTE), COUNT(1) as `count` FROM t1 GROUP BY 
TUMBLE(`ts`, INTERVAL '1' MINUTE)  抛异常


异常堆栈:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Index: 1, Size: 1

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)

at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1

at java.util.ArrayList.rangeCheck(ArrayList.java:657)

at java.util.ArrayList.get(ArrayList.java:433)

at java.util.Collections$UnmodifiableList.get(Collections.java:1311)

at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:682)

at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:665)

at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:561)

at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1(ExprCodeGenerator.scala:184)

at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(ExprCodeGenerator.scala:158)

at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)

at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)

at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)

at scala.collection.TraversableLike.map(TraversableLike.scala:233)

at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)

at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateConverterResultExpression(ExprCodeGenerator.scala:158)

at 
org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:103)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:126)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:55)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlan(StreamExecDataStreamScan.scala:55)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:76)

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)

at 

Re:请教:用flink实现实时告警的功能

2020-08-11 Thread RS
Hi,
你这个完全就是CEP的使用场景啊, 大于多少次, 大于一定数值组合起来判定事件, 
1. 规则变更了, 重启任务就行, 规则都变了, 任务重启也没影响
2. CEP支持规则组合, 时间窗口
3. 最佳实践官网的介绍就很合适
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/cep.html

在 2020-08-06 10:26:19,"samuel@ubtrobot.com"  写道:
>由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
>告警有分两部分:
>   一是告警规则的设置,数据存放在mysql,存储的格式是json
>{"times":5}  ---就是事件发生大于5次就发出告警;
>{"temperature": 80} ---就是温度大于80就告警;
>   二是告警实现
>  1)上报的数据写入到kafka
>  2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
>现在遇到的问题是:
>1. 当规则变更时,如何及时生效?
>2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
>3.这一功能有最佳实践吗?
>
>希望哪位解答一下,谢谢!
>   
>
> 


Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-08-10 Thread RS
Hi
恩,  重新试了下, 这种是可以的, 前面是我操作错了, 谢谢~
Thx

在 2020-08-10 13:36:36,"Yang Wang"  写道:
>你是自己打了一个新的镜像,把flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib下面了吗
>如果是的话不应该有这样的问题
>
>Best,
>Yang
>
>RS  于2020年8月10日周一 下午12:04写道:
>
>> Hi,
>> 我下载了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar, 然后放到了lib下, 重启了集群,
>> 但是启动任务还是会报错:
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Could not find a file system implementation for scheme 'hdfs'. The scheme
>> is not directly supported by Flink and no Hadoop file system to support
>> this scheme could be loaded. For a full list of supported file systems,
>> please see
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:491)
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:64)
>> at
>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:501)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:465)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:301)
>> ... 22 more
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Hadoop is not in the classpath/dependencies.
>> at
>> org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
>> ... 28 more
>>
>>
>> lib下有这些jar包
>> $ ls lib/
>> avro-1.8.2.jar
>> flink-avro-1.11.1-sql-jar.jar
>> flink-connector-jdbc_2.12-1.11.1.jar
>> flink-csv-1.11.1.jar
>> flink-dist_2.12-1.11.1.jar
>> flink-json-1.11.1.jar
>> flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>> flink-shaded-zookeeper-3.4.14.jar
>> flink-sql-connector-kafka_2.12-1.11.1.jar
>> flink-table_2.12-1.11.1.jar
>> flink-table-blink_2.12-1.11.1.jar
>> kafka-clients-2.5.0.jar
>> log4j-1.2-api-2.12.1.jar
>> log4j-api-2.12.1.jar
>> log4j-core-2.12.1.jar
>> log4j-slf4j-impl-2.12.1.jar
>> mysql-connector-java-5.1.49.jar
>>
>>
>>
>> 在 2020-08-10 10:13:44,"Yang Wang"  写道:
>> >Matt Wang是正确的
>> >
>> >目前Flink发布的binary和镜像里面都没有flink-shaded-hadoop,所以需要你在官方镜像的基础再加一层
>> >把flink-shaded-hadoop[1]打到/opt/flink/lib目录下
>> >
>> >FROM flinkCOPY /path/of/flink-shaded-hadoop-2-uber-*.jar $FLINK_HOME/lib/
>> >
>> >
>> >[1].
>> >
>> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber
>> >
>> >
>> >Best,
>> >Yang
>> >
>> >Matt Wang  于2020年8月7日周五 下午5:22写道:
>> >
>> >> 官网的镜像只包含 Flink 相关的内容,如果需要连接 HDFS,你需要将 Hadoop 相关包及配置打到镜像中
>> >>
>> >>
>> >> --
>> >>
>> >> Best,
>> >> Matt Wang
>> >>
>> >>
>> >> 在2020年08月7日 12:49,caozhen 写道:
>> >> 顺手贴一下flink1.11.1的hadoop集成wiki:
>> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
>> >>
>> >> 根据官网说不再提供flink-shaded-hadoop-2-uber。并给出以下两种解决方式
>> >>
>> >> 1、建议使用HADOOP_CLASSPATH加载hadoop依赖
>> >> 2、或者将hadoop依赖放到flink客户端lib目录下
>> >>
>> >> *我在用1.11.1 flink on
>> >>
>> >>
>> yarn时,使用的是第二种方式,下载hadoop-src包,将一些常用依赖拷贝到lib目录下。(这可能会和你的mainjar程序发生类冲突问题,需要调试)
>> >>
>> >> 我觉得目前这种方式不好,只是暂时解决问题。还是应该有flink-shaded-hadoop包,正在尝试打包,有些问题还没完全解决。
>> >> *
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>>


Re:Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-08-09 Thread RS
Hi, 
我下载了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar, 然后放到了lib下, 重启了集群, 但是启动任务还是会报错:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded. For a full list of supported file systems, please see 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:491)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:64)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:501)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:465)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:301)
... 22 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
... 28 more


lib下有这些jar包
$ ls lib/
avro-1.8.2.jar
flink-avro-1.11.1-sql-jar.jar
flink-connector-jdbc_2.12-1.11.1.jar
flink-csv-1.11.1.jar
flink-dist_2.12-1.11.1.jar
flink-json-1.11.1.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
flink-shaded-zookeeper-3.4.14.jar
flink-sql-connector-kafka_2.12-1.11.1.jar
flink-table_2.12-1.11.1.jar
flink-table-blink_2.12-1.11.1.jar
kafka-clients-2.5.0.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
mysql-connector-java-5.1.49.jar



在 2020-08-10 10:13:44,"Yang Wang"  写道:
>Matt Wang是正确的
>
>目前Flink发布的binary和镜像里面都没有flink-shaded-hadoop,所以需要你在官方镜像的基础再加一层
>把flink-shaded-hadoop[1]打到/opt/flink/lib目录下
>
>FROM flinkCOPY /path/of/flink-shaded-hadoop-2-uber-*.jar $FLINK_HOME/lib/
>
>
>[1].
>https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber
>
>
>Best,
>Yang
>
>Matt Wang  于2020年8月7日周五 下午5:22写道:
>
>> 官网的镜像只包含 Flink 相关的内容,如果需要连接 HDFS,你需要将 Hadoop 相关包及配置打到镜像中
>>
>>
>> --
>>
>> Best,
>> Matt Wang
>>
>>
>> 在2020年08月7日 12:49,caozhen 写道:
>> 顺手贴一下flink1.11.1的hadoop集成wiki:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
>>
>> 根据官网说不再提供flink-shaded-hadoop-2-uber。并给出以下两种解决方式
>>
>> 1、建议使用HADOOP_CLASSPATH加载hadoop依赖
>> 2、或者将hadoop依赖放到flink客户端lib目录下
>>
>> *我在用1.11.1 flink on
>>
>> yarn时,使用的是第二种方式,下载hadoop-src包,将一些常用依赖拷贝到lib目录下。(这可能会和你的mainjar程序发生类冲突问题,需要调试)
>>
>> 我觉得目前这种方式不好,只是暂时解决问题。还是应该有flink-shaded-hadoop包,正在尝试打包,有些问题还没完全解决。
>> *
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink-jdbc_2.11:1.11.1依赖 找不到

2020-08-07 Thread RS
Hi,
找下这种的 flink-connector-jdbc_2.12-1.11.1.jar

















在 2020-08-07 14:47:29,"lydata"  写道:
>flink-jdbc_2.11:1.11.1依赖 在 https://mvnrepository.com/  找不到 ,是不是没有上传?  


Flink 1.11.1 on k8s 如何配置hadoop

2020-08-06 Thread RS
Hi,
Flink 1.11.1 想运行到K8S上面, 使用的镜像是flink:1.11.1-scala_2.12, 按照官网上面介绍的, 部署session 
cluster, jobmanager和taskmanager都启动成功了
然后提交任务的时候会报错:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)


提示找不到Hadoop的依赖, 从官网上介绍的话, 1.11开始不建议使用flink-shaded-hadoop-2-uber, 
需要HADOOP_CLASSPATH
但是还是不太清楚怎么和K8S部署结合起来, 官网说的比较简单粗略, 有人有实际成功的案例可以分享下吗?


Thx



Flink 1.11.1 消费带SASL的Kafka报错: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config

2020-08-02 Thread RS
Hi, 
我尝试消费SASL机制的Kafka集群


jaas.conf 文件内容:
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin001"
  password="123456";
};


执行命令如下:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf"
./bin/sql-client.sh embedded



CREATE TABLE t1

(vendor STRING)

WITH (

'connector' = 'kafka',

'topic' = 'test',

'properties.bootstrap.servers' = '127.0.0.1:9092',

'properties.group.id' = 'g1',

'properties.sasl.mechanisms'='PLAIN',

'properties.sasl.username'='admin001',

'properties.sasl.password'='123456',

'properties.security.protocol'='SASL_PLAINTEXT',

'format' = 'json',

'scan.startup.mode' = 'earliest-offset',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

);



然后报错提示:
Flink SQL> select * from t1;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: No serviceName defined in either JAAS or 
Kafka config


请教下, 这个该如何解决?


Thx



kafka avro格式sink报错,NoClassDefFoundError: Could not initialize class org.apache.avro.SchemaBuilder

2020-07-27 Thread RS
Hi,
版本:Flink-1.11.1
任务启动模式:standalone
Flink任务编译的jar的maven中包含了flink-avro,jar-with-dependencies编译的

org.apache.flink
flink-avro
1.11.1

编译出来的jar也包含了这个class


我看官网上说明 Flink has extensive built-in support for Apache Avro。感觉默认是支持avro的
1. 直接启动的话,会报错  Caused by: org.apache.flink.table.api.ValidationException: Could 
not find any factory for identifier 'avro' that implements 
'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath.
Available factory identifiers are:
canal-json
csv
debezium-json
json
2. 下载了一个flink-avro-1.11.1.jar的jar包扔到了flink/lib下,报错 Caused by: 
java.lang.ClassNotFoundException: org.apache.avro.generic.GenericRecord
3. 下载了avro-1.10.0.jar放到flink/lib下,报错 Caused by: java.lang.NoClassDefFoundError: 
Could not initialize class org.apache.avro.SchemaBuilder
请教下,要支持avro的话,还需要怎么操作下?


Thanks



Re:Re: kafka-connect json格式适配问题?

2020-07-27 Thread RS
Hi,
啊,发现不太对,`schema`需要一个dict,不是STRING。请教下这个如何用SQL定义出来?


Thanks
在 2020-07-27 17:49:18,"Jark Wu"  写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROWupdate_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out', \
>'properties.bootstrap.servers' = '127.0.0.1:9092', \
>'sink.partitioner' = 'round-robin', \
>'format' = 'json')
>
>-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS  wrote:
>
>> hi,
>> kafka->Flink->kafka->mysql
>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>> 使用kafka-connect是方便数据同时导出到其他存储
>>
>>
>>
>> Flink定义输出表结构:
>>
>> CREATE TABLE print_table \
>>
>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>
>> WITH (\
>>
>> 'connector' = 'kafka', \
>>
>> 'topic' = 'test_out', \
>>
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>
>> 'sink.partitioner' = 'round-robin', \
>>
>> 'format' = 'json')
>>
>>
>>
>>
>> 输出的数据格式示例:
>>
>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>
>>
>>
>>
>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>
>> {
>>
>>   "schema": {
>>
>> "type": "struct",
>>
>> "fields": [
>>
>>   {
>>
>> "type": "int64",
>>
>> "optional": false,
>>
>> "field": "id"
>>
>>   },
>>
>>   {
>>
>> "type": "string",
>>
>> "optional": true,
>>
>> "field": "name"
>>
>>   }
>>
>> ],
>>
>> "optional": true,
>>
>> "name": "user"
>>
>>   },
>>
>>   "payload": {
>>
>> "id": 1,
>>
>> "name": "admin"
>>
>>   }
>>
>> }
>>
>>
>>
>>
>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>
>> 当前Flink处理sql:
>>
>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>> MINUTE)
>>
>>
>>


Re:回复:解析kafka的mysql binlog问题

2020-07-27 Thread RS
Hi,
附近应该是收不到的,包括图片啥的
只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去





在 2020-07-27 19:21:51,"air23"  写道:

我再上传一次 


在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23  wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>


Re:Re: kafka-connect json格式适配问题?

2020-07-27 Thread RS
Hi,
改了下sql,遇到一个新的问题:
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported 
cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) 
*ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, 
`update_time` TIMESTAMP(6)>'.


SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time) 
as payload


我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?


Thanks
在 2020-07-27 17:49:18,"Jark Wu"  写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROWupdate_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out', \
>'properties.bootstrap.servers' = '127.0.0.1:9092', \
>'sink.partitioner' = 'round-robin', \
>'format' = 'json')
>
>-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS  wrote:
>
>> hi,
>> kafka->Flink->kafka->mysql
>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>> 使用kafka-connect是方便数据同时导出到其他存储
>>
>>
>>
>> Flink定义输出表结构:
>>
>> CREATE TABLE print_table \
>>
>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>
>> WITH (\
>>
>> 'connector' = 'kafka', \
>>
>> 'topic' = 'test_out', \
>>
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>
>> 'sink.partitioner' = 'round-robin', \
>>
>> 'format' = 'json')
>>
>>
>>
>>
>> 输出的数据格式示例:
>>
>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>
>>
>>
>>
>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>
>> {
>>
>>   "schema": {
>>
>> "type": "struct",
>>
>> "fields": [
>>
>>   {
>>
>> "type": "int64",
>>
>> "optional": false,
>>
>> "field": "id"
>>
>>   },
>>
>>   {
>>
>> "type": "string",
>>
>> "optional": true,
>>
>> "field": "name"
>>
>>   }
>>
>> ],
>>
>> "optional": true,
>>
>> "name": "user"
>>
>>   },
>>
>>   "payload": {
>>
>> "id": 1,
>>
>> "name": "admin"
>>
>>   }
>>
>> }
>>
>>
>>
>>
>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>
>> 当前Flink处理sql:
>>
>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>> MINUTE)
>>
>>
>>


kafka-connect json格式适配问题?

2020-07-27 Thread RS
hi,
kafka->Flink->kafka->mysql
Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
使用kafka-connect是方便数据同时导出到其他存储



Flink定义输出表结构:

CREATE TABLE print_table \

(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \

WITH (\

'connector' = 'kafka', \

'topic' = 'test_out', \

'properties.bootstrap.servers' = '127.0.0.1:9092', \

'sink.partitioner' = 'round-robin', \

'format' = 'json')




输出的数据格式示例:

{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}




但是kafka-connect-jdbc的json格式需要schema和payload,示例:

{

  "schema": {

"type": "struct",

"fields": [

  {

"type": "int64",

"optional": false,

"field": "id"

  },

  {

"type": "string",

"optional": true,

"field": "name"

  }

],

"optional": true,

"name": "user"

  },

  "payload": {

"id": 1,

"name": "admin"

  }

}




请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?

当前Flink处理sql:

INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS 
total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as 
update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' 
MINUTE)




Re:Re: Re: Could not find any factory for identifier 'kafka'

2020-07-27 Thread RS
Hi,
1. 好的,学习了
2. 
确实,部分Flink依赖调整为provided,打包测试也可以正常执行,但是flink-walkthrough-common_2.11这种包在Flink的lib中没有看到,还是打包进去了




在 2020-07-27 11:42:50,"Caizhi Weng"  写道:
>Hi,
>
>Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies
>是否能把这些资源文件打进去。
>
>另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink
>的用户 jar 的话,并不需要把 Flink 的依赖也放进去。
>
>RS  于2020年7月24日周五 下午8:30写道:
>
>> hi,
>> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
>>
>>
>> 我项目中新增目录:resources/META-INF/services
>> 然后从Flink源码中复制了2个文件
>> org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
>> 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。
>>
>>
>> 在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道:
>> >hi
>> >只需要-sql和-json两个包就可以了
>> >
>> >
>> >| |
>> >JasonLee
>> >|
>> >|
>> >邮箱:17610775...@163.com
>> >|
>> >
>> >Signature is customized by Netease Mail Master
>> >
>> >On 07/24/2020 17:02, RS wrote:
>> >hi,
>> >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> >编译的jar包是jar-with-dependencies的
>> >
>> >
>> >代码片段:
>> >   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> >   "  number BIGINT,\n" +
>> >   "  msg STRING,\n" +
>> >   "  username STRING,\n" +
>> >   "  update_time TIMESTAMP(3)\n" +
>> >   ") WITH (\n" +
>> >   " 'connector' = 'kafka',\n" +
>> >   " 'topic' = '%s',\n" +
>> >   " 'properties.bootstrap.servers' = '%s',\n" +
>> >   " 'properties.group.id' = '%s',\n" +
>> >   " 'format' = 'json',\n" +
>> >   " 'json.fail-on-missing-field' = 'false',\n" +
>> >   " 'json.ignore-parse-errors' = 'true'\n" +
>> >   ")\n", tableName, topic, servers, group);
>> >
>> >
>> >   StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >   StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> >   tableEnv.executeSql(ddlSql);
>> >
>> >
>> >报错信息:
>> >Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> >Available factory identifiers are:
>> >datagen
>> >at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> >at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> >... 33 more
>> >
>> >
>> >参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>> >
>> >
>> >附上pom依赖:
>> >
>> >   
>> >   org.apache.flink
>> >   flink-java
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-table-api-java-bridge_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-table-api-java
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-connector-kafka_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-sql-connector-kafka_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-json
>> >   ${flink.version}
>> >   
>> >   
>> >
>> >
>> >感谢各位~
>>


Re:flink 聚合 job 重启问题

2020-07-27 Thread RS
伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了


在 2020-07-27 11:33:31,"郑斌斌"  写道:
>hi all :
>
> 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt 
> from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
>但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.
>
>Thanks
>
>


Re:Re: flink1.11查询结果每秒入库到mysql数量很少

2020-07-24 Thread RS
你看下INERT SQL的执行时长,看下是不是MySQL那边的瓶颈?比如写入的数据较大,索引创建比较慢等其他问题?

或者你手动模拟执行下SQL写数据对比下速度?














在 2020-07-25 10:20:35,"小学生" <201782...@qq.com> 写道:
>您好,谢谢您的解答,但是我测试了按您这个方式添加以后,每秒入mysql数据量变成了8条左右,提升还达不到需要。


Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread RS
hi,
感谢回复,尝试了多次之后,发现应该不是依赖包的问题


我项目中新增目录:resources/META-INF/services
然后从Flink源码中复制了2个文件 
org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。


在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道:
>hi
>只需要-sql和-json两个包就可以了
>
>
>| |
>JasonLee
>|
>|
>邮箱:17610775...@163.com
>|
>
>Signature is customized by Netease Mail Master
>
>On 07/24/2020 17:02, RS wrote:
>hi,
>Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>编译的jar包是jar-with-dependencies的
>
>
>代码片段:
>   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>   "  number BIGINT,\n" +
>   "  msg STRING,\n" +
>   "  username STRING,\n" +
>   "  update_time TIMESTAMP(3)\n" +
>   ") WITH (\n" +
>   " 'connector' = 'kafka',\n" +
>   " 'topic' = '%s',\n" +
>   " 'properties.bootstrap.servers' = '%s',\n" +
>   " 'properties.group.id' = '%s',\n" +
>   " 'format' = 'json',\n" +
>   " 'json.fail-on-missing-field' = 'false',\n" +
>   " 'json.ignore-parse-errors' = 'true'\n" +
>   ")\n", tableName, topic, servers, group);
>
>
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>   tableEnv.executeSql(ddlSql);
>
>
>报错信息:
>Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
>factory for identifier 'kafka' that implements 
>'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
>Available factory identifiers are:
>datagen
>at 
>org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>at 
>org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>... 33 more
>
>
>参考了这个 
>http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
>附上pom依赖:
>
>   
>   org.apache.flink
>   flink-java
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-table-api-java-bridge_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-table-api-java
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-connector-kafka_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-sql-connector-kafka_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-json
>   ${flink.version}
>   
>   
>
>
>感谢各位~


Re:Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread RS
邮件格式不对,我重新回复下


我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。

> flink run xxx

没有使用shade-plugin

maven build参数:

1.8
1.11.1










maven-compiler-plugin



${jdk.version}

${jdk.version}







org.apache.maven.plugins

maven-assembly-plugin





package



single











jar-with-dependencies














Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread RS
我这边是直接打成jar包扔到服务器上运行的(bin/flink run 
xxx),没有在IDEA运行过。maven编译没配置shade-plugin,maven build参数如下:
propertiesjdk.version1.8/jdk.version  
  flink.version1.11.1/flink.version
/propertiesbuildplugins  
  plugin
artifactIdmaven-compiler-plugin/artifactId
configuration
source${jdk.version}/source
target${jdk.version}/target
/configuration/plugin
plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-assembly-plugin/artifactId
executionsexecution   
 phasepackage/phase
goals
goalsingle/goal/goals 
   /execution
/executionsconfiguration  
  descriptorRefs
descriptorRefjar-with-dependencies/descriptorRef   
 /descriptorRefs
/configuration/plugin
/plugins/buildthx
在 2020-07-24 17:36:46,"Benchao Li"  写道:
>可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?
>
>如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
>如果你用的是shade plugin,需要看下这个transformer[1]
>
>[1]
>https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer
>
>RS  于2020年7月24日周五 下午5:02写道:
>
>> hi,
>> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> 编译的jar包是jar-with-dependencies的
>>
>>
>> 代码片段:
>> public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> "  number BIGINT,\n" +
>> "  msg STRING,\n" +
>> "  username STRING,\n" +
>> "  update_time TIMESTAMP(3)\n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka',\n" +
>> " 'topic' = '%s',\n" +
>> " 'properties.bootstrap.servers' = '%s',\n" +
>> " 'properties.group.id' = '%s',\n" +
>> " 'format' = 'json',\n" +
>> " 'json.fail-on-missing-field' = 'false',\n" +
>> " 'json.ignore-parse-errors' = 'true'\n" +
>> ")\n", tableName, topic, servers, group);
>>
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> tableEnv.executeSql(ddlSql);
>>
>>
>> 报错信息:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> Available factory identifiers are:
>> datagen
>> at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ... 33 more
>>
>>
>> 参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>>
>>
>> 附上pom依赖:
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java-bridge_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-sql-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-json
>> ${flink.version}
>> 
>> 
>>
>>
>> 感谢各位~
>
>
>
>-- 
>
>Best,
>Benchao Li


Could not find any factory for identifier 'kafka'

2020-07-24 Thread RS
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
public String ddlSql = String.format("CREATE TABLE %s (\n" +
"  number BIGINT,\n" +
"  msg STRING,\n" +
"  username STRING,\n" +
"  update_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'properties.group.id' = '%s',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")\n", tableName, topic, servers, group);


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-table-api-java-bridge_2.12
${flink.version}


org.apache.flink
flink-table-api-java
${flink.version}


org.apache.flink
flink-connector-kafka_2.12
${flink.version}


org.apache.flink
flink-sql-connector-kafka_2.12
${flink.version}


org.apache.flink
flink-json
${flink.version}




感谢各位~