groupBy和keyBy的使用方式不同吗?

2021-02-25 Thread
case class Student(name: String, age: Int,teacher:Teacher)
case class Teacher(name:String,room:(Int,Int,Int),salary:Int)

def main(args: Array[String]): Unit = {
  val teacher = Teacher("teacher-w",(1,2,3),99)
  val students = List(Student("a",11,teacher),Student("b",22,teacher))
  val benv = ExecutionEnvironment.getExecutionEnvironment
  benv.fromElements(students:_*).groupBy("name").sum("teacher.salary").print()
}

以上代码会报错:

Fields 'teacher.salary' are not valid for
'com.lx.list.List1$Student(name: String, age: Integer, teacher:
com.lx.list.List1$Teacher(name: String, room: scala.Tuple3(_1:
Integer, _2: Integer, _3: Integer), salary: Integer))'.


如果把上面的groupBy换成StreamingAip 的 keyBy就可以运行通过
这是为什么?而且据我观察,好像不是groupBy的问题,而是sum方法不认嵌套类型


flink的算子没有类似于spark的cache操作吗?

2021-01-07 Thread
HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作

val env = getBatchEnv
val ds = env.fromElements("a","b","c")

val ds2 = ds.map(x=>{
  println("map op")
  x.charAt(0).toInt+1
})

//此操作会打印三遍map op
ds2.print()

//此操作又会打印三遍map op
ds2.filter(_>100).print()


Re: flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-06 Thread
可以尝试用yarn application -list 去定期查找你的任务来判断任务是否挂掉

bradyMk  于2021年1月6日周三 下午4:35写道:

> Hi,请教大家一个问题:
>
> 目前使用grafana监控flink的作业,想实现一个任务挂掉就报警的功能,初步想法是:监控checkpoint
> size的指标,一旦这个指标为0,就认为任务挂掉,但实际操作后,发现了两个问题:
>
> ① 如果kill掉任务,grafana上的flink所有指标都会一直保持最后接收到的值不变;
> ② 如果cancel掉任务,grafana上的flink所有指标都会突然中断;
>
> 所以,我上面说的想法永远都不会出发告警,因为这个checkpoint size的指标在任务挂掉也不会归为0值;
>
>
> 我又尝试了用一分钟前的job_uptime减去一分钟后的job_uptime,但是这样报警并不优雅,在任务刚启动时会有误报,因为任务刚启动时,一分钟前是没有数据的。
>
> 所以现在很疑惑,请教一下大家如果用grafana监控flink作业的,该选用什么样的指标和用什么规则,可以优雅的报警呢?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/