回复: Re:flink10读取kafka报错
昨晚解决了,暂时还没查清原因,解决方式把createTemporaryTable 方法换成之前过时的那个方法registerTableSource,别的不用动。 pengchengl...@163.com 发件人: PCL 发送时间: 2020-05-12 23:39 收件人: user-zh 主题: Re:回复:flink10读取kafka报错 感谢回复! 这个很神奇的是,执行sqlquery就没问题 /*Table tb1 =tableEnv.sqlQuery("select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND)" + " from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND) "); tb1.printSchema();*/ 放开注释后,打印的schema是 root |-- EXPR$0: DOUBLE |-- EXPR$1: TIMESTAMP(3) 在 2020-05-12 22:36:17,"忝忝向��" <153488...@qq.com> 写道: >TIMESTAMP(3)时间格式不对吧 > > >-- 原始邮件 -- >发件人: "PCL"发送时间: 2020年5月12日(星期二) 晚上9:43 >收件人: "user-zh" >主题: flink10读取kafka报错 > > > >各位大佬: > 有没有遇到过这个问题,Window aggregate can only be defined over a time attribute >column, but TIMESTAMP(3) encountered. >无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。 >版本如下: >代码如下: >//获取运行环境 >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >EnvironmentSettings settings = >EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >//EnvironmentSettings settings = >EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); > //创建一个tableEnvironment >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); > >Schema schema = new Schema() >//.field("id", "VARCHAR").from("id") >.field("id", "STRING") >//.field("name", "VARCHAR") >.field("amount", "DOUBLE") > > .field("proctime", Types.SQL_TIMESTAMP).proctime() >//.field("rowtime", Types.SQL_TIMESTAMP) > > //.rowtime( > > // new Rowtime() > > >// > .timestampsFromField( > > >// > "eventtime") > > >// > .watermarksPeriodicBounded(2000)) >; > >// "0.8", "0.9", "0.10", "0.11", and "universal" >tableEnv.connect(new Kafka().version("universal") > > .topic("source0511") > > .property("zookeeper.connect", "172.16.44.28:7758") > > .property("bootstrap.servers", "172.16.44.28:9096") > > .property("group.id", "source0511-group") > > .startFromEarliest() > > ) > > .withFormat(new Csv()) > > .withSchema(schema) > > .inAppendMode() > > .createTemporaryTable("sourceTable"); > >tableEnv.connect( >new Kafka() > > .version("universal") >// "0.8", "0.9", "0.10", "0.11", and "universal" >.topic("sink0511") > > .property("acks", "all") > > .property("retries", "0") > > .property("batch.size", "16384") > > .property("linger.ms", "10") > > .property("zookeeper.connect", "172.16.44.28:7758") > > .property("bootstrap.servers", "172.16.44.28:9096") > > .sinkPartitionerFixed()) > > .inAppendMode() > > .withFormat(new Json()) > > .withSchema( >new Schema().field("totalamount", "DOUBLE") >//.field("total", "INT") >.field("time", Types.SQL_TIMESTAMP) > > ) > > .createTemporaryTable("sinkTable"); > >tableEnv.sqlUpdate("insert into sinkTable" >+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) " >+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)"); >//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT >user_name) > // FROM user_actions > // GROUP BY TUMBLE(user_action_time, >INTERVAL '10' MINUTE); >env.execute("test");
Re: Re: flink内存分配的问题
这个是jdk的参数,可以通过flink的env.java.opts配置 发件人: Xintong Song 发送时间: 2020-01-02 09:54 收件人: user-zh 主题: Re: flink内存分配的问题 我不确定你说的 -XX:NewSize=2442764288 是从哪里看到的,据我所知 flink 似乎没有设置这个参数。另外你用的 flink 版本是哪个版本? Thank you~ Xintong Song On Tue, Dec 31, 2019 at 8:52 PM cs <58683...@qq.com> wrote: > 谢谢您的回答,我还有个问题想请教下 > 当tm的内存超过一定大小,-XX:NewSize的大小都是一样的 > 例如tm设置15G -XX:NewSize=2442764288 > tm设置20G 新生代还是 -XX:NewSize=2442764288 > 这是为什么呢? > > > > > -- 原始邮件 -- > 发件人: "Xintong Song" 发送时间: 2019年12月31日(星期二) 晚上6:10 > 收件人: "user-zh" > 主题: Re: flink内存分配的问题 > > > > FLINK TM 中是用到了大量的堆外内存的,除了通常意义的 JVM 的栈空间、方法区等堆外开销外,还包括网络 buffer、batch > 缓存、RocksDB等。 > > 默认配置是相对保守,为了保证大多数情况下预留出足够的堆外内存。具体是否设置过大了,要看具体运行的作业的情况。可以尝试通过配置'containerized.heap-cutoff-ratio'进行调整。 > > 另外,即将发布的flink 1.10版本中对TM的内存计算进行了优化,不再采用cutoff而是根据用途列出了更具体的配置项,欢迎试用 > > Thank you~ > > Xintong Song > > > > On Tue, Dec 31, 2019 at 5:53 PM cs <58683...@qq.com> wrote: > > > taskmanager的内存设置为15G但实际的heap只有10G > > 看了tm内存分配的源码1.计算cutoff(15GB * 0.25) 2.计算heap大小(heap计算的入参是15GB - > cutoff大小) > > 3.计算offheap大小(offheap的大小等于15GB-heap大小) > > offheap就是最终的-XX:MaxDirectMemorySize的大小 > > 想请教下MaxDirectMemorySize的大小有必要设置这么大吗?
Re: 关于 FLink historyserver没有completed-jobs的问题
flink-conf.yaml里需要有这些配置 historyserver.web.port: 8082 historyserver.web.address: 0.0.0.0 historyserver.archive.fs.refresh-interval: 1 historyserver.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/ jobmanager.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/ #多少秒后,会将完成的任务提交到history jobstore.expiration-time: 14400 jobmanager.archive.fs.dir和historyserver.archive.fs.dir一样即可 然后启动bin/historyserver.sh start 访问ip:8082,需要跑一个任务,并且等待jobstore.expiration-time这个时间,才会有数据 发件人: 起子 发送时间: 2019-12-25 15:57 收件人: user-zh 主题: 关于 FLink historyserver没有completed-jobs的问题 大神们: 我启动了flink的historyserver,但是里面并没有已完成的任务 配置如下: 结果界面如下: hdfs如下: 麻烦大神们给与指导 部门 / 数据平台 花名 / 起子 Mobile :159 8810 1848 WeChat :159 8810 1848 Email :q...@dian.so Address :浙江省杭州市余杭区文一西路998号5#705
jobmanager异常日志
各位: 大家好,在standalone的jobmanager节点的日志中看到的,这个153的taskmanager挂掉了,不知道是不是下面的报错导致的,有知道下面的Error是啥意思不 2019-12-15 17:15:21.999 [flink-metrics-379] ERROR akka.remote.Remoting flink-metrics-akka.remote.default-remote-dispatcher-20 - Association to [akka.tcp://flink-metr...@xx.xx.xx.153:35929] with UID [1 617823256] irrecoverably failed. Quarantining address. java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours) at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
回复: flink1.9.0 standalone模式高可用配置问题
我刚给你测了一下,两个节点都启动jobmanager.sh ip port 后,kill掉其中一个,另一个的web界面提示正在进行选举,说明是高可用状态。 发件人: pengchenglin 发送时间: 2019-12-06 19:47 收件人: user-zh 主题: flink1.9.0 standalone模式高可用配置问题 各位: 1.9.0版本的flink,两台机器,flink-conf.yaml配置相同(high-availability.zookeeper.path.root和high-availability.cluster-id也相同), 先在机器1运行bin/jobmanager.sh start ip1 port 然后到机器2运行bin/jobmanager.sh start ip2 port 打开ip1:port,ip2:port同时存在,并不像1.7.2一样,跳转到ip1:port 请问一下,这是没有配置好,还是1.9.0版本的高可用就是如此。
flink1.9.0 standalone模式高可用配置问题
各位: 1.9.0版本的flink,两台机器,flink-conf.yaml配置相同(high-availability.zookeeper.path.root和high-availability.cluster-id也相同), 先在机器1运行bin/jobmanager.sh start ip1 port 然后到机器2运行bin/jobmanager.sh start ip2 port 打开ip1:port,ip2:port同时存在,并不像1.7.2一样,跳转到ip1:port 请问一下,这是没有配置好,还是1.9.0版本的高可用就是如此。
回复: perjob yarn页面链接flink出错
perjob模式,运行多个任务时,如果配置了high-availability.zookeeper.path.root: /flink 和high-availability.cluster-id: /v1.1/perjob-20191130 由于每个任务一个集群,所以每个集群在zookeeper上的目录都一样,导致yarn关联的flink相同,推荐删除上面两个配置,对于perjob模式。 发件人: pengchenglin 发送时间: 2019-12-02 15:54 收件人: user-zh@flink.apache.org 主题: perjob yarn页面链接flink出错 各位大佬: 有遇到过这种问题吗,perjob模式,4台机器,同时运行了多个任务,在yarn的管理页面上,点tracking ui,跳转到flink页面,都是同一个任务的flink页面。 flink配置如下: high-availability.zookeeper.client.max-retry-attempts: 10 historyserver.web.address: 0.0.0.0 state.checkpoints.num-retained: 3 historyserver.web.port: 8082 env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/xx/oomdump -XX:ReservedCodeCacheSize=2048M high-availability.cluster-id: /v1.1/perjob-20191130 jobmanager.execution.failover-strategy: region #jobmanager.rpc.address: xx state.savepoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_savepoint high-availability.zookeeper.path.root: /flink high-availability.zookeeper.client.session-timeout: 30 taskmanager.registration.timeout: 20 min high-availability.storageDir: hdfs://xx:8020/flink/v1.1/mxm_ha/ task.cancellation.timeout: 0 taskmanager.network.numberOfBuffers: 10240 parallelism.default: 8 taskmanager.numberOfTaskSlots: 8 akka.ask.timeout: 600 s historyserver.archive.fs.dir: hdfs://xx:8020/flink/v1.1/completed-jobs-mxm1130/ jobmanager.heap.size: 2048m jobmanager.archive.fs.dir: hdfs://xx:8020/flink/v1.1/mxm1130completed-jobs/ heartbeat.timeout: 30 restart-strategy.fixed-delay.attempts: 360 high-availability.zookeeper.client.connection-timeout: 6 historyserver.archive.fs.refresh-interval: 1 jobmanager.rpc.port: 6123 jobstore.expiration-time: 14400 #rest.port: 8983 high-availability.zookeeper.quorum: xx:,xx:,xx: restart-strategy.fixed-delay.delay: 1 s high-availability: zookeeper state.backend: filesystem restart-strategy: fixed-delay taskmanager.heap.size: 8192m akka.client.timeout: 600 s state.checkpoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_checkpoint
perjob yarn页面链接flink出错
各位大佬: 有遇到过这种问题吗,perjob模式,4台机器,同时运行了多个任务,在yarn的管理页面上,点tracking ui,跳转到flink页面,都是同一个任务的flink页面。 flink配置如下: high-availability.zookeeper.client.max-retry-attempts: 10 historyserver.web.address: 0.0.0.0 state.checkpoints.num-retained: 3 historyserver.web.port: 8082 env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/xx/oomdump -XX:ReservedCodeCacheSize=2048M high-availability.cluster-id: /v1.1/perjob-20191130 jobmanager.execution.failover-strategy: region #jobmanager.rpc.address: xx state.savepoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_savepoint high-availability.zookeeper.path.root: /flink high-availability.zookeeper.client.session-timeout: 30 taskmanager.registration.timeout: 20 min high-availability.storageDir: hdfs://xx:8020/flink/v1.1/mxm_ha/ task.cancellation.timeout: 0 taskmanager.network.numberOfBuffers: 10240 parallelism.default: 8 taskmanager.numberOfTaskSlots: 8 akka.ask.timeout: 600 s historyserver.archive.fs.dir: hdfs://xx:8020/flink/v1.1/completed-jobs-mxm1130/ jobmanager.heap.size: 2048m jobmanager.archive.fs.dir: hdfs://xx:8020/flink/v1.1/mxm1130completed-jobs/ heartbeat.timeout: 30 restart-strategy.fixed-delay.attempts: 360 high-availability.zookeeper.client.connection-timeout: 6 historyserver.archive.fs.refresh-interval: 1 jobmanager.rpc.port: 6123 jobstore.expiration-time: 14400 #rest.port: 8983 high-availability.zookeeper.quorum: xx:,xx:,xx: restart-strategy.fixed-delay.delay: 1 s high-availability: zookeeper state.backend: filesystem restart-strategy: fixed-delay taskmanager.heap.size: 8192m akka.client.timeout: 600 s state.checkpoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_checkpoint
Re: flink 缓存本地文件被删除疑问
你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》 发件人: 戴嘉诚 发送时间: 2019-10-11 11:00 收件人: user-zh@flink.apache.org 主题: flink 缓存本地文件被删除疑问 大家好: 最近我的程序迁移到了flink1.9 on yarn session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) ... 6 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 8 more Caused by: java.nio.file.NoSuchFileException: /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst -> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) at java.nio.file.Files.createLink(Files.java:1086) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) ... 12 more 我这里的session中是有多个job在session上运行,但是这里job都是写入hbase中的。 这里看上去是本地机器上的目录文件被删除了?但是这个文件我是没有删除过的…