Re:Re:Re: flink sql job 提交到yarn上报错

2020-06-15 文章 Zhou Zach
flink/lib/下的jar: flink-connector-hive_2.11-1.10.0.jar flink-dist_2.11-1.10.0.jar flink-jdbc_2.11-1.10.0.jar flink-json-1.10.0.jar flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar flink-sql-connector-kafka_2.11-1.10.0.jar flink-table_2.11-1.10.0.jar flink-table-blink_2.11-1.10.0.jar hbase-client-2.1.0.j

Re:Re:Re: flink sql job 提交到yarn上报错

2020-06-15 文章 Zhou Zach
high-availability: zookeeper 在 2020-06-16 14:48:43,"Zhou Zach" 写道: > > > > >high-availability.storageDir: hdfs:///flink/ha/ >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 >state.backend: filesystem >state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/

Re:Re: flink sql job 提交到yarn上报错

2020-06-15 文章 Zhou Zach
high-availability.storageDir: hdfs:///flink/ha/ high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 state.backend: filesystem state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints high-avai

Re: flink sql job 提交到yarn上报错

2020-06-15 文章 王松
你的配置文件中ha配置可以贴下吗 Zhou Zach 于2020年6月16日周二 下午1:49写道: > org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to > initialize the cluster entrypoint YarnJobClusterEntrypoint. > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187) > >

Re:flink sql job 提交到yarn上报错

2020-06-15 文章 Zhou Zach
将flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar放在flink/lib目录下,或者打入fat jar都不起作用。。。 At 2020-06-16 13:49:27, "Zhou Zach" wrote: org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.r

flink sql job 提交到yarn上报错

2020-06-15 文章 Zhou Zach
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCl

pyflink连接elasticsearch5.4问题

2020-06-15 文章 jack
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 连接es的时候报错,findAndCreateTableSink failed。 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 Caused by Could not find a suitable factory for ‘org.apac

Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-15 文章 Kurt Young
table hint的语法是紧跟在你query中访问某张表的时候,所以我理解并不会有 ”这个动态参数作用在哪张表“ 上的疑问吧? Best, Kurt On Tue, Jun 16, 2020 at 10:02 AM Yichao Yang <1048262...@qq.com> wrote: > Hi > > > 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。 > > 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候,涉及到同key属性时,你在query内指定的属性到底是赋予给哪张表的?这个其实是比

通过Kafka更新规则

2020-06-15 文章 Ruibin Xing
我们有一个Flink Job需要一些自定义的规则,希望能够动态添加、更新、删除。规则的数量在百到千条。目前设计的结构是RDB+ Kafka + Flink。 RDB存储规则的完整快照,以展示给Web应用作增删改查。改动通过Kafka发送消息至Flink,通过BroadcastState传播规则。 目前有一个问题没有解决:如何使用Kafka来传递状态。我想了一下,大概有几种方案: 1. 消息标记Add、Upadte、Delete类型,在Flink中写逻辑来处理状态以和RDB中状态保持一致。 目前的问题是,每次重启Job,都需要从头读Kafka,来回放状态的更新。Kafka中的状态消

Re: pyflink数据查询

2020-06-15 文章 jack
hi 感谢您的建议,我这边尝试一下自定义实现sink的方式。 Best, Jack 在 2020-06-15 18:08:15,"godfrey he" 写道: hi jack,jincheng Flink 1.11 支持直接将select的结果collect到本地,例如: CloseableIterator it = tEnv.executeSql("select ...").collect(); while(it.hasNext()) { it.next() } 但是 pyflink 还没有引入 collect() 接口。(后续

Re:Re: pyflink数据查询

2020-06-15 文章 jack
感谢您的建议,目前在学习使用pyflink,使用pyflink做各种有趣的尝试,包括udf函数做日志解析等,也看过 目前官方文档对于pyflink的文档和例子还是偏少,遇到问题了还是需要向各位大牛们多多请教。 Best, Jack 在 2020-06-15 16:13:32,"jincheng sun" 写道: >你好 Jack, > >> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果, >我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询 > >我理解你上面说的 【直接作为结

Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-15 文章 Benchao Li
补充一条,在1.12中,除了LIKE语句,还有Table Hints[1] 可以用来动态修改属性。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html Yichao Yang <1048262...@qq.com> 于2020年6月16日周二 上午10:02写道: > Hi > > > 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。 > > 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候

?????? flink1.11 ??????(???? DDL ??????(???? Table ????))

2020-06-15 文章 Yichao Yang
Hi 1.2??like[1]query ??querykeyquery [1]https://ci.apache.org/projects/flink/flink-

?????? flink1.11 ??????(???? DDL ??????(???? Table ????))

2020-06-15 文章 kcz
tks --  -- ??: "Kurt Young"

Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-15 文章 Kurt Young
就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。 Best, Kurt On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com> wrote: > 动态 Table 属性是指什么?可以举一个列子吗。

flink1.11 ??????(???? DDL ??????(???? Table ????))

2020-06-15 文章 kcz
Table

如何做Flink Stream的性能测试

2020-06-15 文章 aven . wu
各位好; 最近我想测试一下我的程序处理性能如何。请问有什么工具、或者应该通过什么方法来获得一个比较准确的测试结果。 我的场景包含从kafka读取,flink 处理(有查询es做维表关联),处理结果输出到ES 和 Kafka。 Best Aven

Re: flink sql sink hbase failed

2020-06-15 文章 Sun.Zhu
好像不需要改源码 'connector.version' = ‘1.4.3’ 也可以往2.x版本里写 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 On 06/15/2020 19:22,Zhou Zach wrote: 改了源码,可以了 在 2020-06-15 16:17:46,"Leonard Xu" 写道: Hi 在 2020年6月15日,15:36,Zhou Zach 写道: 'connector.version' expects '1.4.3', but is '2.1.0'

Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

2020-06-15 文章 Leonard Xu
Hi, 看你有两个地方声明hbase的表, >|cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3)) 这种方式应该是ok的, > users.addColumn("cf", "age", classOf[Array[Byte]]) 你这里为什么声明 age 的data type 为什么声明 classOf[Array[Byte]] ? 是不是忘了修改了? 这里使用 users.addColumn("cf", "age", classOf[Integer]) 应该就行了。 通过DDL 或者 在TableEn

Re:Re:Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

2020-06-15 文章 Zhou Zach
hbase中维表: streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_hbase3( |rowkey string, |cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3)) |) WITH ( |'connector.type' = 'hbase', |'connector.version' = '2.1.0', |'connector.table-name' = 'user_

Re:Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

2020-06-15 文章 Zhou Zach
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) val conf = new Configuration v

Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

2020-06-15 文章 Leonard Xu
Hi, 看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql 中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL 贴下吗? 祝好, Leonard Xu > 在 2020年6月15日,19:55,Zhou Zach 写道: > > > > > > Exception in thread "main" org.apache.flink.table.api.ValidationExcep

flink sql 怎样将从hbase中取出的BYTES类型转换成Int

2020-06-15 文章 Zhou Zach
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match. Query schema: [time: STRING, age: BYTES] Sink schema: [time: STRING, sum_age: INT]

Re: 关于多个来源,如何保证数据对齐

2020-06-15 文章 Benchao Li
Hi, 听起来你的需求应该就是做一个双流join,可以做一个基于事件时间的双流join[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#interval-joins 阿华田 于2020年6月15日周一 下午6:31写道: > 建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache, > com.google.common.cache; > > > | | > 阿华田 > | >

Re:Re: flink sql sink hbase failed

2020-06-15 文章 Zhou Zach
改了源码,可以了 在 2020-06-15 16:17:46,"Leonard Xu" 写道: >Hi > > >> 在 2020年6月15日,15:36,Zhou Zach 写道: >> >> 'connector.version' expects '1.4.3', but is '2.1.0' > >Hbase connector只支持1.4.3的版本,其他不支持,但之前看有社区用户用1.4.3的connector写入高版本的case,你可以试下。 > >祝好 >Leonard Xu

Re: Flink SQL窗口计算结果无法sink

2020-06-15 文章 Benchao Li
Hi, 我试了一下,TO_TIMESTAMP(FROM_UNIXTIME())这种方式不会有时区问题呀, 你可以说下你具体遇到的是什么问题么?比如怎么观察到的,以及问题的表现是什么。 王超 <1984chaow...@gmail.com> 于2020年6月15日周一 下午3:31写道: > Hello, > > 我遇到了类似https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html > 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。 > > pu

回复:关于多个来源,如何保证数据对齐

2020-06-15 文章 阿华田
建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache, com.google.common.cache; | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月15日 14:41,steven chen 写道: hi: 1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。 2. 有topic a 是所有的用户pv日志, topic b 是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafk

Re: pyflink数据查询

2020-06-15 文章 godfrey he
hi jack,jincheng Flink 1.11 支持直接将select的结果collect到本地,例如: CloseableIterator it = tEnv.executeSql("select ...").collect(); while(it.hasNext()) { it.next() } 但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng) 但是1.11的TableResult#collect实现对流的query支持不完整(只支持append only的query),master已经完整支持。 可以参照 j

????????????????????????????????????

2020-06-15 文章 Yichao Yang
Hi watermark ??5??a??15?? b20?? Best, Yichao Ya

Re: flink sql sink hbase failed

2020-06-15 文章 Leonard Xu
Hi > 在 2020年6月15日,15:36,Zhou Zach 写道: > > 'connector.version' expects '1.4.3', but is '2.1.0' Hbase connector只支持1.4.3的版本,其他不支持,但之前看有社区用户用1.4.3的connector写入高版本的case,你可以试下。 祝好 Leonard Xu

Re: pyflink数据查询

2020-06-15 文章 jincheng sun
你好 Jack, > pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果, 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景: 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我

Re: pyflink问题求助

2020-06-15 文章 Xingbo Huang
Hello, 雪魂 在1.10里面的batch模式(flink planner和blink planner)都是没法直接使用sql ddl的方式将jdbc作为sink的。 需要你注册使用JDBCAppendSink。 对于PyFlink的用户来说,需要wrapper一下这个类。我写了一个简单的wrapper,你可以参考一下 from pyflink.java_gateway import get_gateway from pyflink.table.types import _to_java_type from pyflink.util import utils class JDBCA

flink on yarn日志web前台动态展示问题

2020-06-15 文章 zjfpla...@hotmail.com
大家好, 我们这边想做flink on yarn日志web前台动态展示的功能。因为没在flink restful api里面找到日志相关的api,现在的想法是这样: 1.web前端编写flink脚本,点击运行调用web后端的执行接口 2.web后端生成此前端任务的taskId,并调用flink驱动包(Pom依赖方式),传入前端脚本+taskId作为入口传参 3.flink驱动包中: A>通过YarnClient启动flink on yarn任务,返回yarn app

flink sql sink hbase failed

2020-06-15 文章 Zhou Zach
flink version: 1.10.0 hbase version: 2.1.0 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding

关于多个来源,如何保证数据对齐

2020-06-15 文章 steven chen
hi: 1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。 2. 有topic a 是所有的用户pv日志, topic b 是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafka topic c中, 问题:当a 消息 提前到达,b 消息晚20分钟到达,job 在工作时如何保证2个topic 数据对齐,或者说2边数据进行关联整合? 相当于2条消息处理后合并成1条往下游sink ,如何保证数据数据a和b对应的上?

pyflink????????

2020-06-15 文章 ????
FlinkTraceback (most recent call last): File "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/tmp/pyflink/db00a36e-521f-4109-b7

Fwd: Flink SQL窗口计算结果无法sink

2020-06-15 文章 王超
Hello, 我遇到了类似https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。 public static void main (String[] args) throws Exception { // set up the streaming execution environment ClientConfig clientConfig = ClientConfig.build