关于FlinkSQL的窗口和触发

2020-08-31 Thread BenChen
Hi all,
在FlinkSQL中,我知道可以通过group by window去做窗口处理,但是如果触发时间和窗口时间不一致,如何去表达呢?
比如说,统计一天的PV和UV,每隔10S更新数据,在StreamingApi里面,可以通过timeWindow(1Day) + 
trigger(10Seconds)去实现,但是到FlinkSQL上要如何实现呢?


感激不尽。


| |
BenChen
|
|
haibin...@163.com
|
签名由网易邮箱大师定制



Re: flink checkpoint导致反压严重

2020-08-31 Thread Congxian Qiu
Hi
如果我理解没错的话,这种 单 key 热点的问题,需要算 中位数(无法像 sum/count
这样分步计算的),只能通过现在你写的这种方法,先分布聚合,然后最终再计算中位数。不过或许可以找找数学方法,看有没有近似的算法
Best,
Congxian


赵一旦  于2020年9月1日周二 上午10:15写道:

> (1)url理论上足够多,也足够随机。而并行度比如是30,url理论上是万、十万、百万、千万级别,理论上不会出现数据倾斜吧。
> (2)如果的确有倾斜,那么你那个方法我看不出有啥用,我看你好像是全缓存下来?这没啥用吧。
> (3)我的思路,考虑到你是要求1分钟窗口,每个url维度的,response的中位数。所以本质需要url+time维度的全部response数据排序。
>  由于url数量可能比较少(比如和并行度类似),导致了数据倾斜。所以key不能仅用url,需要分步。
>
>
>  
> 分步方法:如果url的访问量总体极大,则response的值应该有很大重复,比如url1对应response=2ms的有1000个,对应3ms的有500个等这种量级。这样的话直接url+response作为key作为第一级统计也可以降低很大压力,同时加了response后应该就够分散了。第2级别拿到的是url级别的不同response+出现次数的数据。根据这些是可以计算中位数的,同时第二步压力也低,因为url1可能有1w流量,但response的不同值可能是100个。
>  前提背景:每个url的流量 >> 该url的response不同值(即具有相同response+url的流量不少)。
>
> JasonLee <17610775...@163.com> 于2020年8月31日周一 下午7:31写道:
>
> > hi
> >
> > 我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀.
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink json ddl解析

2020-08-31 Thread zilong xiao
like this:  ARRAY>>

Dream-底限  于2020年9月1日周二 上午11:40写道:

> hi
>
> 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
> array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常
>
>
> private static TypeInformation convertArray(String location,
> JsonNode node, JsonNode root) {
>// validate items
>if (!node.has(ITEMS)) {
>   throw new IllegalArgumentException(
>  "Arrays must specify an '" + ITEMS + "' property in node: " +
> location);
>}
>final JsonNode items = node.get(ITEMS);
>
>// list (translated to object array)
>if (items.isObject()) {
>   final TypeInformation elementType = convertType(
>  location + '/' + ITEMS,
>  items,
>  root);
>   // result type might either be ObjectArrayTypeInfo or
> BasicArrayTypeInfo for Strings
>   return Types.OBJECT_ARRAY(elementType);
>}
>// tuple (translated to row)
>else if (items.isArray()) {
>   final TypeInformation[] types = convertTypes(location + '/' +
> ITEMS, items, root);
>
>   // validate that array does not contain additional items
>   if (node.has(ADDITIONAL_ITEMS) &&
> node.get(ADDITIONAL_ITEMS).isBoolean() &&
> node.get(ADDITIONAL_ITEMS).asBoolean()) {
>  throw new IllegalArgumentException(
> "An array tuple must not allow additional items in node: "
> + location);
>   }
>
>   return Types.ROW(types);
>}
>throw new IllegalArgumentException(
>   "Invalid type for '" + ITEMS + "' property in node: " + location);
> }
>


flink json ddl解析

2020-08-31 Thread Dream-底限
hi
我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常


private static TypeInformation convertArray(String location,
JsonNode node, JsonNode root) {
   // validate items
   if (!node.has(ITEMS)) {
  throw new IllegalArgumentException(
 "Arrays must specify an '" + ITEMS + "' property in node: " +
location);
   }
   final JsonNode items = node.get(ITEMS);

   // list (translated to object array)
   if (items.isObject()) {
  final TypeInformation elementType = convertType(
 location + '/' + ITEMS,
 items,
 root);
  // result type might either be ObjectArrayTypeInfo or
BasicArrayTypeInfo for Strings
  return Types.OBJECT_ARRAY(elementType);
   }
   // tuple (translated to row)
   else if (items.isArray()) {
  final TypeInformation[] types = convertTypes(location + '/' +
ITEMS, items, root);

  // validate that array does not contain additional items
  if (node.has(ADDITIONAL_ITEMS) &&
node.get(ADDITIONAL_ITEMS).isBoolean() &&
node.get(ADDITIONAL_ITEMS).asBoolean()) {
 throw new IllegalArgumentException(
"An array tuple must not allow additional items in node: "
+ location);
  }

  return Types.ROW(types);
   }
   throw new IllegalArgumentException(
  "Invalid type for '" + ITEMS + "' property in node: " + location);
}


Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询

2020-08-31 Thread Qishang
Hi.
我们也遇到一样的场景,现在您是否有一些具体实施和思考可以交流一下吗?

USERNAME  于2020年8月13日周四 下午3:27写道:

>
>
> 任务流程:
> OGG->KAFKA->FLINK->HIVE
>
>
> KAFKA数据样例:
> 其中会有多个
> "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
> {
> "table": "SCOOT.TABLENAME",
> "op_type": "U",
> "op_ts": "2020-08-11 07:53:40.008001",
> "current_ts": "2020-08-11T15:56:41.233000",
> "pos": "980119769930",
> "before": {
> "C1": 4499000,
> "C2": null,
> "C3": null,
> "C4": null,
> "C5": null
> },
> "after": {
> "C1": 4499000,
> "C2": null,
> "C3": "",
> "C4": "",
> "C5": "通过"
> }
> }
> 问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
> 看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?
>
>
> 例如 样例数据在hive中建表
> create table TABLENAME
> (
> op_type  STRING,
> op_ts  STRING,
> current_ts   STRING,
> pos STRING,
> "C1" STRING,
> "C2" STRING,
> "C3" STRING,
> "C4" STRING,
> "C5" STRING
> )
> 理解的难点,
> 1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
> 2.同一FLINK任务会有新增的表,需自动适配
> 3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等
>
>
> 或者只能采用通过表结构
> create table TABLENAME
> (
> table   STRING,
> op_type  STRING,
> op_ts  STRING,
> current_ts   STRING,
> pos STRING,
> "before"  STRING,
> "after" STRING
> )
> 然后剩下的在HIVE中解决。
>
>
> 或者有其他更好的方案?
>
>


使用orc写动态分区表时,文件大小滚动策略不起作用

2020-08-31 Thread wei.wei
当一个task同时写多个bucket的时候,会因为FIXED_PATH覆盖之前注册的writer callback,导致checkMemory不能回调;

https://issues.apache.org/jira/browse/FLINK-18915



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink checkpoint导致反压严重

2020-08-31 Thread 赵一旦
(1)url理论上足够多,也足够随机。而并行度比如是30,url理论上是万、十万、百万、千万级别,理论上不会出现数据倾斜吧。
(2)如果的确有倾斜,那么你那个方法我看不出有啥用,我看你好像是全缓存下来?这没啥用吧。
(3)我的思路,考虑到你是要求1分钟窗口,每个url维度的,response的中位数。所以本质需要url+time维度的全部response数据排序。
 由于url数量可能比较少(比如和并行度类似),导致了数据倾斜。所以key不能仅用url,需要分步。

 
分步方法:如果url的访问量总体极大,则response的值应该有很大重复,比如url1对应response=2ms的有1000个,对应3ms的有500个等这种量级。这样的话直接url+response作为key作为第一级统计也可以降低很大压力,同时加了response后应该就够分散了。第2级别拿到的是url级别的不同response+出现次数的数据。根据这些是可以计算中位数的,同时第二步压力也低,因为url1可能有1w流量,但response的不同值可能是100个。
 前提背景:每个url的流量 >> 该url的response不同值(即具有相同response+url的流量不少)。

JasonLee <17610775...@163.com> 于2020年8月31日周一 下午7:31写道:

> hi
>
> 我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀.
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink1.11连接mysql问题

2020-08-31 Thread amen...@163.com
如果是mysql5.x以上的版本,url中autoReconnect参数会无效吧,

可以尝试下修改配置文件wait_timeout/interactive_out参数

best,
amenhub
 
发件人: 酷酷的浑蛋
发送时间: 2020-08-31 20:48
收件人: user-zh@flink.apache.org
主题: 回复: flink1.11连接mysql问题
 
 
下面是我连接mysql的配置,用的flink-1.11.1,还是报那个错误
CREATE TABLE xx(
  `xx` varchar,
  `xx` varchar
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx/xx?autoReconnect=true=false',
'table-name' = ‘xx',
'driver' = 'com.mysql.jdbc.Driver',
'username' = ‘xx',
'password' = ‘xx',
'scan.partition.column' = 'id',
'scan.partition.num' = '50',
'scan.partition.lower-bound' = '500',
'scan.partition.upper-bound' = '1000',
'scan.fetch-size' = '100',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '10s'
);
在2020年08月31日 17:33,Leonard Xu 写道:
 
 
在 2020年8月28日,15:02,酷酷的浑蛋  写道:
 
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem.
 
 
 
 
flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用
 
 
 
Hi
 
超时断开问题在1.11应该已经修复[1],你是怎么使用的?可以提供更多的信息吗
 
Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16681 

 


Re: flink-1.11.1 Table API /SQL 无法写入hive orc表

2020-08-31 Thread amen...@163.com
hi Jian Wang,

根据我的理解,在flink 
lib目录下导入官方的flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar是指hive[2.0.0-2.2.0]版本都可以使用此依赖。

关于你的问题我曾经遇到过,hive也是2.1.1,我的demo参考[1]可以运行成功,而不需要额外导入flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar,
只需要把[1]中的依赖改成provided并把其jar包导入flink/lib即可。

希望能帮到你,

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#program-maven

best,
amenhub

 
发件人: Jian Wang
发送时间: 2020-08-31 21:55
收件人: user-zh
主题: flink-1.11.1 Table API /SQL 无法写入hive orc表
Hi all,
 
我基于flink 1.11 + hadoop 3.0.0 + hive 2.1.1 , flink on yarn模式,在streaming 
job上的Table API上执行flink sql实时写入hive表。
 
根据文档 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 
 
去配置,现在遇到flink和hive的依赖问题。
 
 
在项目内的pom上,所有相关hive相关依赖都是provided,在flink 
lib下放进flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar, 
提交任务的时候,会出现hive-exec.jar的冲突导致java.lang.NoClassDefFoundError: Could not 
initialize class org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 
(因为我的hive是2.1.1版本,flink没有提供flink-sql-connector-hive的2.1.1版本,所以我用的和它最近的2.2.0)。  
 
我又尝试了根据我的hive版本2.1.1, 
去根据flink源码,把hive-exec改成2.1.1去手动打包flink-sql-connector-hive-2.1.1_2.11-1.11.1.jar放到flink
 lib下, 
但是发现flink-sql-connector-hive里面的orc-core-1.4.3和hive-exec-2.1.1冲突,java.lang.NoSuchMethodError:
 
org.apache.orc.TypeDescription.fromString(Ljava/lang/String;)Lorg/apache/orc/TypeDescription;
 
我看到在flink提供的flink-sql-connector-hive中确实没有2.1.1的版本,是这个版本无法和flink兼容吗?或 有flink 
1.11和hive 2.1.1集成成功的范例参考吗?
 
谢谢
 
 
王剑 


Re: 请教 hive streaming 报错

2020-08-31 Thread liangck
遇到同样的问题,请问解决了吗。我是flink-connector-hive和hive-exec打进jar包里提交的。但是
flink-connector-hive里有个org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder类,引用了streaming-java包里的org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl。估计是因为类加载器不同导致无法引用报错。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink-1.11.1 Table API /SQL 无法写入hive orc表

2020-08-31 Thread Jian Wang
Hi all,

我基于flink 1.11 + hadoop 3.0.0 + hive 2.1.1 , flink on yarn模式,在streaming 
job上的Table API上执行flink sql实时写入hive表。

根据文档 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 
 
去配置,现在遇到flink和hive的依赖问题。


在项目内的pom上,所有相关hive相关依赖都是provided,在flink 
lib下放进flink-sql-connector-hive-2.2.0_2.11-1.11.1.jar, 
提交任务的时候,会出现hive-exec.jar的冲突导致java.lang.NoClassDefFoundError: Could not 
initialize class org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 
(因为我的hive是2.1.1版本,flink没有提供flink-sql-connector-hive的2.1.1版本,所以我用的和它最近的2.2.0)。  

我又尝试了根据我的hive版本2.1.1, 
去根据flink源码,把hive-exec改成2.1.1去手动打包flink-sql-connector-hive-2.1.1_2.11-1.11.1.jar放到flink
 lib下, 
但是发现flink-sql-connector-hive里面的orc-core-1.4.3和hive-exec-2.1.1冲突,java.lang.NoSuchMethodError:
 
org.apache.orc.TypeDescription.fromString(Ljava/lang/String;)Lorg/apache/orc/TypeDescription;

我看到在flink提供的flink-sql-connector-hive中确实没有2.1.1的版本,是这个版本无法和flink兼容吗?或 有flink 
1.11和hive 2.1.1集成成功的范例参考吗?

谢谢


王剑 

Re: SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-31 Thread Averell
Hello Robert, Arvid,

As I am running on EMR, and currently AWS only supports version 1.10.
I tried both solutions that you suggested ((i) copying a SAXParser
implementation to the plugins folder and (ii) using the S3FS Plugin from
1.10.1), and both worked - I could have successful checkpoints.

However, intermittenly my checkpoints still fail (about 10%). And whenever
it fails, there are non-completed files left in S3 (attached screenshot
below).
I'm not sure whether those uncompleted files are expected, or is that a bug?

Thanks and regards,
Averell

 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Exception on s3 committer

2020-08-31 Thread Ivan Yang
Hi Yun,

Thank you so much for you suggestion.

(1) The job couldn’t restore from the last checkpoint. The exception is in my 
original email.
(2) No, I didn’t change any multipart upload settings. 
(3) The file is gone. I have another batch process that reads Flink output s3 
bucket and pushes objects to another bucket. Upon success read and write, The 
batch job will delete the file. What’s puzzling me is if Flink hasn’t 
successfully commit the multipart file, it should not be visible to the batch 
job. It looks the situation is while Flink tried to commit the multipart file, 
it crashed and restarted. The file is committed on s3 successfully, but not 
acknowledge recorded on Flink side. In between, the batch job consumed the 
file. I don’t know if that’s possible.

Thanks
Ivan

> On Aug 30, 2020, at 11:10 PM, Yun Gao  wrote:
> 
> 
> Hi Ivan,
> 
>I think there might be some points to check:
> 
>1. Is the job restored from the latest successful checkpoint after restart 
> ? 
>2. Have you ever changed the timeout settings for uncompleted multipart 
> upload ?
>3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 
> exist or not ?
> 
> Best,
>  Yun
> 
> --Original Mail --
> Sender:Ivan Yang 
> Send Date:Sat Aug 29 12:43:28 2020
> Recipients:user 
> Subject:Exception on s3 committer
> Hi all,
> 
> We got this exception after a job restart. Does anyone know what may lead to 
> this situation? and how to get pass this Checkpoint issue? Prior to this, the 
> job failed due to “Checkpoint expired before completing.” We are s3 heavy, 
> writing out 10K files to s3 every 10 minutes using 
> StreamingFileSink/BulkFormat to various s3 prefixes. Thanks in advance. -Ivan
> 
> 2020-08-28 15:17:58
> java.io .IOException: Recovering commit failed for object 
> cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object 
> does not exist and MultiPart Upload 
> 3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw-
>  is not valid.
> at 
> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io 

Re: Debezium Flink EMR

2020-08-31 Thread Marta Paes Moreira
Hey, Rex!

This is likely due to the tombstone records that Debezium produces for
DELETE operations (i.e. a record with the same key as the deleted row and a
value of null). These are markers for Kafka to indicate that log compaction
can remove all records for the given key, and the initial implementation of
the debezium-format can't handle them. This issue is already documented
(and solved) in [1].

In the meantime, can you try adding "tombstones.on.delete":false" to the
configuration of your Debezium MySQL connector? Marta
[1] https://issues.apache.org/jira/browse/FLINK-18705

On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley  wrote:

> Hi, getting so close but ran into another issue:
>
> Flink successfully reads changes from Debezium/Kafka and writes them to
> Elasticsearch, but there's a problem with deletions. When I DELETE a row
> from MySQL the deletion makes it successfully all the way to Elasticsearch
> which is great, but then the taskmanager suddenly dies with a null pointer
> exception. Inserts and Updates do not have the same problem. This seems
> very odd. Any help would be much appreciated. Thanks!
>
> flink-taskmanager_1| 2020-08-31 23:30:33,684 WARN
>  org.apache.flink.runtime.taskmanager.Task[] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
> flink-taskmanager_1| java.lang.NullPointerException: null
> flink-taskmanager_1| at java.lang.String.(String.java:566)
> ~[?:1.8.0_265]
> flink-taskmanager_1| at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-taskmanager_1| at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1| at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1| at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1| at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1| at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1| at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1| at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1| 2020-08-31 23:30:33,720 INFO
>  org.apache.flink.runtime.taskmanager.Task[] - Freeing
> task resources for Source: TableSourceScan(table=[[default_catalog,
> default_database, topic_addresses]], fields=[id, customer_id, street, city,
> state, zip, type]) -> Sink:
> Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
> customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8).
> flink-taskmanager_1| 2020-08-31 23:30:33,728 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
> Un-registering task and sending final execution state FAILED to JobManager
> for task Source: TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> 2b79917cb528f37fad7f636740d2fdd8.
> flink-jobmanager_1 | 2020-08-31 23:30:33,770 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
> flink-jobmanager_1 | java.lang.NullPointerException: null
> flink-jobmanager_1 | at java.lang.String.(String.java:566)
> ~[?:1.8.0_265]
> 

Re: Debezium Flink EMR

2020-08-31 Thread Rex Fenley
Hi, getting so close but ran into another issue:

Flink successfully reads changes from Debezium/Kafka and writes them to
Elasticsearch, but there's a problem with deletions. When I DELETE a row
from MySQL the deletion makes it successfully all the way to Elasticsearch
which is great, but then the taskmanager suddenly dies with a null pointer
exception. Inserts and Updates do not have the same problem. This seems
very odd. Any help would be much appreciated. Thanks!

flink-taskmanager_1| 2020-08-31 23:30:33,684 WARN
 org.apache.flink.runtime.taskmanager.Task[] - Source:
TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
flink-taskmanager_1| java.lang.NullPointerException: null
flink-taskmanager_1| at java.lang.String.(String.java:566)
~[?:1.8.0_265]
flink-taskmanager_1| at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-taskmanager_1| at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1| at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1| at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1| at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1| at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1| at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1| at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1| 2020-08-31 23:30:33,720 INFO
 org.apache.flink.runtime.taskmanager.Task[] - Freeing
task resources for Source: TableSourceScan(table=[[default_catalog,
default_database, topic_addresses]], fields=[id, customer_id, street, city,
state, zip, type]) -> Sink:
Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8).
flink-taskmanager_1| 2020-08-31 23:30:33,728 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Un-registering task and sending final execution state FAILED to JobManager
for task Source: TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
2b79917cb528f37fad7f636740d2fdd8.
flink-jobmanager_1 | 2020-08-31 23:30:33,770 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
flink-jobmanager_1 | java.lang.NullPointerException: null
flink-jobmanager_1 | at java.lang.String.(String.java:566)
~[?:1.8.0_265]
flink-jobmanager_1 | at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1 | at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[?:?]
flink-jobmanager_1 | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[?:?]
flink-jobmanager_1 | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[?:?]
flink-jobmanager_1 | at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

Re: Flink SQL Streaming Join Creates Duplicates

2020-08-31 Thread Austin Cawley-Edwards
Hey Arvid,

Yes, I was able to self-answer this one. Was just confused on the
non-deterministic behavior of the FULL OUTER join statement. Thinking
through it and took a harder read through the Dynamic Tables doc section[1]
where "Result Updating" is hinted at, and the behavior makes total sense in
a streaming env.

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/dynamic_tables.html

On Mon, Aug 31, 2020 at 5:16 AM Arvid Heise  wrote:

> Hi Austin,
>
> Do I assume correctly, that you self-answered your question? If not, could
> you please update your current progress?
>
> Best,
>
> Arvid
>
> On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Ah, I think the "Result Updating" is what got me -- INNER joins do the
>> job!
>>
>> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> oops, the example query should actually be:
>>>
>>> SELECT table_1.a, table_1.b, table_2.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> and duplicate results should actually be:
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = "data b 1", c = null)
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = "data b 2", c = null)
>>>
>>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey all,

 I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
 reading from a few CSV files and joins some records across them into a
 couple of data streams (yes, this could be a batch job won't get into why
 we chose streams unless it's relevant). These joins are producing some
 duplicate records, one with the joined field present and one with the
 joined field as `null`, though this happens only ~25% of the time. Reading
 the docs on joins[1], I thought this could be caused by too strict Idle
 State Retention[2], so I increased that to min, max (15min, 24h) but that
 doesn't seem to have an effect, and the problem still occurs when testing
 on a subset of data that finishes processing in under a minute.

 The query roughly looks like:

 table_1 has fields a, b
 table_2 has fields b, c

 SELECT table_1.a, table_1.b, table_1.c
 FROM table_1
 LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

 Correct result:
 Record(a = "data a 1", b = "data b 1", c = "data c 1")
 Record(a = "data a 2", b = "data b 2", c = "data c 2")

 Results seem to be anywhere between all possible dups and the correct
 result.

 Record(a = "data a 1", b = "data b 1", c = "data c 1")
 Record(a = "data a 1", b = null, c = "data c 1")
 Record(a = "data a 2", b = "data b 2", c = "data c 2")
 Record(a = "data a 2", b = null, c = "data c 2")

 The CSV files are registered as Flink Tables with the following:

 tableEnv.connect(
 new FileSystem()
 .path(path)
 )
 .withFormat(
 new Csv()
 .quoteCharacter('"')
 .ignoreParseErrors()
 )
 .withSchema(schema)
 .inAppendMode()
 .createTemporaryTable(tableName);


 I'm creating my table environment like so:

 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
 .useBlinkPlanner()
 .build();

 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
 tableEnvSettings);

 TableConfig tConfig = tEnv.getConfig();
 tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));


 Is there something I'm misconfiguring or have misunderstood the docs?

 Thanks,
 Austin

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
 [2]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time

>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Debezium Flink EMR

2020-08-31 Thread Rex Fenley
Ah, my bad, thanks for pointing that out Arvid!

On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise  wrote:

> Hi Rex,
>
> you still forgot
>
> 'debezium-json.schema-include' = true
>
> Please reread my mail.
>
>
> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley  wrote:
>
>> Thanks for the input, though I've certainly included a schema as is
>> reflected earlier in this thread. Including here again
>> ...
>> tableEnv.executeSql("""
>> CREATE TABLE topic_addresses (
>> -- schema is totally the same to the MySQL "addresses" table
>> id INT,
>> customer_id INT,
>> street STRING,
>> city STRING,
>> state STRING,
>> zip STRING,
>> type STRING,
>> PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'dbserver1.inventory.addresses',
>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>> 'properties.group.id' = 'testGroup',
>> 'format' = 'debezium-json' -- using debezium-json as the format
>> )
>> """)
>>
>> val table = tableEnv.from("topic_addresses").select($"*")
>> ...
>>
>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise  wrote:
>>
>>> Hi Rex,
>>>
>>> the connector expects a value without a schema, but the message contains
>>> a schema. You can tell Flink that the schema is included as written in the
>>> documentation [1].
>>>
>>> CREATE TABLE topic_products (
>>>   -- schema is totally the same to the MySQL "products" table
>>>   id BIGINT,
>>>   name STRING,
>>>   description STRING,
>>>   weight DECIMAL(10, 2)) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'products_binlog',
>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>  'properties.group.id' = 'testGroup',
>>>  'format' = 'debezium-json',
>>>  'debezium-json.schema-include' = true)
>>>
>>> @Jark Wu  , it would be probably good to make the
>>> connector more robust and catch these types of misconfigurations.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>
>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley  wrote:
>>>
 Awesome, so that took me a step further. When running i'm receiving an
 error however. FYI, my docker-compose file is based on the Debezium mysql
 tutorial which can be found here
 https://debezium.io/documentation/reference/1.2/tutorial.html

 Part of the stack trace:

 flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt
 Debezium JSON message
 '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
 

Re: Debezium Flink EMR

2020-08-31 Thread Arvid Heise
Hi Rex,

you still forgot

'debezium-json.schema-include' = true

Please reread my mail.


On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley  wrote:

> Thanks for the input, though I've certainly included a schema as is
> reflected earlier in this thread. Including here again
> ...
> tableEnv.executeSql("""
> CREATE TABLE topic_addresses (
> -- schema is totally the same to the MySQL "addresses" table
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'dbserver1.inventory.addresses',
> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'debezium-json' -- using debezium-json as the format
> )
> """)
>
> val table = tableEnv.from("topic_addresses").select($"*")
> ...
>
> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> the connector expects a value without a schema, but the message contains
>> a schema. You can tell Flink that the schema is included as written in the
>> documentation [1].
>>
>> CREATE TABLE topic_products (
>>   -- schema is totally the same to the MySQL "products" table
>>   id BIGINT,
>>   name STRING,
>>   description STRING,
>>   weight DECIMAL(10, 2)) WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'products_binlog',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'properties.group.id' = 'testGroup',
>>  'format' = 'debezium-json',
>>  'debezium-json.schema-include' = true)
>>
>> @Jark Wu  , it would be probably good to make the
>> connector more robust and catch these types of misconfigurations.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>
>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley  wrote:
>>
>>> Awesome, so that took me a step further. When running i'm receiving an
>>> error however. FYI, my docker-compose file is based on the Debezium mysql
>>> tutorial which can be found here
>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>
>>> Part of the stack trace:
>>>
>>> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt
>>> Debezium JSON message
>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>> cool street","city":"Big
>>> 

Re: Debezium Flink EMR

2020-08-31 Thread Rex Fenley
Thanks for the input, though I've certainly included a schema as is
reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise  wrote:

> Hi Rex,
>
> the connector expects a value without a schema, but the message contains a
> schema. You can tell Flink that the schema is included as written in the
> documentation [1].
>
> CREATE TABLE topic_products (
>   -- schema is totally the same to the MySQL "products" table
>   id BIGINT,
>   name STRING,
>   description STRING,
>   weight DECIMAL(10, 2)) WITH (
>  'connector' = 'kafka',
>  'topic' = 'products_binlog',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'debezium-json',
>  'debezium-json.schema-include' = true)
>
> @Jark Wu  , it would be probably good to make the
> connector more robust and catch these types of misconfigurations.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>
> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley  wrote:
>
>> Awesome, so that took me a step further. When running i'm receiving an
>> error however. FYI, my docker-compose file is based on the Debezium mysql
>> tutorial which can be found here
>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>
>> Part of the stack trace:
>>
>> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt Debezium
>> JSON message
>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>> cool street","city":"Big
>> City","state":"California","zip":"9","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.10","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>> flink-jobmanager_1 | 

Editing Rowtime for SQL Table

2020-08-31 Thread Satyam Shekhar
Hello,

I use Flink for continuous evaluation of SQL queries on streaming data. One
of the use cases requires us to run recursive SQL queries. I am unable to
find a way to edit rowtime time attribute of the intermediate result table.

For example, let's assume that there is a table T0 with schema -
root
 |-- str1: STRING
 |-- int1: BIGINT
 |-- utime: TIMESTAMP(3)
 |-- itime: TIMESTAMP(3) *ROWTIME*

Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");

I wish to change the rowtime of V0 from itime to utime. I tried doing -

V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());

but ran into the following exception -

org.apache.flink.table.api.ValidationException: Window properties can only
be used on windowed tables.
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
~[flink-table-api-java-1.11.1.jar:1.11.1]

Any guidance on how to address this?

Regards,
Satyam


Re: flink-sql-gateway还会更新吗

2020-08-31 Thread zongsforce
请教一下,我在用flink-sql-gateway的1.11.1版本的SET语法设置hive dialect时(SET
table.sql-dialect=hive),flink-sql-gateway出现了报错,同样的语法在sql-client是支持的,那如果我想在session级切换hive
dialect我应该怎么做呢?  多谢
我的环境如下:
flink-sql-gateway:1.11.1
flink:1.11.1
hive:3.1.2
hadoop:3.0.0 
日志如下:
2020-08-31 20:39:56,051 INFO 
com.ververica.flink.table.gateway.rest.session.Session   [] - Session:
dc5b8e66111bd59a4c7abe0acc625275, runstatement: set table.sql-dialect=hive
2020-08-31 20:39:56,051 ERROR
com.ververica.flink.table.gateway.rest.session.Session   [] - Session:
dc5b8e66111bd59a4c7abe0acc625275, Failed to parse statement: set
table.sql-dialect=hive
2020-08-31 20:39:56,052 ERROR
com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler [] -
Exception occurred in REST handler.
org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to parse
statement.
at
com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:91)
~[flink-sql-gateway-0.2-SNAPSHOT.jar:?]
...
Caused by: com.ververica.flink.table.gateway.utils.SqlGatewayException:
Failed to parse statement.
at
com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:102)
~[flink-sql-gateway-0.2-SNAPSHOT.jar:?]
at
com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
~[flink-sql-gateway-0.2-SNAPSHOT.jar:?]
... 43 more
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
"table" at line 1, column 5.
Was expecting one of:
 ...
 ...
 ...
 ...
 ...
...
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered
"table" at line 1, column 5.
Was expecting one of:
 ...
 ...
 ...
 ...
 ...

at
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
~[flink-table_2.11-1.11.1.jar:1.11.1]
...



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql-gateway还会更新吗

2020-08-31 Thread Tio Planto
请教一下,我在用flink-sql-gateway的1.11.1版本的SET语法设置hive dialect时(SET
table.sql-dialect=hive),flink-sql-gateway出现了报错,同样的语法在sql-client是支持的,那如果我想在session级切换hive
dialect我应该怎么做呢?  多谢
我的环境如下:

flink-sql-gateway:1.11.1

flink:1.11.1

hive:3.1.2

hadoop:3.0.0

日志如下:

2020-08-31 20:39:56,051 INFO
 com.ververica.flink.table.gateway.rest.session.Session   [] - Session:
dc5b8e66111bd59a4c7abe0acc625275, runstatement: set table.sql-dialect=hive
2020-08-31 20:39:56,051 ERROR
com.ververica.flink.table.gateway.rest.session.Session   [] - Session:
dc5b8e66111bd59a4c7abe0acc625275, Failed to parse statement: set
table.sql-dialect=hive
2020-08-31 20:39:56,052 ERROR
com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler [] -
Exception occurred in REST handler.
org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to parse
statement.
at
com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:91)
~[flink-sql-gateway-0.2-SNAPSHOT.jar:?]
at
com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
~[flink-sql-gateway-0.2-SNAPSHOT.jar:?]
at
com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
[flink-sql-gateway-0.2-SNAPSHOT.jar:?]
at
com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
[flink-sql-gateway-0.2-SNAPSHOT.jar:?]
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at

flink-1.11连接hive或filesystem问题

2020-08-31 Thread 酷酷的浑蛋
1. Create hive表(...)with(...)
我发现写入hive只能根据checkpoint去提交分区?可以按照文件大小或者间隔时间来生成吗?


2. Create table (connector=filesystem,format=json) with(…)
这种方式format只能等于json? 我怎么按照分隔符写入hdfs?

Re: FileSystemHaServices and BlobStore

2020-08-31 Thread Khachatryan Roman
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun  wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> --
> *From:* Alexey Trenikhun 
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> --
> *From:* Khachatryan Roman 
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun  wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use 
> FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>


回复: flink1.11连接mysql问题

2020-08-31 Thread 酷酷的浑蛋


下面是我连接mysql的配置,用的flink-1.11.1,还是报那个错误
CREATE TABLE xx(
  `xx` varchar,
  `xx` varchar
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx/xx?autoReconnect=true=false',
'table-name' = ‘xx',
'driver' = 'com.mysql.jdbc.Driver',
'username' = ‘xx',
'password' = ‘xx',
'scan.partition.column' = 'id',
'scan.partition.num' = '50',
'scan.partition.lower-bound' = '500',
'scan.partition.upper-bound' = '1000',
'scan.fetch-size' = '100',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '10s'
);
在2020年08月31日 17:33,Leonard Xu 写道:


在 2020年8月28日,15:02,酷酷的浑蛋  写道:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem.




flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用



Hi

超时断开问题在1.11应该已经修复[1],你是怎么使用的?可以提供更多的信息吗

Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16681 




Re: flink checkpoint导致反压严重

2020-08-31 Thread JasonLee
hi

我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀.



--
Sent from: http://apache-flink.147419.n8.nabble.com/

[ANNOUNCE] Weekly Community Update 2020/35

2020-08-31 Thread Konstantin Knauf
Dear community,

happy to share a brief community update for the past week with configurable
memory sharing between Flink and its Python "side car", stateful Python
UDFs, an introduction of our GSoD participants and a little bit more.

Flink Development
==

* [datastream api] Dawid has started a vote to remove DataStream#fold and
DataStream#split (both already deprecated) in Flink 1.12. [1]

* [runtime] Xintong has started a discussion thread for "Intra-Slot
Management Memory Sharing", which lets users configure the fraction of
managed memory that should be used by Flink internally (RocksDB or Batch
algorithms) on the one side and the Python process on the other side. [2]

* [python] Wei Zhon has started discussion for FLIP-139, which aims to add
support for *stateful *Python UDFs for the Table API/SQL. So far, only
stateless functions are supported. [3]

* [documentation] Kartik Khare and Mohammad Haseeb Asif will work with the
Apache Flink Community to improve the documentation of Flink SQL as part of
their participation in Google Season of Docs 2020. [4]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Remove-deprecated-DataStream-fold-and-DataStream-split-in-1-12-tp44229.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-tp44146.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-139-General-Python-User-Defined-Aggregate-Function-on-Table-API-tp44139.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Introducing-the-GSoD-2020-Participants-tp44144.html

flink-packages.org
==

* fabricalab has published a DynamoDB streaming source on flink-packages.org.
[5]

[5] https://flink-packages.org/packages/streaming-flink-dynamodb-connector

Notable Bugs
==

* [FLINK-18934] [1.11.1] Flink's mechanism to deal with idle
sources/partitions introduced in Flink 1.11 [6] does not currently work
with co-functions, union or joins. [7]

[6]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
[7] https://issues.apache.org/jira/browse/FLINK-18934

Events, Blog Posts, Misc
===

* Dian Fu is now part of the Apache Flink PMC. Congratulations! [8]

[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-PMC-member-Dian-Fu-tp44170p44240.html

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Packaging multiple Flink jobs from a single IntelliJ project

2020-08-31 Thread Manas Kale
Guess I figured out a solution for the first question as well - I am
packaging multiple main() classes in the same JAR and specifying entrypoint
classes when submitting the JAR. Most of my issues stemmed from an
improperly configured POM file and a mismatch in Flink runtime versions.
I'll assume this is the recommended way to go about doing this, thanks for
reading and have a great day!

On Mon, Aug 31, 2020 at 12:03 PM Manas Kale  wrote:

> Hi,
> I solved my second issue - I was not following Maven's convention for
> placing source code (I had not placed my source in src/main/java).
> However, I still would like some help with my first question - what is the
> recommended way to set a project with multiple main() classes? At the end,
> I would like to be able to run each main() class as a separate job. Should
> I create a single JAR and specify different entrypoint classes each time or
> should I create separate JARs for each main() class?
>
> On Mon, Aug 31, 2020 at 11:13 AM Manas Kale  wrote:
>
>> Hi,
>> I have an IntelliJ project that has multiple classes with main()
>> functions. I want to package this project as a JAR that I can submit to the
>> Flink cluster and specify the entry class when I start the job. Here are my
>> questions:
>>
>>- I am not really familiar with Maven and would appreciate some
>>pointers/examples. From what I understand, I will need to use some sort of
>>transformer in the Maven shade plugin to merge all of the classes. *If
>>this is correct, can I see a small example? *
>>- Also, I can't get a single main class working:
>>
>>
>> http://maven.apache.org/POM/4.0.0; 
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>4.0.0
>>
>>flink_summarization
>>flink_summarization
>>0.1
>>jar
>>
>>Flink Quickstart Job
>>http://www.myorganization.org
>>
>>
>>   UTF-8
>>   1.10.1
>>   1.8
>>   2.11
>>   ${java.version}
>>   ${java.version}
>>
>>
>>
>>   
>>  apache.snapshots
>>  Apache Development Snapshot Repository
>>  
>> https://repository.apache.org/content/repositories/snapshots/
>>  
>> false
>>  
>>  
>> true
>>  
>>   
>>
>>
>>
>>   
>>   
>>   
>>   
>>  org.apache.flink
>>  flink-java
>>  ${flink.version}
>>  provided
>>   
>>   
>>  org.apache.flink
>>  
>> flink-streaming-java_${scala.binary.version}
>>  ${flink.version}
>>  provided
>>   
>>
>>   
>>  org.apache.flink
>>  flink-connector-kafka_2.11
>>  ${flink.version}
>>   
>>
>>   
>>  org.apache.flink
>>  flink-state-processor-api_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>>   
>>  org.apache.flink
>>  flink-connector-jdbc_2.11
>>  1.11.0
>>   
>>
>>   
>>   
>>   
>>  org.slf4j
>>  slf4j-log4j12
>>  1.7.7
>>  runtime
>>   
>>   
>>  log4j
>>  log4j
>>  1.2.17
>>  runtime
>>   
>>
>>   
>>   
>>  org.apache.flink
>>  flink-test-utils_${scala.binary.version}
>>  ${flink.version}
>>  test
>>   
>>   
>>  org.apache.flink
>>  flink-runtime_2.11
>>  ${flink.version}
>>  test
>>  tests
>>   
>>   
>>  org.apache.flink
>>  flink-streaming-java_2.11
>>  ${flink.version}
>>  test
>>  tests
>>   
>>   
>>  org.assertj
>>  assertj-core
>>  
>>  3.16.1
>>  test
>>   
>>
>>
>>
>>
>>
>>   
>>
>>  
>>  
>> org.apache.maven.plugins
>> maven-compiler-plugin
>> 3.1
>> 
>>${java.version}
>>${java.version}
>> 
>>  
>>
>>  
>>  
>>  
>> org.apache.maven.plugins
>> maven-shade-plugin
>> 3.0.0
>> 
>> 
>>false
>> 
>> 
>>
>>
>>   package
>>   
>>  shade
>>   
>>   
>>  
>> 
>>org.apache.flink:force-shading
>>com.google.code.findbugs:jsr305
>>org.slf4j:*
>>log4j:*
>> 
>>  
>>  
>> 
>>
>>*:*
>>
>>  

Re: runtime memory management

2020-08-31 Thread Xintong Song
Well, that's a long story. In general, there are 2 steps.

   1. *Which operators are deployed in the same slot?* Operators are first
   *chained*[1] together, then a *slot sharing strategy*[2] is applied by
   default.
   2. *Which task managers are slots allocated from?*
  1. For active deployments (Kubernetes, Yarn, Mesos), task
  managers are launched on demand. That means ideally you should
not have too
  many empty slots.
  2. For the standalone deployment, by default slots are allocated
  randomly from all registered task managers. You can configure[3] the
  cluster to allocate slots evenly across task managers.


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html#tasks-and-operator-chains
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/internals/job_scheduling.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#cluster-evenly-spread-out-slots

On Mon, Aug 31, 2020 at 4:31 PM lec ssmi  wrote:

> Thanks.
> When the program starts, how is each operator allocated in taskmanager?
> For example, if I have 2 taskmanagers and 10 operators, 9 operators  are
> allocated to tm-A and the remaining one is placed in tm-B, the utilization
> of resources will be very low.
>
> Xintong Song  于2020年8月31日周一 下午2:45写道:
>
>> Hi,
>>
>> For a complex streaming job, is there any way to tilt the memory towards
>>> stateful operators?
>>
>> If streaming jobs are interested, the quick answer is no. Memory is
>> fetched on demand for all operators.
>>
>> Currently, only managed memory for batch jobs are pre-planned for each
>> operator.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Aug 31, 2020 at 1:33 PM lec ssmi  wrote:
>>
>>> HI:
>>>   Generally speaking, when we submitting the flink program, the number
>>> of taskmanager and the memory of each tn will be specified. And the
>>> smallest real execution unit of flink should be operator.
>>>Since the calculation logic corresponding to each operator is
>>> different, some need to save the state, and some don't.  Therefore, the
>>> memory size required by each operator should be different. How does the
>>> flink program allocate taskmanager memory to the operator by default?
>>>   In our production practice, with the increase of traffic, some
>>> operators (mainly stateful such as join and groupby) often have
>>> insufficient memory, resulting in slower calculations. The usual approach
>>> is to increase the entire taskmanager memory. But will this part of the
>>> increased memory be allocated to the map-like operator, or that the memory
>>> itself is fetched on demand  in the same taskmanager  whoever needs the
>>> memory will fetch it until the memory is used up,  in other words, there is
>>> no preset memory allocation ratio. For a complex streaming job, is there
>>> any way to tilt the memory towards stateful operators?
>>>
>>>  Thanks.
>>>
>>>
>>>
>>>


Re: Security vulnerabilities of dependencies in Flink 1.11.1

2020-08-31 Thread Arvid Heise
Hi Shravan,

we periodically bump version numbers, especially for major releases and
basic dependencies such as netty.

However, running a simple scan over dependencies is not that useful without
also checking whether the reported issues are actually triggered by code.
For example, we are not using jackson to process YAML, so that this
vulnerability is not triggered at all. If you are not ingesting Json
through table API, then the outdated jackson-databind is actually not a
security issue as well.

Nevertheless, the respective teams will take a closer look at the report
though. If we see that the vulnerabilities are actively used, then we will
bump soonish.

How do these potential vulnerabilities affect your operations? I'd assume
that most users run isolated Flink clusters if not isolated applications.
Then, the netty vulnerability could never be exploited because netty ports
should not be exposed. On the other hand, if your Flink cluster is fully
exposed, then you may have bigger problems then the dependencies.

Best,

Arvid

On Mon, Aug 31, 2020 at 9:13 AM shravan 
wrote:

> issues.docx
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2439/issues.docx>
>
>
> Hello,
>
> We are using Apache Flink 1.11.1 version and our security scans report the
> following issues.
> Please let us know your comments on these security vulnerabilities and fix
> plans for them.
>
> PFA a word document with details in regard to CVE numbers, details, and
> it's
> severity.
>
> Issues in a nutshell,
> 1. Flink-shaded-netty, has netty 4.1.39 which is vulnerable
> 2. Flink-shaded-jackson, has snakeyaml 1.24 which is vulnerable
> 3. Flink-table, has vulnerable version of Jackson-databind in table APIs
>
> Looking forward on a response.
>
> Thanks,
> Shravan
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Idle stream does not advance watermark in connected stream

2020-08-31 Thread Dawid Wysakowicz
Hey Arvid,

The problem is that the StreamStatus.IDLE is set on the Task level. It
is not propagated to the operator. Combining of the Watermark for a
TwoInputStreamOperator happens in the AbstractStreamOperator:

    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }

    public void processWatermark1(Watermark mark) throws Exception {
        input1Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        input2Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

There we do not know that e.g. the whole input 1 is idle. Therefore if
we do not receive any Watermarks from it (it became IDLE) we do not
progress the Watermark starting from any two input operator. We are
missing similar handling of the IDLE status from the task level which
works well for one input operators and multiple parallel upstream instances.

Best,

Dawid

On 31/08/2020 11:05, Arvid Heise wrote:
> Hi Aljoscha,
>
> I don't quite follow your analysis. If both sources are configured
> with idleness, they should send a periodic watermark on timeout.
> So the code that you posted would receive watermarks on the idle
> source and thus advance watermarks periodically.
>
> If an idle source does not emit a watermark at all, then I'm curious
> why it's not mapped to StreamStatus.IDLE [1], which would trigger the
> desired behavior.
>
> [1]
> https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd791669898b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79
>
> On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek  > wrote:
>
> Yes, I'm afraid this analysis is correct. The StreamOperator,
> AbstractStreamOperator to be specific, computes the combined
> watermarks
> from both inputs here:
> 
> https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
>
> The operator layer is not aware of idleness so it will never
> notice. The
> idleness only works on the level of inputs but is never forwarded
> to an
> operator itself.
>
> To fix this we would have to also make operators aware of idleness
> such
> that they can take this into account when computing the combined
> output
> watermark.
>
> Best,
> Aljoscha
>
> On 26.08.20 10:02, Dawid Wysakowicz wrote:
> > Hi Kien,
> >
> > I am afraid this is a valid bug. I am not 100% sure but the way I
> > understand the code the idleness mechanism applies to input
> channels,
> > which means e.g. when multiple parallell instances shuffle its
> results
> > to downstream operators.
> >
> > In case of a two input operator, combining the watermark of two
> > different upstream operators happens inside of the operator itself.
> > There we do not have the idleness status. We do not have a
> status that a
> > whole upstream operator became idle. That's definitely a
> bug/limitation.
> >
> > I'm also cc'ing Aljoscha who could maybe confirm my analysis.
> >
> > Best,
> >
> > Dawid
> >
> > On 24/08/2020 16:00, Truong Duc Kien wrote:
> >> Hi all,
> >> We are testing the new Idleness detection feature in Flink 1.11,
> >> however, it does not work as we expected:
> >> When we connect two data streams, of which one is idle, the output
> >> watermark CoProcessOperator does not increase, hence the program
> >> cannot progress.
> >>
> >> I've made a small project to illustrate the problem. The watermark
> >> received by the sink does not increase at all until the idle
> source is
> >> stopped.
> >>
> >> https://github.com/kien-truong/flink-idleness-testing
> >>
> >> Is this a bug or does the idleness detection not support this
> use case ?
> >>
> >> Regards.
> >> Kien
> >
>
>
>
> -- 
>
> Arvid Heise| Senior Java Developer
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward - The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbHRegistered at 

Re: flink checkpoint导致反压严重

2020-08-31 Thread zhanglachun
感谢大佬,现在基本可以确定是数据倾斜导致ck缓慢和反压严重
理由:
1.在webui查看计算子subtasks信息,3个subtask中其中一个的数据量只有其他两个的1/3
2.将key添加随机数后,计算性能直线上升(当然只是为了测试,结算结果显然不是预期的)

之前我做过分布聚合来解决缓解数据倾斜问题

需求:比如有两个字段:url,respontse_time,按url
keyby,一分钟时间窗口,计算该url的响应时间(respontse_time)中位数
这里明显几个首页url的访问量会非常大,有些详情页url可能就访问量很小,这就肯定会有数据倾斜

我之前的分布聚合步骤是:
第一步聚合中:在url后添加1~50的整型随机数,keyby后使用RoaringBitmap缓存respontse_time.
结果类似于
(www.baidu.com1,[1,2,3])
(www.baidu.com2,[4,5,6])
第二步聚合中:将url后缀去除,再次keyby后,将第一轮的RoaringBitmap缓存的数据整合
结果类似于
(www.baidu.com,[1,2,3,4,5,6])
最后将RoaringBitmap里的全窗口数据计算最终中位数结果
www.baidu.com  (3+4)/2=3.5

现在看来应该提升不佳,因为是计算中位数,就我目前对中位数的原理理解,无论如何都必须将全窗口数据缓存到窗口结束才能触发结果计算,也就是我上面的步骤,虽然有分步,但实际上只是分步缓存,而不是分步计算,不能像sum和count计算,可以在分步聚合中,逐步累加中间结果

对于这类有不能增量累加的数据倾斜场景,不知大佬有没有比较好的解决知道



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Issues with Flink Batch and Hadoop dependency

2020-08-31 Thread Arvid Heise
Hi Dan,

Your approach in general is good. You might want to use the bundled hadoop
uber jar [1] to save some time if you find the appropriate version. You can
also build your own version and include it then in lib/.

In general, I'd recommend moving away from sequence files. As soon as you
change your records minimally, everything falls apart. Going with
established binary formats like Avro or Parquet is usually desired also
because of the additional tooling and pays quickly off in the long run.

[1] https://flink.apache.org/downloads.html#additional-components

On Sat, Aug 29, 2020 at 10:50 PM Dan Hill  wrote:

> I was able to get a basic version to work by including a bunch of hadoop
> and s3 dependencies in the job jar and hacking in some hadoop config
> values.  It's probably not optimal but it looks like I'm unblocked.
>
> On Fri, Aug 28, 2020 at 12:11 PM Dan Hill  wrote:
>
>> I'm assuming I have a simple, common setup problem.  I've spent 6 hours
>> debugging and haven't been able to figure it out.  Any help would be
>> greatly appreciated.
>>
>>
>> *Problem*
>> I have a Flink Streaming job setup that writes SequenceFiles in S3.  When
>> I try to create a Flink Batch job to read these Sequence files, I get the
>> following error:
>>
>> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>>
>> It fails on this readSequenceFile.
>>
>> env.createInput(HadoopInputs.readSequenceFile(Text.class,
>> ByteWritable.class, INPUT_FILE))
>>
>> If I directly depend on org-apache-hadoop/hadoop-mapred when building the
>> job, I get the following error when trying to run the job:
>>
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "s3"
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
>> at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>> at
>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
>> at
>> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
>> at
>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
>> at
>> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
>> at
>> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257)
>>
>>
>> *Extra context*
>> I'm using this Helm chart 
>> for creating Flink.  I'm using v1.10.1.
>>
>>
>> *Questions*
>> Are there any existing projects that read batch Hadoop file formats from
>> S3?
>>
>> I've looked at these instructions for Hadoop Integration
>> .
>> I'm assuming my configuration is wrong.  I'm also assuming I need the
>> hadoop dependency properly setup in the jobmanager and taskmanager (not in
>> the job itself).  If I use this Helm chart, do I need to download a hadoop
>> common jar into the Flink images for jobmanager and taskmanager?  Are there
>> pre-built images which I can use that already have the dependencies setup?
>>
>>
>> - Dan
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Debezium Flink EMR

2020-08-31 Thread Arvid Heise
Hi Rex,

the connector expects a value without a schema, but the message contains a
schema. You can tell Flink that the schema is included as written in the
documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
 'debezium-json.schema-include' = true)

@Jark Wu  , it would be probably good to make the
connector more robust and catch these types of misconfigurations.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format

On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley  wrote:

> Awesome, so that took me a step further. When running i'm receiving an
> error however. FYI, my docker-compose file is based on the Debezium mysql
> tutorial which can be found here
> https://debezium.io/documentation/reference/1.2/tutorial.html
>
> Part of the stack trace:
>
> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt Debezium
> JSON message
> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
> cool street","city":"Big
> City","state":"California","zip":"9","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.10","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
> flink-jobmanager_1 | at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-jobmanager_1 | at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[?:?]
> flink-jobmanager_1 | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[?:?]
> flink-jobmanager_1 | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[?:?]
> flink-jobmanager_1 | at
> 

Re: flink1.11连接mysql问题

2020-08-31 Thread Leonard Xu


> 在 2020年8月28日,15:02,酷酷的浑蛋  写道:
> 
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
> successfully received from the server was 52,445,041 milliseconds ago. The 
> last packet sent successfully to the server was 52,445,045 milliseconds ago. 
> is longer than the server configured value of'wait_timeout'. You should 
> consider either expiring and/or testing connection validity before use in 
> your application, increasing the server configured values for client 
> timeouts, orusing the Connector/J connection property 'autoReconnect=true' to 
> avoid this problem.
> 
> 
> 
> 
> flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用
> 


Hi

超时断开问题在1.11应该已经修复[1],你是怎么使用的?可以提供更多的信息吗

Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16681 




Re: Flink Migration

2020-08-31 Thread Arvid Heise
Hi Navneeth,

if everything worked before and you just experience later issues, it would
be interesting to know if your state size grew over time. An application
over time usually needs gradually more resources. If the user base of your
company grows, so does the amount of messages (be it click stream, page
impressions, or any kind of transactions). Often time, also the operator
state grows. Sometimes, it's just that the events themselves become more
complex and thus you need more overall bandwidth. This means that from time
to time, you need to increase the memory of Flink (for state) or the number
of compute nodes (to handle more events). In the same way, you need to make
sure that your sink scales as well.

If you fail to keep up with the demand, the application gradually becomes
more unstable (for example by running out of memory repeatedly). I'm
suspecting that this may happen in your case.

First, it's important to understand what the bottleneck is. Web UI should
help to narrow it down quickly. You can also share your insights and we can
discuss further strategies.

If nothing works out, I also recommend an upgrade. Your best migration path
would be to use Flink 1.7, which should allow a smoother transition for
state [1]. I'd guess that afterwards, you should be able to migrate to 1.11
with almost no code changes.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17

On Fri, Aug 28, 2020 at 1:43 PM Yun Tang  wrote:

> Hi Navneeth
>
> First of all, I suggest to upgrade Flink version to latest version.
> And you could refer here [1] for the savepoint compatibility when
> upgrading Flink.
>
> For the problem that cannot connect address, you could login your pod and
> run 'nslookup jobmanager' to see whether the host could be resolved.
> You can also check the service of 'jobmanager' whether work as expected
> via 'kubectl get svc' .
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>
> Best
> Yun Tang
>
> --
> *From:* Navneeth Krishnan 
> *Sent:* Friday, August 28, 2020 17:00
> *To:* user 
> *Subject:* Flink Migration
>
> Hi All,
>
> We are currently on a very old version of flink 1.4.0 and it has worked
> pretty well. But lately we have been facing checkpoint timeout issues. We
> would like to minimize any changes to the current pipelines and go ahead
> with the migration. With that said our first pick was to migrate to 1.5.6
> and later migrate to a newer version.
>
> Do you guys think a more recent version like 1.6 or 1.7 might work? We did
> try 1.8 but it requires some changes in the pipelines.
>
> When we tried 1.5.6 with docker compose we were unable to get the task
> manager attached to jobmanager. Are there some specific configurations
> required for newer versions?
>
> Logs:
>
> 8-28 07:36:30.834 [main] INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils  - TaskManager will
> try to connect for 1 milliseconds before falling back to heuristics
>
> 2020-08-28 07:36:30.853 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Retrieved new target
> address jobmanager/172.21.0.8:6123.
>
> 2020-08-28 07:36:31.279 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Trying to connect to
> address jobmanager/172.21.0.8:6123
>
> 2020-08-28 07:36:31.280 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.281 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.281 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.282 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/127.0.0.1': Invalid argument (connect failed)
>
> 2020-08-28 07:36:31.283 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.284 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/127.0.0.1': Invalid argument (connect failed)
>
> 2020-08-28 07:36:31.684 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Trying to connect to
> address jobmanager/172.21.0.8:6123
>
> 2020-08-28 07:36:31.686 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.687 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/172.21.0.9': Connection refused (Connection 

Re: 来自李国鹏的邮件

2020-08-31 Thread Leonard Xu


> 在 2020年8月31日,15:55,李国鹏  写道:
> 
> 退订


Hi
是指取消订阅邮件吗?
可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org取消订阅来自 
user-zh@flink.apache.org  邮件列表的邮件

邮件列表的订阅管理,可以参考[1]

祝好,
Leonard Xu
[1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 


Re: Flink SQL Streaming Join Creates Duplicates

2020-08-31 Thread Arvid Heise
Hi Austin,

Do I assume correctly, that you self-answered your question? If not, could
you please update your current progress?

Best,

Arvid

On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Ah, I think the "Result Updating" is what got me -- INNER joins do the
> job!
>
> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> oops, the example query should actually be:
>>
>> SELECT table_1.a, table_1.b, table_2.c
>> FROM table_1
>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>
>> and duplicate results should actually be:
>>
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 1", b = "data b 1", c = null)
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>> Record(a = "data a 2", b = "data b 2", c = null)
>>
>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>>> reading from a few CSV files and joins some records across them into a
>>> couple of data streams (yes, this could be a batch job won't get into why
>>> we chose streams unless it's relevant). These joins are producing some
>>> duplicate records, one with the joined field present and one with the
>>> joined field as `null`, though this happens only ~25% of the time. Reading
>>> the docs on joins[1], I thought this could be caused by too strict Idle
>>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>>> doesn't seem to have an effect, and the problem still occurs when testing
>>> on a subset of data that finishes processing in under a minute.
>>>
>>> The query roughly looks like:
>>>
>>> table_1 has fields a, b
>>> table_2 has fields b, c
>>>
>>> SELECT table_1.a, table_1.b, table_1.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> Correct result:
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>>
>>> Results seem to be anywhere between all possible dups and the correct
>>> result.
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = null, c = "data c 1")
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = null, c = "data c 2")
>>>
>>> The CSV files are registered as Flink Tables with the following:
>>>
>>> tableEnv.connect(
>>> new FileSystem()
>>> .path(path)
>>> )
>>> .withFormat(
>>> new Csv()
>>> .quoteCharacter('"')
>>> .ignoreParseErrors()
>>> )
>>> .withSchema(schema)
>>> .inAppendMode()
>>> .createTemporaryTable(tableName);
>>>
>>>
>>> I'm creating my table environment like so:
>>>
>>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>>> .useBlinkPlanner()
>>> .build();
>>>
>>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>>> tableEnvSettings);
>>>
>>> TableConfig tConfig = tEnv.getConfig();
>>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>>
>>>
>>> Is there something I'm misconfiguring or have misunderstood the docs?
>>>
>>> Thanks,
>>> Austin
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Idle stream does not advance watermark in connected stream

2020-08-31 Thread Arvid Heise
Hi Aljoscha,

I don't quite follow your analysis. If both sources are configured with
idleness, they should send a periodic watermark on timeout.
So the code that you posted would receive watermarks on the idle source and
thus advance watermarks periodically.

If an idle source does not emit a watermark at all, then I'm curious why
it's not mapped to StreamStatus.IDLE [1], which would trigger the desired
behavior.

[1]
https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd791669898b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79

On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek 
wrote:

> Yes, I'm afraid this analysis is correct. The StreamOperator,
> AbstractStreamOperator to be specific, computes the combined watermarks
> from both inputs here:
>
> https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
>
> The operator layer is not aware of idleness so it will never notice. The
> idleness only works on the level of inputs but is never forwarded to an
> operator itself.
>
> To fix this we would have to also make operators aware of idleness such
> that they can take this into account when computing the combined output
> watermark.
>
> Best,
> Aljoscha
>
> On 26.08.20 10:02, Dawid Wysakowicz wrote:
> > Hi Kien,
> >
> > I am afraid this is a valid bug. I am not 100% sure but the way I
> > understand the code the idleness mechanism applies to input channels,
> > which means e.g. when multiple parallell instances shuffle its results
> > to downstream operators.
> >
> > In case of a two input operator, combining the watermark of two
> > different upstream operators happens inside of the operator itself.
> > There we do not have the idleness status. We do not have a status that a
> > whole upstream operator became idle. That's definitely a bug/limitation.
> >
> > I'm also cc'ing Aljoscha who could maybe confirm my analysis.
> >
> > Best,
> >
> > Dawid
> >
> > On 24/08/2020 16:00, Truong Duc Kien wrote:
> >> Hi all,
> >> We are testing the new Idleness detection feature in Flink 1.11,
> >> however, it does not work as we expected:
> >> When we connect two data streams, of which one is idle, the output
> >> watermark CoProcessOperator does not increase, hence the program
> >> cannot progress.
> >>
> >> I've made a small project to illustrate the problem. The watermark
> >> received by the sink does not increase at all until the idle source is
> >> stopped.
> >>
> >> https://github.com/kien-truong/flink-idleness-testing
> >>
> >> Is this a bug or does the idleness detection not support this use case ?
> >>
> >> Regards.
> >> Kien
> >
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Implementation of setBufferTimeout(timeoutMillis)

2020-08-31 Thread Pankaj Chand
Thank you so much, Yun! It is exactly what I needed.

On Mon, Aug 31, 2020 at 1:50 AM Yun Gao  wrote:

> Hi Pankaj,
>
> I think it should be in
> org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.
>
> Best,
>  Yun
>
>
>
> --
> Sender:Pankaj Chand
> Date:2020/08/31 02:40:15
> Recipient:user
> Theme:Implementation of setBufferTimeout(timeoutMillis)
>
> Hello,
>
> The documentation gives the following two sample lines for setting the
> buffer timeout for the streaming environment or transformation.
>
>
>
> *env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new
> MyMapper()).setBufferTimeout(timeoutMillis);*
>
> I have been trying to find where (file and method) in the Flink source
> code are the buffers being flushed by iteratively referring to the value of
> timeoutMillis (or the default value), but have been unsuccessful. Please
> help.
>
> Thanks,
>
> Pankaj
>
>


Re: runtime memory management

2020-08-31 Thread lec ssmi
Thanks.
When the program starts, how is each operator allocated in taskmanager?
For example, if I have 2 taskmanagers and 10 operators, 9 operators  are
allocated to tm-A and the remaining one is placed in tm-B, the utilization
of resources will be very low.

Xintong Song  于2020年8月31日周一 下午2:45写道:

> Hi,
>
> For a complex streaming job, is there any way to tilt the memory towards
>> stateful operators?
>
> If streaming jobs are interested, the quick answer is no. Memory is
> fetched on demand for all operators.
>
> Currently, only managed memory for batch jobs are pre-planned for each
> operator.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 31, 2020 at 1:33 PM lec ssmi  wrote:
>
>> HI:
>>   Generally speaking, when we submitting the flink program, the number of
>> taskmanager and the memory of each tn will be specified. And the smallest
>> real execution unit of flink should be operator.
>>Since the calculation logic corresponding to each operator is
>> different, some need to save the state, and some don't.  Therefore, the
>> memory size required by each operator should be different. How does the
>> flink program allocate taskmanager memory to the operator by default?
>>   In our production practice, with the increase of traffic, some
>> operators (mainly stateful such as join and groupby) often have
>> insufficient memory, resulting in slower calculations. The usual approach
>> is to increase the entire taskmanager memory. But will this part of the
>> increased memory be allocated to the map-like operator, or that the memory
>> itself is fetched on demand  in the same taskmanager  whoever needs the
>> memory will fetch it until the memory is used up,  in other words, there is
>> no preset memory allocation ratio. For a complex streaming job, is there
>> any way to tilt the memory towards stateful operators?
>>
>>  Thanks.
>>
>>
>>
>>


回复:flink1.11连接mysql问题

2020-08-31 Thread 酷酷的浑蛋
关键是在sql中怎么设置,connector=jdbc




在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
这个问题本质是连接活性问题,
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)


建议使用连接池druid进行连接活性保持


原始邮件
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接mysql问题


com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用

来自李国鹏的邮件

2020-08-31 Thread 李国鹏
退订

来自李国鹏的邮件

2020-08-31 Thread 李国鹏
Unsubscribe

Security vulnerabilities of dependencies in Flink 1.11.1

2020-08-31 Thread shravan
issues.docx

  

Hello,

We are using Apache Flink 1.11.1 version and our security scans report the
following issues.  
Please let us know your comments on these security vulnerabilities and fix
plans for them.

PFA a word document with details in regard to CVE numbers, details, and it's
severity.

Issues in a nutshell,
1. Flink-shaded-netty, has netty 4.1.39 which is vulnerable
2. Flink-shaded-jackson, has snakeyaml 1.24 which is vulnerable
3. Flink-table, has vulnerable version of Jackson-databind in table APIs

Looking forward on a response.

Thanks,
Shravan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


回复:flink1.11连接mysql问题

2020-08-31 Thread 13580506953
这个问题本质是连接活性问题, 
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)


建议使用连接池druid进行连接活性保持


 原始邮件 
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接mysql问题


com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用

Re: runtime memory management

2020-08-31 Thread Xintong Song
Hi,

For a complex streaming job, is there any way to tilt the memory towards
> stateful operators?

If streaming jobs are interested, the quick answer is no. Memory is fetched
on demand for all operators.

Currently, only managed memory for batch jobs are pre-planned for each
operator.

Thank you~

Xintong Song



On Mon, Aug 31, 2020 at 1:33 PM lec ssmi  wrote:

> HI:
>   Generally speaking, when we submitting the flink program, the number of
> taskmanager and the memory of each tn will be specified. And the smallest
> real execution unit of flink should be operator.
>Since the calculation logic corresponding to each operator is
> different, some need to save the state, and some don't.  Therefore, the
> memory size required by each operator should be different. How does the
> flink program allocate taskmanager memory to the operator by default?
>   In our production practice, with the increase of traffic, some operators
> (mainly stateful such as join and groupby) often have insufficient memory,
> resulting in slower calculations. The usual approach is to increase the
> entire taskmanager memory. But will this part of the increased memory be
> allocated to the map-like operator, or that the memory itself is fetched on
> demand  in the same taskmanager  whoever needs the memory will fetch it
> until the memory is used up,  in other words, there is no preset memory
> allocation ratio. For a complex streaming job, is there any way to tilt the
> memory towards stateful operators?
>
>  Thanks.
>
>
>
>


Re: 如何设置FlinkSQL并行度

2020-08-31 Thread 赵一旦
啥情况,你是调整了sql部分实现嘛。有示例嘛。

zilong xiao  于2020年8月29日周六 下午5:19写道:

> SQL 算子并行度设置可以自己实现,可以私下交流下,正好在做这块,基本能工作了
>
> JasonLee <17610775...@163.com> 于2020年8月23日周日 下午2:07写道:
>
> > hi
> > checkpoint savepoint的问题可以看下这个
> > https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-31 Thread zilong xiao
可以用程序来完成的,flink-conf.yaml里可以先用占位符,例如 `env.java.opts:
-Djob.name={{job_name}}`  在你提交作业之前,先读到这个模板文件,在代码里去replace该占位符就好,不需要手动去改

Jim Chen  于2020年8月31日周一 下午1:33写道:

> 我也是flink1.10.1的版本的,如果按照你的方法,每次启动一个任务,都要在flink-conf.yaml中修改一下`env.java.opts:
> -Djob.name=xxx`吗?这样的话,是不是太麻烦了
>
> zilong xiao  于2020年8月31日周一 下午12:08写道:
>
> > 想问下你用的flink哪个版本呢?
> > 如果是Flink 1.10-版本,可以在shell脚本中加上 -yD
> > jobName=xxx,然后在logback自定义PatternLayout中用环境变量`_DYNAMIC_PROPERTIES`获取
> > 如果是Flink 1.10+版本,则上述方式不可行,因为1.10+版本在作业启动执行 launch_container.sh
> > <
> >
> http://dn-rt199.jja.bigo:8042/node/containerlogs/container_e19_1597907464753_1954_01_01/zengkejie/launch_container.sh/?start=-4096
> > >脚本时,脚本中不再`export
> >  _DYNAMIC_PROPERTIES`变量,所以无法从环境变量获取,那么可以在flink-conf.yaml中添加
> > `env.java.opts: -Djob.name=xxx`,然后在 PatternLayout中获取启动参数即可
> >
> > 以上是我个人的实现方式,目前可正常运行,如有描述不正确的地方,欢迎探讨~
> >
> > Jim Chen  于2020年8月31日周一 上午11:33写道:
> >
> > > 我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到
> > >
> > >
> > > zilong xiao  于2020年8月27日周四 下午7:24写道:
> > >
> > > > 如果是用CLI方式提交作业的话是可以做到的
> > > >
> > > > Jim Chen  于2020年8月27日周四 下午6:13写道:
> > > >
> > > > > 如果是自动以PatternLayout的话,我有几点疑问:
> > > > >
> > > > >
> > > >
> > >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> > > > >
> > > > > 如果使用env的话
> > > > > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > > > > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > > > > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > > > >
> > >
> -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> > > > >
> > > > > zilong xiao  于2020年8月25日周二 下午5:32写道:
> > > > >
> > > > > >
> > > >
> > 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > > > > 2:这些属性有办法可以从环境变量中获取
> > > > > >
> > > > > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > > > > >
> > > > > > > 大家好:
> > > > > > >
> >  我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > > > >
> > > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > > > > 这个配置文件,是整个项目的,是基于Yarn的per
> > > job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Packaging multiple Flink jobs from a single IntelliJ project

2020-08-31 Thread Manas Kale
Hi,
I solved my second issue - I was not following Maven's convention for
placing source code (I had not placed my source in src/main/java).
However, I still would like some help with my first question - what is the
recommended way to set a project with multiple main() classes? At the end,
I would like to be able to run each main() class as a separate job. Should
I create a single JAR and specify different entrypoint classes each time or
should I create separate JARs for each main() class?

On Mon, Aug 31, 2020 at 11:13 AM Manas Kale  wrote:

> Hi,
> I have an IntelliJ project that has multiple classes with main()
> functions. I want to package this project as a JAR that I can submit to the
> Flink cluster and specify the entry class when I start the job. Here are my
> questions:
>
>- I am not really familiar with Maven and would appreciate some
>pointers/examples. From what I understand, I will need to use some sort of
>transformer in the Maven shade plugin to merge all of the classes. *If
>this is correct, can I see a small example? *
>- Also, I can't get a single main class working:
>
>
> http://maven.apache.org/POM/4.0.0; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>4.0.0
>
>flink_summarization
>flink_summarization
>0.1
>jar
>
>Flink Quickstart Job
>http://www.myorganization.org
>
>
>   UTF-8
>   1.10.1
>   1.8
>   2.11
>   ${java.version}
>   ${java.version}
>
>
>
>   
>  apache.snapshots
>  Apache Development Snapshot Repository
>  
> https://repository.apache.org/content/repositories/snapshots/
>  
> false
>  
>  
> true
>  
>   
>
>
>
>   
>   
>   
>   
>  org.apache.flink
>  flink-java
>  ${flink.version}
>  provided
>   
>   
>  org.apache.flink
>  flink-streaming-java_${scala.binary.version}
>  ${flink.version}
>  provided
>   
>
>   
>  org.apache.flink
>  flink-connector-kafka_2.11
>  ${flink.version}
>   
>
>   
>  org.apache.flink
>  flink-state-processor-api_2.11
>  ${flink.version}
>  provided
>   
>
>   
>  org.apache.flink
>  flink-connector-jdbc_2.11
>  1.11.0
>   
>
>   
>   
>   
>  org.slf4j
>  slf4j-log4j12
>  1.7.7
>  runtime
>   
>   
>  log4j
>  log4j
>  1.2.17
>  runtime
>   
>
>   
>   
>  org.apache.flink
>  flink-test-utils_${scala.binary.version}
>  ${flink.version}
>  test
>   
>   
>  org.apache.flink
>  flink-runtime_2.11
>  ${flink.version}
>  test
>  tests
>   
>   
>  org.apache.flink
>  flink-streaming-java_2.11
>  ${flink.version}
>  test
>  tests
>   
>   
>  org.assertj
>  assertj-core
>  
>  3.16.1
>  test
>   
>
>
>
>
>
>   
>
>  
>  
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.1
> 
>${java.version}
>${java.version}
> 
>  
>
>  
>  
>  
> org.apache.maven.plugins
> maven-shade-plugin
> 3.0.0
> 
> 
>false
> 
> 
>
>
>   package
>   
>  shade
>   
>   
>  
> 
>org.apache.flink:force-shading
>com.google.code.findbugs:jsr305
>org.slf4j:*
>log4j:*
> 
>  
>  
> 
>
>*:*
>
>   META-INF/*.SF
>   META-INF/*.DSA
>   META-INF/*.RSA
>
> 
>  
>  
>  implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>
> iu.feature_summarization.basic_features.pre.BasicPreProcessJob
> 
>
>  
>   
>
> 
>  
>   
>
>   
>  
>
> 
> 
>

Re: flink stream sink hive

2020-08-31 Thread Yun Gao
社区的邮件列表应该不支持图片,现在图看不到,要不直接把stack贴上来吧,或者用个图床。



 --Original Mail --
Sender:liya...@huimin100.cn 
Send Date:Thu Aug 27 19:09:51 2020
Recipients:user-zh 
Subject:flink stream sink hive

flink1.11.1 往hive2.1.1 的orc表写数据报的异常,在网上查不到,只能来这里了,麻烦大佬们帮我看看


liya...@huimin100.cn

Re: Exception on s3 committer

2020-08-31 Thread Yun Gao

Hi Ivan,

   I think there might be some points to check:

   1. Is the job restored from the latest successful checkpoint after restart ? 
   2. Have you ever changed the timeout settings for uncompleted multipart 
upload ?
   3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 
exist or not ?

Best,
 Yun


 --Original Mail --
Sender:Ivan Yang 
Send Date:Sat Aug 29 12:43:28 2020
Recipients:user 
Subject:Exception on s3 committer
Hi all,

We got this exception after a job restart. Does anyone know what may lead to 
this situation? and how to get pass this Checkpoint issue? Prior to this, the 
job failed due to “Checkpoint expired before completing.” We are s3 heavy, 
writing out 10K files to s3 every 10 minutes using StreamingFileSink/BulkFormat 
to various s3 prefixes. Thanks in advance. -Ivan

2020-08-28 15:17:58
java.io.IOException: Recovering commit failed for object 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object does 
not exist and MultiPart Upload 
3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw-
 is not valid.
at 
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Completing multipart commit on 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804: 
com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does 
not exist. The upload ID may be invalid, or the upload may have been aborted or 
completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; 
Request ID: 9A99AFAD80A8F202; S3 Extended Request ID: 
fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=), 
S3 Extended Request ID: 
fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=:NoSuchUpload
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at