Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 文章 Robin Zhang
Hi,奔跑的小飞袁 目前没试过flink集成es,所以细节方面没办法深究太多,但是,可以给你提供个思路: 1. 查看pom中es的dependency是否设置了scope,导致依赖没有成功引入; 2. 如果依赖成功引入了,但是还不行,相反,在lib下放置相同的jar却可以正常执行,基本可以确定就是依赖冲突,具体什么类导致的,这个目前无法确定,期待更好地思路。 Best, Robin 奔跑的小飞袁 wrote > 现在我的lib下没有ElasticSearch相关的connector,在pom中引用,这样会产生冲突吗,还有这种现象有可能是在哪块冲突了 >

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 文章 奔跑的小飞袁
现在我的lib下没有ElasticSearch相关的connector,在pom中引用,这样会产生冲突吗,还有这种现象有可能是在哪块冲突了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于flink-sql count/sum 数据如何每天重新计算

2020-10-19 文章 Robin Zhang
Hi, 夜思流年梦 我理解按照日期分组就可以解决你的需求,流数据属于哪一天就只算当天的,不影响其他date的数据; 按天分组的数据都计算出来了,再汇总一下就是一个月的 Best, Robin 夜思流年梦 wrote > 现有此场景: > 计算每天员工的业绩(只计算当天的) > > > 现在我用flink-sql 的方式,insert into select current_date, count(1) ,worker from > XX where writeTime>=current_date group by worker; > 把数据按天分区的方式先把数据

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 文章 Robin Zhang
Hi, 奔跑的小飞袁 Flink的class加载原则是child first,所以,尽量避免在pom中自己引入flink相关的依赖,避免跟Flink集群环境造成冲突,建议将安装包放在lib下,由flink去加载。 Best, Robin 奔跑的小飞袁 wrote > hello > 我在使用flinksql连接器时当我将flink-sql-connector-elasticsearch6_2.11-1.11.1.jar放在lib下,程序正常执行,但是当我在pom中进行配置时会产生如下报错,同样的问题会产生在hbase、jdbc的connector中,请问下这可能是什么造成的

Re: 单任务多条流的逻辑报错

2020-10-19 文章 Robin Zhang
Hi, 根据报错内容,定位到你的代码在 at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.j

flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 文章 奔跑的小飞袁
hello 我在使用flinksql连接器时当我将flink-sql-connector-elasticsearch6_2.11-1.11.1.jar放在lib下,程序正常执行,但是当我在pom中进行配置时会产生如下报错,同样的问题会产生在hbase、jdbc的connector中,请问下这可能是什么造成的 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a sink for writing table 'default

Re: 回复:请问现在Flink支持动态扩缩容吗?

2020-10-19 文章 Yun Tang
Hi Flink-1.8 之前支持通过rest命令进行扩缩容 [1],不过后来在重构时该功能被disable了[2]。当然这个功能距离动态扩缩容还是有差距的,可以理解成是从外部进行扩缩容的基础。 目前在阿里巴巴的企业版中,有名为libra的动态扩缩容插件 [3] 提供相关功能。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling [2] https://issues.apache.org/jira/browse/FL

Re: 退订

2020-10-19 文章 Congxian Qiu
Hi 退订请发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考[1] [1] https://flink.apache.org/community.html#mailing-lists Best, Congxian 费文杰 <15171440...@163.com> 于2020年10月20日周二 下午1:51写道: > hi: > 退订!

退订

2020-10-19 文章 费文杰
hi: 退订!

单任务多条流的逻辑报错

2020-10-19 文章 freeza1...@outlook.com
Hi all: 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因? org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) at org.

关于flink-sql count/sum 数据如何每天重新计算

2020-10-19 文章 夜思流年梦
现有此场景: 计算每天员工的业绩(只计算当天的) 现在我用flink-sql 的方式,insert into select current_date, count(1) ,worker from XX where writeTime>=current_date group by worker; 把数据按天分区的方式先把数据sink到mysql 但是发现落地到mysql的数据把前几天的数据都给算进来了,如何只算今天的数据? 另外还有一个疑惑,如何既计算当天数据,又包含了本月的所有数据?

flink更换scala版本问题

2020-10-19 文章 赵一旦
如题,更换scala版本的话,不清楚检查点等是否兼容呢?线上操作。 更换原因主要是准备用flinksql,但zeppelin貌似对scala支持为2.11,之前我用的2.12都是。 想着一次性都换成2.11吧。

pyflink1.11.0 kafka connector如果有访问权限

2020-10-19 文章 whh_960101
CREATETABLEkafkaTable(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3))WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','format'='csv','scan.startup.mode'='earliest-offset')你好,如果使用sql语句来创建ka

答复: Flink 1.11里如何parse出未解析的执行计划

2020-10-19 文章 刘首维
Hi, 我之前跟你有相同的需求,实现方式也跟你的思路基本类似, mock一个env 然后反射获取calciteParserSupplier 目前在生产环境运行良好 FYI 发件人: 马阳阳 发送时间: 2020年10月19日 17:57:47 收件人: Flink中文邮件列表 主题: Flink 1.11里如何parse出未解析的执行计划 Flink 1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致

Re: flink table转datastream失败

2020-10-19 文章 Dream-底限
hi、 我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常: streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志 Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as person_uuid,cast(t1.app_id

回复:请问现在Flink支持动态扩缩容吗?

2020-10-19 文章 熊云昆
目前还不支持吧 | | 熊云昆 | | 邮箱:xiongyun...@163.com | 签名由 网易邮箱大师 定制 在2020年10月19日 18:22,林影 写道: 请问现在Flink支持动态扩缩容吗,或者说社区在这方面有什么计划吗?

flink1.10.0 batch模式写入hive失败,找不到TableSinkFactory

2020-10-19 文章 Allen
请问照片中的报错怎么解决? 发自我的iPhone

Re:flink table转datastream失败

2020-10-19 文章 hailongwang
Hi Dream, 可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。 Best, Hailong Wang 在 2020-10-19 09:50:33,"Dream-底限" 写道: >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。 > >table.printSchema(); >streamTableEnv.toRetractStream(table, >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table

Re:pyflink下同时读查mysql优化求助

2020-10-19 文章 hailongwang
Hi, 我理解主要是在写入或者查询的网络IO上。对于写入的化,可以批量写入,减少网络IO。 查询的话,如果数据特点比较适合做缓存的话,可以增加 LRU 缓存,异步查询等。 Best, Hailong Wang 在 2020-10-19 12:19:34,"小学生" <201782...@qq.com> 写道: >各位大佬,我现在通过pyflink实时将kafka的消息获取,一条线直接进入mysql,另一条线通过udf去查该mysql的历史数据进行计算,当数据量达到1百万的时候,性能就下降了。怎么优化呢

Re: flinkSQL1.11写出数据到jdbc fleld type do not match

2020-10-19 文章 Benchao Li
你的source跟sink的字段数量都不一样多,你需要让insert的语句的query的table schema跟sink表的schema相同才可以。 比如可以用下面的SQL来写入: ```SQL insert into cloud_behavior_sink select operation, operation_channel, ip, lat, lng, user_id, device_id from cloud_behavior_source; ``` 奔跑的小飞袁 于2020年10月19日周一 下午4:29写道: > h

?????? ????????????????????????????????????????

2020-10-19 文章 x
??KeyedProcessFunctionProcessWindowFunction. --  -- ??: "user-zh"

请问现在Flink支持动态扩缩容吗?

2020-10-19 文章 林影
请问现在Flink支持动态扩缩容吗,或者说社区在这方面有什么计划吗?

Re: 关于内存大小设置以及预测

2020-10-19 文章 Xintong Song
事前估算是比较难的,不同作业差别可能会很大。 如果只是 heap oom 的话,没必要调大整个 JM/TM 的内存,可以只针对 heap 部分进行调整。 可以参考一下这篇文档 [1]。 Thank you~ Xintong Song [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/memory/mem_tuning.html On Sun, Oct 18, 2020 at 8:54 PM guangyong yang wrote: > 可以使用jvm自带命令jstat或通过 Mana

Flink 1.11里如何parse出未解析的执行计划

2020-10-19 文章 马阳阳
Flink 1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致在某些前置sql(例如insert into用到的表的create table语句)没有执行之前,这个parse方法会报错。如果只是想调用Calcite的相关的功能去parse sql语句,有什么办法可以做到吗?能想到的一个办法是通过反射拿到ParserImpl里面的calciteParserSupplier。想知道Flink有没有提供直接的接口或者方法去做纯的sql parsing。 谢谢~

Re: SQL interval join 问题

2020-10-19 文章 Benchao Li
Hi Mic, 感谢关注这个issue,这个issue当前还在讨论中。 我认为问题已经定位清楚了,抄送了其他的committer同学进一步讨论确认。 Mic 于2020年10月19日周一 下午3:51写道: > 搜了一下,目前是有一个 issue 看起来相关,https://issues.apache.org/jira/browse/FLINK-18996 > 不知道处理进度如何? > 在 2020-10-19 15:03:54,"Mic" 写道: > >现有 SQL 语句如下: > >create table source1( > > id varchar PRIMARY KE

flinkSQL1.11写出数据到jdbc fleld type do not match

2020-10-19 文章 奔跑的小飞袁
hello 我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗? 下面是我的异常日志以及sql文件 SET stream.enableCheckpointing=1000*60; SET stream.setParallelism=3; -- Kafka cdbp zdao source 表 create TABLE cloud_behavior_source( operation STRING, operation_channel STRING, `time` STRING, ip STR

Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
Hi,zilong 的确是这个问题,感谢帮助。 Best, Robin zilong xiao wrote > Hi Robin Zhang > 你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626 > ,可以看下这个issue描述,祝好~ > > Robin Zhang < > vincent2015qdlg@ > > 于2020年10月19日周一 下午3:42写道: > >> 普通的source -> map -> filter-> sink 测试应用。 >> >> 触发savepo

Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
Hi,Congxian 感谢提供思路,看了一下,JM端没有暴露日志,只能查看到ck正常的日志 Best, Robin Congxian Qiu wrote > Hi > 你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task > 完成的慢,(savepoint 可能比 checkpoint 要慢) > Best, > Congxian > > > Robin Zhang < > vincent2015qdlg@ > > 于2020年10月19日周一 下午3:42写道: > >> 普

Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 zilong xiao
Hi Robin Zhang 你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626 ,可以看下这个issue描述,祝好~ Robin Zhang 于2020年10月19日周一 下午3:42写道: > 普通的source -> map -> filter-> sink 测试应用。 > > 触发savepoint的脚本 : > ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} > 具体报错信息: > > org.apache.flink.util.F

Re:SQL interval join 问题

2020-10-19 文章 Mic
搜了一下,目前是有一个 issue 看起来相关,https://issues.apache.org/jira/browse/FLINK-18996不知道处理进度如何? 在 2020-10-19 15:03:54,"Mic" 写道: >现有 SQL 语句如下: >create table source1( > id varchar PRIMARY KEY, > a varchar, > proctime AS PROCTIME() >) with ( >'connector' = 'kafka' >... >); >create table source2( > i

Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Congxian Qiu
Hi 你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task 完成的慢,(savepoint 可能比 checkpoint 要慢) Best, Congxian Robin Zhang 于2020年10月19日周一 下午3:42写道: > 普通的source -> map -> filter-> sink 测试应用。 > > 触发savepoint的脚本 : > ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} > 具体报错信息: > > or

flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
普通的source -> map -> filter-> sink 测试应用。 触发savepoint的脚本 : ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} 具体报错信息: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "81990282a4686ebda3d04041e3620776". at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliF

??????????????????????????????????????

2020-10-19 文章 x
1224 --  -- ??: "user-zh"

Re: 求助:如何处理数据不连续导致状态无法清理

2020-10-19 文章 Congxian Qiu
Hi 或许你可以使用 timer 来进行兜底,注册一个未来某个时间的 timer,然后 timer 触发的时候把 state 清理掉 Best, Congxian x <35907...@qq.com> 于2020年10月19日周一 下午2:55写道: > 版本为v1.10.1 > 使用AggregateFunction+ProcessWindowFunction的方式,进行实时统计,ProcessWindowFunction中涉及状态的累计运算,使用事件时间,按维度+日期分区,按分钟开窗,跨天需要将状态清除,避免状态越来越大。状态清除的逻辑,覆盖ProcessWindowF

??????????????????????????????????????

2020-10-19 文章 x
1224   -- ??: "user-zh"

Re:Re: flink sql 更新mysql字段

2020-10-19 文章 Michael Ran
我们是自定义SQL。 但是不同SQL 更新部分字段,会有锁冲突,是能单条减少冲突量。 批量会死锁 在 2020-09-28 21:36:11,"Leonard Xu" 写道: >Hi > >Insert 到指定字段是个通用的需求,社区已经有一个issue[1] 在跟踪了,你可以关注下 > > >祝好 >Leonard >[1] https://issues.apache.org/jira/browse/FLINK-18726 > > >> 在 2020年9月28日,17:46,lem

请问现在Flink支持动态扩缩容吗?

2020-10-19 文章 林影
请问现在Flink支持动态扩缩容吗,或者说社区在这方面有什么计划吗?

SQL interval join 问题

2020-10-19 文章 Mic
现有 SQL 语句如下: create table source1( id varchar PRIMARY KEY, a varchar, proctime AS PROCTIME() ) with ( 'connector' = 'kafka' ... ); create table source2( id varchar PRIMARY KEY, a varchar, proctime AS PROCTIME() ) with ( 'connector' = 'kafka' ... ); select case when