perjob yarn页面链接flink出错

2019-12-01 文章 pengchenglin
各位大佬:

有遇到过这种问题吗,perjob模式,4台机器,同时运行了多个任务,在yarn的管理页面上,点tracking 
ui,跳转到flink页面,都是同一个任务的flink页面。

flink配置如下:
high-availability.zookeeper.client.max-retry-attempts: 10
historyserver.web.address: 0.0.0.0
state.checkpoints.num-retained: 3
historyserver.web.port: 8082
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/xx/oomdump 
-XX:ReservedCodeCacheSize=2048M
high-availability.cluster-id: /v1.1/perjob-20191130
jobmanager.execution.failover-strategy: region
#jobmanager.rpc.address: xx
state.savepoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_savepoint
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.session-timeout: 30
taskmanager.registration.timeout: 20 min
high-availability.storageDir: hdfs://xx:8020/flink/v1.1/mxm_ha/
task.cancellation.timeout: 0
taskmanager.network.numberOfBuffers: 10240
parallelism.default: 8
taskmanager.numberOfTaskSlots: 8
akka.ask.timeout: 600 s
historyserver.archive.fs.dir: hdfs://xx:8020/flink/v1.1/completed-jobs-mxm1130/
jobmanager.heap.size: 2048m
jobmanager.archive.fs.dir: hdfs://xx:8020/flink/v1.1/mxm1130completed-jobs/
heartbeat.timeout: 30
restart-strategy.fixed-delay.attempts: 360
high-availability.zookeeper.client.connection-timeout: 6
historyserver.archive.fs.refresh-interval: 1
jobmanager.rpc.port: 6123
jobstore.expiration-time: 14400
#rest.port: 8983
high-availability.zookeeper.quorum: xx:,xx:,xx:
restart-strategy.fixed-delay.delay: 1 s
high-availability: zookeeper
state.backend: filesystem
restart-strategy: fixed-delay
taskmanager.heap.size: 8192m
akka.client.timeout: 600 s
state.checkpoints.dir: hdfs://xx:8020/flink/v1.1/mxm1130_checkpoint


连续keyBy后sum

2019-12-01 文章 hahaha sc
以下代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val list = new ListBuffer[Tuple3[String,Int,Int]]

val random = new Random()

for(x<- 0 to 100){
  if(random.nextBoolean()){list.append(("INSERT",2,1))}else{
list.append(("UPDATE",2,1))
  }
}


val data = env.fromElements(list).flatMap(_.toList)


val keyed=data.keyBy(0).sum(1)

keyed.print()


val reKeyed=keyed.keyBy(0).sum(2)
reKeyed.print()


按理说,reKeyed应该是在keyed的基础上,再进行相加,但是print出来的结果,reKeyed是在原始数据上进行的操作。

这是为什么呢?


yarn-cluster 提交任务的问题

2019-12-01 文章 李军
指定主程序类;报错: The program's entry point class 
'com.dora.job.stream.AliLogStreamingJob' was not found in the jar file.
但是运行同一个包下另一个主程序类却可以找到,可以运行;这个类就不可以





小白求助,FlinkSQL处理速度上不来

2019-12-01 文章 wydh...@gmail.com

各位大佬,我用flink  sql写了一些指标计算程序,消费kafka写到influxdb,发现夜晚kafka日志生产速度在10几k/min 
时,程序没有问题,但是到白天涨到100k/min后 就渐渐卡住消费不动了,用的是flink 1.9 ,现在觉得是Flink 
sql执行这一层有点慢,窗口是滚动5分钟,目前是用的两个solt,调大并行度试了一下也没效果,这个有什么解决方案吗?
代码如下:
val windowWidth = 5

//stream config
val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000 * 60*5)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(6*10)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
//   
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//table config
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(10))

tEnv.registerFunction("TimeFormatJava", new TimeFormatJava())
tEnv.registerFunction("TimeFormatUDF", TimeFormatUDF)

  
//Kafka Source
val kafkaProperties: Properties = new Properties
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaServers)
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId)
kafkaProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5000")
kafkaProperties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"500")


var topics: util.List[String] = new util.ArrayList[String]

for (topic <- kafkaTopics.split(SPLIT)) {
  topics.add(topic)
}

val kafkaConsumer: FlinkKafkaConsumer011[String] = new 
FlinkKafkaConsumer011[String](topics, new SimpleStringSchema, kafkaProperties)

val driverSearchDstream: DataStream[DriverSearch] = 
env.addSource(kafkaConsumer.setStartFromLatest()).map(msg => {
  val info: String = msg.substring(msg.indexOf("{"), msg.length)
  val createTime = msg.substring(0, 19)
  val timeStamp = getLongTime(createTime)

  val json = JSON.parseObject(info)
  DriverSearch(
json.getString("driverId") + "_" + timeStamp,
json.getString("driverId"),
json.getIntValue("searchType"),
timeStamp
  )
}).setParallelism(2)

val driverSearchDstreamWithEventTime: DataStream[DriverSearch] = 
driverSearchDstream.assignTimestampsAndWatermarks(
  new 
BoundedOutOfOrdernessTimestampExtractor[DriverSearch](org.apache.flink.streaming.api.windowing.time.Time.seconds(10L))
 {
override def extractTimestamp(element: DriverSearch): Long = 
element.timestamp
  }
)


driverSearchDstream.map(info=>println(info+"time:"+System.currentTimeMillis()))

val table: Table = tEnv.fromDataStream(driverSearchDstreamWithEventTime, 
'rowKey, 'driverId, 'searchType, 'timestamp.rowtime as 'w)

val sql1: String =
  s"""
select TimeFormatJava(TUMBLE_END(w, INTERVAL 
'$windowWidth' MINUTE),8) as time_end,
searchType,
count(distinct driverId) as typeUv,
count(distinct rowKey) as typePv
from $table
group by TUMBLE(w, INTERVAL '$windowWidth' 
MINUTE),searchType
  """.stripMargin

val resultTable1: Table = tEnv.sqlQuery(sql1)

val typeMap= 
immutable.Map(1->"1-goWorkSearch",2->"2-offWorkSearch",3->"3-nearbySearch",4->"4-temporarySearch",5->"5-commonSearch",6->"6-multiplySearch")

val influxStream: DataStream[InfluxDBPoint] = 
tEnv.toAppendStream[Row](resultTable1).map {
  row => {
val typeName: String= typeMap(row.getField(1).asInstanceOf[Int])
val point = new InfluxDBPoint("Carpool_Search_Pv_Uv", 
row.getField(0).asInstanceOf[Long]) //udf +8hour
val fields = new util.HashMap[String,Object]()
val tags = new util.HashMap[String,String]()
fields.put("typeUv", row.getField(2))
fields.put("typePv",row.getField(3))
point.setFields(fields)
tags.put("typeName",typeName)
point.setTags(tags)
point
  }
}

influxStream.map{
  point=>{
println( println("influxPoint:"+point.getFields+"=="
  +point.getTags+"=="+point.getMeasurement
  +"=="+point.getTimestamp+"time:"+System.currentTimeMillis())
)
  }
}


val influxDBConfig = InfluxDBConfig.builder("http://host:8086;, "admin", 
"admin", "aimetric").build


influxStream.addSink(new InfluxDBSink(influxDBConfig))

env.execute()

  }


  def getLongTime(str:String) ={
val format = new SimpleDateFormat("/MM/dd HH:mm:ss")
val time: Long = format.parse(str).getTime
time
  






wydh...@gmail.com


Re: checkpoint、state

2019-12-01 文章 Congxian Qiu
Hi

Checkpoint 是所有本地 state 的一个快照,用于程序出现故障后,进行恢复使用。

Best,
Congxian


hahaha sc  于2019年11月29日周五 下午4:12写道:

>
> flink的每条数据既然都做了checkpoint,做成全局分布式一致性快照,那还需要本地state干啥呢?是否可以理解成,本地state是一致性快照的一部分而已?
>   昨天看了 社区的直播回放,听PMC的介绍,好像不是一回事。
>


Re: 回复: 回复: 本地checkpoint 文件190G了

2019-12-01 文章 Yun Tang
Hi 

为什么你知道本地checkpoint文件达到190GB了,具体是哪个目录撑到了190GB?

如果没有启用 state.backend.local-recovery:
* 使用FsSateBackend/Memory StateBackend, 
本地不应该有什么checkpoint文件残留,因为执行checkpoint时,直接写HDFS了
* 使用 RocksDB state backend,无论是否开启incremental 
checkpoint本地也不应该有任何checkpoint文件残留(因为会被及时清理掉),除非你的DB目录本身就达到了190GB

如果启用了 state.backend.local-recovery:
* 使用Memory StateBackend:与不启用时等效,因为Memory StateBackend不支持local recovery
* 
使用FsStateBackend/RocksDBStateBackend:存有本地checkpoint备份,目前只能等到有一个checkpoint完成才能及时清理无用的文件,需要等
 FLINK-8871 fix 才能及时清理本地无用文件

我所能怀疑的就是你的作业长时间checkpoint没有成功,导致本地的local recovery目录不断增大。
另外,retain checkpoint(默认为1)是分布式checkpoint store中保留几个checkpoint的概念,与task 
manager端本地的checkpoint保留几个无关。

祝好
唐云


On 11/30/19, 2:15 PM, "sun" <1392427...@qq.com> wrote:

感谢,我这样在生产上试试看-- 原始邮件 --
发件人: "tison"
发送时间: 2019年11月30日(星期六) 下午2:12
收件人: "user-zh";
主题: Re: 回复: 本地checkpoint 文件190G了


retain 调小是你的场景比较简单的方法,1 2 3 都行,你可以试试...

Best,
tison.


sun <1392427...@qq.com> 于2019年11月30日周六 下午2:08写道:

> 好的,我主要想知道,怎么定时清理那些我用不到的checkpoint 文,怎么让我的本地checkpoint
> 不会一直长大-- 原始邮 --
> 发人: "tangjunli...@huitongjy.com"
> 发送时间: 2019年11月30日(星期六) 下午2:06
> 收人: "user-zh";
> 主题: 回复: 回复: 本地checkpoint 文190G了
>
>
> 如果处理数据没有达到一定量级,建议state.backend.incremental 设为false
>
>
>
> tangjunli...@huitongjy.com
>
> 发人: sun
> 发送时间: 2019-11-30 14:05
> 收人: user-zh
> 主题: 回复: 本地checkpoint 文190G了
> rocksdb ,设置的true-- 原始邮 --
> 发人: "tangjunli...@huitongjy.com"
> 发送时间: 2019年11月30日(星期六) 下午2:03
> 收人: "user-zh";
> 主题: 回复: 本地checkpoint 文190G了
>
>
> 用的什么backend? state.backend.incremental  这个参数设置的什么?
>
>
>
> tangjunli...@huitongjy.com
> 发人: sun
> 发送时间: 2019-11-30 10:13
> 收人: flink; user-zh-subscribe
> 主题: 本地checkpoint 文190G了
> 求助,我的文夹一直在长大
>
>
>
>
> 发自我的iPhone