您好,请问每次任务重启后,jm节点是不一样的,你是如何获取到{cluster}的信息的呢?
-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi
自定义的 KeySelector[1] 能否满足呢?
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#keyed-datastream
Best,
Congxian
Peihui He 于2020年11月2日周一 下午2:56写道:
> Hi,
>
> 不好意思,我这边误导。
> 现在的情况是这样的
>
> 用这个方法测试
> KeyGroupRangeAssignment.assignKeyToParallelOperator(a
Hi,
不好意思,我这边误导。
现在的情况是这样的
用这个方法测试
KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id,
KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism ),
parallelism)
发现不是很均匀( job暂时并发度是103,这样的话默认maxParallelism 就256 了
),下游key的数量能够相差100多。后来通过设置 maxParallelism和parallelism相等比按默认的
KeyGroup
?? ??
where??where??pyflink??api??where??
?? 2020-11-02 10:15
user-zh
?? ?? pyflink??where??
??
??
那我们没有这样的计算平台该怎么办呢?
-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
是这样的,因为重试的时候
flink-jdbc-connector会把错误的堆栈打印出来的,然后再重连的,对任务没有影响,你的任务失败了吗?我的任务其实也有这个错误,但是任务没有失败,重新连接上mysql了。
你仔细 看下你的日志里有没有下面的日志:
JDBC executeBatch error, retry times = 1
发件人: 酷酷的浑蛋
发送时间: 2020年11月2日 3:33
收件人: user-zh@flink.apache.org
主题: 回复: flink1.11连接mysql问题
标题上写的
Hi
不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby 会更均匀的话,是不是直接把计算
md5 的逻辑改成计算 hashcode 的逻辑就行了
Best,
Congxian
Peihui He 于2020年11月2日周一 上午10:01写道:
> hi,
>
> 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
>
> Best Wishes.
>
> Zhang Yuxiao 于2020年10月31日周六 上午9:38写道:
>
> >
Hi
如果是空间满,可以看看都是什么文件(什么目录,文件名是否有某些格式等),这些文件的存在是否合理,如果你觉得不合理可以发到社区邮件列表讨论,或者创建
issue 进行跟进。如果是合理的那就只能想办法删除一些不需要的文件了
Best,
Congxian
赵一旦 于2020年10月30日周五 下午5:51写道:
> 磁盘满看是什么东西导致满,然后清理就是了。比如是flink日志满?那就清理flink日志。
>
> void <2030531...@qq.com> 于2020年10月29日周四 下午6:56写道:
>
> > hi all
> > flink跑批任务使用d
x27;,
> > 'transient_lastDdlTime'='160466')
> > Time taken: 0.252 seconds, Fetched: 25 row(s)
> >
> > 另外,下载了hive文件内容如下
> > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31
> 11:25:38<0x01>INSERT
> >
> &
平台层的意思就是说:假设你们有实时计算平台,那么可以考虑把这个想法做在你们的平台里面。
bradyMk 于2020年11月2日周一 上午11:40写道:
> zhisheng大佬好~我不是很理解您说的平台层具体是什么意思,指的是什么。。。
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
说一下我们平台的实现方式
1、自定义metricReporter,假如任务开启了checkpoint,reporter会自动的将最新完成的checkpoint路径进行上报
可参考https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#checkpointing
2、平台会有是否重试和是否基于checkpoint进行恢复的选项
3、假如上述两选项都开启了之后,可以对运行失败的任务基于最新的checkpoint进行拉起
--
Sent from: http://apac
我在使用API的时候导入的依赖如下,
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
com.alibaba.ververica
flink-connector-mysql-cdc
1.1.0
,
其中flink 版本是1.11.1,2.11,
*当我刚运行的时候是能读到数据的*,但是我一旦对mysql中的表执行了变更操作,程序就会抛出Enc
建议你发下完整的错误信息,然后不要发图片(看不到图片),这样让更多人看到后,问题就解决快了。😁
发件人: yangxusun9
发送时间: 2020年11月2日 3:44
收件人: user-zh@flink.apache.org
主题: Re: flink-cdc-mysql 使用时遇到的问题
应该不是的,我用的是root账户
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢,使用最新的 release 1.11 之后没有再出现这样的问题。
祝好!
--
Sent from: http://apache-flink.147419.n8.nabble.com/
应该不是的,我用的是root账户
--
Sent from: http://apache-flink.147419.n8.nabble.com/
zhisheng大佬好~我不是很理解您说的平台层具体是什么意思,指的是什么。。。
-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
标题上写的就是flink1.11啊
在2020年11月2日 11:33,酷酷的浑蛋 写道:
你看历史的回复,用的就是flink1.11,最新的flink-1.11.2也试过了还是有这个问题,而且我是在flink sql中使用
在2020年11月2日 11:30,史 正超 写道:
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
1 语句保活连接。
你看历史的回复,用的就是flink1.11,最新的flink-1.11.2也试过了还是有这个问题,而且我是在flink sql中使用
在2020年11月2日 11:30,史 正超 写道:
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
1 语句保活连接。
发件人: 酷酷的浑蛋
发送时间: 2020年11月2日 2:28
收件人: user-z
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
1 语句保活连接。
发件人: 酷酷的浑蛋
发送时间: 2020年11月2日 2:28
收件人: user-zh@flink.apache.org
主题: Re:回复:flink1.11连接mysql问题
没有解决,隔一段时间就会报这个超时错误
在 2020-10-1
partition-commit.delay'='1 min',
> > > 'sink.partition-commit.policy.kind'='metastore,success-file',
> > > 'sink.partition-commit.trigger'='partition-time',
> > > 'transient_lastDdlTime'='1
zhisheng大佬好~我不太理解你说的平台层是什么。。。
-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
t.policy.kind'='metastore,success-file',
> > 'sink.partition-commit.trigger'='partition-time',
> > 'transient_lastDdlTime'='160466')
> > Time taken: 0.252 seconds, Fetched: 25 row(s)
> >
> > 另外,下载了hiv
没有解决,隔一段时间就会报这个超时错误
在 2020-10-14 17:33:30,"superainbower" 写道:
>HI
>链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
>| |
>superainbower
>|
>|
>superainbo...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2020年08月31日 15:57,酷酷的浑蛋 写道:
>关键是在sql中怎么设置,connector=jdbc
>
>
>
>
>在2020年08月31日 15:06,13580506953<13580506...@163.co
hi,
可以自己根据社区的代码进行重编译,改成自己公司的依赖名,推送自公司的 nexus。
Best
zhisheng
Yangze Guo 于2020年10月29日周四 下午4:00写道:
> 1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-18361
>
> Best,
> Yangze Guo
>
> On Thu, Oct 29, 2020 at 3:37 PM 赵帅 wrote:
> >
hi,
应该是可以继承 FlinkKafkaPartitioner 接口,自己重写 partition 方法实现 hash(key) 的功能
eg:
public class MyCustomPartitioner extends FlinkKafkaPartitioner> {
@Override
public int partition(Map map, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
String key = map.get(xxx).toStri
??
??where
??
---
Py4JJavaError
Traceback (most recent call last)
??
??where
??
---
Py4JJavaError
Traceback (most recent call last)
hi,
已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
Best Wishes.
Zhang Yuxiao 于2020年10月31日周六 上午9:38写道:
> 你好,
>
> 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
>
> 发件人: Peihui He
> 发送时间: 2020年10月30日 下午 07:23
> 收件人: user-zh@flink.apache.org
Hi,
你说的不行,指的是运行报错了(如果报错了,可以贴下错误的日志),还是出来的结果不符合预期(是不生效,还是啥的)。
Best,
Xingbo
洗你的头 <1264386...@qq.com> 于2020年11月1日周日 上午10:16写道:
> 尊敬的开发者您好:我想要在输出表中进行条件筛选,使用了where语句,结果不行
> 我的代码如下:
> # 处理流程
> t_env.from_path('mySource') \
> .select("pickup_datetime, dropoff_datetime,
> pickup_longitude, pickup_l
hi,
提供一个方案,平台层可以做到作业自动拉起,那么需要平台层有这些信息。
1、作业启动的时候保存一下作业的 jobid 信息
2、平台轮训检测作业的状态,如果作业挂了,直接从配置的 checkpoint 基础路径 + jobid 目录下去找最新的目录(里面含
_metadata)
eg: hdfs:/flink/checkpoints/9b4cddb385b0c5db96b0774769867673/chk-15109
然后平台层将作业从 checkpoint 拉起,当然这个是否自动拉起,拉起的次数可以让用户去选择。
Best!
zhisheng
bradyMk 于202
x27;)
> Time taken: 0.252 seconds, Fetched: 25 row(s)
>
> 另外,下载了hive文件内容如下
> 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT
>
> 还是查询不到结果
> hive> select * from team;
> OK
> Time taken: 0.326 seconds
>
> 陈帅 于2020
可能是某一个 C算子的某一个 subtask 的inPoolUsage 打满了。建议查看 subtask级别的
Metric。从原理上分析,如果B反压严重,至少有某一个 C算子的一个 subtask 的inPoolUsage 打满了。希望对你有所帮助👏
祝好
fanrui
---原始邮件---
发件人: "赵一旦"
没有人有这方面经验分享下嘛?
赵一旦 于2020年10月30日周五 下午5:59写道:
>
> 我看了下,inputQueueLength和outputQueueLength是有值的。xxxPoolUsage都是0(包括exclusive,floating)。numBuffersInLocal这种也有值。还有的指标本身有值,带个PerSecond没值,这都不合理呀,比如指标A的值在几千,但指标AperSecond却是0。
> 明显存在很多指标固定为0,但是现在混淆在一起,不清楚是指标统计了,值为0,还是就没统计。
>
>
> 赵一旦 于2020年10月30日周五 下午5:52写道:
我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
shell查不到数据。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformati
lt;0x01>2020-10-31 11:25:38<0x01>INSERT
>
> 还是查询不到结果
> hive> select * from team;
> OK
> Time taken: 0.326 seconds
>
> 陈帅 于2020年11月1日周日 下午5:10写道:
>
>>
>> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
>> 生
-31 11:25:38<0x01>INSERT
还是查询不到结果
hive> select * from team;
OK
Time taken: 0.326 seconds
陈帅 于2020年11月1日周日 下午5:10写道:
>
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr
之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
陈帅 于2020年11月1日周日 下午4:43写道:
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream
37 matches
Mail list logo