使用FileInputFormat
递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。
代码:
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new
Path("hdfs://arc/success_fid_flow"));
fileInputFormat.se
你传的第二个参数是string,这样试一下?
select("drop_fields(message, array('x'))")
不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception)
> 在 2020年6月1日,下午1:59,jack 写道:
>
>
>
>
>
>
>
> 是的,对应参数没有填写正确,感谢;
> 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-01 11:01:34,"Dian Fu
??jenkinsflink
??jenkensflink
是的,对应参数没有填写正确,感谢;
另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。
在 2020-06-01 11:01:34,"Dian Fu" 写道:
>The input types should be as following:
>
>input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>
>Regards,
>Dian
>
>> 在 2020年6月1日,上午10:49,刘亚坤 写道:
>>
>> 目前在
org.apache.flink.configuration.IllegalConfigurationException: The Flink config
file
'/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08/flink-conf.yaml'
(/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_15
这些问题一两句话也说不清楚,建议看看 Flink 官网的文档和博客。
在 2020-06-01 11:08:27,"xyq" 写道:
>hello 您好,
>打扰了请教几个问题,:
>
>1.flink窗口的延时数据怎么处理 ,假如我的数据写入kafka或clickhouse,侧输出流可以做到吗?
>
>2.flink怎么做到端到端恰好一次,是不是sink的组件本身得支持恰好一次,clickhouse支持恰好一次吗?
>
>3.flink突然发现之前跑的数据有异常,怎么从之前恢复数据?
>
>4.flink不借助外部组件怎么算日活跃人数(假
hello 您好,
打扰了请教几个问题,:
1.flink窗口的延时数据怎么处理 ,假如我的数据写入kafka或clickhouse,侧输出流可以做到吗?
2.flink怎么做到端到端恰好一次,是不是sink的组件本身得支持恰好一次,clickhouse支持恰好一次吗?
3.flink突然发现之前跑的数据有异常,怎么从之前恢复数据?
4.flink不借助外部组件怎么算日活跃人数(假设数据量还很大)?
5.kafka分区是6个,之前并行度一直给3,最近我数据变大好多,我调整到6,报内存溢出,我把taskmanager的内存也调大2倍,还是不行,居然报了我代码中有些问题,
The input types should be as following:
input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
Regards,
Dian
> 在 2020年6月1日,上午10:49,刘亚坤 写道:
>
> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
>
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def drop_f
目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def drop_fields(message, *fields):
import json
message = json.loads(message)
for field in fields:
message.pop(field)
return json.dumps(messa
我怀疑是JM没有把flink-conf.yaml注册为local resource,最根本的原因应该还是你的提交方式
有问题,导致有些文件没有ship过去。如果你可以把JM的log以及launch_container.sh脚本发出来,
应该可以看出来原因
Best,
Yang
Leonard Xu 于2020年6月1日周一 上午9:39写道:
> 邮件里的图片经常看不到,可以用图床工具,放链接。
>
> > 在 2020年6月1日,09:27,wangweigu...@stevegame.cn 写道:
> >
> >
> > 这个邮件好像图片都看不到啊,你们能看到不?
> >
>
Hi
1.??30s??1s
2.Congxian??
3.MaxOutOfOrderness =
100ms??Flink?
邮件里的图片经常看不到,可以用图床工具,放链接。
> 在 2020年6月1日,09:27,wangweigu...@stevegame.cn 写道:
>
>
> 这个邮件好像图片都看不到啊,你们能看到不?
>
>
>
> 发件人: 程龙
> 发送时间: 2020-05-30 19:20
> 收件人: user-zh
> 主题: Re:re:提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下
>
>
> 是用代码提交的jobmanager 是可以加载的 就是启动taskmanager 这个目录就没有创建, 界面
这个邮件好像图片都看不到啊,你们能看到不?
发件人: 程龙
发送时间: 2020-05-30 19:20
收件人: user-zh
主题: Re:re:提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下
是用代码提交的jobmanager 是可以加载的 就是启动taskmanager 这个目录就没有创建, 界面如下 ,错误日志就是我下面贴出来的那个
在 2020-05-30 19:16:57,"462329521" <462329...@qq.com> 写道:
>你的提交命令是什么呢 看样子是加载不到配
14 matches
Mail list logo