回复: Re:flink10读取kafka报错

2020-05-13 文章 pengchenglin
昨晚解决了,暂时还没查清原因,解决方式把createTemporaryTable 方法换成之前过时的那个方法registerTableSource,别的不用动。



pengchengl...@163.com
 
发件人: PCL
发送时间: 2020-05-12 23:39
收件人: user-zh
主题: Re:回复:flink10读取kafka报错
 
 
 
感谢回复!
这个很神奇的是,执行sqlquery就没问题
/*Table tb1 =tableEnv.sqlQuery("select sum(amount),TUMBLE_END(proctime, 
INTERVAL '5' SECOND)" +
" from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND) ");
tb1.printSchema();*/
放开注释后,打印的schema是
root
|-- EXPR$0: DOUBLE
|-- EXPR$1: TIMESTAMP(3)
 
 
 
 
 
 
 
 
 
 
 
在 2020-05-12 22:36:17,"忝忝向��" <153488...@qq.com> 写道:
>TIMESTAMP(3)时间格式不对吧
>
>
>-- 原始邮件 --
>发件人: "PCL"发送时间: 2020年5月12日(星期二) 晚上9:43
>收件人: "user-zh"
>主题: flink10读取kafka报错
>
>
>
>各位大佬:
> 有没有遇到过这个问题,Window aggregate can only be defined over a time attribute 
>column, but TIMESTAMP(3) encountered.
>无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。
>版本如下:
>代码如下:
>//获取运行环境
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings = 
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>//EnvironmentSettings settings = 
>EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
>    //创建一个tableEnvironment
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
>
>Schema schema = new Schema()
>//.field("id", "VARCHAR").from("id")
>.field("id", "STRING")
>//.field("name", "VARCHAR")
>.field("amount", "DOUBLE")
>   
> .field("proctime", Types.SQL_TIMESTAMP).proctime()
>//.field("rowtime", Types.SQL_TIMESTAMP)
>   
> //.rowtime(
>   
> //    new Rowtime()
>   
> 
>//   
> .timestampsFromField(
>  
> 
>//
> "eventtime")
>   
> 
>//   
> .watermarksPeriodicBounded(2000))
>;
>
>//   "0.8", "0.9", "0.10", "0.11", and "universal"
>tableEnv.connect(new Kafka().version("universal")
>   
> .topic("source0511")
>   
> .property("zookeeper.connect", "172.16.44.28:7758")
>   
> .property("bootstrap.servers", "172.16.44.28:9096")
>   
> .property("group.id", "source0511-group")
>   
> .startFromEarliest()
>   
> )
>   
> .withFormat(new Csv())
>   
> .withSchema(schema)
>   
> .inAppendMode()
>   
> .createTemporaryTable("sourceTable");
>
>tableEnv.connect(
>new Kafka()
>   
> .version("universal")
>// "0.8", "0.9", "0.10", "0.11", and "universal"
>.topic("sink0511")
>   
> .property("acks", "all")
>   
> .property("retries", "0")
>   
> .property("batch.size", "16384")
>   
> .property("linger.ms", "10")
>   
> .property("zookeeper.connect", "172.16.44.28:7758")
>   
> .property("bootstrap.servers", "172.16.44.28:9096")
>   
> .sinkPartitionerFixed())
>   
> .inAppendMode()
>   
> .withFormat(new Json())
>   
> .withSchema(
>new Schema().field("totalamount", "DOUBLE")
>//.field("total", "INT")
>.field("time", Types.SQL_TIMESTAMP)
>   
> )
>   
> .createTemporaryTable("sinkTable");
>
>tableEnv.sqlUpdate("insert into sinkTable"
>+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) "
>+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)");
>//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT 
>user_name)
>   // FROM user_actions
>   // GROUP BY TUMBLE(user_action_time, 
>INTERVAL '10' MINUTE);
>env.execute("test");


Re: Re: flink内存分配的问题

2020-01-01 文章 pengchenglin
这个是jdk的参数,可以通过flink的env.java.opts配置
 
发件人: Xintong Song
发送时间: 2020-01-02 09:54
收件人: user-zh
主题: Re: flink内存分配的问题
我不确定你说的 -XX:NewSize=2442764288 是从哪里看到的,据我所知 flink 似乎没有设置这个参数。另外你用的 flink
版本是哪个版本?
 
Thank you~
 
Xintong Song
 
 
 
On Tue, Dec 31, 2019 at 8:52 PM cs <58683...@qq.com> wrote:
 
> 谢谢您的回答,我还有个问题想请教下
> 当tm的内存超过一定大小,-XX:NewSize的大小都是一样的
> 例如tm设置15G -XX:NewSize=2442764288
> tm设置20G 新生代还是 -XX:NewSize=2442764288
> 这是为什么呢?
>
>
>
>
> -- 原始邮件 --
> 发件人: "Xintong Song" 发送时间: 2019年12月31日(星期二) 晚上6:10
> 收件人: "user-zh"
> 主题: Re: flink内存分配的问题
>
>
>
> FLINK TM 中是用到了大量的堆外内存的,除了通常意义的 JVM 的栈空间、方法区等堆外开销外,还包括网络 buffer、batch
> 缓存、RocksDB等。
>
> 默认配置是相对保守,为了保证大多数情况下预留出足够的堆外内存。具体是否设置过大了,要看具体运行的作业的情况。可以尝试通过配置'containerized.heap-cutoff-ratio'进行调整。
>
> 另外,即将发布的flink 1.10版本中对TM的内存计算进行了优化,不再采用cutoff而是根据用途列出了更具体的配置项,欢迎试用
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Dec 31, 2019 at 5:53 PM cs <58683...@qq.com> wrote:
>
> > taskmanager的内存设置为15G但实际的heap只有10G
> > 看了tm内存分配的源码1.计算cutoff(15GB * 0.25) 2.计算heap大小(heap计算的入参是15GB -
> cutoff大小)
> > 3.计算offheap大小(offheap的大小等于15GB-heap大小)
> > offheap就是最终的-XX:MaxDirectMemorySize的大小
> > 想请教下MaxDirectMemorySize的大小有必要设置这么大吗?


Re: 关于 FLink historyserver没有completed-jobs的问题

2019-12-25 文章 pengchenglin

flink-conf.yaml里需要有这些配置
historyserver.web.port: 8082
historyserver.web.address: 0.0.0.0
historyserver.archive.fs.refresh-interval: 1
historyserver.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
jobmanager.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
#多少秒后,会将完成的任务提交到history
jobstore.expiration-time: 14400
jobmanager.archive.fs.dir和historyserver.archive.fs.dir一样即可
然后启动bin/historyserver.sh start
访问ip:8082,需要跑一个任务,并且等待jobstore.expiration-time这个时间,才会有数据
 
发件人: 起子
发送时间: 2019-12-25 15:57
收件人: user-zh
主题: 关于 FLink historyserver没有completed-jobs的问题
大神们:
我启动了flink的historyserver,但是里面并没有已完成的任务
配置如下:

结果界面如下:
hdfs如下:
麻烦大神们给与指导
 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705



jobmanager异常日志

2019-12-15 文章 pengchenglin
各位:
大家好,在standalone的jobmanager节点的日志中看到的,这个153的taskmanager挂掉了,不知道是不是下面的报错导致的,有知道下面的Error是啥意思不

2019-12-15 17:15:21.999 [flink-metrics-379] ERROR akka.remote.Remoting 
flink-metrics-akka.remote.default-remote-dispatcher-20 - Association to 
[akka.tcp://flink-metr...@xx.xx.xx.153:35929] with UID [1
617823256] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for too 
long. (more than 48.0 hours)
at 
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at 
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



回复: flink1.9.0 standalone模式高可用配置问题

2019-12-06 文章 pengchenglin
我刚给你测了一下,两个节点都启动jobmanager.sh ip port 后,kill掉其中一个,另一个的web界面提示正在进行选举,说明是高可用状态。
 
发件人: pengchenglin
发送时间: 2019-12-06 19:47
收件人: user-zh
主题: flink1.9.0 standalone模式高可用配置问题
各位:
1.9.0版本的flink,两台机器,flink-conf.yaml配置相同(high-availability.zookeeper.path.root和high-availability.cluster-id也相同),
先在机器1运行bin/jobmanager.sh start ip1 port
然后到机器2运行bin/jobmanager.sh start ip2 port
打开ip1:port,ip2:port同时存在,并不像1.7.2一样,跳转到ip1:port
 
请问一下,这是没有配置好,还是1.9.0版本的高可用就是如此。


flink1.9.0 standalone模式高可用配置问题

2019-12-06 文章 pengchenglin
各位:
1.9.0版本的flink,两台机器,flink-conf.yaml配置相同(high-availability.zookeeper.path.root和high-availability.cluster-id也相同),
先在机器1运行bin/jobmanager.sh start ip1 port
然后到机器2运行bin/jobmanager.sh start ip2 port
打开ip1:port,ip2:port同时存在,并不像1.7.2一样,跳转到ip1:port

请问一下,这是没有配置好,还是1.9.0版本的高可用就是如此。


回复: perjob yarn页面链接flink出错

2019-12-06 文章 pengchenglin
perjob模式,运行多个任务时,如果配置了high-availability.zookeeper.path.root: /flink
和high-availability.cluster-id: /v1.1/perjob-20191130 
由于每个任务一个集群,所以每个集群在zookeeper上的目录都一样,导致yarn关联的flink相同,推荐删除上面两个配置,对于perjob模式。
 
发件人: pengchenglin
发送时间: 2019-12-02 15:54
收件人: user-zh@flink.apache.org
主题: perjob yarn页面链接flink出错
各位大佬:

有遇到过这种问题吗,perjob模式,4台机器,同时运行了多个任务,在yarn的管理页面上,点tracking 
ui,跳转到flink页面,都是同一个任务的flink页面。

flink配置如下:
high-availability.zookeeper.client.max-retry-attempts: 10
historyserver.web.address: 0.0.0.0
state.checkpoints.num-retained: 3
historyserver.web.port: 8082
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/xx/oomdump 
-XX:ReservedCodeCacheSize=2048M
high-availability.cluster-id: /v1.1/perjob-20191130
jobmanager.execution.failover-strategy: region
#jobmanager.rpc.address: xx
state.savepoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_savepoint
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.session-timeout: 30
taskmanager.registration.timeout: 20 min
high-availability.storageDir: hdfs://xx:8020/flink/v1.1/mxm_ha/
task.cancellation.timeout: 0
taskmanager.network.numberOfBuffers: 10240
parallelism.default: 8
taskmanager.numberOfTaskSlots: 8
akka.ask.timeout: 600 s
historyserver.archive.fs.dir: hdfs://xx:8020/flink/v1.1/completed-jobs-mxm1130/
jobmanager.heap.size: 2048m
jobmanager.archive.fs.dir: hdfs://xx:8020/flink/v1.1/mxm1130completed-jobs/
heartbeat.timeout: 30
restart-strategy.fixed-delay.attempts: 360
high-availability.zookeeper.client.connection-timeout: 6
historyserver.archive.fs.refresh-interval: 1
jobmanager.rpc.port: 6123
jobstore.expiration-time: 14400
#rest.port: 8983
high-availability.zookeeper.quorum: xx:,xx:,xx:
restart-strategy.fixed-delay.delay: 1 s
high-availability: zookeeper
state.backend: filesystem
restart-strategy: fixed-delay
taskmanager.heap.size: 8192m
akka.client.timeout: 600 s
state.checkpoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_checkpoint


perjob yarn页面链接flink出错

2019-12-01 文章 pengchenglin
各位大佬:

有遇到过这种问题吗,perjob模式,4台机器,同时运行了多个任务,在yarn的管理页面上,点tracking 
ui,跳转到flink页面,都是同一个任务的flink页面。

flink配置如下:
high-availability.zookeeper.client.max-retry-attempts: 10
historyserver.web.address: 0.0.0.0
state.checkpoints.num-retained: 3
historyserver.web.port: 8082
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/xx/oomdump 
-XX:ReservedCodeCacheSize=2048M
high-availability.cluster-id: /v1.1/perjob-20191130
jobmanager.execution.failover-strategy: region
#jobmanager.rpc.address: xx
state.savepoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_savepoint
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.session-timeout: 30
taskmanager.registration.timeout: 20 min
high-availability.storageDir: hdfs://xx:8020/flink/v1.1/mxm_ha/
task.cancellation.timeout: 0
taskmanager.network.numberOfBuffers: 10240
parallelism.default: 8
taskmanager.numberOfTaskSlots: 8
akka.ask.timeout: 600 s
historyserver.archive.fs.dir: hdfs://xx:8020/flink/v1.1/completed-jobs-mxm1130/
jobmanager.heap.size: 2048m
jobmanager.archive.fs.dir: hdfs://xx:8020/flink/v1.1/mxm1130completed-jobs/
heartbeat.timeout: 30
restart-strategy.fixed-delay.attempts: 360
high-availability.zookeeper.client.connection-timeout: 6
historyserver.archive.fs.refresh-interval: 1
jobmanager.rpc.port: 6123
jobstore.expiration-time: 14400
#rest.port: 8983
high-availability.zookeeper.quorum: xx:,xx:,xx:
restart-strategy.fixed-delay.delay: 1 s
high-availability: zookeeper
state.backend: filesystem
restart-strategy: fixed-delay
taskmanager.heap.size: 8192m
akka.client.timeout: 600 s
state.checkpoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_checkpoint


Re: flink 缓存本地文件被删除疑问

2019-10-10 文章 pengchenglin
你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》
 
发件人: 戴嘉诚
发送时间: 2019-10-11 11:00
收件人: user-zh@flink.apache.org
主题: flink 缓存本地文件被删除疑问
大家好:
最近我的程序迁移到了flink1.9 on yarn 
session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 more
Caused by: java.nio.file.NoSuchFileException: 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
 -> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at 
sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
at java.nio.file.Files.createLink(Files.java:1086)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 12 more
 
 
我这里的session中是有多个job在session上运行,但是这里job都是写入hbase中的。
这里看上去是本地机器上的目录文件被删除了?但是这个文件我是没有删除过的…