Re: Flink作业运行失败

2020-10-15 文章 Jeff Zhang
你是hadoop2 吗?我记得这个情况只有hadoop3才会出现 gangzi <1139872...@qq.com> 于2020年10月16日周五 上午11:22写道: > TM > 的CLASSPATH确实没有hadoop-mapreduce-client-core.jar。这个难道是hadoop集群的问题吗?还是一定要shade-hadoop包,官方不推荐shade-hadoop包了。 > > > 2020年10月16日 上午10:50,Jeff Zhang 写道: > > > > 你看看TM的log,里面有CLASSPATH的 > > > > gangzi

回复: flink 自定义udf注册后不能使用

2020-10-15 文章 史 正超
Hi, 从日志上看 是说 匹配不到 imei_encrypt的UDF,有可能是sql里传的字段和imei_encrypt的参数不匹配, 能看下你的具体代码和udf的声明吗 发件人: 奔跑的小飞袁 发送时间: 2020年10月16日 3:30 收件人: user-zh@flink.apache.org 主题: flink 自定义udf注册后不能使用 hello 我在使用flinkSQL注册udf时,发生了以下错误,这是我定义有问题还是flink的bug

flink 自定义udf注册后不能使用

2020-10-15 文章 奔跑的小飞袁
hello 我在使用flinkSQL注册udf时,发生了以下错误,这是我定义有问题还是flink的bug org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 11, column 6 to line 11, column 23: No match found for function signature imei_encrypt() at

Re: Flink作业运行失败

2020-10-15 文章 gangzi
TM 的CLASSPATH确实没有hadoop-mapreduce-client-core.jar。这个难道是hadoop集群的问题吗?还是一定要shade-hadoop包,官方不推荐shade-hadoop包了。 > 2020年10月16日 上午10:50,Jeff Zhang 写道: > > 你看看TM的log,里面有CLASSPATH的 > > gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道: > >> 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH

Re: Flink作业运行失败

2020-10-15 文章 Jeff Zhang
你看看TM的log,里面有CLASSPATH的 gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道: > 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop > classpath`,但是报:java.lang.NoClassDefFoundError: > org/apache/hadoop/mapred/JobConf > >

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
我摘取了plan其中一部分 在过滤数据这里 == Abstract Syntax Tree == +- LogicalFilter(condition=[error_exist($1)]) == Optimized Logical Plan == +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0])

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
== Abstract Syntax Tree == LogicalProject(_c0=[log_get($1)], _c1=[_UTF-16LE''], _c2=[_UTF-16LE''], _c3=[_UTF-16LE'ERROR'], _c4=[_UTF-16LE'Asia/Shanghai'], _c5=[_UTF-16LE'@timestamp'], kubernetes$container$name=[$3.container.name], clusterName=[$2]) +-

Re: Flink作业运行失败

2020-10-15 文章 gangzi
我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop classpath`,但是报:java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf

Re:Flink作业运行失败

2020-10-15 文章 Shubin Ruan
尝试在集群的各个节点上执行下述命令: export HADOOP_CLASSPATH= 然后执行任务提交。 在 2020-10-15 22:05:43,"gangzi" <1139872...@qq.com> 写道: >请教一下,flink-1.11.1 yarn per job提交作业后,抛出了如下异常: >java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf >at java.lang.Class.getDeclaredMethods0(Native

??????????????????????????????????????

2020-10-15 文章 ??????
??aggregateFunction?? | | ?? | | ??xiongyun...@163.com | ?? ??2020??10??15?? 15:47?? ?? Hi,All

Flink作业运行失败

2020-10-15 文章 gangzi
请教一下,flink-1.11.1 yarn per job提交作业后,抛出了如下异常: java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.getDeclaredMethod(Class.java:2128) at

Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 Dian Fu
可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables > 在 2020年10月15日,下午7:02,whh_960101 写道: > > hi, > 我刚才改了一下你的例子[1],通过from_elements构建一个source表 > 然后使用我的udf >

回复: kafka topic字段 不全的统计场景

2020-10-15 文章 史 正超
@Kyle Zhang 谢谢答复,现在差不多就是你说的这种方式做的。 发送自 Windows 10 版邮件应用 发件人: Kyle Zhang 发送时间: Thursday, October 15, 2020 6:56:08 PM 收件人: user-zh@flink.apache.org 主题: Re: kafka topic字段 不全的统计场景 group

Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 amen...@163.com
追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗? 那这种设置env的方式有可能还会造成其他什么问题? best, amenhub 发件人: amen...@163.com 发送时间: 2020-10-15 19:22 收件人: user-zh 主题: Re: Re: flink1.11加载外部jar包进行UDF注册 非常感谢您的回复! 对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗? 因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF

Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 amen...@163.com
非常感谢您的回复! 对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗? 因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。 期待您的回复,谢谢~ best, amenhub 发件人: cxydeve...@163.com 发送时间: 2020-10-15 17:46 收件人: user-zh 主题: Re: flink1.11加载外部jar包进行UDF注册

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
hi, 我刚才改了一下你的例子[1],通过from_elements构建一个source表 然后使用我的udf source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印出来的结果能够很好的筛选出我想要的数据 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 source =

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
hi, 我刚才改了一下你的例子[1],通过from_elements构建一个source表 然后使用我的udf source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印出来的结果能够很好的筛选出我想要的数据 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 source =

Re: kafka topic字段 不全的统计场景

2020-10-15 文章 Kyle Zhang
group by id应该就可以了吧,其他几个字段用last value或者first value[1],还有就是考虑迟到的数据怎么处理 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html On Thu, Oct 15, 2020 at 5:01 PM 史 正超 wrote: > 大佬们,现在我有个场景: > 一个kafka 主题 有 4个字段 , id, field2, field3, field4,其中id 是唯一标识,

flink sql state queryable ?

2020-10-15 文章 kandy.wang
想了解一下flink sql state里的东西,是否可以用datastream里的queryable api 查询 ? 怎么查询呢,是需要知道key 才可以查询么。 诉求就是想知道state里到底存的啥

Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 cxydeve...@163.com
我们用方法是通过反射设置env的配置,增加pipeline.classpaths 具体代码如下 public static void main(final String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings =

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) def udf1(msg): #udf1就是简单的筛选log中的error关键字 if msg is None: return '' msg_dic = json.loads(msg.strip()) log = msg_dic.get('log').lower() if 'error' in log or 'fail' in log:

kafka topic字段 不全的统计场景

2020-10-15 文章 史 正超
大佬们,现在我有个场景: 一个kafka 主题 有 4个字段 , id, field2, field3, field4,其中id 是唯一标识, 但是有个问题是,并不是每个消息都会带上全量的字段消息,只有id是固有的字段。然后需要把id, field2, field3, field4 作为一个维度 统计, 比如有如下 kafka消息: {"id": 1, "field2":"b"} {"id": 1, "field3":"c", "field4":"d"} 那么 按照维度 count(1) (group by id, field2, field3, field4)

Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 Xingbo Huang
Hi, 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 [1] https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 Best, Xingbo whh_960101 于2020年10月15日周四 下午2:30写道: > 您好,我使用pyflink时的代码如下,有如下问题: > > > source = st_env.from_path('source') >

Re: pyflink Table object如何打印出其中内容方便调试

2020-10-15 文章 Xingbo Huang
Hi, 你想要输出table的结果,可以有两种方便的方式, 1. table.to_pandas() 2. 使用print connector,可以参考[1] 然后你如果对pyflink感兴趣,可以看看这个doc[2],可以帮助你快速上手 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html [2]

pyflink Table object如何打印出其中内容方便调试

2020-10-15 文章 whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table = source.select("msg").where(udf1(msg)=True) 这样单步调试print(table)出来的结果是 pyflink有没有将Table转化成可打印格式的方法

Re: kafka table connector保留多久的数据

2020-10-15 文章 Xiao Xu
flink 是不会保留数据的, 数据都是落盘在 kafka 里, flink 根据 offset 去读 kafka 里的数据, 可以设置 kafka 里留存的时间 marble.zh...@coinflex.com.INVALID 于2020年10月14日周三 下午4:37写道: > 你好, 用kafka table > > connector接过来的数据,在flink这边会保留多久,在参数列表里没有看到有这个设置,如果保留太久,内存会撑暴,比如我只想保留半个小时,之前的数据可以清除。 > > > > -- > Sent from:

????????????????????????????????

2020-10-15 文章 ????????????
Hi,All ??flink??1??12:00~12:59??flink ?? env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); .timeWindow(Time.hours(1))

Re: blob server相关,文件找不到

2020-10-15 文章 janick
1.9.3 也遇到同样问题,lz 解决了吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/

pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table =