Re: UDF:Type is not supported: ANY

2020-08-05 文章 Benchao Li
看起来写法没啥问题,我们就是这么用的。
你用的是哪个版本的Flink?然后是怎么注册的UDF呢?

zilong xiao  于2020年8月6日周四 下午12:06写道:

> 我这么写过,貌似不行,下面是我的代码,可否看下是否可行?
>
> public class Json2Map extends ScalarFunction {
>
>private static final Logger LOG =
> LoggerFactory.getLogger(Json2Map.class);
>
>private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>
>public Json2Map(){}
>
>public Map eval(String param) {
>   Map result = new HashMap<>();
>   try {
>  if (param == null) {
> return result;
>  }
>  result = OBJECT_MAPPER.readValue(param, Map.class);
>  // 遍历toString 貌似也不行
>  //for(Object obj : tmp.keySet()){
>  // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj)));
>  //}
>  LOG.info("result is: {}", result);
>   } catch (JsonProcessingException e){
>  LOG.error("failed to convert json to map, param is: {}", param,
> e);
>   }
>   return result;
>}
>
>
>@Override
>public TypeInformation>
> getResultType(Class[] signature) {
>   return Types.MAP(Types.STRING, Types.STRING);
>}
>
> }
>
>
> Benchao Li  于2020年8月6日周四 上午11:04写道:
>
> > 可以直接返回Map类型呀,比如:
> >
> > public class String2Map extends ScalarFunction {
> >
> >public Map eval(String param) throws Exception {
> >   Map map = new HashMap<>();
> >   // ...
> >   return map;
> >}
> >
> >@Override
> >public TypeInformation getResultType(Class[] signature) {
> >   return Types.MAP(Types.STRING, Types.STRING);
> >}
> >
> > }
> >
> >
> > zilong xiao  于2020年8月6日周四 上午10:24写道:
> >
> > >
> > >
> >
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
> > >
> > > Benchao Li  于2020年8月5日周三 下午11:49写道:
> > >
> > > > Hi zilong,
> > > >
> > > > SQL里面的ARRAY类型,对应的legacy
> > > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > > > 其他类型的type information会被当做any类型来处理。
> > > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-18417
> > > >
> > > > zilong xiao  于2020年8月3日周一 下午8:23写道:
> > > >
> > > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > > > >
> > > > > godfrey he  于2020年8月3日周一 下午7:50写道:
> > > > >
> > > > > > 你把Map换为Map试试
> > > > > >
> > > > > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > > > > >
> > > > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > > > >
> > > > > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > > > > >
> > > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > > >
> Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > > > `Type
> > > > > is
> > > > > > > not
> > > > > > > > supported:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > > > >
> > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > > > Json2Map
> > > > > > > > udf应该怎么操作呢?求前辈指导
> > > > > > > >
> > > > > > > > udfd代码如下:
> > > > > > > >
> > > > > > > > public class Json2List extends ScalarFunction {
> > > > > > > >
> > > > > > > >private static final Logger LOG =
> > > > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > > > >
> > > > > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > > > > ObjectMapper()
> > > > > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES,
> true)
> > > > > > > >
> >  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > > > true) ;
> > > > > > > >
> > > > > > > >public Json2List(){}
> > > > > > > >
> > > > > > > >public List eval(String param) {
> > > > > > > >   List result = new ArrayList<>();
> > > > > > > >   try {
> > > > > > > >  List> list =
> > > > > > OBJECT_MAPPER.readValue(param,
> > > > > > > List.class);
> > > > > > > >  for(Map map : list){
> > > > > > > >
>  result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > > > >  }
> > > > > > > >  return result;
> > > > > > > >   } catch (JsonProcessingException e){
> > > > > > > >  LOG.error("failed to convert json to array, param
> is:
> > > {}",
> > > > > > > param, e);
> > > > > > > >   }
> > > > > > > >   return result;
> > > > > > > >}
> > > > > > > >
> > > > > > > >
> > > > > > > >@Override
> > > > > > > >public TypeInformation>
> > getResultType(Class[]
> > > > > > > signature) {
> > > > > > > >   return Types.LIST(Types.STRING);
> > > > > > > >}
> > > > > > > >
> > > > > > > > }
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
Hi
我这边没有看到相关的附件,不确定是邮件客户端的问题还是其他什么,你那边能否再确认下 附件 的发送情况呢?

Best,
Congxian


op <520075...@qq.com> 于2020年8月6日周四 上午10:36写道:

>感谢 ,  截图和配置在附件里面
>   我试试配置  RocksDB StateBackend
>
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2020年8月5日(星期三) 下午5:43
> *收件人:* "user-zh";
> *主题:* Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
> Hi
>   RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].
>
>   另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及 HDFS
> 上 checkpoint 目录的截图
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
>
> Best,
> Congxian
>
>
> op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:
>
> > 你好,ttl配置是
> > val settings =
> EnvironmentSettings.newInstance().inStreamingMode().build()
> > val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> > val tConfig = tableEnv.getConfig
> > tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))
> >
> >
> >   1)目前是有3个任务都是这种情况
> >   2)目前集群没有RocksDB环境
> > 谢谢
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > qcx978132...@gmail.com;
> > 发送时间:2020年8月5日(星期三) 下午3:30
> > 收件人:"user-zh" >
> > 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> >
> >
> >
> > Hi op
> >  这个情况比较奇怪。我想确认下:
> >  1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
> >  2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
> >
> >  另外,你 TTL 其他的配置是怎么设置的呢?
> >
> > 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> > Best,
> > Congxian
> >
> >
> > op <520075...@qq.com 于2020年8月5日周三 下午2:46写道:
> >
> >  nbsp; nbsp;
> > 
> >
> 你好,我使用的是FsStateBackendnbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
> >  nbsp;
> > nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
> >  nbsp; nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
> >  by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
> >  nbsp; nbsp;运行5天能满足清理条件
> > 
> > 
> > 
> > 
> >  -- 原始邮件 --
> >  发件人:
> >
> 
> > "user-zh"
> >
> 
> > <
> >  qcx978132...@gmail.comgt;;
> >  发送时间:nbsp;2020年8月3日(星期一) 下午5:50
> >  收件人:nbsp;"user-zh" > 
> >  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> > 
> > 
> > 
> >  Hi
> >  nbsp;nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
> >  目录的数据量看,有增长,后续基本持平。现在
> >  Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> > checkpoint
> >  之间,数据改动很多的话,这个值会变大
> > 
> >  [1]
> > 
> > 
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> > 
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> >;
> > Best,
> >  Congxian
> > 
> > 
> >  op <520075...@qq.comgt; 于2020年8月3日周一 下午2:18写道:
> > 
> >  gt; amp;nbsp; amp;nbsp;
> >  gt;
> > 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
> >  gt; 逻辑是按照 事件day 和 id 进行groupby
> >  gt; 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
> >  gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> >  gt; Time.minutes(1440+10))
> >  gt;
> >  gt;
> >  gt;
> >  gt;
> >  gt;
> > --amp;nbsp;原始邮件amp;nbsp;--
> >  gt; 发件人:
> > 
> >
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> >  nbsp; "user-zh"
> > 
> >
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> >  nbsp; <
> >  gt; 384939...@qq.comamp;gt;;
> >  gt; 发送时间:amp;nbsp;2020年8月3日(星期一) 中午1:50
> >  gt; 收件人:amp;nbsp;"user-zh" > amp;gt;;
> >  gt;
> >  gt; 主题:amp;nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和
> 时间窗口
> > 操作后 状态越来越大
> >  gt;
> >  gt;
> >  gt;
> >  gt; hi,您好:
> >  gt; 我改回增量模式重新收集了一些数据:
> >  gt; 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
> >  gt; 2、checkpoint是interval设置的是5秒
> >  gt; 3、目前这个作业是每分钟一个窗口
> >  gt; 4、并行度设置的1,使用on-yarn模式
> >  gt;
> >  gt; 刚启动的时候,如下:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
> >  gt;
> >  gt; 18分钟后,如下:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;gt;
> >  gt;
> >  gt; checkpoints设置:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;gt;
> >  gt;
> >  gt; hdfs上面大小:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;gt;
> >  gt;
> >  gt; 页面上看到的大小:
> >  gt; <
> > 
> >
> 

Re: UDF:Type is not supported: ANY

2020-08-05 文章 zilong xiao
我这么写过,貌似不行,下面是我的代码,可否看下是否可行?

public class Json2Map extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2Map.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

   public Json2Map(){}

   public Map eval(String param) {
  Map result = new HashMap<>();
  try {
 if (param == null) {
return result;
 }
 result = OBJECT_MAPPER.readValue(param, Map.class);
 // 遍历toString 貌似也不行
 //for(Object obj : tmp.keySet()){
 // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj)));
 //}
 LOG.info("result is: {}", result);
  } catch (JsonProcessingException e){
 LOG.error("failed to convert json to map, param is: {}", param, e);
  }
  return result;
   }


   @Override
   public TypeInformation>
getResultType(Class[] signature) {
  return Types.MAP(Types.STRING, Types.STRING);
   }

}


Benchao Li  于2020年8月6日周四 上午11:04写道:

> 可以直接返回Map类型呀,比如:
>
> public class String2Map extends ScalarFunction {
>
>public Map eval(String param) throws Exception {
>   Map map = new HashMap<>();
>   // ...
>   return map;
>}
>
>@Override
>public TypeInformation getResultType(Class[] signature) {
>   return Types.MAP(Types.STRING, Types.STRING);
>}
>
> }
>
>
> zilong xiao  于2020年8月6日周四 上午10:24写道:
>
> >
> >
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
> >
> > Benchao Li  于2020年8月5日周三 下午11:49写道:
> >
> > > Hi zilong,
> > >
> > > SQL里面的ARRAY类型,对应的legacy
> > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > > 其他类型的type information会被当做any类型来处理。
> > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-18417
> > >
> > > zilong xiao  于2020年8月3日周一 下午8:23写道:
> > >
> > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > > >
> > > > godfrey he  于2020年8月3日周一 下午7:50写道:
> > > >
> > > > > 你把Map换为Map试试
> > > > >
> > > > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > > > >
> > > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > > >
> > > > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > > > >
> > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > > `Type
> > > > is
> > > > > > not
> > > > > > > supported:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > > >
> > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > > Json2Map
> > > > > > > udf应该怎么操作呢?求前辈指导
> > > > > > >
> > > > > > > udfd代码如下:
> > > > > > >
> > > > > > > public class Json2List extends ScalarFunction {
> > > > > > >
> > > > > > >private static final Logger LOG =
> > > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > > >
> > > > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > > > ObjectMapper()
> > > > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > > > >
>  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > > true) ;
> > > > > > >
> > > > > > >public Json2List(){}
> > > > > > >
> > > > > > >public List eval(String param) {
> > > > > > >   List result = new ArrayList<>();
> > > > > > >   try {
> > > > > > >  List> list =
> > > > > OBJECT_MAPPER.readValue(param,
> > > > > > List.class);
> > > > > > >  for(Map map : list){
> > > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > > >  }
> > > > > > >  return result;
> > > > > > >   } catch (JsonProcessingException e){
> > > > > > >  LOG.error("failed to convert json to array, param is:
> > {}",
> > > > > > param, e);
> > > > > > >   }
> > > > > > >   return result;
> > > > > > >}
> > > > > > >
> > > > > > >
> > > > > > >@Override
> > > > > > >public TypeInformation>
> getResultType(Class[]
> > > > > > signature) {
> > > > > > >   return Types.LIST(Types.STRING);
> > > > > > >}
> > > > > > >
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


flink1.11 es connector

2020-08-05 文章 Dream-底限
hi
我们这面想用es做时态表查询,但是flink没有报漏es源连接器,需要自己实现一个,请问大家对es做时态表这件事感觉靠谱吗(ps:之所以不用hbase是因为hbase的rowkey设计以及可能维护的二级索引比较麻烦,但hbase也是一个调研方案)


?????? ????????flink??????????????????

2020-08-05 文章 ????????
cep
cepGroupPattern??within??wait??
cepcep??cep
?? cepflink1.7?? 
https://developer.aliyun.com/article/738451





----
??: 
   "user-zh"

https://blog.csdn.net/zhangjun5965/article/details/106573528

samuel@ubtrobot.com 

Re: UDF:Type is not supported: ANY

2020-08-05 文章 Benchao Li
可以直接返回Map类型呀,比如:

public class String2Map extends ScalarFunction {

   public Map eval(String param) throws Exception {
  Map map = new HashMap<>();
  // ...
  return map;
   }

   @Override
   public TypeInformation getResultType(Class[] signature) {
  return Types.MAP(Types.STRING, Types.STRING);
   }

}


zilong xiao  于2020年8月6日周四 上午10:24写道:

>
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
>
> Benchao Li  于2020年8月5日周三 下午11:49写道:
>
> > Hi zilong,
> >
> > SQL里面的ARRAY类型,对应的legacy
> type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > 其他类型的type information会被当做any类型来处理。
> > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18417
> >
> > zilong xiao  于2020年8月3日周一 下午8:23写道:
> >
> > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > >
> > > godfrey he  于2020年8月3日周一 下午7:50写道:
> > >
> > > > 你把Map换为Map试试
> > > >
> > > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > > >
> > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > >
> > > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > > >
> > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > `Type
> > > is
> > > > > not
> > > > > > supported:
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > >
> > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > Json2Map
> > > > > > udf应该怎么操作呢?求前辈指导
> > > > > >
> > > > > > udfd代码如下:
> > > > > >
> > > > > > public class Json2List extends ScalarFunction {
> > > > > >
> > > > > >private static final Logger LOG =
> > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > >
> > > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > > ObjectMapper()
> > > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > > >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > true) ;
> > > > > >
> > > > > >public Json2List(){}
> > > > > >
> > > > > >public List eval(String param) {
> > > > > >   List result = new ArrayList<>();
> > > > > >   try {
> > > > > >  List> list =
> > > > OBJECT_MAPPER.readValue(param,
> > > > > List.class);
> > > > > >  for(Map map : list){
> > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > >  }
> > > > > >  return result;
> > > > > >   } catch (JsonProcessingException e){
> > > > > >  LOG.error("failed to convert json to array, param is:
> {}",
> > > > > param, e);
> > > > > >   }
> > > > > >   return result;
> > > > > >}
> > > > > >
> > > > > >
> > > > > >@Override
> > > > > >public TypeInformation> getResultType(Class[]
> > > > > signature) {
> > > > > >   return Types.LIST(Types.STRING);
> > > > > >}
> > > > > >
> > > > > > }
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: 请教:用flink实现实时告警的功能

2020-08-05 文章 Jun Zhang
可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置

https://blog.csdn.net/zhangjun5965/article/details/106573528

samuel@ubtrobot.com  于2020年8月6日周四 上午10:26写道:

> 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
> 告警有分两部分:
>一是告警规则的设置,数据存放在mysql,存储的格式是json
> {"times":5}  ---就是事件发生大于5次就发出告警;
> {"temperature": 80} ---就是温度大于80就告警;
>二是告警实现
>   1)上报的数据写入到kafka
>   2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
> 现在遇到的问题是:
> 1. 当规则变更时,如何及时生效?
> 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
> 3.这一功能有最佳实践吗?
>
> 希望哪位解答一下,谢谢!
>
>
>
>


?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 文章 op
  ?? 
 ?? RocksDB StateBackend



----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend

Best,
Congxian


op <520075...@qq.com ??2020??8??5?? 4:03??

 ??ttl??
 val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
 val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
 val tConfig = tableEnv.getConfig
 tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))


 nbsp; nbsp; 1)3??
 nbsp; nbsp; 2)RocksDB
 
 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 qcx978132...@gmail.comgt;;
 :nbsp;2020??8??5??(??) 3:30
 ??:nbsp;"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
 gt
 


????????flink??????????????????

2020-08-05 文章 samuel....@ubtrobot.com
flink 
,??


   ??mysql??json
{"times":5}  ---5??
{"temperature": 80} ---80
   
  1)kafka
  
2)flinkkafka??


??
1. 
2.??flink CEP??
3.??


   

 


Re: UDF:Type is not supported: ANY

2020-08-05 文章 zilong xiao
感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值

Benchao Li  于2020年8月5日周三 下午11:49写道:

> Hi zilong,
>
> SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> 其他类型的type information会被当做any类型来处理。
> 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
>
> [1] https://issues.apache.org/jira/browse/FLINK-18417
>
> zilong xiao  于2020年8月3日周一 下午8:23写道:
>
> > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> >
> > godfrey he  于2020年8月3日周一 下午7:50写道:
> >
> > > 你把Map换为Map试试
> > >
> > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > >
> > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > >
> > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > >
> > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> `Type
> > is
> > > > not
> > > > > supported:
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > >
> STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > Json2Map
> > > > > udf应该怎么操作呢?求前辈指导
> > > > >
> > > > > udfd代码如下:
> > > > >
> > > > > public class Json2List extends ScalarFunction {
> > > > >
> > > > >private static final Logger LOG =
> > > > LoggerFactory.getLogger(Json2List.class);
> > > > >
> > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > ObjectMapper()
> > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > true) ;
> > > > >
> > > > >public Json2List(){}
> > > > >
> > > > >public List eval(String param) {
> > > > >   List result = new ArrayList<>();
> > > > >   try {
> > > > >  List> list =
> > > OBJECT_MAPPER.readValue(param,
> > > > List.class);
> > > > >  for(Map map : list){
> > > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > >  }
> > > > >  return result;
> > > > >   } catch (JsonProcessingException e){
> > > > >  LOG.error("failed to convert json to array, param is: {}",
> > > > param, e);
> > > > >   }
> > > > >   return result;
> > > > >}
> > > > >
> > > > >
> > > > >@Override
> > > > >public TypeInformation> getResultType(Class[]
> > > > signature) {
> > > > >   return Types.LIST(Types.STRING);
> > > > >}
> > > > >
> > > > > }
> > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Re: Re: FLINK SQL view的数据复用问题

2020-08-05 文章 godfrey he
目前sql-client还不支持。关于纯SQL文本statement set的支持,
目前社区已经达成语法的一致意见,应该后续会慢慢的支持。

kandy.wang  于2020年8月5日周三 下午10:43写道:

>
>
>
>
>
>
> @ godfrey
> 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-04 19:36:56,"godfrey he"  写道:
> >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
> >
> >kandy.wang  于2020年8月4日周二 下午6:21写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> @ godfrey
> >> thanks。刚试了一下,source -> Deduplicate  ->
> >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
> >>
> >>
> >> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >> >
> >> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >> >
> >> >> FLINK SQL view相关问题:
> >> >> create view order_source
> >> >>
> >> >> as
> >> >>
> >> >> select order_id, order_goods_id, user_id,...
> >> >>
> >> >> from (
> >> >>
> >> >> ..  proctime,row_number() over(partition by order_id,
> >> >> order_goods_id order by proctime desc) as rownum
> >> >>
> >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> >> properties.group.id'='flink_etl_kafka_hbase',
> >> >> 'scan.startup.mode'='latest-offset') */
> >> >>
> >> >> ) where  rownum = 1 and  price > 0;
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT),)
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_date as rowkey,
> >> >>
> >> >> sum(amount) as saleN,
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_date
> >> >>
> >> >> );
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT))
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_hour as rowkey,sum(amount) as saleN,
> >> >>
> >> >>
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_hour
> >> >>
> >> >> );
> >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer
> group。
> >> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  ->
> sink
> >> >> 2
> >> >>
> >> >>
> >> >> 本意是想通过view  order_source
> >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >> >>
> >> >>
> >>
>


?????? arm??centos7??????pyflink

2020-08-05 文章 ????
??


??






----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/python_shell.html

Best,
Xingbo

 <1129656...@qq.com ??2020??8??6?? 9:11??

 Hi:
 nbsp; nbsp; mvn clean package
 
-DskipTestflink-1.11.1-src.tgzflink-1.11.1/build-target/bin/pyflink-shell.shpyflink


 --nbsp;nbsp;--
 ??:

 ""

 <
 1129656...@qq.comgt;;
 :nbsp;2020??8??4??(??) 10:19
 ??:nbsp;"user-zh"

Re: arm,centos7下部署pyflink

2020-08-05 文章 Xingbo Huang
Hi,
肯定不是的,这个只是python
shell的启动脚本。你想想看你mvn编译的只是java的代码,里面都没有pyflink的python代码。你写的python job那些Import
的pyflink的包python都认识不了,肯定跑不起来呀。你如果想玩python shell,可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/python_shell.html

Best,
Xingbo

琴师 <1129656...@qq.com> 于2020年8月6日周四 上午9:11写道:

> Hi:
>   我用mvn clean package
> -DskipTest完整的编译了flink-1.11.1-src.tgz,我看到flink-1.11.1/build-target/bin/pyflink-shell.sh,请问这个是做什么用的,是不是用这个就可以相当于pyflink了?
>
>
> --原始邮件--
> 发件人:
>   "琴师"
>   <
> 1129656...@qq.com;
> 发送时间:2020年8月4日(星期二) 上午10:19
> 收件人:"user-zh"
> 主题:回复: arm,centos7下部署pyflink
>
>
>
> 确实,我尝试了编译pyarrow,但是没有完成,而且不止这一个第三方包。
> 真是遗憾。
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> hxbks...@gmail.com;
> 发送时间:2020年8月4日(星期二) 上午10:09
> 收件人:"user-zh"
> 主题:Re: arm,centos7下部署pyflink
>
>
>
> Hello,
>
> 我们pyflink会依赖pyarrow的包,而pyarrow并没有发布对应在arm下的wheel包,所以,你安装的话,会去下载pyarrow的源码包,然后尝试去编译源码包,所以出现你报错信息中的cmake失败的内容。
>
> 现在可行的方式是你得尝试自己手动编译安装pyarrow,不过pyarrow并没有在arm架构下做测试,而且在高版本的pyarrow包甚至不发布源码包了,所以我很怀疑你能否顺利编译安装成功pyarrow。
>
> Best,
> Xingbo
>
> 琴师 <1129656...@qq.com 于2020年8月4日周二 上午9:57写道:
>
>  你好:
>  nbsp; nbsp;
> 
> 我尝试在arm架构下centos系统内部署pyflink,python版本是3.5.9(3.7.1也尝试过),均不能完成部署,卡在pyarrow这一步,请问,这是系统不支持,还是我部署的问题?如果是我部署的问题,请问有解决方案么?
>  nbsp;ERROR: Command errored out with exit status 1:
>  nbsp; nbsp;command: /usr/local/python3/bin/python3.5
> 
> /usr/local/python3/lib/python3.5/site-packages/pip/_vendor/pep517/_in_process.py
>  build_wheel /tmp/tmpfda_qhew
>  nbsp; nbsp; nbsp; nbsp;cwd:
> /tmp/pip-install-r4b5m7u0/pyarrow
>  nbsp; Complete output (428 lines):
>  nbsp; running bdist_wheel
>  nbsp; running build
>  nbsp; running build_py
>  nbsp; creating build
>  nbsp; creating build/lib.linux-aarch64-3.5
>  nbsp; creating build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/jvm.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/benchmark.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/_generated_version.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/plasma.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/hdfs.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/cuda.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/orc.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/filesystem.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/parquet.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/flight.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/types.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/ipc.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/fs.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/compat.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/json.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/__init__.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/csv.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/util.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/pandas_compat.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/serialization.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; copying pyarrow/feather.py -gt;
> build/lib.linux-aarch64-3.5/pyarrow
>  nbsp; creating build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_jvm.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_deprecations.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_pandas.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_feather.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_gandiva.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_fs.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_json.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_ipc.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_plasma_tf_op.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_misc.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/test_scalars.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/conftest.py -gt;
>  build/lib.linux-aarch64-3.5/pyarrow/tests
>  nbsp; copying pyarrow/tests/strategies.py -gt;
>  

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 文章 jincheng sun
Hi David, Thank you for sharing the problems with the current document, and
I agree with you as I also got the same feedback from Chinese users. I am
often contacted by users to ask questions such as whether PyFlink supports
"Java UDF" and whether PyFlink supports "xxxConnector". The root cause of
these problems is that our existing documents are based on Java users (text
and API mixed part). Since Python is newly added from 1.9, many document
information is not friendly to Python users. They don't want to look for
Python content in unfamiliar Java documents. Just yesterday, there were
complaints from Chinese users about where is all the document entries of
 Python API. So, have a centralized entry and clear document structure,
which is the urgent demand of Python users. The original intention of FLIP
is do our best to solve these user pain points.

Hi Xingbo and Wei Thank you for sharing PySpark's status on document
optimization. You're right. PySpark already has a lot of Python user
groups. They also find that Python user community is an important position
for multilingual support. The centralization and unification of Python
document content will reduce the learning cost of Python users, and good
document structure and content will also reduce the Q & A burden of the
community, It's a once and for all job.

Hi Seth, I wonder if your concerns have been resolved through the previous
discussion?

Anyway, the principle of FLIP is that in python document should only
include Python specific content, instead of making a copy of the Java
content. And would be great to have you to join in the improvement for
PyFlink (Both PRs and Review PRs).

Best,
Jincheng


Wei Zhong  于2020年8月5日周三 下午5:46写道:

> Hi Xingbo,
>
> Thanks for your information.
>
> I think the PySpark's documentation redesigning deserves our attention. It
> seems that the Spark community has also begun to treat the user experience
> of Python documentation more seriously. We can continue to pay attention to
> the discussion and progress of the redesigning in the Spark community. It
> is so similar to our working that there should be some ideas worthy for us.
>
> Best,
> Wei
>
>
> 在 2020年8月5日,15:02,Xingbo Huang  写道:
>
> Hi,
>
> I found that the spark community is also working on redesigning pyspark
> documentation[1] recently. Maybe we can compare the difference between our
> document structure and its document structure.
>
> [1] https://issues.apache.org/jira/browse/SPARK-31851
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>
> Best,
> Xingbo
>
> David Anderson  于2020年8月5日周三 上午3:17写道:
>
>> I'm delighted to see energy going into improving the documentation.
>>
>> With the current documentation, I get a lot of questions that I believe
>> reflect two fundamental problems with what we currently provide:
>>
>> (1) We have a lot of contextual information in our heads about how Flink
>> works, and we are able to use that knowledge to make reasonable inferences
>> about how things (probably) work in cases we aren't so familiar with. For
>> example, I get a lot of questions of the form "If I use  will
>> I still have exactly once guarantees?" The answer is always yes, but they
>> continue to have doubts because we have failed to clearly communicate this
>> fundamental, underlying principle.
>>
>> This specific example about fault tolerance applies across all of the
>> Flink docs, but the general idea can also be applied to the Table/SQL and
>> PyFlink docs. The guiding principles underlying these APIs should be
>> written down in one easy-to-find place.
>>
>> (2) The other kind of question I get a lot is "Can I do  with ?"
>> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
>> very difficult to answer because it is frequently the case that one has to
>> reason about why a given feature doesn't seem to appear in the
>> documentation. It could be that I'm looking in the wrong place, or it could
>> be that someone forgot to document something, or it could be that it can in
>> fact be done by applying a general mechanism in a specific way that I
>> haven't thought of -- as in this case, where one can use a JDBC sink from
>> Python if one thinks to use DDL.
>>
>> So I think it would be helpful to be explicit about both what is, and
>> what is not, supported in PyFlink. And to have some very clear organizing
>> principles in the documentation so that users can quickly learn where to
>> look for specific facts.
>>
>> Regards,
>> David
>>
>>
>> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
>> wrote:
>>
>>> Hi Seth and David,
>>>
>>> I'm very happy to have your reply and suggestions. I would like to share
>>> my thoughts here:
>>>
>>> The main motivation we want to refactor the PyFlink doc is that we want
>>> to make sure that the Python users could find all they want starting from
>>> the PyFlink documentation mainpage. That’s, the PyFlink documentation

?????? arm??centos7??????pyflink

2020-08-05 文章 ????
Hi:
  mvn clean package 
-DskipTestflink-1.11.1-src.tgzflink-1.11.1/build-target/bin/pyflink-shell.shpyflink


----
??: 
   ""   
 <1129656...@qq.com;
:2020??8??4??(??) 10:19
??:"user-zh"

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 文章 Eleanore Jin
Hi Yang and Till,

Thanks a lot for the help! I have the similar question as Till mentioned,
if we do not fail Flink pods when the restart strategy is exhausted, it
might be hard to monitor such failures. Today I get alerts if the k8s pods
are restarted or in crash loop, but if this will no longer be the case, how
can we deal with the monitoring? In production, I have hundreds of small
flink jobs running (2-8 TM pods) doing stateless processing, it is really
hard for us to expose ingress for each JM rest endpoint to periodically
query the job status for each flink job.

Thanks a lot!
Eleanore

On Wed, Aug 5, 2020 at 4:56 AM Till Rohrmann  wrote:

> You are right Yang Wang.
>
> Thanks for creating this issue.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:
>
>> Actually, the application status shows in YARN web UI is not determined
>> by the jobmanager process exit code.
>> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
>> control the final status of YARN application.
>> So although jobmanager exit with zero code, it still could show failed
>> status in YARN web UI.
>>
>> I have created a ticket to track this improvement[1].
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>>
>>> Yes for the other deployments it is not a problem. A reason why people
>>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>>> to monitor than having to take a look at the actual job result. Moreover,
>>> in the YARN web UI the application shows as failed if I am not mistaken.
>>> However, from a framework's perspective, a FAILED job does not mean that
>>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>>
 Hi Eleanore,

 Yes, I suggest to use Job to replace Deployment. It could be used
 to run jobmanager one time and finish after a successful/failed completion.

 However, using Job still could not solve your problem completely. Just
 as Till said, When a job exhausts the restart strategy, the jobmanager
 pod will terminate with non-zero exit code. It will cause the K8s
 restarting it again. Even though we could set the resartPolicy and
 backoffLimit,
 this is not a clean and correct way to go. We should terminate the
 jobmanager process with zero exit code in such situation.

 @Till Rohrmann  I just have one concern. Is it a
 special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
 terminating with
 non-zero exit code is harmless.


 Best,
 Yang

 Eleanore Jin  于2020年8月4日周二 下午11:54写道:

> Hi Yang & Till,
>
> Thanks for your prompt reply!
>
> Yang, regarding your question, I am actually not using k8s job, as I
> put my app.jar and its dependencies under flink's lib directory. I have 1
> k8s deployment for job manager, and 1 k8s deployment for task manager, and
> 1 k8s service for job manager.
>
> As you mentioned above, if flink job is marked as failed, it will
> cause the job manager pod to be restarted. Which is not the ideal
> behavior.
>
> Do you suggest that I should change the deployment strategy from using
> k8s deployment to k8s job? In case the flink program exit with non-zero
> code (e.g. exhausted number of configured restart), pod can be marked as
> complete hence not restarting the job again?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang 
> wrote:
>
>> @Till Rohrmann  In native mode, when a Flink
>> application terminates with FAILED state, all the resources will be 
>> cleaned
>> up.
>>
>> However, in standalone mode, I agree with you that we need to rethink
>> the exit code of Flink. When a job exhausts the restart
>> strategy, we should terminate the pod and do not restart again. After
>> googling, it seems that we could not specify the restartPolicy
>> based on exit code[1]. So maybe we need to return a zero exit code to
>> avoid restarting by K8s.
>>
>> [1].
>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>
>>> @Yang Wang  I believe that we should
>>> rethink the exit codes of Flink. In general you want K8s to restart a
>>> failed Flink process. Hence, an application which terminates in state
>>> FAILED should not return a non-zero exit code because it is a valid
>>> termination state.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
>>> wrote:
>>>
 Hi Eleanore,

 I think you are using K8s resource "Job" to 

Re: UDF:Type is not supported: ANY

2020-08-05 文章 Benchao Li
Hi zilong,

SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
其他类型的type information会被当做any类型来处理。
这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
支持List作为ARRAY的数据,应该要在1.12才能支持[1]。

[1] https://issues.apache.org/jira/browse/FLINK-18417

zilong xiao  于2020年8月3日周一 下午8:23写道:

> 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
>
> godfrey he  于2020年8月3日周一 下午7:50写道:
>
> > 你把Map换为Map试试
> >
> > zilong xiao  于2020年8月3日周一 下午4:56写道:
> >
> > > 目前转List可以用数组代替,Map貌似没法成功运行
> > >
> > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > >
> > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type
> is
> > > not
> > > > supported:
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > Json2Map
> > > > udf应该怎么操作呢?求前辈指导
> > > >
> > > > udfd代码如下:
> > > >
> > > > public class Json2List extends ScalarFunction {
> > > >
> > > >private static final Logger LOG =
> > > LoggerFactory.getLogger(Json2List.class);
> > > >
> > > >private static final ObjectMapper OBJECT_MAPPER = new
> ObjectMapper()
> > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > true) ;
> > > >
> > > >public Json2List(){}
> > > >
> > > >public List eval(String param) {
> > > >   List result = new ArrayList<>();
> > > >   try {
> > > >  List> list =
> > OBJECT_MAPPER.readValue(param,
> > > List.class);
> > > >  for(Map map : list){
> > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > >  }
> > > >  return result;
> > > >   } catch (JsonProcessingException e){
> > > >  LOG.error("failed to convert json to array, param is: {}",
> > > param, e);
> > > >   }
> > > >   return result;
> > > >}
> > > >
> > > >
> > > >@Override
> > > >public TypeInformation> getResultType(Class[]
> > > signature) {
> > > >   return Types.LIST(Types.STRING);
> > > >}
> > > >
> > > > }
> > > >
> > > >
> > >
> >
>


-- 

Best,
Benchao Li


Re: Flink Mysql sink按时间分库分表

2020-08-05 文章 Leonard Xu
Hi

我理解这个除了指定表名,关键是要在数据库中自动建表吧,JDBC 这边之前有个相关issue我跟进过[2],不过代码还没进,暂时还没有好的办法。Es 
connector 是支持类似功能的,如果数据可以放在es可以使用下。

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-16294 



> 在 2020年8月5日,20:36,张健  写道:
> 
> 
> 
> 
> 大家好:
> 
> 
> 想问下目前 flink mysql sink想要实现按时间分库分表 有什么配置项可以使用嘛?
> 
> 
> 我目前的需求是数据源经过一个简单的etl处理写入到mysql中,供业务实时查询。由于业务场景只需要查询当天的数据(历史数据有另外的离线模块处理),所以想每天的数据单独写入一张表中(表名为xx_MMDD)这样。但目前Flink
>  JDBC sink是要指定表名的。请问下有什么简单的实现方案嘛?或者说对于我这样的场景有什么更好的实现方式嘛?
> 
> 
> 多谢。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> 
> 张健



Re:Re: Re: FLINK SQL view的数据复用问题

2020-08-05 文章 kandy.wang






@ godfrey
你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。











在 2020-08-04 19:36:56,"godfrey he"  写道:
>调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
>
>kandy.wang  于2020年8月4日周二 下午6:21写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @ godfrey
>> thanks。刚试了一下,source -> Deduplicate  ->
>> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
>> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>>
>>
>> 在 2020-08-04 17:26:02,"godfrey he"  写道:
>> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
>> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>> >
>> >kandy.wang  于2020年8月4日周二 下午5:20写道:
>> >
>> >> FLINK SQL view相关问题:
>> >> create view order_source
>> >>
>> >> as
>> >>
>> >> select order_id, order_goods_id, user_id,...
>> >>
>> >> from (
>> >>
>> >> ..  proctime,row_number() over(partition by order_id,
>> >> order_goods_id order by proctime desc) as rownum
>> >>
>> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
>> properties.group.id'='flink_etl_kafka_hbase',
>> >> 'scan.startup.mode'='latest-offset') */
>> >>
>> >> ) where  rownum = 1 and  price > 0;
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT),)
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >> select order_date as rowkey,
>> >>
>> >> sum(amount) as saleN,
>> >>
>> >> from order_source
>> >>
>> >> group by order_date
>> >>
>> >> );
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT))
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >> select order_hour as rowkey,sum(amount) as saleN,
>> >>
>> >>
>> >>
>> >> from order_source
>> >>
>> >> group by order_hour
>> >>
>> >> );
>> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
>> >> 2
>> >>
>> >>
>> >> 本意是想通过view  order_source
>> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>> >>
>> >>
>>


Flink Mysql sink按时间分库分表

2020-08-05 文章 张健



大家好:


想问下目前 flink mysql sink想要实现按时间分库分表 有什么配置项可以使用嘛?


我目前的需求是数据源经过一个简单的etl处理写入到mysql中,供业务实时查询。由于业务场景只需要查询当天的数据(历史数据有另外的离线模块处理),所以想每天的数据单独写入一张表中(表名为xx_MMDD)这样。但目前Flink
 JDBC sink是要指定表名的。请问下有什么简单的实现方案嘛?或者说对于我这样的场景有什么更好的实现方式嘛?


多谢。










--

张健

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 文章 Till Rohrmann
You are right Yang Wang.

Thanks for creating this issue.

Cheers,
Till

On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:

> Actually, the application status shows in YARN web UI is not determined by
> the jobmanager process exit code.
> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
> control the final status of YARN application.
> So although jobmanager exit with zero code, it still could show failed
> status in YARN web UI.
>
> I have created a ticket to track this improvement[1].
>
> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>
>> Yes for the other deployments it is not a problem. A reason why people
>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>> to monitor than having to take a look at the actual job result. Moreover,
>> in the YARN web UI the application shows as failed if I am not mistaken.
>> However, from a framework's perspective, a FAILED job does not mean that
>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>
>>> Hi Eleanore,
>>>
>>> Yes, I suggest to use Job to replace Deployment. It could be used to run
>>> jobmanager one time and finish after a successful/failed completion.
>>>
>>> However, using Job still could not solve your problem completely. Just
>>> as Till said, When a job exhausts the restart strategy, the jobmanager
>>> pod will terminate with non-zero exit code. It will cause the K8s
>>> restarting it again. Even though we could set the resartPolicy and
>>> backoffLimit,
>>> this is not a clean and correct way to go. We should terminate the
>>> jobmanager process with zero exit code in such situation.
>>>
>>> @Till Rohrmann  I just have one concern. Is it a
>>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>>> terminating with
>>> non-zero exit code is harmless.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>>
 Hi Yang & Till,

 Thanks for your prompt reply!

 Yang, regarding your question, I am actually not using k8s job, as I
 put my app.jar and its dependencies under flink's lib directory. I have 1
 k8s deployment for job manager, and 1 k8s deployment for task manager, and
 1 k8s service for job manager.

 As you mentioned above, if flink job is marked as failed, it will cause
 the job manager pod to be restarted. Which is not the ideal behavior.

 Do you suggest that I should change the deployment strategy from using
 k8s deployment to k8s job? In case the flink program exit with non-zero
 code (e.g. exhausted number of configured restart), pod can be marked as
 complete hence not restarting the job again?

 Thanks a lot!
 Eleanore

 On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:

> @Till Rohrmann  In native mode, when a Flink
> application terminates with FAILED state, all the resources will be 
> cleaned
> up.
>
> However, in standalone mode, I agree with you that we need to rethink
> the exit code of Flink. When a job exhausts the restart
> strategy, we should terminate the pod and do not restart again. After
> googling, it seems that we could not specify the restartPolicy
> based on exit code[1]. So maybe we need to return a zero exit code to
> avoid restarting by K8s.
>
> [1].
> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>
> Best,
> Yang
>
> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>
>> @Yang Wang  I believe that we should
>> rethink the exit codes of Flink. In general you want K8s to restart a
>> failed Flink process. Hence, an application which terminates in state
>> FAILED should not return a non-zero exit code because it is a valid
>> termination state.
>>
>> Cheers,
>> Till
>>
>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> I think you are using K8s resource "Job" to deploy the jobmanager.
>>> Please set .spec.template.spec.restartPolicy = "Never" and
>>> spec.backoffLimit = 0.
>>> Refer here[1] for more information.
>>>
>>> Then, when the jobmanager failed because of any reason, the K8s job
>>> will be marked failed. And K8s will not restart the job again.
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>>
 Hi Till,

 Thanks for the reply!

 I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
 Specifically, I build a custom docker image, which I copied the app jar
 (not uber jar) and 

Re: flink sql eos

2020-08-05 文章 Leonard Xu
Hi

> 目前仅有kafka实现了TwoPhaseCommitSinkFunction,但kafka的ddl中也没有属性去设
> 置Semantic为EXACTLY_ONCE

除了Kafka还有filesystem connector也是支持 EXACTLY ONCE的,kafka 的已经在1.12支持了[1]


> 当开启全局EXACTLY_ONCE并且所有使用的connector都支持EXACTLY_ONCE,是否整个应
> 用程序就可以做到端到端的精确一致性

是的。 

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-15221 


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 文章 Yang Wang
Actually, the application status shows in YARN web UI is not determined by
the jobmanager process exit code.
Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
control the final status of YARN application.
So although jobmanager exit with zero code, it still could show failed
status in YARN web UI.

I have created a ticket to track this improvement[1].

[1]. https://issues.apache.org/jira/browse/FLINK-18828


Best,
Yang


Till Rohrmann  于2020年8月5日周三 下午3:56写道:

> Yes for the other deployments it is not a problem. A reason why people
> preferred non-zero exit codes in case of FAILED jobs is that this is easier
> to monitor than having to take a look at the actual job result. Moreover,
> in the YARN web UI the application shows as failed if I am not mistaken.
> However, from a framework's perspective, a FAILED job does not mean that
> Flink has failed and, hence, the return code could still be 0 in my opinion.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>
>> Hi Eleanore,
>>
>> Yes, I suggest to use Job to replace Deployment. It could be used to run
>> jobmanager one time and finish after a successful/failed completion.
>>
>> However, using Job still could not solve your problem completely. Just as
>> Till said, When a job exhausts the restart strategy, the jobmanager
>> pod will terminate with non-zero exit code. It will cause the K8s
>> restarting it again. Even though we could set the resartPolicy and
>> backoffLimit,
>> this is not a clean and correct way to go. We should terminate the
>> jobmanager process with zero exit code in such situation.
>>
>> @Till Rohrmann  I just have one concern. Is it a
>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>> terminating with
>> non-zero exit code is harmless.
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>
>>> Hi Yang & Till,
>>>
>>> Thanks for your prompt reply!
>>>
>>> Yang, regarding your question, I am actually not using k8s job, as I put
>>> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
>>> deployment for job manager, and 1 k8s deployment for task manager, and 1
>>> k8s service for job manager.
>>>
>>> As you mentioned above, if flink job is marked as failed, it will cause
>>> the job manager pod to be restarted. Which is not the ideal behavior.
>>>
>>> Do you suggest that I should change the deployment strategy from using
>>> k8s deployment to k8s job? In case the flink program exit with non-zero
>>> code (e.g. exhausted number of configured restart), pod can be marked as
>>> complete hence not restarting the job again?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>>>
 @Till Rohrmann  In native mode, when a Flink
 application terminates with FAILED state, all the resources will be cleaned
 up.

 However, in standalone mode, I agree with you that we need to rethink
 the exit code of Flink. When a job exhausts the restart
 strategy, we should terminate the pod and do not restart again. After
 googling, it seems that we could not specify the restartPolicy
 based on exit code[1]. So maybe we need to return a zero exit code to
 avoid restarting by K8s.

 [1].
 https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code

 Best,
 Yang

 Till Rohrmann  于2020年8月4日周二 下午3:48写道:

> @Yang Wang  I believe that we should
> rethink the exit codes of Flink. In general you want K8s to restart a
> failed Flink process. Hence, an application which terminates in state
> FAILED should not return a non-zero exit code because it is a valid
> termination state.
>
> Cheers,
> Till
>
> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
> wrote:
>
>> Hi Eleanore,
>>
>> I think you are using K8s resource "Job" to deploy the jobmanager.
>> Please set .spec.template.spec.restartPolicy = "Never" and
>> spec.backoffLimit = 0.
>> Refer here[1] for more information.
>>
>> Then, when the jobmanager failed because of any reason, the K8s job
>> will be marked failed. And K8s will not restart the job again.
>>
>> [1].
>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>
>>> Hi Till,
>>>
>>> Thanks for the reply!
>>>
>>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>>> Specifically, I build a custom docker image, which I copied the app jar
>>> (not uber jar) and all its dependencies under /flink/lib.
>>>
>>> So my question is more like, in this case, if the job is marked as
>>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>>> what are the suggestions for such scenario?
>>>
>>> 

Re: The bytecode of the class does not match the source code

2020-08-05 文章 Chesnay Schepler
Well of course these differ; on the left you have the decompiled 
bytecode, on the right the original source.


If these were the same you wouldn't need source jars.

On 05/08/2020 12:20, 魏子涵 wrote:
I'm sure the two versions match up. Following is the pic comparing 
codes in IDEA

https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70






At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and
aren't by chance still using the 1.11.0 source jar.

On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class
in【flink-runtime_2.11-1.11.1.jar】does not match the source code.
Is it a problem we need to fix(if it is, what should we do)? or
just let it go?










?????? flink-1.11 ????????

2020-08-05 文章 kcz
 




----
??: 
   "user-zh"



Re:Re: The bytecode of the class does not match the source code

2020-08-05 文章 魏子涵
I'm sure the two versions match up. Following is the pic comparing codes in IDEA
https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70
















At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and aren't by chance 
still using the 1.11.0 source jar.



On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
  I found  the 【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 
class in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it a 
problem we need to fix(if it is, what should we do)? or just let it go?




 




Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 文章 Wei Zhong
Hi Xingbo,

Thanks for your information. 

I think the PySpark's documentation redesigning deserves our attention. It 
seems that the Spark community has also begun to treat the user experience of 
Python documentation more seriously. We can continue to pay attention to the 
discussion and progress of the redesigning in the Spark community. It is so 
similar to our working that there should be some ideas worthy for us.

Best,
Wei


> 在 2020年8月5日,15:02,Xingbo Huang  写道:
> 
> Hi,
> 
> I found that the spark community is also working on redesigning pyspark 
> documentation[1] recently. Maybe we can compare the difference between our 
> document structure and its document structure.
> 
> [1] https://issues.apache.org/jira/browse/SPARK-31851 
> 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>  
> 
> 
> Best,
> Xingbo
> 
> David Anderson mailto:da...@alpinegizmo.com>> 
> 于2020年8月5日周三 上午3:17写道:
> I'm delighted to see energy going into improving the documentation.
> 
> With the current documentation, I get a lot of questions that I believe 
> reflect two fundamental problems with what we currently provide:
> 
> (1) We have a lot of contextual information in our heads about how Flink 
> works, and we are able to use that knowledge to make reasonable inferences 
> about how things (probably) work in cases we aren't so familiar with. For 
> example, I get a lot of questions of the form "If I use  will I 
> still have exactly once guarantees?" The answer is always yes, but they 
> continue to have doubts because we have failed to clearly communicate this 
> fundamental, underlying principle. 
> 
> This specific example about fault tolerance applies across all of the Flink 
> docs, but the general idea can also be applied to the Table/SQL and PyFlink 
> docs. The guiding principles underlying these APIs should be written down in 
> one easy-to-find place. 
> 
> (2) The other kind of question I get a lot is "Can I do  with ?" E.g., 
> "Can I use the JDBC table sink from PyFlink?" These questions can be very 
> difficult to answer because it is frequently the case that one has to reason 
> about why a given feature doesn't seem to appear in the documentation. It 
> could be that I'm looking in the wrong place, or it could be that someone 
> forgot to document something, or it could be that it can in fact be done by 
> applying a general mechanism in a specific way that I haven't thought of -- 
> as in this case, where one can use a JDBC sink from Python if one thinks to 
> use DDL. 
> 
> So I think it would be helpful to be explicit about both what is, and what is 
> not, supported in PyFlink. And to have some very clear organizing principles 
> in the documentation so that users can quickly learn where to look for 
> specific facts.
> 
> Regards,
> David
> 
> 
> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun  > wrote:
> Hi Seth and David,
> 
> I'm very happy to have your reply and suggestions. I would like to share my 
> thoughts here:
> 
> The main motivation we want to refactor the PyFlink doc is that we want to 
> make sure that the Python users could find all they want starting from the 
> PyFlink documentation mainpage. That’s, the PyFlink documentation should have 
> a catalogue which includes all the functionalities available in PyFlink. 
> However, this doesn’t mean that we will make a copy of the content of the 
> documentation in the other places. It may be just a reference/link to the 
> other documentation if needed. For the documentation added under PyFlink 
> mainpage, the principle is that it should only include Python specific 
> content, instead of making a copy of the Java content.
> 
> >>  I'm concerned that this proposal duplicates a lot of content that will 
> >> quickly get out of sync. It feels like it is documenting PyFlink 
> >> separately from the rest of the project.
> 
> Regarding the concerns about maintainability, as mentioned above, The goal of 
> this FLIP is to provide an intelligible entrance of Python API, and the 
> content in it should only contain the information which is useful for Python 
> users. There are indeed many agenda items that duplicate the Java documents 
> in this FLIP, but it doesn't mean the content would be copied from Java 
> documentation. i.e, if the content of the document is the same as the 
> corresponding Java document, we will add a link to the Java document. e.g. 
> the "Built-in functions" and "SQL". We only create a page for the Python-only 
> content, and then redirect to the Java document if there is something shared 
> with Java. e.g. "Connectors" and "Catalogs". If the document is Python-only 
> and already exists, we will move it from the old python 

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
Hi
  RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].

  另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及 HDFS
上 checkpoint 目录的截图

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend

Best,
Congxian


op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:

> 你好,ttl配置是
> val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
> val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> val tConfig = tableEnv.getConfig
> tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))
>
>
>   1)目前是有3个任务都是这种情况
>   2)目前集群没有RocksDB环境
> 谢谢
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年8月5日(星期三) 下午3:30
> 收件人:"user-zh"
> 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi op
>  这个情况比较奇怪。我想确认下:
>  1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
>  2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
>
>  另外,你 TTL 其他的配置是怎么设置的呢?
>
> 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> Best,
> Congxian
>
>
> op <520075...@qq.com 于2020年8月5日周三 下午2:46写道:
>
>  nbsp; nbsp;
> 
> 你好,我使用的是FsStateBackendnbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
>  nbsp;
> nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
>  nbsp; nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
>  by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
>  nbsp; nbsp;运行5天能满足清理条件
> 
> 
> 
> 
>  -- 原始邮件 --
>  发件人:
> 
> "user-zh"
> 
> <
>  qcx978132...@gmail.comgt;;
>  发送时间:nbsp;2020年8月3日(星期一) 下午5:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> 
> 
> 
>  Hi
>  nbsp;nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
>  目录的数据量看,有增长,后续基本持平。现在
>  Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> checkpoint
>  之间,数据改动很多的话,这个值会变大
> 
>  [1]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> 
> ;
> Best,
>  Congxian
> 
> 
>  op <520075...@qq.comgt; 于2020年8月3日周一 下午2:18写道:
> 
>  gt; amp;nbsp; amp;nbsp;
>  gt;
> 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
>  gt; 逻辑是按照 事件day 和 id 进行groupby
>  gt; 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
>  gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
>  gt; Time.minutes(1440+10))
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  nbsp; "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  nbsp; <
>  gt; 384939...@qq.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年8月3日(星期一) 中午1:50
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口
> 操作后 状态越来越大
>  gt;
>  gt;
>  gt;
>  gt; hi,您好:
>  gt; 我改回增量模式重新收集了一些数据:
>  gt; 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
>  gt; 2、checkpoint是interval设置的是5秒
>  gt; 3、目前这个作业是每分钟一个窗口
>  gt; 4、并行度设置的1,使用on-yarn模式
>  gt;
>  gt; 刚启动的时候,如下:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
>  gt;
>  gt; 18分钟后,如下:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;gt;
>  gt;
>  gt; checkpoints设置:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;gt;
>  gt;
>  gt; hdfs上面大小:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;gt;
>  gt;
>  gt; 页面上看到的大小:
>  gt; <
> 
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;gt
> 
> ;
> ;
>  gt;
>  gt;
>  gt; Congxian Qiu wrote
>  gt; amp;gt; Hiamp;nbsp;amp;nbsp; 鱼子酱
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
> 能否把在使用增量 checkpoint
>  的模式下,截图看一下 checkpoint
>  gt; size 的走势呢?另外可以的话,也麻烦你在每次
>  gt; amp;gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
>  另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>  gt; amp;gt;
>  gt; amp;gt; Best,
>  gt; amp;gt; Congxian
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; 鱼子酱 <
>  gt;
>  gt; amp;gt; 384939718@
>  gt;
>  gt; amp;gt;amp;gt; 于2020年7月30日周四 上午10:43写道:
>  gt; 

Re: The bytecode of the class does not match the source code

2020-08-05 文章 Chesnay Schepler
Please make sure you have loaded the correct source jar, and aren't by 
chance still using the 1.11.0 source jar.


On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the 
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class 
in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it 
a problem we need to fix(if it is, what should we do)? or just let it go?







Re:Re:写入hive 问题

2020-08-05 文章 air23



你好 谢谢。去掉版本号 确实可以了。我用的版本 和我安装的hive版本是一致的。不知道是什么原因导致的。


















在 2020-08-05 15:59:06,"wldd"  写道:
>hi:
>1.你可以看下你配置hive catalog时的hive版本和你当前使用的hive版本是否一致
>2.你也可以尝试在配置hive catalog的时候,不设置hive版本
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>Best,
>wldd
>
>
>
>
>
>在 2020-08-05 15:38:26,"air23"  写道:
>>你好 
>>15:33:59,781 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>>- Created HiveCatalog 'myhive1'
>>Exception in thread "main" 
>>org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
>>Hive Metastore client
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:58)
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
>>at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
>>at 
>>org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
>>at 
>>org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
>>at com.zt.kafka.KafkaTest4.main(KafkaTest4.java:73)
>>Caused by: java.lang.NoSuchMethodException: 
>>org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf)
>>at java.lang.Class.getMethod(Class.java:1786)
>>at 
>>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:54)
>>... 7 mor
>>
>>
>>
>>
>>请问这个是什么问题 Metastore 也已经启动了。
>>谢谢


flink sql eos

2020-08-05 文章 sllence
大家好

   请问目前flink sql是不是不能没有开启全局端到端精确一致性(eos)的方
式,

目前仅有kafka实现了TwoPhaseCommitSinkFunction,但kafka的ddl中也没有属性去设
置Semantic为EXACTLY_ONCE

 

我们是否可以去支持更多的事务性connector,并可以在flink sql维度支持开启全局的
端到端一致性,并为每个connector是否支持EXACTLY_ONCE进行验证,

当开启全局EXACTLY_ONCE并且所有使用的connector都支持EXACTLY_ONCE,是否整个应
用程序就可以做到端到端的精确一致性



?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 文章 op
??ttl??
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
val tConfig = tableEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))


  1)3??
  2)RocksDB

----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
 Best,
 Congxian


 op <520075...@qq.comgt; ??2020??8??3?? 2:18??

 gt; amp;nbsp; amp;nbsp;
 gt; 
1.11.0hdfscheckpoint??checkpoint3??
 gt; ?? day ?? id groupby
 gt; 
7watermark??
 gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 gt; Time.minutes(1440+10))
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 nbsp; "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 nbsp; <
 gt; 384939...@qq.comamp;gt;;
 gt; :amp;nbsp;2020??8??3??(??) 1:50
 gt; 
??:amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
 gt;
 gt; 18??
 gt; 


Re:写入hive 问题

2020-08-05 文章 wldd
hi:
1.你可以看下你配置hive catalog时的hive版本和你当前使用的hive版本是否一致
2.你也可以尝试在配置hive catalog的时候,不设置hive版本













--

Best,
wldd





在 2020-08-05 15:38:26,"air23"  写道:
>你好 
>15:33:59,781 INFO  org.apache.flink.table.catalog.hive.HiveCatalog 
>   - Created HiveCatalog 'myhive1'
>Exception in thread "main" 
>org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
>Hive Metastore client
>at 
>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:58)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
>at 
>org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
>at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
>at 
>org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
>at com.zt.kafka.KafkaTest4.main(KafkaTest4.java:73)
>Caused by: java.lang.NoSuchMethodException: 
>org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf)
>at java.lang.Class.getMethod(Class.java:1786)
>at 
>org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:54)
>... 7 mor
>
>
>
>
>请问这个是什么问题 Metastore 也已经启动了。
>谢谢


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 文章 Till Rohrmann
Yes for the other deployments it is not a problem. A reason why people
preferred non-zero exit codes in case of FAILED jobs is that this is easier
to monitor than having to take a look at the actual job result. Moreover,
in the YARN web UI the application shows as failed if I am not mistaken.
However, from a framework's perspective, a FAILED job does not mean that
Flink has failed and, hence, the return code could still be 0 in my opinion.

Cheers,
Till

On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:

> Hi Eleanore,
>
> Yes, I suggest to use Job to replace Deployment. It could be used to run
> jobmanager one time and finish after a successful/failed completion.
>
> However, using Job still could not solve your problem completely. Just as
> Till said, When a job exhausts the restart strategy, the jobmanager
> pod will terminate with non-zero exit code. It will cause the K8s
> restarting it again. Even though we could set the resartPolicy and
> backoffLimit,
> this is not a clean and correct way to go. We should terminate the
> jobmanager process with zero exit code in such situation.
>
> @Till Rohrmann  I just have one concern. Is it a
> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
> terminating with
> non-zero exit code is harmless.
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>
>> Hi Yang & Till,
>>
>> Thanks for your prompt reply!
>>
>> Yang, regarding your question, I am actually not using k8s job, as I put
>> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
>> deployment for job manager, and 1 k8s deployment for task manager, and 1
>> k8s service for job manager.
>>
>> As you mentioned above, if flink job is marked as failed, it will cause
>> the job manager pod to be restarted. Which is not the ideal behavior.
>>
>> Do you suggest that I should change the deployment strategy from using
>> k8s deployment to k8s job? In case the flink program exit with non-zero
>> code (e.g. exhausted number of configured restart), pod can be marked as
>> complete hence not restarting the job again?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>>
>>> @Till Rohrmann  In native mode, when a Flink
>>> application terminates with FAILED state, all the resources will be cleaned
>>> up.
>>>
>>> However, in standalone mode, I agree with you that we need to rethink
>>> the exit code of Flink. When a job exhausts the restart
>>> strategy, we should terminate the pod and do not restart again. After
>>> googling, it seems that we could not specify the restartPolicy
>>> based on exit code[1]. So maybe we need to return a zero exit code to
>>> avoid restarting by K8s.
>>>
>>> [1].
>>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>>
>>> Best,
>>> Yang
>>>
>>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>>
 @Yang Wang  I believe that we should
 rethink the exit codes of Flink. In general you want K8s to restart a
 failed Flink process. Hence, an application which terminates in state
 FAILED should not return a non-zero exit code because it is a valid
 termination state.

 Cheers,
 Till

 On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:

> Hi Eleanore,
>
> I think you are using K8s resource "Job" to deploy the jobmanager.
> Please set .spec.template.spec.restartPolicy = "Never" and
> spec.backoffLimit = 0.
> Refer here[1] for more information.
>
> Then, when the jobmanager failed because of any reason, the K8s job
> will be marked failed. And K8s will not restart the job again.
>
> [1].
> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>
>> Hi Till,
>>
>> Thanks for the reply!
>>
>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>> Specifically, I build a custom docker image, which I copied the app jar
>> (not uber jar) and all its dependencies under /flink/lib.
>>
>> So my question is more like, in this case, if the job is marked as
>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>> what are the suggestions for such scenario?
>>
>> Thanks a lot!
>> Eleanore
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>>
>> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> how are you deploying Flink exactly? Are you using the application
>>> mode with native K8s support to deploy a cluster [1] or are you manually
>>> deploying a per-job mode [2]?
>>>
>>> I believe the problem might be that we terminate the Flink process
>>> with a non-zero exit code if the job reaches the 

写入hive 问题

2020-08-05 文章 air23
你好 
15:33:59,781 INFO  org.apache.flink.table.catalog.hive.HiveCatalog  
 - Created HiveCatalog 'myhive1'
Exception in thread "main" 
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create 
Hive Metastore client
at 
org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:58)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
at com.zt.kafka.KafkaTest4.main(KafkaTest4.java:73)
Caused by: java.lang.NoSuchMethodException: 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(org.apache.hadoop.hive.conf.HiveConf)
at java.lang.Class.getMethod(Class.java:1786)
at 
org.apache.flink.table.catalog.hive.client.HiveShimV120.getHiveMetastoreClient(HiveShimV120.java:54)
... 7 mor




请问这个是什么问题 Metastore 也已经启动了。
谢谢

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
Hi op
   这个情况比较奇怪。我想确认下:
   1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
   2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢

   另外,你 TTL 其他的配置是怎么设置的呢?

从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
Best,
Congxian


op <520075...@qq.com> 于2020年8月5日周三 下午2:46写道:

>  
> 你好,我使用的是FsStateBackend状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
>  设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
>  观察到的checkpoint shared 目录大小一直在增加,也确认过group
> by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
>  运行5天能满足清理条件
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年8月3日(星期一) 下午5:50
> 收件人:"user-zh"
> 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi
>  能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
> 目录的数据量看,有增长,后续基本持平。现在
> Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果 checkpoint
> 之间,数据改动很多的话,这个值会变大
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> Best,
> Congxian
>
>
> op <520075...@qq.com 于2020年8月3日周一 下午2:18写道:
>
>  nbsp; nbsp;
>  同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
>  逻辑是按照 事件day 和 id 进行groupby
>  然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
>  tConfig.setIdleStateRetentionTime(Time.minutes(1440),
>  Time.minutes(1440+10))
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
>  "user-zh"
> 
>  <
>  384939...@qq.comgt;;
>  发送时间:nbsp;2020年8月3日(星期一) 中午1:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> 
> 
> 
>  hi,您好:
>  我改回增量模式重新收集了一些数据:
>  1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
>  2、checkpoint是interval设置的是5秒
>  3、目前这个作业是每分钟一个窗口
>  4、并行度设置的1,使用on-yarn模式
> 
>  刚启动的时候,如下:
>   
>  18分钟后,如下:
>   
>  checkpoints设置:
>   
>  hdfs上面大小:
>   
>  页面上看到的大小:
>  <
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pnggt
> ;
> 
> 
>  Congxian Qiu wrote
>  gt; Hinbsp;nbsp; 鱼子酱
>  gt;nbsp;nbsp;nbsp;nbsp; 能否把在使用增量 checkpoint
> 的模式下,截图看一下 checkpoint
>  size 的走势呢?另外可以的话,也麻烦你在每次
>  gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
>  gt;nbsp;nbsp;nbsp;nbsp;
> 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>  gt;
>  gt; Best,
>  gt; Congxian
>  gt;
>  gt;
>  gt; 鱼子酱 <
> 
>  gt; 384939718@
> 
>  gt;gt; 于2020年7月30日周四 上午10:43写道:
>  gt;
>  gt;gt; 感谢!
>  gt;gt;
>  gt;gt; flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>  gt;gt; 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>  gt;gt; StateBackend backend =new
>  gt;gt;
>  gt;gt;
> 
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>  gt;gt; StateBackend backend =new
>  gt;gt;
>  gt;gt;
> 
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>  gt;gt;
>  gt;gt;
>  gt;gt; 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>  gt;gt; RocksDBStateBackend:
>  gt;gt; amp;lt;
>  http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;gt
> ;
>  gt;gt; FsStateBackend:
>  gt;gt; amp;lt;
>  http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;gt
> ;
>  gt;gt;
>  gt;gt;
>  gt;gt;
>  gt;gt;
>  gt;gt; --
>  gt;gt; Sent from: http://apache-flink.147419.n8.nabble.com/
>  ; gt;gt;
> 
> 
> 
> 
> 
>  --
>  Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 文章 Yang Wang
Hi Eleanore,

Yes, I suggest to use Job to replace Deployment. It could be used to run
jobmanager one time and finish after a successful/failed completion.

However, using Job still could not solve your problem completely. Just as
Till said, When a job exhausts the restart strategy, the jobmanager
pod will terminate with non-zero exit code. It will cause the K8s
restarting it again. Even though we could set the resartPolicy and
backoffLimit,
this is not a clean and correct way to go. We should terminate the
jobmanager process with zero exit code in such situation.

@Till Rohrmann  I just have one concern. Is it a
special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
terminating with
non-zero exit code is harmless.


Best,
Yang

Eleanore Jin  于2020年8月4日周二 下午11:54写道:

> Hi Yang & Till,
>
> Thanks for your prompt reply!
>
> Yang, regarding your question, I am actually not using k8s job, as I put
> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
> deployment for job manager, and 1 k8s deployment for task manager, and 1
> k8s service for job manager.
>
> As you mentioned above, if flink job is marked as failed, it will cause
> the job manager pod to be restarted. Which is not the ideal behavior.
>
> Do you suggest that I should change the deployment strategy from using k8s
> deployment to k8s job? In case the flink program exit with non-zero code
> (e.g. exhausted number of configured restart), pod can be marked as
> complete hence not restarting the job again?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>
>> @Till Rohrmann  In native mode, when a Flink
>> application terminates with FAILED state, all the resources will be cleaned
>> up.
>>
>> However, in standalone mode, I agree with you that we need to rethink the
>> exit code of Flink. When a job exhausts the restart
>> strategy, we should terminate the pod and do not restart again. After
>> googling, it seems that we could not specify the restartPolicy
>> based on exit code[1]. So maybe we need to return a zero exit code to
>> avoid restarting by K8s.
>>
>> [1].
>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>
>>> @Yang Wang  I believe that we should rethink the
>>> exit codes of Flink. In general you want K8s to restart a failed Flink
>>> process. Hence, an application which terminates in state FAILED should not
>>> return a non-zero exit code because it is a valid termination state.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:
>>>
 Hi Eleanore,

 I think you are using K8s resource "Job" to deploy the jobmanager.
 Please set .spec.template.spec.restartPolicy = "Never" and
 spec.backoffLimit = 0.
 Refer here[1] for more information.

 Then, when the jobmanager failed because of any reason, the K8s job
 will be marked failed. And K8s will not restart the job again.

 [1].
 https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup


 Best,
 Yang

 Eleanore Jin  于2020年8月4日周二 上午12:05写道:

> Hi Till,
>
> Thanks for the reply!
>
> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
> Specifically, I build a custom docker image, which I copied the app jar
> (not uber jar) and all its dependencies under /flink/lib.
>
> So my question is more like, in this case, if the job is marked as
> FAILED, which causes k8s to restart the pod, this seems not help at all,
> what are the suggestions for such scenario?
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>
> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
> wrote:
>
>> Hi Eleanore,
>>
>> how are you deploying Flink exactly? Are you using the application
>> mode with native K8s support to deploy a cluster [1] or are you manually
>> deploying a per-job mode [2]?
>>
>> I believe the problem might be that we terminate the Flink process
>> with a non-zero exit code if the job reaches the ApplicationStatus.FAILED
>> [3].
>>
>> cc Yang Wang have you observed a similar behavior when running Flink
>> in per-job mode on K8s?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
>> [3]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>>
>> On Fri, Jul 31, 2020 at 6:26 PM 

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 文章 Xingbo Huang
Hi,

I found that the spark community is also working on redesigning pyspark
documentation[1] recently. Maybe we can compare the difference between our
document structure and its document structure.

[1] https://issues.apache.org/jira/browse/SPARK-31851
http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html

Best,
Xingbo

David Anderson  于2020年8月5日周三 上午3:17写道:

> I'm delighted to see energy going into improving the documentation.
>
> With the current documentation, I get a lot of questions that I believe
> reflect two fundamental problems with what we currently provide:
>
> (1) We have a lot of contextual information in our heads about how Flink
> works, and we are able to use that knowledge to make reasonable inferences
> about how things (probably) work in cases we aren't so familiar with. For
> example, I get a lot of questions of the form "If I use  will
> I still have exactly once guarantees?" The answer is always yes, but they
> continue to have doubts because we have failed to clearly communicate this
> fundamental, underlying principle.
>
> This specific example about fault tolerance applies across all of the
> Flink docs, but the general idea can also be applied to the Table/SQL and
> PyFlink docs. The guiding principles underlying these APIs should be
> written down in one easy-to-find place.
>
> (2) The other kind of question I get a lot is "Can I do  with ?"
> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
> very difficult to answer because it is frequently the case that one has to
> reason about why a given feature doesn't seem to appear in the
> documentation. It could be that I'm looking in the wrong place, or it could
> be that someone forgot to document something, or it could be that it can in
> fact be done by applying a general mechanism in a specific way that I
> haven't thought of -- as in this case, where one can use a JDBC sink from
> Python if one thinks to use DDL.
>
> So I think it would be helpful to be explicit about both what is, and what
> is not, supported in PyFlink. And to have some very clear organizing
> principles in the documentation so that users can quickly learn where to
> look for specific facts.
>
> Regards,
> David
>
>
> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
> wrote:
>
>> Hi Seth and David,
>>
>> I'm very happy to have your reply and suggestions. I would like to share
>> my thoughts here:
>>
>> The main motivation we want to refactor the PyFlink doc is that we want
>> to make sure that the Python users could find all they want starting from
>> the PyFlink documentation mainpage. That’s, the PyFlink documentation
>> should have a catalogue which includes all the functionalities available in
>> PyFlink. However, this doesn’t mean that we will make a copy of the content
>> of the documentation in the other places. It may be just a reference/link
>> to the other documentation if needed. For the documentation added under
>> PyFlink mainpage, the principle is that it should only include Python
>> specific content, instead of making a copy of the Java content.
>>
>> >>  I'm concerned that this proposal duplicates a lot of content that
>> will quickly get out of sync. It feels like it is documenting PyFlink
>> separately from the rest of the project.
>>
>> Regarding the concerns about maintainability, as mentioned above, The
>> goal of this FLIP is to provide an intelligible entrance of Python API, and
>> the content in it should only contain the information which is useful for
>> Python users. There are indeed many agenda items that duplicate the Java
>> documents in this FLIP, but it doesn't mean the content would be copied
>> from Java documentation. i.e, if the content of the document is the same as
>> the corresponding Java document, we will add a link to the Java document.
>> e.g. the "Built-in functions" and "SQL". We only create a page for the
>> Python-only content, and then redirect to the Java document if there is
>> something shared with Java. e.g. "Connectors" and "Catalogs". If the
>> document is Python-only and already exists, we will move it from the old
>> python document to the new python document, e.g. "Configurations". If the
>> document is Python-only and not exists before, we will create a new page
>> for it. e.g. "DataTypes".
>>
>> The main reason we create a new page for Python Data Types is that it is
>> only conceptually one-to-one correspondence with Java Data Types, but the
>> actual document content would be very different from Java DataTypes. Some
>> detailed difference are as following:
>>
>>
>>
>>   - The text in the Java Data Types document is written for JVM-based
>> language users, which is incomprehensible to users who only understand
>> python.
>>
>>   - Currently the Python Data Types does not support the "bridgedTo"
>> method, DataTypes.RAW, DataTypes.NULL and User Defined Types.
>>
>>   - The section "Planner 

?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 文章 op
  
FsStateBackend??5checkpoint??300ms
 
??1440minute??5
 checkpoint shared group 
by??key??
 5




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian


op <520075...@qq.com ??2020??8??3?? 2:18??

 nbsp; nbsp;
 
1.11.0hdfscheckpoint??checkpoint3??
 ?? day ?? id groupby
 
7watermark??
 tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 Time.minutes(1440+10))




 --nbsp;nbsp;--
 ??:

  "user-zh"

  <
 384939...@qq.comgt;;
 :nbsp;2020??8??3??(??) 1:50
 ??:nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pnggt;

 18??