flink standalone 模式运行任务的问题

2021-03-10 Thread Lei Wang
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size'

按照日志中的建议,我把 metaspace size 从 256M 调整到了 512M,但还是出现了这个错误。
经过我的观察,在新提交任务的时候比较容易出现这个问题,任务一直正常跑很少出现这个错误。
我 flink 是 standAlone 模式部署的。就两个机器, 每台机器 一个 taskManager,总共运行了 9 个 job,所有 job
都打在了一个 jar 中,jar 大小为 42M.

我自己的猜想是程序正常运行时 metaspace 基本已经满了,再新提交一个任务导致又重新初始化 jar 中所有类的 符号
引用空间不够导致了这个错误。不知道这个想法对不对。

但我还有一个疑问,standalone 模式不同  job 实际上跑在了相同的 TaskMgr 进程上,只有一个 JVM,怎么实现代码隔离呢?
比如下面的例子:

job1 和  job2 打在了同一个 jar 中,都用到了代码中的一个 static 变量。
kill 掉 job1, 更改了 这个 static 变种的值,再提交 job1,那更改后的static 变量值 对 job2 会生效吗?

谢谢,
王磊


Re: 回复: Re:回复: flink-1.11.2 执行checkpoint失败

2021-03-10 Thread smallwong
任务会一直重启吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 Thread HunterXHunter
1.12.1



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 Thread HunterXHunter
但是看情况好像是只有在:DataStream发生Keyby或者 setParallelism的时候才会发生



--
Sent from: http://apache-flink.147419.n8.nabble.com/


如何定义时态表

2021-03-10 Thread superainbower


Hi,请教下大家,关于Temporal Tables,官方文档中的定义方法是
-- 
定义一张版本表CREATETABLEproduct_changelog(product_idSTRING,product_nameSTRING,product_priceDECIMAL(10,4),update_timeTIMESTAMP(3)METADATAFROM'value.source.timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCED,--
 (1) 定义主键约束WATERMARKFORupdate_timeASupdate_time-- (2) 通过 watermark 定义事件时间   
   
)WITH('connector'='kafka','topic'='products','scan.startup.mode'='earliest-offset','properties.bootstrap.servers'='localhost:9092','value.format'='debezium-json');
这里是debezium-json,是否可以不用debzium做cdc,利用canal呢?
我尝试了替换'value.format'='canal-json’ 会提示value.source.timestamp 
这个在metadata中没有,只有timestamp
-- 
定义一张版本表CREATETABLEproduct_changelog(product_idSTRING,product_nameSTRING,product_priceDECIMAL(10,4),update_timeTIMESTAMP(3)METADATAFROM'timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCED,--
 (1) 定义主键约束WATERMARKFORupdate_timeASupdate_time-- (2) 通过 watermark 定义事件时间   
   
)WITH('connector'='kafka','topic'='products','scan.startup.mode'='earliest-offset','properties.bootstrap.servers'='localhost:9092','value.format'='canal-json');
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制



Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 Thread HunterXHunter
试了 1.12.2,还是一样问题。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如何定义时态表

2021-03-10 Thread HunterXHunter
把格式调整下,很乱看不明白



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-03-10 Thread Lyon
退订!

????

2021-03-10 Thread 512348363


如何使用flink对时间跨度为1年的历史数据流进行处理,快速得到结果

2021-03-10 Thread Hongyuan Ma
大佬们,如果我有时间跨度为1年的历史数据,我想知道这些历史数据经flink处理后的输出是什么,我应该这么做?
我的业务代码中有用到state和window.
我尝试过根据历史数据,减去历史数据的起始时间戳,再加上当前时间戳(比如历史数据的时间戳timestamp属于[100秒~150秒],当前系统时间是第200秒, 
那就timestamp-100+200=>[200秒,250秒])。再使用定时器定时发送数据到kafka,模拟生成数据流,最后把flink输出结果的时间戳再减回去。
但如果历史数据时间跨度很长的话(比如一年),我这样就要等特别久,有没有办法对时间跨度为1年的历史数据,快速进行处理,得到结果?


向大佬们问好,
马宏元

flink sql sink多数据源问题

2021-03-10 Thread casel.chen
请教一下flink sql多条数据sink用 statement set 语句时,
1. 如果其中一条sink条发生背压或故障,会影响其他sink流吗?
2. 在flink sql cdc 消费同一张mysql表sink进多种数据源场景下,例如 mysql -> fink cdc -> mongodb & 
polardb 建议是启多个作业分别etl,还是分两段 mysql -> flink cdc -> kafka -> flink -> mongodb & 
polardb ... 呢?关系数据库端接入同时多个cdc会不会影响性能?

flink cdc遇到数据源大事务怎么处理?

2021-03-10 Thread casel.chen
flink cdc对接上游的mysql或pg业务库时遇到业务库大批量修数或schema变更是怎么处理的?
会不会瞬间产生很多changelog records打爆flink应用?如果会的话应该要如何避免呢?谢谢!

提交两个SQL任务,其中一个不生效。

2021-03-10 Thread eriendeng
大家好,我通过Yarn Per
Job模式提交了两个任务,后一个提交的任务会变成前一个一样的任务,而且好像不产生实际作用(?),看起来像是在给前一个做HA,不知道是不是什么配置没搞好呢?
两个任务分别是 Kafka Topic1 -> Kafka Topic2 和 Kafka Topic2 ->
Postgre,两个任务中的Topic2用的是同一个。
之前也尝试过,Topic1同时写入Topic2和Postgre,好像也同样不会生效,这是为什么呢?
谢谢大家。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

提交两个SQL任务,其中一个不生效。

2021-03-10 Thread eriendeng
提交两个SQL,后面的SQL不生效,这是为什么呢?后面的看起来会变成前一个的HA。
Job1:Kafka Topic1 -> Kafka Topic2
Job2:Kafka Topic2 -> Postgre

不是很明白这个原因,是不是有哪里没配置好呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 提交两个SQL任务,其中一个不生效。

2021-03-10 Thread silence
多个insert的话要用statementset去提交



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: 回复:MapState 无法更新问题

2021-03-10 Thread chaos
感谢回复,问题已解决。

解决方式:

参照官网的一个例子将状态的获取放在 processElement 内部。

  private val eFenceMapStateDesc = new MapStateDescriptor[String,
Boolean]("carEfenceState", classOf[String], classOf[Boolean])
  private val DbIdMapStateDesc = new MapStateDescriptor[String,
Long]("eFenceCarDbIdState", classOf[String], classOf[Long])


override def processElement(...){
val eFenceMapState = getRuntimeContext.getMapState(eFenceMapStateDesc)
val dbIdMapState = getRuntimeContext.getMapState(DbIdMapStateDesc)


}




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink savepoint迁移问题

2021-03-10 Thread 赵 建云
社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io.EOFException: No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
at 
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
at 
com.esotericsoftware.kryo.io.Input.readInt(Input.java:350)
at 
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11


flink sql如何从远程加载jar包中的udf

2021-03-10 Thread chenxyz
我们将开发的udf放在远程服务器,需要动态地加载jar包。Flink版本1.10,代码如下
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment exeEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSet = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment stEnv = StreamTableEnvironment.create(exeEnv, 
envSet);
String udfPath = "http://www.xxx.com/udf.jar";;
loadClassPath(udfPath);
stEnv.sqlUpdate("create function test as 'com.xx.TestUdf' ");
stEnv.sqlUpdate("create table ");
exeEnv.execute("remote_udf_sql_job");
}


/** 通过类加载器加载class,此处省略代码 **/
public static void loadClassPath(String jar) {
}


目前生成任务执行拓扑图是没有问题的,提交任务后运行会报找不到com.xx.TestUdf类。猜测是JobManager生成执行拓扑图的时候本地通过loadClassPath方法成功加载了class,但是到了TM执行的是算子中的逻辑,不会执行main方法里面的加载类的步骤,所以出现ClassNotFoundException。
目前试了几种方法:
1. 在jobGraph中增加依赖的jar包,但是exeEnv.execute(streamGraph)会重新生成jobGraph,导致增加的jars被清空。
StreamGraph streamGraph = exeEnv.getStreamGraph("remote_udf_sql_job", false);
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.adddJars(jar);
exeEnv.execute(streamGraph);
2. 按照网上的一个方法https://blog.csdn.net/weixin_28893597/article/details/112467465  
,增加配置pipeline.classpaths和pipeline.jars均无效。


flink报错信息:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: com.xx.TestUdf
ClassLoader info: URL ClassLoader:
file: '/flink-1.10.2/lib/slf4j-api-1.7.30.jar' (valid JAR)
file: '/flink-1.10.2/lib/logback-core-1.2.3.jar' (valid JAR)
file: '/flink-1.10.2/lib/logback-classic-1.2.3.jar' (valid JAR)
file: '/flink-1.10.2/lib/log4j-over-slf4j-1.7.30.jar' (valid JAR)
file: '/flink-1.10.2/lib/jcl-over-slf4j-1.7.30.jar' (valid JAR)
file: 
'/tmp/io_tmp_dirs/blobStore-b767d164-c9df-4dd6-a04f-c53982967372/job_35484161537386467514242760035484/blob_p-635c9dc04420693880fbf0f9505979d4e1a9c976-e1d64094d470046aec598c075fde0c30'
 (valid JAR)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:419)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:419)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:442)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:789)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:594)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.xx.TestUdf
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:85)
at 
org.apache.flink.util.ParentFirstClassLoader.loadClassWithoutExceptionHandling(ParentFirstClassLoader.java:64)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:71)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStre

退订

2021-03-10 Thread 512348363
退订

Re:提交两个SQL任务,其中一个不生效。

2021-03-10 Thread Michael Ran
StatementSet 用用
在 2021-03-11 09:37:27,"eriendeng"  写道:
>提交两个SQL,后面的SQL不生效,这是为什么呢?后面的看起来会变成前一个的HA。
>Job1:Kafka Topic1 -> Kafka Topic2
>Job2:Kafka Topic2 -> Postgre
>
>不是很明白这个原因,是不是有哪里没配置好呢?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink savepoint迁移问题

2021-03-10 Thread Kezhu Wang
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)

at 
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)

at 
com.esotericsoftware.kryo.io.Input.readInt(Input.java:350)

at
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)

at
org.apache.flink.runtime.state.OperatorStateR

????

2021-03-10 Thread ??


退订

2021-03-10 Thread 岳坤
退订 ​

Re: Flink savepoint迁移问题

2021-03-10 Thread 赵 建云
现在是我在维护pulsar-flink 
connector,是存在不兼容的升级。还是个很坑的改动。我现在尝试旧的迁移新的字段上方法,会报这个错误。我对1.11支持的代码进行修改,将state的数据结构改成旧版本的形式,同样也是这个错误。你说的StatefulSinkWriterOperator我研究下怎么使用。

2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:

StatefulSinkWriterOperator



Re: flink sql如何从远程加载jar包中的udf

2021-03-10 Thread HunterXHunter
通过 createTemporarySystemFunction 试试看呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


将每个tm的slot数从2降低到1,任务反而无法启动

2021-03-10 Thread lzwang
您好:


任务的拓扑图如下,parallelism的设置是140,但是中间有个操作的并行度设置成了50。

集群剩余的slot总数是195。



如果将每个tm的slot数设置为2,任务能够正常启动,并且分配了70个tm和140个slot,符合预期。



可如果将每个tm的slot数设置为1,便只分配了115个slot。任务会卡在creating状态,并且几分钟后,会抛出异常,“Could not 
allocate all requires slots within timeout of 30 ms. Slots required: 470, 
slots allocated: 388”



这里面有几个问题:
1. 将slot数设置为1后,异常中提示“Slots required: 470”,这个470似乎完全没有考虑slot 
share(我们并没有手动设置SlotSharingGroup)。这是为啥?
2. 将slot数设置为1后,异常中提示“slots allocated: 388”,而整个集群剩余的slot其实只有195个,这个388怎么来的?
3. 最大的并行度应是140,为何只分配了115个slot呢?


我们使用的flink版本是1.6.2。

期待你们的回复~

---
科大讯飞股份有限公司
AI营销平台  汪李之
Tel:15209882175
QQ/WeChat:992424538
Add:安徽省合肥市望江西路666号



回复:将每个tm的slot数从2降低到1,任务反而无法启动

2021-03-10 Thread xuhaiLong
hi


flink 1 slot != 1 core
可以看下 yarn.containers.vcores 这个参数设置为多少。
如果该值为1,tm slot为2,那么每启动一个tm容器就会占用1core,但是每个tm 会有两个slot,反之,如果该值为1,每个tm slot 
也为1,就会需要max parallelism core 数量。




在2021年3月11日 14:34,lzwang 写道:
您好:




任务的拓扑图如下,parallelism的设置是140,但是中间有个操作的并行度设置成了50。
集群剩余的slot总数是195。




如果将每个tm的slot数设置为2,任务能够正常启动,并且分配了70个tm和140个slot,符合预期。




可如果将每个tm的slot数设置为1,便只分配了115个slot。任务会卡在creating状态,并且几分钟后,会抛出异常,“Could not 
allocate all requires slots within timeout of 30 ms. Slots required: 470, 
slots allocated: 388”




这里面有几个问题:
1. 将slot数设置为1后,异常中提示“Slots required: 470”,这个470似乎完全没有考虑slot 
share(我们并没有手动设置SlotSharingGroup)。这是为啥?
2. 将slot数设置为1后,异常中提示“slots allocated: 388”,而整个集群剩余的slot其实只有195个,这个388怎么来的?
3. 最大的并行度应是140,为何只分配了115个slot呢?




我们使用的flink版本是1.6.2。


期待你们的回复~


---
科大讯飞股份有限公司
AI营销平台  汪李之
Tel:15209882175
QQ/WeChat:992424538
Add:安徽省合肥市望江西路666号



Re: flink sql如何从远程加载jar包中的udf

2021-03-10 Thread chenxyz
1.10应该是registerFunction吧,当前jar包中没有这个类(这个类在远程jar包中),这种方法没办法实例化TableFunction。

> 2021年3月11日 上午11:21,HunterXHunter <1356469...@qq.com> 写道:
> 
> 通过 createTemporarySystemFunction 试试看呢
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/