on
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
(inode 2792145632): File does not exist. [Lease. Holder:
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2
on
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
(inode 2792145632): File does not exist. [Lease. Holder:
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2
Flink sql中如何插入null值,有人了解吗?目前,insert 语句values中直接写null在zeppelin上报错了。
|
Best,
Jimmy
|
Signature is customized by Netease Mail Master
on
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
(inode 2792145632): File does not exist. [Lease. Holder:
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2
Zeppelin 支持加载UDF jar的,可以参考下面的代码,不过架构上可能与你们的原有架构会有所差别
https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2#8iONE
https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L469
chenxyz 于2021年3月12日周五 上午9:42写道:
>
目前这种方法不可行,在公司的平台化系统里提交flink任务,自己能掌控的只有代码这块。
在 2021-03-11 16:39:24,"silence" 写道:
>启动时通过-C加到classpath里试试
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
默认的 shard assigner
public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER =
(shard, subtasks) -> shard.hashCode();
如何shard 的数量 大于 并发度 很容易造成分布不均。
想着用这种方法,在主类使用
static ConcurrentHashMap map = new ConcurrentHashMap<>();
static AtomicInteger counter = new AtomicInteger(0);
public static
建云,
之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s
启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制
在2021年03月11日 22:43,Kezhu Wang 写道:
有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。
确实是这样的,checkpoint 把
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。
确实是这样的,checkpoint 把 serializer 也 snapshot 了。
重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。
On March 11, 2021 at 20:26:15, 赵 建云
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。
这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?
感谢~
2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。
+
建议看下集群剩余的内存情况,看是不是 140 个 TAskManager 内存不够了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
退订
--
发自我的网易邮箱手机智能版
退订
StatementSet inserts = tableEnv.createStatementSet();
inserts.addInsertSql("insert into xxx select * from xxx") // topic1
-》topic2任务
inserts.addInsertSql("insert into xxx select * from xxx") // topic2
-》Postgre 任务
inserts.execute();
--
Sent from: http://apache-flink.147419.n8.nabble.com/
请user-zh 不要再发邮件了
--
发件人:silence
发送时间:2021年3月11日(星期四) 16:39
收件人:user-zh
主 题:Re: flink sql如何从远程加载jar包中的udf
启动时通过-C加到classpath里试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink 变量在 operator 之间传递是需要序列话的。如果 DataStream<> 泛型通过基类引用,到后面的 operator
上能保留子类的完整信息并强制转换吗?
比如:
DataStream stream = source.from(SubClass);
stream.keyBy( ) {
这里的代码能判断并强制转换吗。
SubClass subObj = (SubClass) baseObj;
}
谢谢,
王磊
启动时通过-C加到classpath里试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
1:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大
我10分钟一次checkpoint,每次都增大2M,两天增大到400M,但其实我的实际应该只有20M(只做一个窗口计算)(我做savepoint之后也才20M)。
已设置了 ttl。
2:当我关闭state.backend.incremental 后 。每次checkpoint也就20M左右,不会变大了。
按我的理解:state.backend.incremental 开启后,Checkpointed Data
18 matches
Mail list logo