perjob yarn页面链接flink出错
各位大佬: 有遇到过这种问题吗,perjob模式,4台机器,同时运行了多个任务,在yarn的管理页面上,点tracking ui,跳转到flink页面,都是同一个任务的flink页面。 flink配置如下: high-availability.zookeeper.client.max-retry-attempts: 10 historyserver.web.address: 0.0.0.0 state.checkpoints.num-retained: 3 historyserver.web.port: 8082 env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/xx/oomdump -XX:ReservedCodeCacheSize=2048M high-availability.cluster-id: /v1.1/perjob-20191130 jobmanager.execution.failover-strategy: region #jobmanager.rpc.address: xx state.savepoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_savepoint high-availability.zookeeper.path.root: /flink high-availability.zookeeper.client.session-timeout: 30 taskmanager.registration.timeout: 20 min high-availability.storageDir: hdfs://xx:8020/flink/v1.1/mxm_ha/ task.cancellation.timeout: 0 taskmanager.network.numberOfBuffers: 10240 parallelism.default: 8 taskmanager.numberOfTaskSlots: 8 akka.ask.timeout: 600 s historyserver.archive.fs.dir: hdfs://xx:8020/flink/v1.1/completed-jobs-mxm1130/ jobmanager.heap.size: 2048m jobmanager.archive.fs.dir: hdfs://xx:8020/flink/v1.1/mxm1130completed-jobs/ heartbeat.timeout: 30 restart-strategy.fixed-delay.attempts: 360 high-availability.zookeeper.client.connection-timeout: 6 historyserver.archive.fs.refresh-interval: 1 jobmanager.rpc.port: 6123 jobstore.expiration-time: 14400 #rest.port: 8983 high-availability.zookeeper.quorum: xx:,xx:,xx: restart-strategy.fixed-delay.delay: 1 s high-availability: zookeeper state.backend: filesystem restart-strategy: fixed-delay taskmanager.heap.size: 8192m akka.client.timeout: 600 s state.checkpoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_checkpoint
连续keyBy后sum
以下代码: val env = StreamExecutionEnvironment.getExecutionEnvironment val list = new ListBuffer[Tuple3[String,Int,Int]] val random = new Random() for(x<- 0 to 100){ if(random.nextBoolean()){list.append(("INSERT",2,1))}else{ list.append(("UPDATE",2,1)) } } val data = env.fromElements(list).flatMap(_.toList) val keyed=data.keyBy(0).sum(1) keyed.print() val reKeyed=keyed.keyBy(0).sum(2) reKeyed.print() 按理说,reKeyed应该是在keyed的基础上,再进行相加,但是print出来的结果,reKeyed是在原始数据上进行的操作。 这是为什么呢?
yarn-cluster 提交任务的问题
指定主程序类;报错: The program's entry point class 'com.dora.job.stream.AliLogStreamingJob' was not found in the jar file. 但是运行同一个包下另一个主程序类却可以找到,可以运行;这个类就不可以
小白求助,FlinkSQL处理速度上不来
各位大佬,我用flink sql写了一些指标计算程序,消费kafka写到influxdb,发现夜晚kafka日志生产速度在10几k/min 时,程序没有问题,但是到白天涨到100k/min后 就渐渐卡住消费不动了,用的是flink 1.9 ,现在觉得是Flink sql执行这一层有点慢,窗口是滚动5分钟,目前是用的两个solt,调大并行度试了一下也没效果,这个有什么解决方案吗? 代码如下: val windowWidth = 5 //stream config val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(1000 * 60*5) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) env.getCheckpointConfig.setCheckpointTimeout(6*10) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) // env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //table config val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) tEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(10)) tEnv.registerFunction("TimeFormatJava", new TimeFormatJava()) tEnv.registerFunction("TimeFormatUDF", TimeFormatUDF) //Kafka Source val kafkaProperties: Properties = new Properties kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers) kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId) kafkaProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5000") kafkaProperties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"500") var topics: util.List[String] = new util.ArrayList[String] for (topic <- kafkaTopics.split(SPLIT)) { topics.add(topic) } val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema, kafkaProperties) val driverSearchDstream: DataStream[DriverSearch] = env.addSource(kafkaConsumer.setStartFromLatest()).map(msg => { val info: String = msg.substring(msg.indexOf("{"), msg.length) val createTime = msg.substring(0, 19) val timeStamp = getLongTime(createTime) val json = JSON.parseObject(info) DriverSearch( json.getString("driverId") + "_" + timeStamp, json.getString("driverId"), json.getIntValue("searchType"), timeStamp ) }).setParallelism(2) val driverSearchDstreamWithEventTime: DataStream[DriverSearch] = driverSearchDstream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[DriverSearch](org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)) { override def extractTimestamp(element: DriverSearch): Long = element.timestamp } ) driverSearchDstream.map(info=>println(info+"time:"+System.currentTimeMillis())) val table: Table = tEnv.fromDataStream(driverSearchDstreamWithEventTime, 'rowKey, 'driverId, 'searchType, 'timestamp.rowtime as 'w) val sql1: String = s""" select TimeFormatJava(TUMBLE_END(w, INTERVAL '$windowWidth' MINUTE),8) as time_end, searchType, count(distinct driverId) as typeUv, count(distinct rowKey) as typePv from $table group by TUMBLE(w, INTERVAL '$windowWidth' MINUTE),searchType """.stripMargin val resultTable1: Table = tEnv.sqlQuery(sql1) val typeMap= immutable.Map(1->"1-goWorkSearch",2->"2-offWorkSearch",3->"3-nearbySearch",4->"4-temporarySearch",5->"5-commonSearch",6->"6-multiplySearch") val influxStream: DataStream[InfluxDBPoint] = tEnv.toAppendStream[Row](resultTable1).map { row => { val typeName: String= typeMap(row.getField(1).asInstanceOf[Int]) val point = new InfluxDBPoint("Carpool_Search_Pv_Uv", row.getField(0).asInstanceOf[Long]) //udf +8hour val fields = new util.HashMap[String,Object]() val tags = new util.HashMap[String,String]() fields.put("typeUv", row.getField(2)) fields.put("typePv",row.getField(3)) point.setFields(fields) tags.put("typeName",typeName) point.setTags(tags) point } } influxStream.map{ point=>{ println( println("influxPoint:"+point.getFields+"==" +point.getTags+"=="+point.getMeasurement +"=="+point.getTimestamp+"time:"+System.currentTimeMillis()) ) } } val influxDBConfig = InfluxDBConfig.builder("http://host:8086;, "admin", "admin", "aimetric").build influxStream.addSink(new InfluxDBSink(influxDBConfig)) env.execute() } def getLongTime(str:String) ={ val format = new SimpleDateFormat("/MM/dd HH:mm:ss") val time: Long = format.parse(str).getTime time wydh...@gmail.com
Re: checkpoint、state
Hi Checkpoint 是所有本地 state 的一个快照,用于程序出现故障后,进行恢复使用。 Best, Congxian hahaha sc 于2019年11月29日周五 下午4:12写道: > > flink的每条数据既然都做了checkpoint,做成全局分布式一致性快照,那还需要本地state干啥呢?是否可以理解成,本地state是一致性快照的一部分而已? > 昨天看了 社区的直播回放,听PMC的介绍,好像不是一回事。 >
Re: 回复: 回复: 本地checkpoint 文件190G了
Hi 为什么你知道本地checkpoint文件达到190GB了,具体是哪个目录撑到了190GB? 如果没有启用 state.backend.local-recovery: * 使用FsSateBackend/Memory StateBackend, 本地不应该有什么checkpoint文件残留,因为执行checkpoint时,直接写HDFS了 * 使用 RocksDB state backend,无论是否开启incremental checkpoint本地也不应该有任何checkpoint文件残留(因为会被及时清理掉),除非你的DB目录本身就达到了190GB 如果启用了 state.backend.local-recovery: * 使用Memory StateBackend:与不启用时等效,因为Memory StateBackend不支持local recovery * 使用FsStateBackend/RocksDBStateBackend:存有本地checkpoint备份,目前只能等到有一个checkpoint完成才能及时清理无用的文件,需要等 FLINK-8871 fix 才能及时清理本地无用文件 我所能怀疑的就是你的作业长时间checkpoint没有成功,导致本地的local recovery目录不断增大。 另外,retain checkpoint(默认为1)是分布式checkpoint store中保留几个checkpoint的概念,与task manager端本地的checkpoint保留几个无关。 祝好 唐云 On 11/30/19, 2:15 PM, "sun" <1392427...@qq.com> wrote: 感谢,我这样在生产上试试看-- 原始邮件 -- 发件人: "tison" 发送时间: 2019年11月30日(星期六) 下午2:12 收件人: "user-zh"; 主题: Re: 回复: 本地checkpoint 文件190G了 retain 调小是你的场景比较简单的方法,1 2 3 都行,你可以试试... Best, tison. sun <1392427...@qq.com> 于2019年11月30日周六 下午2:08写道: > 好的,我主要想知道,怎么定时清理那些我用不到的checkpoint 文,怎么让我的本地checkpoint > 不会一直长大-- 原始邮 -- > 发人: "tangjunli...@huitongjy.com" > 发送时间: 2019年11月30日(星期六) 下午2:06 > 收人: "user-zh"; > 主题: 回复: 回复: 本地checkpoint 文190G了 > > > 如果处理数据没有达到一定量级,建议state.backend.incremental 设为false > > > > tangjunli...@huitongjy.com > > 发人: sun > 发送时间: 2019-11-30 14:05 > 收人: user-zh > 主题: 回复: 本地checkpoint 文190G了 > rocksdb ,设置的true-- 原始邮 -- > 发人: "tangjunli...@huitongjy.com" > 发送时间: 2019年11月30日(星期六) 下午2:03 > 收人: "user-zh"; > 主题: 回复: 本地checkpoint 文190G了 > > > 用的什么backend? state.backend.incremental 这个参数设置的什么? > > > > tangjunli...@huitongjy.com > 发人: sun > 发送时间: 2019-11-30 10:13 > 收人: flink; user-zh-subscribe > 主题: 本地checkpoint 文190G了 > 求助,我的文夹一直在长大 > > > > > 发自我的iPhone