flink/lib/下的jar:
flink-connector-hive_2.11-1.10.0.jar
flink-dist_2.11-1.10.0.jar
flink-jdbc_2.11-1.10.0.jar
flink-json-1.10.0.jar
flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-table_2.11-1.10.0.jar
flink-table-blink_2.11-1.10.0.jar
hbase-client-2.1.0.j
high-availability: zookeeper
在 2020-06-16 14:48:43,"Zhou Zach" 写道:
>
>
>
>
>high-availability.storageDir: hdfs:///flink/ha/
>high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
>state.backend: filesystem
>state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
state.backend: filesystem
state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints
state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
high-avai
你的配置文件中ha配置可以贴下吗
Zhou Zach 于2020年6月16日周二 下午1:49写道:
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>
>
将flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar放在flink/lib目录下,或者打入fat jar都不起作用。。。
At 2020-06-16 13:49:27, "Zhou Zach" wrote:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint YarnJobClusterEntrypoint.
at
org.apache.flink.r
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint YarnJobClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCl
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是
flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink failed。
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
Caused by Could not find a suitable factory for
‘org.apac
table hint的语法是紧跟在你query中访问某张表的时候,所以我理解并不会有 ”这个动态参数作用在哪张表“ 上的疑问吧?
Best,
Kurt
On Tue, Jun 16, 2020 at 10:02 AM Yichao Yang <1048262...@qq.com> wrote:
> Hi
>
>
> 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。
>
> 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候,涉及到同key属性时,你在query内指定的属性到底是赋予给哪张表的?这个其实是比
我们有一个Flink Job需要一些自定义的规则,希望能够动态添加、更新、删除。规则的数量在百到千条。目前设计的结构是RDB+ Kafka +
Flink。
RDB存储规则的完整快照,以展示给Web应用作增删改查。改动通过Kafka发送消息至Flink,通过BroadcastState传播规则。
目前有一个问题没有解决:如何使用Kafka来传递状态。我想了一下,大概有几种方案:
1. 消息标记Add、Upadte、Delete类型,在Flink中写逻辑来处理状态以和RDB中状态保持一致。
目前的问题是,每次重启Job,都需要从头读Kafka,来回放状态的更新。Kafka中的状态消
hi
感谢您的建议,我这边尝试一下自定义实现sink的方式。
Best,
Jack
在 2020-06-15 18:08:15,"godfrey he" 写道:
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
it.next()
}
但是 pyflink 还没有引入 collect() 接口。(后续
感谢您的建议,目前在学习使用pyflink,使用pyflink做各种有趣的尝试,包括udf函数做日志解析等,也看过
目前官方文档对于pyflink的文档和例子还是偏少,遇到问题了还是需要向各位大牛们多多请教。
Best,
Jack
在 2020-06-15 16:13:32,"jincheng sun" 写道:
>你好 Jack,
>
>> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
>我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
>我理解你上面说的 【直接作为结
补充一条,在1.12中,除了LIKE语句,还有Table Hints[1] 可以用来动态修改属性。
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html
Yichao Yang <1048262...@qq.com> 于2020年6月16日周二 上午10:02写道:
> Hi
>
>
> 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。
>
> 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候
Hi
1.2??like[1]query
??querykeyquery
[1]https://ci.apache.org/projects/flink/flink-
tks
-- --
??: "Kurt Young"
就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。
Best,
Kurt
On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com> wrote:
> 动态 Table 属性是指什么?可以举一个列子吗。
Table
各位好;
最近我想测试一下我的程序处理性能如何。请问有什么工具、或者应该通过什么方法来获得一个比较准确的测试结果。
我的场景包含从kafka读取,flink 处理(有查询es做维表关联),处理结果输出到ES 和 Kafka。
Best
Aven
好像不需要改源码
'connector.version' = ‘1.4.3’ 也可以往2.x版本里写
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制
On 06/15/2020 19:22,Zhou Zach wrote:
改了源码,可以了
在 2020-06-15 16:17:46,"Leonard Xu" 写道:
Hi
在 2020年6月15日,15:36,Zhou Zach 写道:
'connector.version' expects '1.4.3', but is '2.1.0'
Hi,
看你有两个地方声明hbase的表,
>|cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3))
这种方式应该是ok的,
> users.addColumn("cf", "age", classOf[Array[Byte]])
你这里为什么声明 age 的data type 为什么声明 classOf[Array[Byte]] ? 是不是忘了修改了?
这里使用 users.addColumn("cf", "age", classOf[Integer]) 应该就行了。
通过DDL 或者 在TableEn
hbase中维表:
streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_hbase3(
|rowkey string,
|cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3))
|) WITH (
|'connector.type' = 'hbase',
|'connector.version' = '2.1.0',
|'connector.table-name' = 'user_
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
blinkEnvSettings)
val conf = new Configuration
v
Hi,
看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql
中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL 贴下吗?
祝好,
Leonard Xu
> 在 2020年6月15日,19:55,Zhou Zach 写道:
>
>
>
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationExcep
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field types of query result and registered TableSink
default_catalog.default_database.user_cnt do not match.
Query schema: [time: STRING, age: BYTES]
Sink schema: [time: STRING, sum_age: INT]
Hi,
听起来你的需求应该就是做一个双流join,可以做一个基于事件时间的双流join[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#interval-joins
阿华田 于2020年6月15日周一 下午6:31写道:
> 建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache,
> com.google.common.cache;
>
>
> | |
> 阿华田
> |
>
改了源码,可以了
在 2020-06-15 16:17:46,"Leonard Xu" 写道:
>Hi
>
>
>> 在 2020年6月15日,15:36,Zhou Zach 写道:
>>
>> 'connector.version' expects '1.4.3', but is '2.1.0'
>
>Hbase connector只支持1.4.3的版本,其他不支持,但之前看有社区用户用1.4.3的connector写入高版本的case,你可以试下。
>
>祝好
>Leonard Xu
Hi,
我试了一下,TO_TIMESTAMP(FROM_UNIXTIME())这种方式不会有时区问题呀,
你可以说下你具体遇到的是什么问题么?比如怎么观察到的,以及问题的表现是什么。
王超 <1984chaow...@gmail.com> 于2020年6月15日周一 下午3:31写道:
> Hello,
>
> 我遇到了类似https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html
> 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。
>
> pu
建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache,
com.google.common.cache;
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年06月15日 14:41,steven chen 写道:
hi:
1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。
2. 有topic a 是所有的用户pv日志, topic b
是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafk
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
it.next()
}
但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)
但是1.11的TableResult#collect实现对流的query支持不完整(只支持append
only的query),master已经完整支持。
可以参照 j
Hi
watermark
??5??a??15??
b20??
Best,
Yichao Ya
Hi
> 在 2020年6月15日,15:36,Zhou Zach 写道:
>
> 'connector.version' expects '1.4.3', but is '2.1.0'
Hbase connector只支持1.4.3的版本,其他不支持,但之前看有社区用户用1.4.3的connector写入高版本的case,你可以试下。
祝好
Leonard Xu
你好 Jack,
> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
【直接作为结果】这句话就要描述一下,您想怎样作为结果?我
Hello, 雪魂
在1.10里面的batch模式(flink planner和blink planner)都是没法直接使用sql ddl的方式将jdbc作为sink的。
需要你注册使用JDBCAppendSink。
对于PyFlink的用户来说,需要wrapper一下这个类。我写了一个简单的wrapper,你可以参考一下
from pyflink.java_gateway import get_gateway
from pyflink.table.types import _to_java_type
from pyflink.util import utils
class JDBCA
大家好,
我们这边想做flink on yarn日志web前台动态展示的功能。因为没在flink restful
api里面找到日志相关的api,现在的想法是这样:
1.web前端编写flink脚本,点击运行调用web后端的执行接口
2.web后端生成此前端任务的taskId,并调用flink驱动包(Pom依赖方式),传入前端脚本+taskId作为入口传参
3.flink驱动包中:
A>通过YarnClient启动flink on yarn任务,返回yarn app
flink version: 1.10.0
hbase version: 2.1.0
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding
hi:
1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。
2. 有topic a 是所有的用户pv日志, topic b
是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafka topic c中,
问题:当a 消息 提前到达,b 消息晚20分钟到达,job 在工作时如何保证2个topic 数据对齐,或者说2边数据进行关联整合?
相当于2条消息处理后合并成1条往下游sink ,如何保证数据数据a和b对应的上?
FlinkTraceback
(most recent call last): File
"/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco File
"/tmp/pyflink/db00a36e-521f-4109-b7
Hello,
我遇到了类似https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html
中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。
public static void main (String[] args) throws Exception {
// set up the streaming execution environment
ClientConfig clientConfig =
ClientConfig.build
37 matches
Mail list logo