Re: Re:Re: Scala REPL YARN 运行模式报 NoSuchMethodError

2020-12-16 文章 Jacob
感觉像是jline和Scala 某些包冲突所致,Scala我不太了解,你可以从以下方面做些尝试

1.在pom.xml或者其他相关文件中, 排除hadoop(以及其他涉及到jline的依赖)依赖中的jline子依赖,单独引入jline的依赖
我当时遇到的问题是,hadoop-common出现了版本冲突,在某个依赖中包含hadoop-common包,我在该依赖中排除了hadoop-common,然后在单独引入hadoop-common依赖,问题得以解决。

2. 改变(升级)Scala的版本



Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: pyflink 引用第三库的文件出现安装权限的问题

2020-12-16 文章 magichuang
Hi,




在使用pyflink  提交任务时,指定  -pyarch venv.zip -pyexec 
venv.zip/venv/bin/python,任务里面用到了udf  IPy是第三方库,之前直接安装报权限错误,不能安装,现在想用虚拟环境的方法来解决

set_python_requirements 图片地址:https://s3.ax1x.com/2020/12/17/r8J6AI.png




之前是使用默认Python环境,会报安装权限被拒绝的问题,我刚才使用指定Python环境的方式提交了一下,里面也加上了  
set_python_requirements,没有再报权限的错误,并被提交到了yarn上有applicationid,这是说明已经在虚拟环境中安装成功了吧?




但是出现了新的错误,Caused by: java.net.ConnectException: Connection refused

错误图片地址:https://s3.ax1x.com/2020/12/17/r8YJKg.png




我的cdh环境是,hadoop也是在这三台机器上部署的

cdh001  cdh002  cdh003
flink 版本  1.11 

集群配置:

master   cdh001:8081cdh002:8081   

wokers  cdh001 cdh002 cdh003




看报错是在连接  cdh002:31331  的时候出现了问题,当任务被提交到yarn集群时,我在cdh002的机器上查找这个端口,并没有发现有31331存在 
   netstat  -ntlp|grep  31331   是空的




这个是因为什么呀?




Best,

Magichuang




> -- 原始邮件 --
> 发 件 人:"Xingbo Huang" 
> 发送时间:2020-12-16 12:42:48
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: pyflink 引用第三库的文件出现安装权限的问题
>
> Hi,
>
> 默认就是你每台机器的python指向的环境下,当然你也可以通过-pyexec指定不同的python环境
>
> Best,
> Xingbo
>
> magichuang 于2020年12月15日周二 下午8:02写道:
>
> > 我现在看看那个报错,flink是把requirements.txt 和 cached_dir 已经先上传到hdfs上了,因为
> > /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/requirements.txt
> >
> >
> > /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/cached_dir
> > 在提交的时候 去看机器上是存在的,只不过等程序挂了,这个
> > /yarn/nm/usercache/root/appcache/application_1608026509770_0001文件夹就没了,所以有感觉hdfs没有问题。。
> >
> > 现在想请教一下,flink在引入外部 python依赖时,在从离线包里面安装库的时候是安装到了哪里?
> >
> >
> >
> >
> > 我看报错信息: Error [Errno 13] Permission denied: '' while executing command
> > python setup.py egg_info
> >
> > 因为它是在 python setup.py 的时候报的权限问题
> >
> >
> >
> >
> > 求大家给看看~~ 感谢
> >
> >
> >
> >
> > -- 原始邮件 --
> >
> > 发 件 人:magichuang
> >
> > 发送时间:2020-12-15 14:15:04
> >
> > 收 件 人:user-zh
> >
> > 抄 送:
> >
> > 主 题:pyflink 引用第三库的文件出现安装权限的问题
> >
> >
> >
> >
> > 请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错
> >
> > flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器
> >
> >
> >
> >
> > 提交命令:flink run -m yarn-cluster -ynm demo -ys 2 -ytm 2048 -p 2 -py demo.py
> >
> >
> >
> >
> > 代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png
> >
> >
> >
> >
> > 报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png
> >
> >
> >
> >
> > requestments.txt: IPy==1.0 cache_dir: IPy-1.00.tar.gz
> >
> >
> >
> >
> > 自定义udf代码:
> >
> > @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> >
> > def judge_ip(ip):
> >
> > import IPy
> >
> > if ip in IPy.IP('192.168.112.0/28'):
> >
> > return 'in'
> >
> > return 'out'
> >
> >
> >
> >
> >
> >
> >
> > 祝好~
> >
> >
> >



--

Best,

MagicHuang




Re: pyflink 有没有方便的print方式?例如java api中的 .print() ?

2020-12-16 文章 Dian Fu
可以collect到client端[1],或者可以看看另外几种方式[2]:

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.table.html#pyflink.table.TableResult.collect
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/intro_to_table_api.html#emit-results
 



> 在 2020年12月17日,下午2:47,肖越 <18242988...@163.com> 写道:
> 
> 最近在尝试 pyflink 功能,只查到了定义connector 的输出方法,例如:
> sink_ddl = '''
>CREATE TABLE print_sink (
>ID DOUBLE,
>NAME STRING
>) WITH (
>  'connector' = 'print'
>)
>'''
> 每次都要事先定义好要输出的表格格式,是否有更加方便的输出方法?



Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 Dian Fu
看起来是jar的路径的问题,你需要看一下在windows上,jar的路径用URL表示应该是什么样的。另外也可以直接把jar包放到pyflink安装目录中的lib目录下。

> 在 2020年12月17日,下午3:04,肖越 <18242988...@163.com> 写道:
> 
> 您好,这是完整的报错信息:
> Traceback (most recent call last):
> 
>  File 
> "C:\projects\dataService-calculate-code-python\src\test\test_oracle_connector.py",
>  line 24, in 
> 
>
> "C:\projects\dataService-calculate-code-python\src\\test\\flink_connector-jdbc\\flink-connector-jdbc_2.11-1.12.0.jar")
> 
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\common\configuration.py",
>  line 72, in set_string
> 
>add_jars_to_context_class_loader(value.split(";"))
> 
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py",
>  line 114, in add_jars_to_context_class_loader
> 
>jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]
> 
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py",
>  line 114, in 
> 
>jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]
> 
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1554, in __call__
> 
>answer, self._gateway_client, None, self._fqn)
> 
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 147, in deco
> 
>return f(*a, **kw)
> 
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>  line 328, in get_return_value
> 
>format(target_id, ".", name), value)
> 
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> None.java.net.URL.
> 
> : java.net.MalformedURLException: unknown protocol: c
> 
> at java.net.URL.(URL.java:617)
> 
> at java.net.URL.(URL.java:507)
> 
> at java.net.URL.(URL.java:456)
> 
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
> 
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)
> 
> at 
> org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> 
> at 
> org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> 
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> 
> at java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-12-17 14:57:36,"Dian Fu"  写道:
>> 发一下完整的异常信息?
>> 
>>> 在 2020年12月17日,上午11:53,肖越 <18242988...@163.com> 写道:
>>> 
>>> 好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error 
>>> occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-12-17 10:44:56,"Dian Fu"  写道:
 1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
 2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
   a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 
 'jdbc',这个是老的使用方式
   b. JDBC 
 connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
 
 [1] 
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
  
 
 [2] 
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
 
> 在 2020年12月17日,上午10:06,肖越 <18242988...@163.com> 写道:
> 
> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-12-17 09:55:08,"Leonard Xu"  写道:
>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>> 
>> 祝好,
>> Leonard
>> 
>>> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
>>> 
>>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>>> 通过如下方式定义:
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> env = StreamTableEnvironment \
>>> .create(env, environment_settings=EnvironmentSettings
>>> .new_instance()
>>> .use_blink_planner().build())
>>> source_ddl1 = """
>>> CREATE TABLE source_table (id 

Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 文章 Dian Fu
嗯,需要打成Jar包,才可以在PyFlink里用:
1)需要创建一个fat jar,把依赖都shade到jar里面。现在默认的不是fat jar,需要修改一下pom文件,可以参考Kafka里的做法 [1]。
2)关于如何使用,可用的属性有这些 [2]

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-sql-connector-kafka/pom.xml#L46
 

[2] 
https://github.com/apache/bahir-flink/blob/f0b3e1e04930b79b277cfc7ebe3552db246578e9/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
 



> 在 2020年12月17日,上午11:55,magichuang  写道:
> 
> hi,
> 
> 想问一下您这个  
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis 
> 可以打包成jar包嘛,然后在pyflink里用
> 
> 对java不熟悉,我看这个页面里面只是对java和scala说了如何用
> 
> 
> 
> 
> 
> 
> 
> Best,
> 
> MagicHuang
> 
> 
> 
> 
>> -- 原始邮件 --
>> 发 件 人:"Dian Fu" 
>> 发送时间:2020-12-17 10:16:13
>> 收 件 人:user-zh ,hepei...@qq.com
>> 抄 送:
>> 主 题:Re: 求教:pyflink的sink是否支持redis connector?
>> 
>> 感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>> 
>> redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
>> 
>> 关于如何在PyFlink中使用connector,可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html
>> 
>>> 在 2020年12月17日,上午9:52,Xingbo Huang 写道:
>>> 
>>> Hi,
>>> 
>>> 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
>>> connector,关于如何自定义connector,你可以参考文档[2]
>>> 
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
>>> 
>>> Best,
>>> Xingbo
>>> 
>>> 
>>> 消息室 于2020年12月17日周四 上午9:33写道:
>>> 
 您好:
 
 
 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
 connector?感谢!
  如不支持,有何建议方式?
>> 
> 
> 
> 
> 



Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 肖越
您好,这是完整的报错信息:
Traceback (most recent call last):

  File 
"C:\projects\dataService-calculate-code-python\src\test\test_oracle_connector.py",
 line 24, in 


"C:\projects\dataService-calculate-code-python\src\\test\\flink_connector-jdbc\\flink-connector-jdbc_2.11-1.12.0.jar")

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\common\configuration.py",
 line 72, in set_string

add_jars_to_context_class_loader(value.split(";"))

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py",
 line 114, in add_jars_to_context_class_loader

jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py",
 line 114, in 

jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1554, in __call__

answer, self._gateway_client, None, self._fqn)

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco

return f(*a, **kw)

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value

format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL.

: java.net.MalformedURLException: unknown protocol: c

at java.net.URL.(URL.java:617)

at java.net.URL.(URL.java:507)

at java.net.URL.(URL.java:456)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)

at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)

at 
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)

at 
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)

at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

















在 2020-12-17 14:57:36,"Dian Fu"  写道:
>发一下完整的异常信息?
>
>> 在 2020年12月17日,上午11:53,肖越 <18242988...@163.com> 写道:
>> 
>> 好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error 
>> occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-12-17 10:44:56,"Dian Fu"  写道:
>>> 1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>>> 2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
>>>a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 
>>> 'jdbc',这个是老的使用方式
>>>b. JDBC 
>>> connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
>>>  
>>> 
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>>> 
 在 2020年12月17日,上午10:06,肖越 <18242988...@163.com> 写道:
 
 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 在 2020-12-17 09:55:08,"Leonard Xu"  写道:
> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
> 
> 祝好,
> Leonard
> 
>> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
>> 
>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>> 通过如下方式定义:
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env = StreamTableEnvironment \
>>  .create(env, environment_settings=EnvironmentSettings
>>  .new_instance()
>>  .use_blink_planner().build())
>> source_ddl1 = """
>>  CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>  tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>  ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>  is_valid INT,time_mark TIMESTAMP) WITH (
>>  'connector.type' = 'jdbc',
>>  'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>  'connector.table' = 'ts_pf_ac_yldrate',
>>  'connector.driver' = 

Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 Dian Fu
发一下完整的异常信息?

> 在 2020年12月17日,上午11:53,肖越 <18242988...@163.com> 写道:
> 
> 好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error 
> occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-12-17 10:44:56,"Dian Fu"  写道:
>> 1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>> 2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
>>a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 
>> 'jdbc',这个是老的使用方式
>>b. JDBC 
>> connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
>>  
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>> 
>>> 在 2020年12月17日,上午10:06,肖越 <18242988...@163.com> 写道:
>>> 
>>> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-12-17 09:55:08,"Leonard Xu"  写道:
 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
 
 祝好,
 Leonard
 
> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
> 
> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
> 通过如下方式定义:
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env = StreamTableEnvironment \
>  .create(env, environment_settings=EnvironmentSettings
>  .new_instance()
>  .use_blink_planner().build())
> source_ddl1 = """
>  CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>  tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>  ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>  is_valid INT,time_mark TIMESTAMP) WITH (
>  'connector.type' = 'jdbc',
>  'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>  'connector.table' = 'ts_pf_ac_yldrate',
>  'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>  'connector.username' = 'xxx',
>  'connector.password' = 'xxx')
>  """
> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
> env.sql_update(source_ddl1)
> table = env.sql_query(sql)
> env.execute("flink_test")
> 报错信息:
>  raise java_exception
> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
> at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
> at 
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
> at 
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> 

pyflink 有没有方便的print方式?例如java api中的 .print() ?

2020-12-16 文章 肖越
最近在尝试 pyflink 功能,只查到了定义connector 的输出方法,例如:
sink_ddl = '''
CREATE TABLE print_sink (
ID DOUBLE,
NAME STRING
) WITH (
  'connector' = 'print'
)
'''
每次都要事先定义好要输出的表格格式,是否有更加方便的输出方法?

如何通过现实时间控制事件时间的窗口

2020-12-16 文章 guoliubi...@foxmail.com
Hi,
我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下
 系统时间
 与上一条间隔
 事件时间
 与上一条间隔
 9:00:01
 
 9:00:01
 
 9:00:11 
 10s
 9:00:02
 1s
 9:00:12
 1s
 9:00:12
 10s
从事件时间上看,第一条和第二条数据是归集到同一窗口的。
不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。
请问这种情况需要怎么生成watermark?
使用过
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L))
或者
WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L))
结果都把第一条和第二条数据归集到同一个窗口中了,
都没有达到预想的结果。
要如何设置才能在窗口中仅有一条数据而忽略第二条数据?


guoliubi...@foxmail.com


Re: flink1.12 docker 镜像啥时候有

2020-12-16 文章 Yang Wang
你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的

你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的

   - git clone https://github.com/apache/flink-docker.git
   - cd scala_2.11-java8-debian
   - sudo docker build -t flink:1.12.0 .
   - docker push


Best,
Yang

superainbower  于2020年12月17日周四 上午7:19写道:

> 请教下 git checkout dev-master./add-custom.sh -u
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> -n flink-1.12.0 这是一条指令吗?感觉执行不了
> 另外直接
> git clone https://github.com/apache/flink-docker.git
> 在里面的1.12目录中选择2.11的进去,直接dock build -t flink:1.12.0 .可以吗,我尝试直接这样构建出来的镜像好像不能跑
>
> 在2020年12月16日 10:56,Yang Wang 写道:
> 目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
> 具体你可以跟进一下这个PR https://github.com/docker-library/official-images/pull/9249
>
> 当前你也可以自己build一个镜像来使用,方法如下:
>
> git clone https://github.com/apache/flink-docker.git
> git checkout dev-master./add-custom.sh -u
>
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> -n
> 
> flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
> flink:flink-1.12.0docker push flink:flink-1.12.0
>
>
>
> jiangjiguang719  于2020年12月9日周三 下午5:09写道:
>
> > 请问啥时候 在docker hub中可以看到1.12版本的镜像?
>


window agg early-fire 不生效

2020-12-16 文章 kandy.wang
1min的滚动窗口:
table.exec.emit.early-fire.enabled=true;
table.exec.emit.early-fire.delay=10 s;
设置窗口定期trigger之后,参数不生效
查看执行计划:
 {
  "id": 6,
  "type": "GroupWindowAggregate(groupBy=[mid, code, floor_id], 
window=[TumblingGroupWindow('w$, log_ts, 6)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[mid, code, floor_id, COUNT(*) AS pv, start('w$) 
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime], emit=[early delay 1 millisecond])",
  "pact": "Operator",
  "contents": "GroupWindowAggregate(groupBy=[mid, code, floor_id], 
window=[TumblingGroupWindow('w$, log_ts, 6)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[mid, code, floor_id, COUNT(*) AS pv, start('w$) 
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime], emit=[early delay 1 millisecond])",
  "parallelism": 72,
  "chaining_strategy": "ALWAYS",
  "uid": "9tx/TSKD9GBbEnuTZOIRSA==",
  "predecessors": [
{
  "id": 4,
  "ship_strategy": "HASH",
  "side": "second"
}
  ]
},
可以确认已经设置成功,就不知道为啥没有定期trigger。
现在看起来像是watermark允许数据延迟设置的5min + 窗口 1min =6min之后才能看到结果

Fwd: flink1.12.0中使用LISTAGG报错

2020-12-16 文章 huang huang
-- Forwarded message -
发件人: huang huang 
Date: 2020年12月17日周四 下午12:47
Subject: flink1.12.0中使用LISTAGG报错
To: 


各位好:

使用pyflink执行sql时,LISTAGG出现了错误

请问目前版本的LISTAGG不支持排序么,有人遇到过这种情况么?


*FLINK版本:*1.12.0


*SQL代码:*

SELECT session_id, LISTAGG(page_id, ',') WITHIN GROUP(ORDER BY
action_time) as user_hit_log FROM user_action group by session_id

*报错信息:*


Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Aggregate
expression 'LISTAGG' must not contain a within group clause

如果有写法不对,请指正,感谢!


根据查到的文档LISTAGG的用法,没看出来问题:

Listagg is an *ordered set function*, which require the within group clause
to specify an order. The minimal syntax is:

LISTAGG(, ) WITHIN GROUP(ORDER BY …)


参考链接:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html#aggregate-functions

https://modern-sql.com/feature/listagg


flink1.12.0中使用LISTAGG报错

2020-12-16 文章 huang huang
各位好:

使用pyflink执行sql时,LISTAGG出现了错误

请问目前版本的LISTAGG不支持排序么,有人遇到过这种情况么?


*FLINK版本:*1.12.0


*SQL代码:*

SELECT session_id, LISTAGG(page_id, ',') WITHIN GROUP(ORDER BY
action_time) as user_hit_log FROM user_action group by session_id

*报错信息:*


Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Aggregate
expression 'LISTAGG' must not contain a within group clause

如果有写法不对,请指正,感谢!


根据查到的文档LISTAGG的用法,没看出来问题:

Listagg is an *ordered set function*, which require the within group clause
to specify an order. The minimal syntax is:

LISTAGG(, ) WITHIN GROUP(ORDER BY …)


参考链接:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html#aggregate-functions

https://modern-sql.com/feature/listagg


Re: Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 文章 magichuang
hi,

想问一下您这个  
https://github.com/apache/bahir-flink/tree/master/flink-connector-redis 
可以打包成jar包嘛,然后在pyflink里用

对java不熟悉,我看这个页面里面只是对java和scala说了如何用







Best,

MagicHuang




> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2020-12-17 10:16:13
> 收 件 人:user-zh ,hepei...@qq.com
> 抄 送:
> 主 题:Re: 求教:pyflink的sink是否支持redis connector?
>
> 感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>
> redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
>
> 关于如何在PyFlink中使用connector,可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html
>
> > 在 2020年12月17日,上午9:52,Xingbo Huang 写道:
> >
> > Hi,
> >
> > 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
> > connector,关于如何自定义connector,你可以参考文档[2]
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
> > [2]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
> >
> > Best,
> > Xingbo
> >
> >
> > 消息室 于2020年12月17日周四 上午9:33写道:
> >
> >> 您好:
> >>
> >>  
> >> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
> >> connector?感谢!
> >>   如不支持,有何建议方式?
>






Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 肖越
好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error 
occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?

















在 2020-12-17 10:44:56,"Dian Fu"  写道:
>1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
> a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 
> 'jdbc',这个是老的使用方式
> b. JDBC 
> connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
> 
>
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>
>> 在 2020年12月17日,上午10:06,肖越 <18242988...@163.com> 写道:
>> 
>> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-12-17 09:55:08,"Leonard Xu"  写道:
>>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>>> 
>>> 祝好,
>>> Leonard
>>> 
 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
 
 pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
 通过如下方式定义:
 env = StreamExecutionEnvironment.get_execution_environment()
 env.set_parallelism(1)
 env = StreamTableEnvironment \
   .create(env, environment_settings=EnvironmentSettings
   .new_instance()
   .use_blink_planner().build())
 source_ddl1 = """
   CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
   tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
   ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
   is_valid INT,time_mark TIMESTAMP) WITH (
   'connector.type' = 'jdbc',
   'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
   'connector.table' = 'ts_pf_ac_yldrate',
   'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
   'connector.username' = 'xxx',
   'connector.password' = 'xxx')
   """
 sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
 env.sql_update(source_ddl1)
 table = env.sql_query(sql)
 env.execute("flink_test")
 报错信息:
   raise java_exception
 pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at 
 org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at 
 org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at 
 org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at 
 org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at 
 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at 
 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at 
 org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at 
 org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at 
 org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at 
 org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at 
 org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
 

Re: flink sql es写入时,用户名密码认证不支持

2020-12-16 文章 HunterXHunter
是的,需要上传certificate文件,1.12好像没有上传文件的配置



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Scala REPL YARN 运行模式报 NoSuchMethodError

2020-12-16 文章 卢国庆
谢谢回复。


我已经采用 `hadoop classpath` 方式完成了 Hadoop 的集成。当前的问题是在 CDH 5.16.2 + Flink 环境下遇到的


补充下丢失的截图信息


使用 Scala REPL Yarn 运行模式报 NoSuchMethodError,详细错误信息如下:
$ ./bin/start-scala-shell.sh yarn
|
scala> Exception in thread "main" java.lang.NoSuchMethodError: 
jline.console.completer.CandidateListCompletionHandler.setPrintSpaceAfterFullCompletion(Z)V
at 
scala.tools.nsc.interpreter.jline.JLineConsoleReader.initCompletion(JLineReader.scala:139)
at 
scala.tools.nsc.interpreter.jline.InteractiveReader.postInit(JLineReader.scala:54)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$25.apply(ILoop.scala:899)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$25.apply(ILoop.scala:897)
at 
scala.tools.nsc.interpreter.SplashReader.postInit(InteractiveReader.scala:130)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply$mcV$sp(ILoop.scala:926)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply(ILoop.scala:908)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply(ILoop.scala:908)
at scala.tools.nsc.interpreter.ILoop$$anonfun$mumly$1.apply(ILoop.scala:189)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:221)
at scala.tools.nsc.interpreter.ILoop.mumly(ILoop.scala:186)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$startup$1$1.apply(ILoop.scala:979)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:990)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:891)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:891)
at 
scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:891)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:187)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:131)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
|







环境说明
CDH 5.16.2
测试 Flink 1.10.2 和 1.11.2 都能复现该问题


已分析内容
使用 Arthas 查看已加载类,加载的是 CDH 相关依赖
|
$ sc *CandidateListCompletionHandler
jline.console.completer.CandidateListCompletionHandler
Affect(row-cnt:1) cost in 113 ms.
[arthas@23856]$ sc -d jline.console.completer.CandidateListCompletionHandler
 class-infojline.console.completer.CandidateListCompletionHandler   

  
 code-source   
/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/jars/jline-2.11.jar   

   
 name  jline.console.completer.CandidateListCompletionHandler   

  
 isInterface   false

  
 isAnnotation  false

  
 isEnumfalse

  
 isAnonymousClass  false

  
 isArray   false

  
 isLocalClass  false

  
 isMemberClass false

  
 isPrimitive   false
 

Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-16 文章 Yun Tang
Hi

write buffer的指标可以看 cur-size-all-mem-tables,由于1.9没有block 
cache的指标,如果不自行将代码[1]pick回去的话,暂时没办法通过内置的方式监控了。


[1] https://issues.apache.org/jira/browse/FLINK-15387

祝好
唐云

From: bradyMk 
Sent: Wednesday, December 16, 2020 12:03
To: user-zh@flink.apache.org 
Subject: flink1.9.1 如何配置RocksDB的block-cache-usage参数

Hi~想请教一下大家:

最近使用flink1.9版本用RocksDB做增量ck,我想配置如下两个内容的指标来监控任务的内存情况:
  ①block-cache-usage
  ②write buffer

但是在官网[1]并没有找到相关指标,通过查阅资料得知:
  write buffer对应的指标为:state.backend.rocksdb.metrics.cur-size-all-mem-tables
  而block-cache-usage的指标是1.10版本之后才有的,1.9版本没有这个指标;

问:
①write buffer是否对应这个指标 ->
state.backend.rocksdb.metrics.cur-size-all-mem-tables
②如果1.9没有监控block-cache-usage的直接指标,那么该如何监控block-cache-usage呢?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics







-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 文章 Dian Fu
您好,关于JDBC connector的问题,已经回复您了,我们在另外一个thread里讨论吧。

> 在 2020年12月17日,上午10:25,肖越 <18242988...@163.com> 写道:
> 
> 请问 可以连接oracle数据库么?自己尝试了定义connector,但报错findAndCreateTableSource failed 
> ,并没有找到pyflink关于oracle connector的定义示例代码
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-12-17 10:16:13,"Dian Fu"  写道:
>> 感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>> 
>> redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
>>  
>> 
>> 关于如何在PyFlink中使用connector,可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html
>> 
>>> 在 2020年12月17日,上午9:52,Xingbo Huang  写道:
>>> 
>>> Hi,
>>> 
>>> 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
>>> connector,关于如何自定义connector,你可以参考文档[2]
>>> 
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
>>> 
>>> Best,
>>> Xingbo
>>> 
>>> 
>>> 消息室  于2020年12月17日周四 上午9:33写道:
>>> 
 您好:
 
   
 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
 connector?感谢!
如不支持,有何建议方式?
>> 



Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 Dian Fu
1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
 a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 
'jdbc',这个是老的使用方式
 b. JDBC 
connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars

> 在 2020年12月17日,上午10:06,肖越 <18242988...@163.com> 写道:
> 
> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-12-17 09:55:08,"Leonard Xu"  写道:
>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>> 
>> 祝好,
>> Leonard
>> 
>>> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
>>> 
>>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>>> 通过如下方式定义:
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> env = StreamTableEnvironment \
>>>   .create(env, environment_settings=EnvironmentSettings
>>>   .new_instance()
>>>   .use_blink_planner().build())
>>> source_ddl1 = """
>>>   CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>>   tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>>   ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>>   is_valid INT,time_mark TIMESTAMP) WITH (
>>>   'connector.type' = 'jdbc',
>>>   'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>>   'connector.table' = 'ts_pf_ac_yldrate',
>>>   'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>>   'connector.username' = 'xxx',
>>>   'connector.password' = 'xxx')
>>>   """
>>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>>> env.sql_update(source_ddl1)
>>> table = env.sql_query(sql)
>>> env.execute("flink_test")
>>> 报错信息:
>>>   raise java_exception
>>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>>> at 
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>>> at 
>>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>>> at 
>>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>>> at 
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>>> at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>>> at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>>> at 
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at 
>>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at 
>>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>> at 
>>> 

Re:Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 文章 肖越
请问 可以连接oracle数据库么?自己尝试了定义connector,但报错findAndCreateTableSource failed 
,并没有找到pyflink关于oracle connector的定义示例代码

















在 2020-12-17 10:16:13,"Dian Fu"  写道:
>感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>
>redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
> 
>
>关于如何在PyFlink中使用connector,可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html
>
>> 在 2020年12月17日,上午9:52,Xingbo Huang  写道:
>> 
>> Hi,
>> 
>> 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
>> connector,关于如何自定义connector,你可以参考文档[2]
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
>> 
>> Best,
>> Xingbo
>> 
>> 
>> 消息室  于2020年12月17日周四 上午9:33写道:
>> 
>>> 您好:
>>> 
>>>   
>>> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
>>> connector?感谢!
>>>如不支持,有何建议方式?
>


Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 文章 Dian Fu
感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。

redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
 

关于如何在PyFlink中使用connector,可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html

> 在 2020年12月17日,上午9:52,Xingbo Huang  写道:
> 
> Hi,
> 
> 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
> connector,关于如何自定义connector,你可以参考文档[2]
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
> 
> Best,
> Xingbo
> 
> 
> 消息室  于2020年12月17日周四 上午9:33写道:
> 
>> 您好:
>> 
>>   
>> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
>> connector?感谢!
>>如不支持,有何建议方式?



Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 肖越
谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?

















在 2020-12-17 09:55:08,"Leonard Xu"  写道:
>目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>
>祝好,
>Leonard
>
>> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
>> 
>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>> 通过如下方式定义:
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env = StreamTableEnvironment \
>>.create(env, environment_settings=EnvironmentSettings
>>.new_instance()
>>.use_blink_planner().build())
>> source_ddl1 = """
>>CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>is_valid INT,time_mark TIMESTAMP) WITH (
>>'connector.type' = 'jdbc',
>>'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>'connector.table' = 'ts_pf_ac_yldrate',
>>'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>'connector.username' = 'xxx',
>>'connector.password' = 'xxx')
>>"""
>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>> env.sql_update(source_ddl1)
>> table = env.sql_query(sql)
>> env.execute("flink_test")
>> 报错信息:
>>raise java_exception
>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>> at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>> at 
>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>> at 
>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>> at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>> at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>> at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>> at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>> at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>> at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at 
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at 
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at 
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at 
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)


Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 Leonard Xu
目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。

祝好,
Leonard

> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
> 
> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
> 通过如下方式定义:
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env = StreamTableEnvironment \
>.create(env, environment_settings=EnvironmentSettings
>.new_instance()
>.use_blink_planner().build())
> source_ddl1 = """
>CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>is_valid INT,time_mark TIMESTAMP) WITH (
>'connector.type' = 'jdbc',
>'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>'connector.table' = 'ts_pf_ac_yldrate',
>'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>'connector.username' = 'xxx',
>'connector.password' = 'xxx')
>"""
> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
> env.sql_update(source_ddl1)
> table = env.sql_query(sql)
> env.execute("flink_test")
> 报错信息:
>raise java_exception
> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
> at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
> at 
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
> at 
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)



Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 hhywcl
pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
通过如下方式定义:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env = StreamTableEnvironment \
.create(env, environment_settings=EnvironmentSettings
.new_instance()
.use_blink_planner().build())
source_ddl1 = """
CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
is_valid INT,time_mark TIMESTAMP) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
'connector.table' = 'ts_pf_ac_yldrate',
'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
'connector.username' = 'xxx',
'connector.password' = 'xxx')
"""
sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
env.sql_update(source_ddl1)
table = env.sql_query(sql)
env.execute("flink_test")
报错信息:
raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at 
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at 
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)

Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 文章 Xingbo Huang
Hi,

据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
connector,关于如何自定义connector,你可以参考文档[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html

Best,
Xingbo


消息室  于2020年12月17日周四 上午9:33写道:

> 您好:
>
>   
> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
> connector?感谢!
>如不支持,有何建议方式?


Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 文章 肖越
pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
通过如下方式定义:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env = StreamTableEnvironment \
.create(env, environment_settings=EnvironmentSettings
.new_instance()
.use_blink_planner().build())
source_ddl1 = """
CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
is_valid INT,time_mark TIMESTAMP) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
'connector.table' = 'ts_pf_ac_yldrate',
'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
'connector.username' = 'xxx',
'connector.password' = 'xxx')
"""
sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
env.sql_update(source_ddl1)
table = env.sql_query(sql)
env.execute("flink_test")
报错信息:
raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at 
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at 
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)

Re: Re: 直接内存溢出

2020-12-16 文章 Xintong Song
日志文件开头会打环境信息,包括 JVM 参数。

Thank you~

Xintong Song



On Wed, Dec 16, 2020 at 10:01 PM aven  wrote:

> 感谢回复,我尝试一下这两个参数。
> 我还有一个问题,flink的内存配置参数在启动,在运行时是否有办法查看。
> 或者在启动的时候是可以通过日志打印出来吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best!
> Aven
>
>
>
>
>
> 在 2020-12-16 19:22:19,"Xintong Song"  写道:
> >可以使用这两个参数。
> >
> >   - containerized.heap-cutoff-ratio
> >   - containerized.heap-cutoff-min
> >
> >cutoff 的含义是从 container 内存中额外留出一部分,不作为 flink 的 heap/network/managed
> >内存。这部分内存通常是用于 JVM、用户代码、第三方依赖的对外内存开销。Flink 计算 MaxDirectMemorySize 参数时也会把
> >cutoff 算进去,因此调大 cutoff 也可以起到放宽直接内存上限的效果。
> >
> >
> >另外,Flink 1.9 及以前的内存模型是比较混乱的,建议有条件的话尽快升级到新版本。
> >
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >
> >On Wed, Dec 16, 2020 at 7:07 PM 巫旭阳  wrote:
> >
> >> 报错信息如下
> >> Caused by: java.lang.OutOfMemoryError: Direct buffer memory at
> >> java.nio.Bits.reserveMemory(Bits.java:693)
> >>
> >>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at
> >> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >>
> >>  at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> >>
> >>  at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> >>
> >>  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
> >>
> >>  at
> >> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
> >>
> >>  at
> >>
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
> >>
> >>
> >>  at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
> >>
> >>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> >>
> >>  at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> >>
> >>
> >>  at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
> >>
> >>
> >>  at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >>
> >>
> >>
> >> 版本
> >>   Flink:1.9.1
> >>   kafka-client:0.10.0.1
> >> 环境
> >>   on yarn
> >> JVM参数
> >>   -Xms14336m
> >>   -Xmx14336m
> >>   -XX:MaxDirectMemorySize=6144m
> >> flink-conf.yml
> >>  使用的是默认的参数
> >>  Stream任务,并且没有使用RocksDB
> >>
> >> 目前初步怀疑是Flink 的堆外内存占用过大导致kafka consumer 无法申请堆外内存导致OOM。但根据官方文档的配置
> >> taskmanager.memory.fraction=0.7 这个应该在我的程序中不生效
> >> taskmanager.network.memory.fraction=0.1
> >>
> >>
> >> 这样的配置下来应该用户代码可使用的堆外内存为6144m*0.9=5529m
> >> 我的问题是
> >> 在我当前的环境下是否还有我没注意到的Flink堆外内存配置,或者Flink需要占用的堆外内存是我所不了解的。
> >> 除了控制kafka comsumer 的流量以外有没有什么其他的调整方式?
> >>
> >>
> >> Best
> >> Aven
> >>
> >>
>


??????pyflink??sink????????redis connector?

2020-12-16 文章 ??????
??

   
??pyflink??1.12.0??pyflink??sinkredis
 connector???
   

回复:flink1.12 docker 镜像啥时候有

2020-12-16 文章 superainbower
请教下 git checkout dev-master./add-custom.sh -u 
https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
 -n flink-1.12.0 这是一条指令吗?感觉执行不了
另外直接
git clone https://github.com/apache/flink-docker.git
在里面的1.12目录中选择2.11的进去,直接dock build -t flink:1.12.0 .可以吗,我尝试直接这样构建出来的镜像好像不能跑

在2020年12月16日 10:56,Yang Wang 写道:
目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
具体你可以跟进一下这个PR https://github.com/docker-library/official-images/pull/9249

当前你也可以自己build一个镜像来使用,方法如下:

git clone https://github.com/apache/flink-docker.git
git checkout dev-master./add-custom.sh -u
https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
-n flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
flink:flink-1.12.0docker push flink:flink-1.12.0



jiangjiguang719  于2020年12月9日周三 下午5:09写道:

> 请问啥时候 在docker hub中可以看到1.12版本的镜像?


Re:Re: 直接内存溢出

2020-12-16 文章 aven
感谢回复,我尝试一下这两个参数。
我还有一个问题,flink的内存配置参数在启动,在运行时是否有办法查看。
或者在启动的时候是可以通过日志打印出来吗?













--

Best!
Aven





在 2020-12-16 19:22:19,"Xintong Song"  写道:
>可以使用这两个参数。
>
>   - containerized.heap-cutoff-ratio
>   - containerized.heap-cutoff-min
>
>cutoff 的含义是从 container 内存中额外留出一部分,不作为 flink 的 heap/network/managed
>内存。这部分内存通常是用于 JVM、用户代码、第三方依赖的对外内存开销。Flink 计算 MaxDirectMemorySize 参数时也会把
>cutoff 算进去,因此调大 cutoff 也可以起到放宽直接内存上限的效果。
>
>
>另外,Flink 1.9 及以前的内存模型是比较混乱的,建议有条件的话尽快升级到新版本。
>
>
>Thank you~
>
>Xintong Song
>
>
>
>On Wed, Dec 16, 2020 at 7:07 PM 巫旭阳  wrote:
>
>> 报错信息如下
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory at
>> java.nio.Bits.reserveMemory(Bits.java:693)
>>
>>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at
>> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>>
>>  at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
>>
>>  at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>>
>>  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>>
>>  at
>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
>>
>>  at
>> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>>
>>  at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>>
>>  at
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
>>
>>  at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
>>
>>  at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
>>
>>
>>  at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>>
>>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>>
>>  at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>>
>>
>>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>>
>>
>>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>
>>
>>
>> 版本
>>   Flink:1.9.1
>>   kafka-client:0.10.0.1
>> 环境
>>   on yarn
>> JVM参数
>>   -Xms14336m
>>   -Xmx14336m
>>   -XX:MaxDirectMemorySize=6144m
>> flink-conf.yml
>>  使用的是默认的参数
>>  Stream任务,并且没有使用RocksDB
>>
>> 目前初步怀疑是Flink 的堆外内存占用过大导致kafka consumer 无法申请堆外内存导致OOM。但根据官方文档的配置
>> taskmanager.memory.fraction=0.7 这个应该在我的程序中不生效
>> taskmanager.network.memory.fraction=0.1
>>
>>
>> 这样的配置下来应该用户代码可使用的堆外内存为6144m*0.9=5529m
>> 我的问题是
>> 在我当前的环境下是否还有我没注意到的Flink堆外内存配置,或者Flink需要占用的堆外内存是我所不了解的。
>> 除了控制kafka comsumer 的流量以外有没有什么其他的调整方式?
>>
>>
>> Best
>> Aven
>>
>>


Flink SQL中with子句中的配置最终是传给了外部数据源对应的配置中,如果有些配置不能出现在SQL的wtih子句中而要设置一些额外的配置参数(SQL级别不是全局级别的配置参数)给外部数据源的配置中,可以怎么传递?

2020-12-16 文章 邮件帮助中心



Re: 直接内存溢出

2020-12-16 文章 Xintong Song
可以使用这两个参数。

   - containerized.heap-cutoff-ratio
   - containerized.heap-cutoff-min

cutoff 的含义是从 container 内存中额外留出一部分,不作为 flink 的 heap/network/managed
内存。这部分内存通常是用于 JVM、用户代码、第三方依赖的对外内存开销。Flink 计算 MaxDirectMemorySize 参数时也会把
cutoff 算进去,因此调大 cutoff 也可以起到放宽直接内存上限的效果。


另外,Flink 1.9 及以前的内存模型是比较混乱的,建议有条件的话尽快升级到新版本。


Thank you~

Xintong Song



On Wed, Dec 16, 2020 at 7:07 PM 巫旭阳  wrote:

> 报错信息如下
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory at
> java.nio.Bits.reserveMemory(Bits.java:693)
>
>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>  at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
>
>  at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>
>  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>
>  at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
>
>  at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>
>  at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>
>  at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
>
>  at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
>
>  at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
>
>
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>
>  at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>
>
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>
>
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>
>
>
> 版本
>   Flink:1.9.1
>   kafka-client:0.10.0.1
> 环境
>   on yarn
> JVM参数
>   -Xms14336m
>   -Xmx14336m
>   -XX:MaxDirectMemorySize=6144m
> flink-conf.yml
>  使用的是默认的参数
>  Stream任务,并且没有使用RocksDB
>
> 目前初步怀疑是Flink 的堆外内存占用过大导致kafka consumer 无法申请堆外内存导致OOM。但根据官方文档的配置
> taskmanager.memory.fraction=0.7 这个应该在我的程序中不生效
> taskmanager.network.memory.fraction=0.1
>
>
> 这样的配置下来应该用户代码可使用的堆外内存为6144m*0.9=5529m
> 我的问题是
> 在我当前的环境下是否还有我没注意到的Flink堆外内存配置,或者Flink需要占用的堆外内存是我所不了解的。
> 除了控制kafka comsumer 的流量以外有没有什么其他的调整方式?
>
>
> Best
> Aven
>
>


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-16 文章 Yang Wang
如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载

Best,
Yang

superainbower  于2020年12月16日周三 下午6:19写道:

> 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
> 在2020年12月16日 10:53,Yang Wang  写道:
>
> 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
>
> 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
>
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
> 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
>
> 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
>
>
> [1]. https://github.com/apache/flink-shaded
> [2].
>
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
>
> Best,
> Yang
>
> 赢峰  于2020年12月11日周五 上午8:45写道:
>
> > flink-shaded-hadoop-2-uber版本如何选择?
> >
> >
> > xxx-xxx 分别表示什么意思?
> >
> >
> >
>
>


直接内存溢出

2020-12-16 文章 巫旭阳
报错信息如下
Caused by: java.lang.OutOfMemoryError: Direct buffer memory at 
java.nio.Bits.reserveMemory(Bits.java:693) 

 at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at 
java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) 

 at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) 

 at sun.nio.ch.IOUtil.read(IOUtil.java:195) 

 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

 at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)

 at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)

 at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) 

 at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) 

 at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) 

 at org.apache.kafka.common.network.Selector.poll(Selector.java:303) 

 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) 

 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
 

 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
 

 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)



版本 
  Flink:1.9.1 
  kafka-client:0.10.0.1
环境 
  on yarn
JVM参数
  -Xms14336m
  -Xmx14336m
  -XX:MaxDirectMemorySize=6144m
flink-conf.yml
 使用的是默认的参数
 Stream任务,并且没有使用RocksDB
 
目前初步怀疑是Flink 的堆外内存占用过大导致kafka consumer 无法申请堆外内存导致OOM。但根据官方文档的配置 
taskmanager.memory.fraction=0.7 这个应该在我的程序中不生效
taskmanager.network.memory.fraction=0.1


这样的配置下来应该用户代码可使用的堆外内存为6144m*0.9=5529m
我的问题是
在我当前的环境下是否还有我没注意到的Flink堆外内存配置,或者Flink需要占用的堆外内存是我所不了解的。 
除了控制kafka comsumer 的流量以外有没有什么其他的调整方式?


Best
Aven



Re: Scala REPL YARN 运行模式报 NoSuchMethodError

2020-12-16 文章 Jacob
hi,
你的截图好像没有上传成功,通过你的描述,大概是NoSuchMethod之类的错误,我前几天在升级flink版本时候也遇到过类似问题,后来的解决方案是
导入hadoop classpath (export HADOOP_CLASSPATH=`hadoop
classpath`)解决的,如果没有解决你的问题,尝试把flink-shaded-hadoop-2-uber*-*.jar放在 flink/lib下面




Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink-shaded-hadoop-2-uber版本如何选择

2020-12-16 文章 superainbower
借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去

在2020年12月16日 10:53,Yang Wang 写道:
以flink-shaded-hadoop-2-uber的2.8.3-10.0为例

2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本

社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3

如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0


[1]. https://github.com/apache/flink-shaded
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation

Best,
Yang

赢峰  于2020年12月11日周五 上午8:45写道:

> flink-shaded-hadoop-2-uber版本如何选择?
>
>
> xxx-xxx 分别表示什么意思?
>
>
>


Re: Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 Jacob
是的,我看到项目中有logback.xml配置,在pom中也有logback-classic依赖
  
ch.qos.logback
logback-classic
1.2.3


我移除这个依赖后,在UI中可以看到相关日志了!

谢谢!




Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Scala REPL YARN 运行模式报 NoSuchMethodError

2020-12-16 文章 卢国庆
各位好,辛苦帮忙看个问题


使用 Scala REPL Yarn 运行模式报 NoSuchMethodError,截图如下:
$ ./bin/start-scala-shell.sh yarn   


环境说明
CDH 5.16.2
测试 Flink 1.10.2 和 1.11.2 都能复现该问题


已分析内容
使用 Arthas 查看已加载类,加载的是 CDH 相关依赖



删除这个 CDH 依赖 jline-2.11.jar,不再报 NoSuchMethodError。但 Arthas 没有找到 
jline.console.completer.CandidateListCompletionHandler,而是找到 
scala.tools.jline_embedded.console.completer.CandidateListCompletionHandler,详见下面截图





Re: Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 zilong xiao
看样子slf4j最终是绑定到了logback实现,你的任务配置是用的logback吗?如果不是,需要把logback的依赖排除掉

Jacob <17691150...@163.com> 于2020年12月16日周三 下午5:38写道:

> 谢谢回复!
>
> 1. 在jobmanager.err中发现如下日志绑定,存在冲突。
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/***/appcache/application_1603495749855_62368/filecache/24/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/***/appcache/application_1603495749855_62368/filecache/32/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
>
> 这个多绑定会影响吗?
>
> 2. 该版本使用的配置如下:
>
> env.java.home: /usr/java/jdk1.8.0_162
> yarn.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162
> containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_162
> containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162
>
>
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 3072m
> taskmanager.memory.process.size: 3072m
> taskmanager.numberOfTaskSlots: 4
>
> yarn.application-attempts: 10
> state.backend: filesystem
> state.checkpoints.dir: hdfs://nameservice1/prd/website/flink_checkpoint
> state.savepoints.dir: hdfs://nameservice1/prd/website/flink_checkpoint
> state.backend.incremental: false
> state.backend.fs.memory-threshold: 1024
> state.checkpoints.num-retained: 3
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 1000
> restart-strategy.fixed-delay.delay: 30 s
>
> jobmanager.execution.failover-strategy: region
>
>
> classloader.resolve-order: parent-first
>
> 3. job运行方式:on yarn
>
> 4. hadoop版本:2.6
>
> Thanks!
> Jacob
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 Jacob
谢谢回复!

1. 在jobmanager.err中发现如下日志绑定,存在冲突。

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/***/appcache/application_1603495749855_62368/filecache/24/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/***/appcache/application_1603495749855_62368/filecache/32/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]

这个多绑定会影响吗?

2. 该版本使用的配置如下:

env.java.home: /usr/java/jdk1.8.0_162
yarn.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162
containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_162
containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162


jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 3072m
taskmanager.memory.process.size: 3072m
taskmanager.numberOfTaskSlots: 4

yarn.application-attempts: 10
state.backend: filesystem
state.checkpoints.dir: hdfs://nameservice1/prd/website/flink_checkpoint
state.savepoints.dir: hdfs://nameservice1/prd/website/flink_checkpoint
state.backend.incremental: false
state.backend.fs.memory-threshold: 1024
state.checkpoints.num-retained: 3

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1000
restart-strategy.fixed-delay.delay: 30 s

jobmanager.execution.failover-strategy: region


classloader.resolve-order: parent-first

3. job运行方式:on yarn

4. hadoop版本:2.6

Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 zilong xiao
Hi Jacob
1.可否发现使用的配置?
2.检查下jobmanager.err日志,看下日志的绑定是否正确

Jacob <17691150...@163.com> 于2020年12月16日周三 下午4:01写道:

> 
>
> Hello everyone!
>
>
> 如上图所示,升级后的flink,为什么看不到taskmanager的日志了。在Stdout中能看自己代码中打的log,但flink自身的log以及springboot相关的log等,都无法看到,不知何因?升级后日志系统需要重新配置吗?
>
>
> Thanks!
> Jacob
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


env.getCheckpointConfig().enableUnalignedCheckpoints();

2020-12-16 文章 ??????
env.getCheckpointConfig().enableUnalignedCheckpoints();??,

Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 Jacob
 

Hello everyone!

如上图所示,升级后的flink,为什么看不到taskmanager的日志了。在Stdout中能看自己代码中打的log,但flink自身的log以及springboot相关的log等,都无法看到,不知何因?升级后日志系统需要重新配置吗?


Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/