Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread HunterXHunter
我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL 1.11.3问题请教

2021-05-31 Thread yinghua...@163.com
我们使用Nifi将数据采集到kafka,然后使用Flink处理kafka中的数据,现在Nifi如果将多条数据当做一条记录(写入kafka中的数据格式为csv或者json)写入kafka后(就是一个offset中包含了多条数据),Flink只会处理其中的一条数据?有没有什么办法让Flink都处理一个offset中的多条数据?



yinghua...@163.com


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 Thread Jacob
问题已解决

需要在FLink home的lib中引入kafka connector jar包



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
感谢大佬的回复!以后优先现在邮箱这边讨论发现问题再去提个issue!
我的 idleStateRetention确实是设置3600秒,我先进行测试看看。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc 采集mysql binlog 可以保留before,after的字段吗

2021-05-31 Thread 董建
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗?
按照官方的例子,定义表结构后,是最新的字段值?
能否同时保留before和after?

Re:datastream union各个topic的数据后,数据有丢失

2021-05-31 Thread 13631283359



已经解决了,去掉循环,把每个kafka topic单独处理,再union













在 2021-06-01 08:54:42,"13631283359" <13631283...@163.com> 写道:

大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env 
Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics: 
List[String], env: StreamExecutionEnvironment): DataStream[String] = { var 
total: DataStream[String] = null for (str <- topics) { val topicName = 
str.split(":")(0) val groupId = str.split(":")(1) val source_data = 
getSourceData(topicName, groupId, env) if (total != null) { total = 
total.union(source_data) } else { total = source_data } } total }





 

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread Yun Tang
Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用 
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata 
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint 
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink 
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1] 
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <15205029...@163.com>
Sent: Tuesday, June 1, 2021 10:51
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 Thread Jacob
Thank you for your reply!

您所说的kafka connector 是*flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了* flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 Thread Jacob
Thank you for your reply!

您所说的kafka connector 是* flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了 *flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
没有更好的方式吗,这样治标不治本,那我大状态任务会有很多问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread HunterXHunter
关闭 增量checkpoint



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 Thread LakeShen
Hi Jacob,

Maybe you miss the kafka connector dependency in your pom,
you could refer to this url :
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
LakeShen

Jacob <17691150...@163.com> 于2021年6月1日周二 上午9:54写道:

> Dear All,
>
> 我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下
>
> 其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。
> 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:
>
> 请指教
>
> *Java Code*
>
> TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS
> user_behavior_kafka_table");
> tableResult.print();
> TableResult tableResult2 = tableEnvironment.executeSql(
> "CREATE TABLE user_behavior_kafka_table
> (\r\n" +
> "   `user_id` STRING,\r\n" +
> "   `item_id` STRING\r\n" +
> " ) WITH (\r\n" +
> "   'connector' = 'kafka',\r\n" +
> "   'topic' = 'TestTopic',\r\n" +
> "   'properties.bootstrap.servers' =
> 'localhost:9092',\r\n" +
> "   'properties.group.id' =
> 'consumerTest',\r\n" +
> "   'scan.startup.mode' =
> 'earliest-offset',\r\n" +
> "   'format' = 'json'\r\n" +
> ")");
> tableResult2.print();
>
>
> // 数据写入
> tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnvironment.executeSql(
> "INSERT INTO user_behavior_hive_table SELECT user_id,
> item_id FROM user_behavior_kafka_table");
>
>
> *POM File*
>
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
>
> 
> org.apache.flink
> flink-streaming-java_2.11
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-clients_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
>
> flink-table-api-java-bridge_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-sql-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
>
> 
> org.apache.flink
> flink-shaded-hadoop-2-uber
> 2.7.5-10.0
> provided
> 
>
> 
> org.apache.flink
> flink-connector-hive_2.11
> ${flink.version}
> provided
> 
>
> 
> org.apache.hive
> hive-exec
> ${hive.version}
> provided
> 
>
>
> *Error Messge*
>
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
> a
> connector using option ''connector'='kafka''.
> at
>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81)
> ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> 

Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 Thread Jacob
Dear All,

我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下

其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。
搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:

请指教

*Java Code*

TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS
user_behavior_kafka_table");
tableResult.print();
TableResult tableResult2 = tableEnvironment.executeSql(
"CREATE TABLE user_behavior_kafka_table (\r\n" 
+ 
"   `user_id` STRING,\r\n" + 
"   `item_id` STRING\r\n" + 
" ) WITH (\r\n" + 
"   'connector' = 'kafka',\r\n" + 
"   'topic' = 'TestTopic',\r\n" + 
"   'properties.bootstrap.servers' = 
'localhost:9092',\r\n" + 
"   'properties.group.id' = 
'consumerTest',\r\n" + 
"   'scan.startup.mode' = 
'earliest-offset',\r\n" + 
"   'format' = 'json'\r\n" + 
")");
tableResult2.print();


// 数据写入
tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnvironment.executeSql(
"INSERT INTO user_behavior_hive_table SELECT user_id,
item_id FROM user_behavior_kafka_table");


*POM File*


org.apache.flink
flink-json
${flink.version}



org.apache.flink
flink-streaming-java_2.11
${flink.version}
provided



org.apache.flink

flink-clients_${scala.binary.version}
${flink.version}
provided



org.apache.flink

flink-table-api-java-bridge_${scala.binary.version}
${flink.version}
provided



org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}
provided



org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}




org.apache.flink
flink-shaded-hadoop-2-uber
2.7.5-10.0
provided



org.apache.flink
flink-connector-hive_2.11
${flink.version}
provided



org.apache.hive
hive-exec
${hive.version}
provided



*Error Messge*

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='kafka''.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81)
~[flink-connector-hive_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
有没有大佬帮忙看看



--
Sent from: http://apache-flink.147419.n8.nabble.com/


datastream union各个topic的数据后,数据有丢失

2021-05-31 Thread 13631283359
大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env 
Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics: 
List[String], env: StreamExecutionEnvironment): DataStream[String] = { var 
total: DataStream[String] = null for (str <- topics) { val topicName = 
str.split(":")(0) val groupId = str.split(":")(1) val source_data = 
getSourceData(topicName, groupId, env) if (total != null) { total = 
total.union(source_data) } else { total = source_data } } total }

Re: S3 + Parquet credentials issue

2021-05-31 Thread Svend
Hi Angelo,

I've not worked exactly in the situation you described, although I've had to 
configure S3 access from a Flink application recently and here are a couple of 
things I learnt along the way:

* You should normally not need to include flink-s3-fs-hadoop nor 
hadoop-mapreduce-client-core in your classpath but should rather make 
flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. 
The motivation for that is that this jar is a fat jar containing a lot of 
hadoop and aws classes, s.t. including it in your classpath quickly leads to 
conflicts. The plugins folder is associated with a separate classpath, with 
helps avoiding those conflicts.

* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the 
documentation regarding how to configure it, and especially security accesses, 
is available in [1]

* Ideally, when running on AWS, your code should not be using 
BasicAWSCredentialsProvider, but instead the application should assume a role, 
which you associate with some IAM permission.  If that's your case, the 
specific documentation for that situation is in [2]. If you're running some 
test locally on your laptop, BasicAWSCredentialsProvider with some key id and 
secret pointing to a dev account may be appropriate.

* As I understand it, any configuration entry in flink.yaml that starts with 
"fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by 
reading documentation in [1] and [2] you might be able to figure out which 
parameters are relevant to your case, which you can then set with the mechanism 
just mentioned. For example, in my case, I simply add this to flink.yaml:

fs.s3a.aws.credentials.provider: 
"com.amazonaws.auth.WebIdentityTokenCredentialsProvider"

* You can debug the various operations that are attempted on S3 by setting this 
logger to DEBUG level:  org.apache.hadoop.fs.s3a


Good luck :) 

Svend



[1] 
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
[2] 
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
[3] 
https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink-


On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
> Hello,
> 
> Trying to read a parquet file located in S3 leads to a AWS credentials 
> exception. Switching to other format (raw, for example) works ok regarding to 
> file access.
> 
> This is a snippet of code to reproduce the issue:
> 
> static void parquetS3Error() {
> 
> EnvironmentSettings settings = 
> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build();
> 
> TableEnvironment t_env = TableEnvironment.*create*(settings);
> 
> // parquet format gives error:
> // Caused by: java.net.SocketTimeoutException: doesBucketExist on 
> bucket-prueba-medusa: com.amazonaws.AmazonClientException:
> // No AWS Credentials provided by BasicAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
> // com.amazonaws.SdkClientException: Failed to connect to service 
> endpoint:
> t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value` INT) 
> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 
> 'parquet')");
> 
> // other formats (i.e. raw) work properly:
> // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
> //++
> //|url |
> //++
> //| [80, 65, 82, 49, 21, 0, 21,... |
> //| [0, 0, 0, 50, 48, 50, 49, 4... |
> t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH ( 'connector' 
> = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");
> 
> Table t1 = t_env.from("backup");
> 
> t1.execute().print();
> 
> }
> Flink version is 1.12.2.
> 
> Please find attached the pom with dependencies and version numbers.
> 
> What would be a suitable workaround for this?
> 
> Thank you very much.
> 
> Angelo.
> 
> 
>  
> 
> *Attachments:*
>  * pom.xml


Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-31 Thread Timothy Bess
Hi Igal,

Thanks for the help! I'll switch over to that. I ended up defaulting null
to empty string in that deserializer and deploying my own jar to get
production going again. The thing that makes this case tricky is that my
code was publishing empty string, not null, and that is apparently
interpretted by Kakfa as null. So then it's read back in and halts
processing because of the null. I think it might make sense to have a
property/setting that defaults the ID or skips the event. Otherwise it
becomes a poison pill.

Thanks,

Tim


On Mon, May 31, 2021, 7:59 AM Igal Shilman  wrote:

> Hi Tim,
> It is unfortunate that the error message was so minimal, we'll definitely
> improve that (FLINK-22809).
>
> Skipping NULL keys is a bit problematic, although technically possible,
> I'm not sure that this is how we should handle this.
> Let me follow up on that.
>
> The way you can customize the behaviour of that connector without having
> to fork StateFun, is to define an ingress with a different deserializer.
> You would have to use the StatefulFunctionModule [1][2] and bind an
> ingress, you can use the KafkaIngressBuilder [3] and set
> KafkaIngressBuilde::withDeserializer()
> You would also have to define a router to route these messages to target
> functions.
>
> I've prepared a minimal example for you here: [4]
>
> I hope this helps,
> Igal.
>
>
> [1]
> https://github.com/apache/flink-statefun/blob/release-2.2/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java
> [2]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/index.html
> [3]
> https://github.com/apache/flink-statefun/blob/release-2.2/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
> [4] https://github.com/igalshilman/custom-ingress
>
>
>
>
>
> On Fri, May 28, 2021 at 8:19 PM Timothy Bess  wrote:
>
>> Ok so after digging into it a bit it seems that the exception was thrown
>> here:
>>
>> https://github.com/apache/flink-statefun/blob/release-2.2/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java#L48
>>
>> I think it'd be useful to have a configuration to prevent null keys from
>> halting processing.
>> It looks like we are occasionally publishing with a key string that is
>> sometimes empty, and that is interpreted by Kafka as null. Then when it's
>> read back in, the ingress chokes on the null value.
>>
>> I'm trying to keep from having to edit statefun and use my own jar, any
>> thoughts?
>>
>> Thanks,
>>
>> Tim
>>
>> On Fri, May 28, 2021 at 10:33 AM Timothy Bess  wrote:
>>
>>> Oh wow that Harness looks cool, I'll have to take a look at that.
>>>
>>> Unfortunately the JobManager UI seems to just show this:
>>> [image: image.png]
>>>
>>> Though it does seem that maybe the source function is where the failure
>>> is happening according to this?
>>> [image: image.png]
>>>
>>> Still investigating, but I do see a lot of these logs:
>>> 2021-05-28 14:25:09,199 WARN
>>>  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>>> [] - Transaction KafkaTransactionState [transactionalId=feedback-union ->
>>> functions -> Sink:
>>> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-39,
>>> producerId=2062, epoch=2684] has been open for 55399128 ms. This is close
>>> to or even exceeding the transaction timeout of 90 ms.
>>>
>>> Seems like it's restoring some old kafka transaction? Not sure. I like
>>> Arvid's idea of attaching a debugger, I'll definitely give that a try.
>>>
>>> On Fri, May 28, 2021 at 7:49 AM Arvid Heise  wrote:
>>>
 If logs are not helping, I think the remaining option is to attach a
 debugger [1]. I'd probably add a breakpoint to
 LegacySourceFunctionThread#run and see what happens. If the issue is in
 recovery, you should add a breakpoint to StreamTask#beforeInvoke.

 [1]
 https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters

 On Fri, May 28, 2021 at 1:11 PM Igal Shilman 
 wrote:

> Hi Tim,
> Any additional logs from before are highly appreciated, this would
> help us to trace this issue.
> By the way, do you see something in the JobManager's UI?
>
> On Fri, May 28, 2021 at 9:06 AM Tzu-Li (Gordon) Tai <
> tzuli...@apache.org> wrote:
>
>> Hi Timothy,
>>
>> It would indeed be hard to figure this out without any stack traces.
>>
>> Have you tried changing to debug level logs? Maybe you can also try
>> using the StateFun Harness to restore and run your job in the IDE - in 
>> that
>> case you should be able to see which code exactly is throwing this
>> exception.
>>
>> Cheers,
>> Gordon
>>
>> On Fri, May 28, 2021 at 12:39 PM Timothy Bess 
>> wrote:
>>
>>> Hi,
>>>
>>> Just checking to see if anyone has experienced this error. Might

Re: Flink in k8s operators list

2021-05-31 Thread Svend
Hi Ilya,

Thanks for the kind feed-back.

We hit the first issue you mention related to K8s 1.18+, we then updated the 
controller-gen version to 0.2.4 in the makefile as described in the ticket you 
linked, and then ran "make deploy", which worked around the issue for us.

I'm not aware of the 2nd issue you refer to related to in-progress job? In case 
that helps, we access the Flink-UI by simply opening a port-forward on port 
8081 on the job manager, which among other things shows the currently running 
jobs.

Svend


On Mon, 31 May 2021, at 12:00 PM, Ilya Karpov wrote:
> Hi Svend,
> 
> thank you so much to sharing your experience! GCP k8s operator looks 
> promising (currently i’m trying to build it and run helm chart. An issue 
>  
> with k8s version 1.18+ is road block right now, but I see that there is a 
> solution), and also seems like flink team also refers 
>  to it this implementation.
> 
> In your setup did you solve the problem of visualising list of in-progress 
> jobs?
> 
> > One worrying point though is that the maintainers of the repo seem to have 
> > become silent in March this year.
> lyfts implementation  (haven’t 
> tried it yet) seems to be even more abandoned (last release 20/04/2020).
> 
>> 29 мая 2021 г., в 11:23, Svend  написал(а):
>> 
>> Hi Ilya,
>> 
>> At my company we're currently using the GCP k8s operator (2nd on your list). 
>> Our usage is very moderate, but so far it works great for us.
>> 
>> We appreciate that when upgrading the application, it triggers automatically 
>> a savepoint during shutdown and resumes from it when restarting. It also 
>> allows to take savepoints at regular intervals (we take one per day 
>> currently).
>> 
>> We're using it with Flink 1.12.4 and AWS EKS.
>> 
>> Getting the Flink metrics and logs exported to our monitoring system worked 
>> out of the box. 
>> 
>> Configuring IAM roles and K8s service account for saving checkpoints and 
>> savepoints to S3 required a bit more fiddling although we got it working. 
>> 
>> Happy to share code snippet about any of that if that's useful :)
>> 
>> It was last updated with Flink 1.11 in mind, so there is currently no 
>> built-in support for the reactive scaling mode recently added in Flink 1.13.
>> 
>> One worrying point though is that the maintainers of the repo seem to have 
>> become silent in March this year. There is a small and active community 
>> around it though and issues and PRs keep on arriving and are waiting for 
>> feed-back. It's all free and OSS, so who are we to complain? Though it's 
>> still an important attention point.
>> 
>> Hope this helps,
>> 
>> Svend
>> 
>> 
>> 
>> 
>> 
>> On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote:
>>> Hi there,
>>> 
>>> I’m making a little research about the easiest way to deploy link job to 
>>> k8s cluster and manage its lifecycle by *k8s operator*. The list of 
>>> solutions is below:
>>> - https://github.com/fintechstudios/ververica-platform-k8s-operator
>>> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
>>> - https://kudo.dev/docs/examples/apache-flink.html
>>> - https://github.com/wangyang0918/flink-native-k8s-operator
>>> 
>>> If you are using smth that is not listed above please share! Any share 
>>> about how specific solution works is greatly appreciated.
>>> 
>>> Thanks in advance


Re: StreamingFileSink only writes data to MINIO during savepoint

2021-05-31 Thread David Anderson
The StreamingFileSink requires that you have checkpointing enabled. I'm
guessing that you don't have checkpointing enabled, since that would
explain the behavior you are seeing.

The relevant section of the docs [1] explains:

Checkpointing needs to be enabled when using the StreamingFileSink. Part
> files can only be finalized on successful checkpoints. If checkpointing is
> disabled, part files will forever stay in the in-progress or the pending
> state, and cannot be safely read by downstream systems.


Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/streamfile_sink/#streaming-file-sink

On Fri, May 28, 2021 at 5:26 PM Robert Cullen  wrote:

> On my kubernetes cluster when I set the StreamingFileSink to write to a
> local instance of S3 (MINIO - 500 GB) it only writes the data after I
> execute a savepoint
>
> The expected behavior is to write the data in real-time. I'm guessing the
> memory requirements have not been met or a configuration in MINIO is
> missing?  Any ideas?
>
> --
> Robert Cullen
> 240-475-4490
>


Flink state processor API with Avro data type

2021-05-31 Thread Min Tan
大家好,

我使用 Flink 1.10.1 并尝试使用 Flink State Processor API 从Savepoint读取 flink state 状态。
当状态Type 是普通 Java type或 Java POJOs时, 运行良好。

当 Avro 生成的 Java class 用作状态类型 state type时,不工作。
在这种Avro class情况下是否需要额外的序列化 serializers?

 
谢谢
谭民

Re: Parallelism with onTimer() in connectedStream

2021-05-31 Thread Maminspapin
Any idea, guys? Can timers work correctly with parallelism? May be it's my
fault it works such way



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-31 Thread Igal Shilman
Hi Tim,
It is unfortunate that the error message was so minimal, we'll definitely
improve that (FLINK-22809).

Skipping NULL keys is a bit problematic, although technically possible, I'm
not sure that this is how we should handle this.
Let me follow up on that.

The way you can customize the behaviour of that connector without having to
fork StateFun, is to define an ingress with a different deserializer.
You would have to use the StatefulFunctionModule [1][2] and bind an
ingress, you can use the KafkaIngressBuilder [3] and set
KafkaIngressBuilde::withDeserializer()
You would also have to define a router to route these messages to target
functions.

I've prepared a minimal example for you here: [4]

I hope this helps,
Igal.


[1]
https://github.com/apache/flink-statefun/blob/release-2.2/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/index.html
[3]
https://github.com/apache/flink-statefun/blob/release-2.2/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
[4] https://github.com/igalshilman/custom-ingress





On Fri, May 28, 2021 at 8:19 PM Timothy Bess  wrote:

> Ok so after digging into it a bit it seems that the exception was thrown
> here:
>
> https://github.com/apache/flink-statefun/blob/release-2.2/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java#L48
>
> I think it'd be useful to have a configuration to prevent null keys from
> halting processing.
> It looks like we are occasionally publishing with a key string that is
> sometimes empty, and that is interpreted by Kafka as null. Then when it's
> read back in, the ingress chokes on the null value.
>
> I'm trying to keep from having to edit statefun and use my own jar, any
> thoughts?
>
> Thanks,
>
> Tim
>
> On Fri, May 28, 2021 at 10:33 AM Timothy Bess  wrote:
>
>> Oh wow that Harness looks cool, I'll have to take a look at that.
>>
>> Unfortunately the JobManager UI seems to just show this:
>> [image: image.png]
>>
>> Though it does seem that maybe the source function is where the failure
>> is happening according to this?
>> [image: image.png]
>>
>> Still investigating, but I do see a lot of these logs:
>> 2021-05-28 14:25:09,199 WARN
>>  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>> [] - Transaction KafkaTransactionState [transactionalId=feedback-union ->
>> functions -> Sink:
>> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-39,
>> producerId=2062, epoch=2684] has been open for 55399128 ms. This is close
>> to or even exceeding the transaction timeout of 90 ms.
>>
>> Seems like it's restoring some old kafka transaction? Not sure. I like
>> Arvid's idea of attaching a debugger, I'll definitely give that a try.
>>
>> On Fri, May 28, 2021 at 7:49 AM Arvid Heise  wrote:
>>
>>> If logs are not helping, I think the remaining option is to attach a
>>> debugger [1]. I'd probably add a breakpoint to
>>> LegacySourceFunctionThread#run and see what happens. If the issue is in
>>> recovery, you should add a breakpoint to StreamTask#beforeInvoke.
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
>>>
>>> On Fri, May 28, 2021 at 1:11 PM Igal Shilman  wrote:
>>>
 Hi Tim,
 Any additional logs from before are highly appreciated, this would help
 us to trace this issue.
 By the way, do you see something in the JobManager's UI?

 On Fri, May 28, 2021 at 9:06 AM Tzu-Li (Gordon) Tai <
 tzuli...@apache.org> wrote:

> Hi Timothy,
>
> It would indeed be hard to figure this out without any stack traces.
>
> Have you tried changing to debug level logs? Maybe you can also try
> using the StateFun Harness to restore and run your job in the IDE - in 
> that
> case you should be able to see which code exactly is throwing this
> exception.
>
> Cheers,
> Gordon
>
> On Fri, May 28, 2021 at 12:39 PM Timothy Bess 
> wrote:
>
>> Hi,
>>
>> Just checking to see if anyone has experienced this error. Might just
>> be a Flink thing that's irrelevant to statefun, but my job keeps failing
>> over and over with this message:
>>
>> 2021-05-28 03:51:13,001 INFO
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] -
>> Starting FlinkKafkaInternalProducer (10/10) to produce into default
>> topic __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
>> 2021-05-28 03:51:13,001 INFO
>> org.apache.flink.streaming.connectors.kafka.internal.
>> FlinkKafkaInternalProducer [] - Attempting to resume transaction
>> feedback-union -> functions -> Sink:
>> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45
>> with producerId 31 and epoch 3088
>> 2021-05-28 

Reading Flink states from svaepoint uning State Processor API

2021-05-31 Thread Tan, Min
Hi,

I am using Flink 1.10.1 and try to read the flink states from a savepoint using 
Flink state processor API.
It works well when state types are the normal Java type or Java POJOs.

When Avro generated Java classes are used as the state type, it does not read 
any states anymore.

Are any additional customer serializers required in this situation?

Regards,
Min



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

S3 + Parquet credentials issue

2021-05-31 Thread Angelo G.
Hello,

Trying to read a parquet file located in S3 leads to a AWS credentials
exception. Switching to other format (raw, for example) works ok regarding
to file access.

This is a snippet of code to reproduce the issue:

static void parquetS3Error() {

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

// parquet format gives error:
// Caused by: java.net.SocketTimeoutException: doesBucketExist on
bucket-prueba-medusa: com.amazonaws.AmazonClientException:
// No AWS Credentials provided by BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider
InstanceProfileCredentialsProvider :
// com.amazonaws.SdkClientException: Failed to connect to service endpoint:
t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value`
INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../',
'format' = 'parquet')");

// other formats (i.e. raw) work properly:
// Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
//++
//|url |
//++
//| [80, 65, 82, 49, 21, 0, 21,... |
//| [0, 0, 0, 50, 48, 50, 49, 4... |
t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH (
'connector' = 'filesystem', 'path' = 's3a://.../', 'format' =
'raw')");

Table t1 = t_env.from("backup");

t1.execute().print();

}

Flink version is 1.12.2.

Please find attached the pom with dependencies and version numbers.

What would be a suitable workaround for this?

Thank you very much.

Angelo.

http://maven.apache.org/POM/4.0.0;
		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;>
	4.0.0
	org.apache.flink
	flink-s3
	1.0-SNAPSHOT
	jar
	Flink Quickstart Job
	
		UTF-8
		1.12.2
		1.8
		2.11
		${target.java.version}
		${target.java.version}
		2.12.1
	
	
		
			apache.snapshots
			Apache Development Snapshot Repository
			https://repository.apache.org/content/repositories/snapshots/
			
false
			
			
true
			
		
	
	
		
		
		
			org.apache.flink
			flink-java
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-streaming-java_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-clients_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-table-api-java-bridge_2.11
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-table-planner-blink_2.11
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-streaming-scala_2.11
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-table-common
			${flink.version}
			provided
		
		
		
			org.apache.flink
			flink-s3-fs-hadoop
			${flink.version}
		
		
		
			org.apache.hadoop
			hadoop-mapreduce-client-core
			3.1.0
		
		
			org.apache.flink
			flink-parquet_2.11
			${flink.version}
		
		
		
		
			org.apache.logging.log4j
			log4j-slf4j-impl
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-api
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-core
			${log4j.version}
			runtime
		
	
	
		
			
			
org.apache.maven.plugins
maven-compiler-plugin
3.1

	${target.java.version}
	${target.java.version}

			
			
			
			
org.apache.maven.plugins
maven-shade-plugin
3.1.1

	
	
		package
		
			shade
		
		
			

	org.apache.flink:force-shading
	com.google.code.findbugs:jsr305
	org.slf4j:*
	org.apache.logging.log4j:*

			
			

	
	*:*
	
		META-INF/*.SF
		META-INF/*.DSA
		META-INF/*.RSA
	

			
			

	org.apache.flink.StreamingJob

			
		
	

			
		
		
			


	org.eclipse.m2e
	lifecycle-mapping
	1.0.0
	
		
			

	
		org.apache.maven.plugins
		maven-shade-plugin
		[3.1.1,)
		
			shade
		
	
	
		
	


	
		org.apache.maven.plugins
		maven-compiler-plugin
		[3.1,)
		
			testCompile
			compile
		
	
	
		
	

			
		
	

			
		
	


Re: 回复:Flink sql的state ttl设置

2021-05-31 Thread LakeShen
或许你可以参考这个:
[image: image.png]

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/

Best,
LakeShen

chenchencc <1353637...@qq.com> 于2021年5月28日周五 下午4:30写道:

> 想问下state ttl能针对单表设置吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: rocksdb状态后端最多保留checkpoints问题

2021-05-31 Thread LakeShen
在增量 checkpoint 下,你可以简单理解状态几乎都存在 checkpoint 目录中的 shared 目录,
所以即使清理 checkpoint,也只是先将这次 checkpoint 引用的相关文件句柄的引用数减1,
只有一个文件没有 checkpoint 引用它时,才会真正删除该文件。

Best,
LakeShen.

刘建刚  于2021年5月28日周五 下午7:03写道:

> 增量快照的原理是sst文件共享,系统会自动帮助你管理sst文件的引用,类似java的引用,并不会因为一个快照删除了就会把实际的数据删除掉。
> 也就不会发生你说的情况
>
> tison  于2021年5月28日周五 上午1:47写道:
>
> > rocksdb 增量 checkpoint 不是你这么理解的,总的不会恢复不了。原因可以参考下面的材料
> >
> > -
> >
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> > 官方 blog 介绍
> > - https://www.bilibili.com/video/BV1db411e7x2 施博士的介绍,大概 24 分钟开始讲
> >
> > Best,
> > tison.
> >
> >
> > casel.chen  于2021年5月27日周四 下午11:35写道:
> >
> > > 作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb
> > >
> >
> state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?
> >
>


Re: 流与流 left join

2021-05-31 Thread LakeShen
Hi,或许 Flink SQL  interval join 能够满足你的需求。

Best,
LakeShen.

Shuo Cheng  于2021年5月31日周一 下午12:10写道:

> state ttl 只能是全局算子维度, table.exec.state.ttl
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 求教:动态字段的处理

2021-05-31 Thread LakeShen
看下你的 Flink 版本是多少,如果是高版本的话,社区有提供 DataStream 的 HBase Sink。

Best,
LakeShen.

Zorro  于2021年5月31日周一 下午2:41写道:

> 由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。
>
> 如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。
> 如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase SQL
> connector做一些更改。不过这些更改是需要在Java代码层面的。
>
> 至于其他的处理逻辑可以用pyFlink很方便的改写。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Connecting to MINIO Operator/Tenant via SSL

2021-05-31 Thread Nico Kruber
Just a hunch:
Your command to start the job is only submitting the Flink job to an existing 
cluster. Did you also configure the certificates on the cluster's machines 
(because they would ultimately do these checks, not your local machine 
submitting the job)?
-> You can specify additional JVM parameters for TMs and JMs as shown in [1]


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/
deployment/config/#env-java-opts

On Tuesday, 18 May 2021 15:13:45 CEST Robert Cullen wrote:
> The new MINIO operator/tenant model requires connection over SSL. I’ve
> added the 3 public certs that MINIO provides using keytool to the
> truststore and passing the JVM params via command line to flink as follows:
> 
> root@flink-client:/opt/flink# ./bin/flink run --detached --target
> kubernetes-session -Dkubernetes.cluster-id=flink-jobmanager
> -Dkubernetes.namespace=flink -Djavax.net.ssl.trustStore=$JAVA
> _HOME/lib/security/cacerts -Djavax.net.ssl.trustStorePassword=changeit
> ./usrlib/flink-job.jar
> 
> But there is still an SSL validation error:
> 
> 
> 2021-05-18 13:01:53,635 DEBUG com.amazonaws.auth.AWS4Signer
> [] - AWS4 String to Sign: '"AWS4-HMAC-SHA256
> 20210518T130153Z
> 20210518/us-east-1/s3/aws4_request
> b38391c7efd22a9ed0bceb93d460732bdd632f2acccf7e9d2d1baa30be69ced2"
> 2021-05-18 13:01:53,636 DEBUG
> com.amazonaws.http.conn.ssl.SdkTLSSocketFactory  [] -
> connecting to /10.42.0.133:9000
> 2021-05-18 13:01:53,636 DEBUG
> com.amazonaws.http.conn.ssl.SdkTLSSocketFactory  [] -
> Connecting socket to /10.42.0.133:9000 with timeout 5000
> 2021-05-18 13:01:53,636 DEBUG
> com.amazonaws.http.conn.ssl.SdkTLSSocketFactory  [] -
> Enabled protocols: [TLSv1.2]
> 2021-05-18 13:01:53,636 DEBUG
> com.amazonaws.http.conn.ssl.SdkTLSSocketFactory  [] -
> Enabled cipher suites:[TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
> TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
> TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
> TLS_RSA_WITH_AES_256_GCM_SHA384,
> TLS_ECDH_ECDSA_WITH_AES_256_GCM_SHA384,
> TLS_ECDH_RSA_WITH_AES_256_GCM_SHA384,
> TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,
> TLS_DHE_DSS_WITH_AES_256_GCM_SHA384,
> TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
> TLS_RSA_WITH_AES_128_GCM_SHA256,
> TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256,
> TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256,
> TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,
> TLS_DHE_DSS_WITH_AES_128_GCM_SHA256,
> TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384,
> TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384,
> TLS_RSA_WITH_AES_256_CBC_SHA256,
> TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384,
> TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384,
> TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,
> TLS_DHE_DSS_WITH_AES_256_CBC_SHA256,
> TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
> TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA,
> TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA,
> TLS_ECDH_RSA_WITH_AES_256_CBC_SHA, TLS_DHE_RSA_WITH_AES_256_CBC_SHA,
> TLS_DHE_DSS_WITH_AES_256_CBC_SHA,
> TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,
> TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,
> TLS_RSA_WITH_AES_128_CBC_SHA256,
> TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256,
> TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256,
> TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,
> TLS_DHE_DSS_WITH_AES_128_CBC_SHA256,
> TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
> TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_128_CBC_SHA,
> TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA,
> TLS_ECDH_RSA_WITH_AES_128_CBC_SHA, TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
> TLS_DHE_DSS_WITH_AES_128_CBC_SHA, TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
> 2021-05-18 13:01:53,636 DEBUG
> com.amazonaws.http.conn.ssl.SdkTLSSocketFactory  [] -
> socket.getSupportedProtocols(): [TLSv1.3, TLSv1.2, TLSv1.1, TLSv1,
> SSLv3, SSLv2Hello], socket.getEnabledProtocols(): [TLSv1.2]
> 2021-05-18 13:01:53,636 DEBUG
> com.amazonaws.http.conn.ssl.SdkTLSSocketFactory  [] - TLS
> protocol enabled for SSL handshake: [TLSv1.2, TLSv1.1, TLSv1]
> 2021-05-18 13:01:53,636 DEBUG
> com.amazonaws.http.conn.ssl.SdkTLSSocketFactory  [] -
> Starting handshake
> 2021-05-18 13:01:53,638 DEBUG
> com.amazonaws.http.conn.ClientConnectionManagerFactory   [] -
> java.lang.reflect.InvocationTargetException: null
> at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
> l.java:43) ~[?:1.8.0_292]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_292]
> at
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(Clien
> tConnectionManagerFactory.java:76) ~[flink-s3-fs-hadoop-1.13.0.jar:1.13.0]
> at com.amazonaws.http.conn.$Proxy49.connect(Unknown Source) ~[?:1.13.0]
> at
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec
> .java:393) ~[flink-s3-fs-hadoop-1.13.0.jar:1.13.0]
> at
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:2
> 36) ~[flink-s3-fs-hadoop-1.13.0.jar:1.13.0]
> 

Re: [ANNOUNCE] Apache Flink 1.13.1 released

2021-05-31 Thread Till Rohrmann
Thanks for the great work Dawid and to everyone who has contributed to this
release.

Cheers,
Till

On Mon, May 31, 2021 at 10:25 AM Yangze Guo  wrote:

> Thanks, Dawid for the great work, thanks to everyone involved.
>
> Best,
> Yangze Guo
>
> On Mon, May 31, 2021 at 4:14 PM Youngwoo Kim (김영우) 
> wrote:
> >
> > Got it.
> > Thanks Dawid for the clarification.
> >
> > - Youngwoo
> >
> > On Mon, May 31, 2021 at 4:50 PM Dawid Wysakowicz 
> wrote:
> >>
> >> Hi Youngwoo,
> >>
> >> Usually we publish the docker images a day after the general release, so
> >> that the artifacts are properly distributed across Apache mirrors. You
> >> should be able to download the docker images from apache/flink now. It
> >> may take a few extra days to have the images published as the official
> >> image, as it depends on the maintainers of docker hub.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 31/05/2021 08:01, Youngwoo Kim (김영우) wrote:
> >> > Great work! Thank you Dawid and all of the contributors.
> >> > I'm eager to adopt the new release, however can't find docker images
> for
> >> > that from https://hub.docker.com/_/flink
> >> >
> >> > Hope it'll be available soon.
> >> >
> >> > Thanks,
> >> > Youngwoo
> >> >
> >> >
> >> > On Sat, May 29, 2021 at 1:49 AM Dawid Wysakowicz <
> dwysakow...@apache.org>
> >> > wrote:
> >> >
> >> >> The Apache Flink community is very happy to announce the release of
> Apache
> >> >> Flink 1.13.1, which is the first bugfix release for the Apache Flink
> 1.13
> >> >> series.
> >> >>
> >> >> Apache Flink® is an open-source stream processing framework for
> >> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> >> applications.
> >> >>
> >> >> The release is available for download at:
> >> >> https://flink.apache.org/downloads.html
> >> >>
> >> >> Please check out the release blog post for an overview of the
> improvements
> >> >> for this bugfix release:
> >> >> https://flink.apache.org/news/2021/05/28/release-1.13.1.html
> >> >>
> >> >> The full release notes are available in Jira:
> >> >>
> >> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
> >> >>
> >> >> We would like to thank all contributors of the Apache Flink
> community who
> >> >> made this release possible!
> >> >>
> >> >> Regards,
> >> >> Dawid Wysakowicz
> >> >>
> >>
>


不同的程序在同一时间段报同一个异常

2021-05-31 Thread mq sun
大家好:
  最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
ERROR org.apache.flink.runtime.blob.BlobServerConnection  -Error while
excuting Blob connection
.
.
.
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
:Adjusted frame length exceeds 10485760: 1347375960 -discarded

上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因


Re: flink sink kafka from checkpoint run failed

2021-05-31 Thread tianxy
我也遇到了 请问你解决了没




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink in k8s operators list

2021-05-31 Thread Ilya Karpov
Hi Svend,

thank you so much to sharing your experience! GCP k8s operator looks promising 
(currently i’m trying to build it and run helm chart. An issue 
 with 
k8s version 1.18+ is road block right now, but I see that there is a solution), 
and also seems like flink team also refers  to it 
this implementation.

In your setup did you solve the problem of visualising list of in-progress jobs?

> One worrying point though is that the maintainers of the repo seem to have 
> become silent in March this year.
lyfts implementation  (haven’t tried 
it yet) seems to be even more abandoned (last release 20/04/2020).

> 29 мая 2021 г., в 11:23, Svend  написал(а):
> 
> Hi Ilya,
> 
> At my company we're currently using the GCP k8s operator (2nd on your list). 
> Our usage is very moderate, but so far it works great for us.
> 
> We appreciate that when upgrading the application, it triggers automatically 
> a savepoint during shutdown and resumes from it when restarting. It also 
> allows to take savepoints at regular intervals (we take one per day 
> currently).
> 
> We're using it with Flink 1.12.4 and AWS EKS.
> 
> Getting the Flink metrics and logs exported to our monitoring system worked 
> out of the box. 
> 
> Configuring IAM roles and K8s service account for saving checkpoints and 
> savepoints to S3 required a bit more fiddling although we got it working. 
> 
> Happy to share code snippet about any of that if that's useful :)
> 
> It was last updated with Flink 1.11 in mind, so there is currently no 
> built-in support for the reactive scaling mode recently added in Flink 1.13.
> 
> One worrying point though is that the maintainers of the repo seem to have 
> become silent in March this year. There is a small and active community 
> around it though and issues and PRs keep on arriving and are waiting for 
> feed-back. It's all free and OSS, so who are we to complain? Though it's 
> still an important attention point.
> 
> Hope this helps,
> 
> Svend
> 
> 
> 
> 
> 
> On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote:
>> Hi there,
>> 
>> I’m making a little research about the easiest way to deploy link job to k8s 
>> cluster and manage its lifecycle by k8s operator. The list of solutions is 
>> below:
>> - https://github.com/fintechstudios/ververica-platform-k8s-operator 
>> 
>> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator 
>> 
>> - https://kudo.dev/docs/examples/apache-flink.html 
>> 
>> - https://github.com/wangyang0918/flink-native-k8s-operator 
>> 
>> 
>> If you are using smth that is not listed above please share! Any share about 
>> how specific solution works is greatly appreciated.
>> 
>> Thanks in advance



怎么关闭operatorChaining

2021-05-31 Thread McClone
版本flink 1.11.2
 EnvironmentSettings build = 
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(build);

recover from svaepoint

2021-05-31 Thread 周瑞
HI:
   When "sink.semantic = exactly-once", the following 
exception is thrown when recovering from svaepoint


   public static final String KAFKA_TABLE_FORMAT =
  "CREATE TABLE "+TABLE_NAME+" (\n" +
  " 
"+COLUMN_NAME+" STRING\n" +
  ") WITH 
(\n" +
  " 
'connector' = 'kafka',\n" +
  " 
'topic' = '%s',\n" +
  " 
'properties.bootstrap.servers' = '%s',\n" +
  " 
'sink.semantic' = 'exactly-once',\n" +
  " 
'properties.transaction.timeout.ms' = '90',\n" +
  " 
'sink.partitioner' = 
'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
  " 
'format' = 'dbz-json'\n" +
  ")\n";
[] -Source:TableSourceScan(table=[[default_catalog, 
default_database, debezium_source]], fields=[data]) 
-Sink:Sink(table=[default_catalog.default_database.KafkaTable], 
fields=[data]) (1/1)#859(075273be72ab01bf1afd3c066876aaa6) 
switchedfromINITIALIZINGtoFAILEDwith failure 
cause: 
org.apache.kafka.common.KafkaException:UnexpectederrorinInitProducerIdResponse;Producerattempted
 an operation with an old epoch.Eitherthere is a newer producer 
with the same transactionalId,orthe producer's transaction has been 
expired by the broker.
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
 at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
 at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
 at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
 at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
 at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
 at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
 at java.lang.Thread.run(Thread.java:748)

savepoint fail

2021-05-31 Thread 周瑞
HI:
  When "sink.semantic = exactual-only", the following 
exception is thrown when recovering from svaepoint


   public static final String KAFKA_TABLE_FORMAT =
  "CREATE TABLE "+TABLE_NAME+" (\n" +
  " 
"+COLUMN_NAME+" STRING\n" +
  ") WITH 
(\n" +
  " 
'connector' = 'kafka',\n" +
  " 
'topic' = '%s',\n" +
  " 
'properties.bootstrap.servers' = '%s',\n" +
  " 
'sink.semantic' = 'exactly-once',\n" +
  " 
'properties.transaction.timeout.ms' = '90',\n" +
  " 
'sink.partitioner' = 
'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
  " 
'format' = 'dbz-json'\n" +
  ")\n";
[] - Source: TableSourceScan(table=[[default_catalog, 
default_database, debezium_source]], fields=[data]) - Sink: 
Sink(table=[default_catalog.default_database.KafkaTable], fields=[data]) 
(1/1)#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to 
FAILED with failure cause: org.apache.kafka.common.KafkaException: Unexpected 
error in InitProducerIdResponse; Producer attempted an operation with an old 
epoch. Either there is a newer producer with the same transactionalId, or the 
producer's transaction has been expired by the broker.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.lang.Thread.run(Thread.java:748)

Re: [ANNOUNCE] Apache Flink 1.13.1 released

2021-05-31 Thread Yangze Guo
Thanks, Dawid for the great work, thanks to everyone involved.

Best,
Yangze Guo

On Mon, May 31, 2021 at 4:14 PM Youngwoo Kim (김영우)  wrote:
>
> Got it.
> Thanks Dawid for the clarification.
>
> - Youngwoo
>
> On Mon, May 31, 2021 at 4:50 PM Dawid Wysakowicz  
> wrote:
>>
>> Hi Youngwoo,
>>
>> Usually we publish the docker images a day after the general release, so
>> that the artifacts are properly distributed across Apache mirrors. You
>> should be able to download the docker images from apache/flink now. It
>> may take a few extra days to have the images published as the official
>> image, as it depends on the maintainers of docker hub.
>>
>> Best,
>>
>> Dawid
>>
>> On 31/05/2021 08:01, Youngwoo Kim (김영우) wrote:
>> > Great work! Thank you Dawid and all of the contributors.
>> > I'm eager to adopt the new release, however can't find docker images for
>> > that from https://hub.docker.com/_/flink
>> >
>> > Hope it'll be available soon.
>> >
>> > Thanks,
>> > Youngwoo
>> >
>> >
>> > On Sat, May 29, 2021 at 1:49 AM Dawid Wysakowicz 
>> > wrote:
>> >
>> >> The Apache Flink community is very happy to announce the release of Apache
>> >> Flink 1.13.1, which is the first bugfix release for the Apache Flink 1.13
>> >> series.
>> >>
>> >> Apache Flink® is an open-source stream processing framework for
>> >> distributed, high-performing, always-available, and accurate data 
>> >> streaming
>> >> applications.
>> >>
>> >> The release is available for download at:
>> >> https://flink.apache.org/downloads.html
>> >>
>> >> Please check out the release blog post for an overview of the improvements
>> >> for this bugfix release:
>> >> https://flink.apache.org/news/2021/05/28/release-1.13.1.html
>> >>
>> >> The full release notes are available in Jira:
>> >>
>> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
>> >>
>> >> We would like to thank all contributors of the Apache Flink community who
>> >> made this release possible!
>> >>
>> >> Regards,
>> >> Dawid Wysakowicz
>> >>
>>


Re: [ANNOUNCE] Apache Flink 1.13.1 released

2021-05-31 Thread 김영우
Got it.
Thanks Dawid for the clarification.

- Youngwoo

On Mon, May 31, 2021 at 4:50 PM Dawid Wysakowicz 
wrote:

> Hi Youngwoo,
>
> Usually we publish the docker images a day after the general release, so
> that the artifacts are properly distributed across Apache mirrors. You
> should be able to download the docker images from apache/flink now. It
> may take a few extra days to have the images published as the official
> image, as it depends on the maintainers of docker hub.
>
> Best,
>
> Dawid
>
> On 31/05/2021 08:01, Youngwoo Kim (김영우) wrote:
> > Great work! Thank you Dawid and all of the contributors.
> > I'm eager to adopt the new release, however can't find docker images for
> > that from https://hub.docker.com/_/flink
> >
> > Hope it'll be available soon.
> >
> > Thanks,
> > Youngwoo
> >
> >
> > On Sat, May 29, 2021 at 1:49 AM Dawid Wysakowicz  >
> > wrote:
> >
> >> The Apache Flink community is very happy to announce the release of
> Apache
> >> Flink 1.13.1, which is the first bugfix release for the Apache Flink
> 1.13
> >> series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> improvements
> >> for this bugfix release:
> >> https://flink.apache.org/news/2021/05/28/release-1.13.1.html
> >>
> >> The full release notes are available in Jira:
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >>
> >> Regards,
> >> Dawid Wysakowicz
> >>
>
>


Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-31 Thread Yang Wang
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。


Best,
Yang

刘建刚  于2021年5月28日周五 下午6:51写道:

> 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
>
> 董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:
>
> > 稳定复现
> > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> > 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-05-28 18:15:38,"刘建刚"  写道:
> > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> > >1、从savepoint恢复;
> > >2、作业开始定期做savepoint;
> > >3、作业failover。
> > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> > >如果还是有问题,需要通过日志来排查了。
> > >
> > >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
> > >
> > >> 我遇到的问题现象是这样的
> > >>
> > >>
> > >>
> > >>
> > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> > >>
> > >>
> > >>
> > >>
> > >> flink run -d -s
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> > >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> > >>
> > >>
> > >>
> > >>
> > >> 2、flink-conf.xml
> > >>
> > >>
> > >>
> > >>
> > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> > >>
> > >>
> > >>
> > >>
> > >> 3、代码checkpoint设置
> > >>
> > >>
> > >>
> > >>
> > >>StreamExecutionEnvironment env =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>
> > >>
> > >>
> > >>
> > >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> > >> 10));
> > >>
> > >>
> > >>
> > >>
> > >>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >>
> > >>
> > >>
> > >>
> > >>env.enableCheckpointing(1 * 60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setTolerableCheckpointFailureNumber(100);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setCheckpointTimeout(60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setMaxConcurrentCheckpoints(1);
> > >>
> > >>
> > >>
> > >>
> > >> 4、问题现象
> > >>
> > >>
> > >>
> > >>
> > >> a)运维同事切换yarn
> > >>
> >
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> > >>
> > >>
> > >>
> > >>
> > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> > >> restore,从日志中看还是从chk-100 restore的。
> > >>
> > >>
> > >>
> > >>
> > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> > >> sourceMilApplysLogStream = MySQLSource.builder()
> > >>
> > >>
> > >>
> > >>
> > >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> > >>
> > >>
> > >>
> > >>
> > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> > >>
> > >>
> > >>
> > >>
> > >> 2021-05-24 16:49:50,398 INFO
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >>
> > >>
> > >>
> > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> > >>
> > >>
> > >>
> > >>
> > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
> >
>


Re: flink 1.13 k8s native 启动找不到 KubernetesSessionClusterEntrypoint

2021-05-31 Thread Yang Wang
你可以describe一下失败JM的pod发出来,看看生成的启动命令是不是正确的


Best,
Yang

fz  于2021年5月28日周五 下午10:09写道:

> 镜像: flink:1.13.0-scala_2.11
>
> sed: cannot rename /opt/flink/conf/sed1yRdDY: Device or resource busy
> sed: cannot rename /opt/flink/conf/sed03zP3W: Device or resource busy
> /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only
> file system
> sed: cannot rename /opt/flink/conf/sedFtORA0: Device or resource busy
> mv: cannot move '/opt/flink/conf/flink-conf.yaml.tmp' to
> '/opt/flink/conf/flink-conf.yaml': Device or resource busy
> + /usr/local/openjdk-8/bin/java -classpath '/opt/flink/lib/*' -Xms30720m
> -Xmx30720m -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
> -Dlog4j.configuration=file:/opt/flink/conf/log4j.properties
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> Error: Could not find or load main class
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: JM cannot recover with Kubernetes HA

2021-05-31 Thread Yang Wang
When your APIServer or ETCD of your K8s cluster is working in heavy load,
then the fabric8 kubernetes client
might get a timeout when watching/renewing/getting the ConfigMap.

I think you could increase the read/connect timeout(default is 10s) of http
client and have a try.
env.java.opts: "-Dkubernetes.connection.timeout=3
-Dkubernetes.request.timeout=3"

As well as the leader election timeout.
high-availability.kubernetes.leader-election.lease-duration: 30s
high-availability.kubernetes.leader-election.renew-deadline: 30s


After you apply these configurations, I think the Flink cluster could
tolerate the "not-very-good" network environment.


Moreover, if you could share the failed JobManager logs, it will be easier
for the community to debug the issues.


Best,
Yang

Matthias Pohl  于2021年5月28日周五 下午11:37写道:

> Hi Enrique,
> thanks for reaching out to the community. I'm not 100% sure what problem
> you're facing. The log messages you're sharing could mean that the Flink
> cluster still behaves as normal having some outages and the HA
> functionality kicking in.
>
> The behavior you're seeing with leaders for the different actors (i.e.
> RestServer, Dispatcher, ResourceManager) being located on different hosts
> is fine and no indication for something going wrong as well.
>
> It might help to share the entire logs with us if you need assistance in
> investigating your issue.
>
> Best,
> Matthias
>
> On Thu, May 27, 2021 at 12:42 PM Enrique  wrote:
>
>> To add to my post, instead of using POD IP for the
>> `jobmanager.rpc.address`
>> configuration we start each JM pod with the Fully Qualified Name `--host
>> ..ns.svc:8081`  and this address gets
>> persisted
>> to the ConfigMaps. In some scenarios, the leader address in the ConfigMaps
>> might differ.
>>
>> For example, let's assume I have 3 JMs:
>>
>> jm-0.jm-statefulset.ns.svc:8081 <-- Leader
>> jm-1.jm-statefulset.ns.svc:8081
>> jm-2.jm-statefulset..ns.svc:8081
>>
>> I have seen the ConfigMaps in the following state:
>>
>> RestServer Configmap Address: jm-0.jm-statefulset.ns.svc:8081
>> DispatchServer Configmap Address: jm-1.jm-statefulset.ns.svc:8081
>> ResourceManager ConfigMap Address: jm-0.jm-statefulset.ns.svc:8081
>>
>> Is this the correct behaviour?
>>
>> I then have seen that the TM pods fail to connect due to
>>
>> ```
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
>> token
>> not set: Ignoring message
>> RemoteFencedMessage(b870874c1c590d593178811f052a42c9,
>> RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time)))
>> sent to
>> akka.tcp://fl...@jm-1.jm-statefulset.ns.svc
>> :6123/user/rpc/resourcemanager_0
>> because the fencing token is null.
>> ```
>>
>> This is explained by Till
>>
>> https://issues.apache.org/jira/browse/FLINK-18367?focusedCommentId=17141070=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17141070
>>
>> Has anyone else seen this?
>>
>> Thanks!
>>
>> Enrique
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread yujianbo
一、环境:
1、版本:1.12.0
2、flink sql
3、已经设置了setIdleStateRetention 为1小时
4、状态后端是rocksDB, 增量模式
5、源数据没有数据激增情况,任务已经跑了两天

二、详情
具体sql见第三大点,就是普通的group by统计的
sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。
   
我sql的groupby维度有加一个具体的分钟字段,所以一小时之后是不可能有一模一样的维度数据,那过期的数据正常是要被清理掉,那/checkpoint/shared/文件夹大小不断增长是否能说明过期的旧数据还没有被清理?
这种情况应该怎么处理

三、sql具体

CREATE TABLE user_behavior (
   `request_ip` STRING,
   `request_time` BIGINT,
   `header` STRING ,
//这个操作是将时间戳转为分钟
   `t_min` as cast(`request_time`-(`request_time` + 2880)%6 as
BIGINT),
   `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'-MM-dd
HH:mm:ss')),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) 
with (
   'connector' = 'kafka',
    
);


CREATE TABLE blackhole_table (
   `cnt` BIGINT,
   `lists` STRING
) WITH (
 'connector' = 'blackhole'
);


insert into blackhole_table 
select 
count(*) as cnt, 
LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING)))
as lists
from user_behavior 
group by `request_ip`,`header`,`t_min`;





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [ANNOUNCE] Apache Flink 1.13.1 released

2021-05-31 Thread Dawid Wysakowicz
Hi Youngwoo,

Usually we publish the docker images a day after the general release, so
that the artifacts are properly distributed across Apache mirrors. You
should be able to download the docker images from apache/flink now. It
may take a few extra days to have the images published as the official
image, as it depends on the maintainers of docker hub.

Best,

Dawid

On 31/05/2021 08:01, Youngwoo Kim (김영우) wrote:
> Great work! Thank you Dawid and all of the contributors.
> I'm eager to adopt the new release, however can't find docker images for
> that from https://hub.docker.com/_/flink
>
> Hope it'll be available soon.
>
> Thanks,
> Youngwoo
>
>
> On Sat, May 29, 2021 at 1:49 AM Dawid Wysakowicz 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink 1.13.1, which is the first bugfix release for the Apache Flink 1.13
>> series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements
>> for this bugfix release:
>> https://flink.apache.org/news/2021/05/28/release-1.13.1.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Dawid Wysakowicz
>>



OpenPGP_signature
Description: OpenPGP digital signature


Re: Running multiple CEP pattern rules

2021-05-31 Thread Dawid Wysakowicz
I am afraid there is no much of an active development going on in the
CEP library. I would not expect new features there in the nearest future.

On 28/05/2021 22:00, Tejas wrote:
> Hi Dawid,
> Do you have any plans to bring this functionality in flink CEP in future ?
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



OpenPGP_signature
Description: OpenPGP digital signature


Re: Apache Flink - A question about Tables API and SQL interfaces

2021-05-31 Thread Ingo Bürk
Hi everyone,

there is also [1] to introduce a CURRENT_WATERMARK function in SQL which
can help in dealing with late events. Maybe that's interesting here as well.

[1] https://issues.apache.org/jira/browse/FLINK-22737


Regards
Ingo

On Sun, May 30, 2021 at 5:31 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Mans,
>
> Regarding to your first question: I bookmarked the following mailing list
> discussion a while ago [1].
>
> Fabian Hueske as one of the major contributors to Flink answered that
> there aren't yet any trigger semantics in Flink SQL, but linked a great
> idea with a SQL extension of "EMIT".
>
> I read each Flink release notes and hope this idea is going to be
> implemented, but as far as I know, there wasn't any progress on this over
> the last years.
>
> Best regards
> Theo
>
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Do-Not-Support-Custom-Trigger-td20932.html
>
> - Ursprüngliche Mail -
> Von: "张静" 
> An: "Austin Cawley-Edwards" 
> CC: "M Singh" , "user" 
> Gesendet: Freitag, 14. Mai 2021 06:06:33
> Betreff: Re: Apache Flink - A question about Tables API and SQL interfaces
>
> Hi Mans,
>  +1 for Austin's reply.
>  I would like to add something about "allow lateness".
>  After introduce Windowing table-valued function in Flink 1.13,
> User could use two SQL solution to do window aggregate. And 'allow
> lateness' behavior is different in these two solutions.
> 1. If adopt windowing tvf window aggregate [2], 'allow lateness'
> is not supported yet.
> 2. If adopt legacy Grouped Window Functions [1], 'allow lateness'
> is supported. However, you should use the feature with caution since
> it depends on state retention configuration (`table.exec.state.ttl`
> [3]), especially if a job contains many operator except for window
> aggregate. Please see JIRA-21301 [4]. It maybe could be resolved in
> Flink-1.14.
>
> Best,
> beyond1920
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/#group-window-aggregation-deprecated
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf/
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/
> [4]:https://issues.apache.org/jira/browse/FLINK-21301
>
> Austin Cawley-Edwards  于2021年5月13日周四 下午9:57写道:
> >
> > Hi Mans,
> >
> > There are currently no public APIs for doing so, though if you're
> willing to deal with some breaking changes there are some experimental
> config options for late events in the Table API and SQL, seen in the
> WIndowEmitStrategy class[1].
> >
> > Best,
> > Austin
> >
> > [1]:
> https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L173-L211
> >
> > On Wed, May 12, 2021 at 5:12 PM M Singh  wrote:
> >>
> >> Thanks Austin for your helpful references.
> >>
> >> I did take a look at [2]/[3] - but did not find anything relevant on
> searching for string 'late' (for allowed lateness etc) or side output.  So
> from my understanding the late events will be dropped if I am using Table
> API or SQL and the only option is to use datastream interface.  Please let
> me know if I missed anything.
> >>
> >> Thanks again.
> >>
> >>
> >> On Wednesday, May 12, 2021, 04:36:06 PM EDT, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
> >>
> >>
> >> Hi Mans,
> >>
> >> I don't believe there are explicit triggers/evictors/timers in the
> Table API/ SQL, as that is abstracted away from the lower-level DataStream
> API. If you need to get into the fine-grained details, Flink 1.13 has made
> some good improvements in going from the Table API to the DataStream API,
> and back again. [1]
> >>
> >> For working with time and lateness with Table API and SQL, some good
> places to look are the GroupBy Window Aggregation section of the Table API
> docs[2], as well as the SQL cookbook[3] and Ververica's SQL training
> wiki[4].
> >>
> >> Hope that helps,
> >> Austin
> >>
> >> [1]:
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html#improved-interoperability-between-datastream-api-and-table-apisql
> >> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#groupby-window-aggregation
> >> [3]:
> https://github.com/ververica/flink-sql-cookbook#aggregations-and-analytics
> >> [4]: https://github.com/ververica/sql-training/wiki/Queries-and-Time
> >>
> >> On Wed, May 12, 2021 at 1:30 PM M Singh  wrote:
> >>
> >> Hey Folks:
> >>
> >> I have the following questions regarding Table API/SQL in streaming
> mode:
> >>
> >> 1. Is there is a notion triggers/evictors/timers when using Table API
> or SQL interfaces ?
> >> 2. Is there anything like side outputs and ability to define allowed
> lateness when dealing with the Table API or SQL interfaces ?
> >>
> >> If 

Re: Idle source configuration per topic with the Kafka Table API connector

2021-05-31 Thread Svend
Awesome,  thanks a lot for clarifications Jing Zhang, it's very useful.

Best,

Svend

On Sun, 30 May 2021, at 6:27 AM, JING ZHANG wrote:
> Hi Svend,
> Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those 
> version provides many related improvements.
> 
> > as per [1]
> Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a 
> global parameter, It would apply to all those table sources which with 
> watermark  clause but not use SOURCE WATERMARK
> > as per [2]
> Yes.
> > If that is correct, I guess I can simply use the DataStream connector for 
> > that specific topic and then convert it to a Table.
> Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like 
> the following demo:
> Table table =
> tableEnv.fromDataStream(
> dataStream,
> Schema.*newBuilder*()
> . // other logical
> .watermark("columnName", "SOURCE_WATERMARK()")
> .build());
> I would like to invite Jark And Timo to double check, they are more familiar 
> with the issue.
> 
> Best,
> JING ZHANG
> 
> 
> Svend  于2021年5月29日周六 下午3:34写道:
>> __
>> Hi everyone,
>> 
>> My Flink streaming application consumes several Kafka topics, one of which 
>> receiving traffic in burst once per day.
>> 
>> I would like that topic not to hold back the progress of the watermark.
>> 
>> Most of my code is currently using the SQL API and in particular the Table 
>> API Kafka connector.
>> 
>> I have read about the idle source configuration mechanism, could you please 
>> confirm my understanding that:
>> 
>> * as per [1]: when I'm using the Table API Kafka connector, we currently do 
>> not have the possibility to specify the idle source parameter specifically 
>> for each topic, although we can set it globally on the 
>> StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter
>> 
>> * as per [2]: when using the DataStream Kafka connector, we can set the idle 
>> source parameter specifically for each topic by specifying ".withIdleness()" 
>> to the WatermarkStrategy.
>> 
>> If that is correct, I guess I can simply use the DataStream connector for 
>> that specific topic and then convert it to a Table.
>> 
>> Thanks a lot!
>> 
>> Svend
>> 
>> 
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission
>> 
>> 
>> 


Re: Got exception when running the localhost cluster

2021-05-31 Thread 김영우
Hi Lingfeng,

I believe Java 8 or 11 is appropriate for the Flink cluster at this point.
I'm not sure that Flink 1.13 supports Java 16 officially.

Thanks,
Youngwoo

On Mon, May 31, 2021 at 2:49 PM Lingfeng Pu  wrote:

> Hi,
>
> I'm new to Flink. I got a problem when running the local cluster on my
> computer. Some key software information as follows:
>
> 1. Flink version: 1.13.0 for Scala 2.11;
> 2. OS: Fedora 34;
> 3. Java version: 16;
> 4. Scala version: 2.11.12.
>
> When I started up the local cluster by command line, everything seems fine
> from the command line, BUT I could not access the localhost:8081 is failed
> to open. Furthermore, the exception comes out when I running the Flink
> example, please see all the details below:
>
> [root@localhost flink-1.13.0]# ./bin/start-cluster.sh
> Starting cluster.
> Starting standalonesession daemon on host fedora.
> Starting taskexecutor daemon on host fedora.
> [root@localhost flink-1.13.0]# ./bin/flink run
> examples/streaming/WordCount.jar
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Unable to make field private final byte[]
> java.lang.String.value accessible: module java.base does not "opens
> java.lang" to unnamed module @2baf3d81
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make
> field private final byte[] java.lang.String.value accessible: module
> java.base does not "opens java.lang" to unnamed module @2baf3d81
> at
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
> at
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
> at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)
> at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2053)
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
> at
> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
> at
> org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
> at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ... 8 more
>
> I tried search solutions online, but nothing useful for me so far. I
> urgently need some specific advice about how to solve this issue! I'll be
> grateful for that :)
>
>


Re: 求教:动态字段的处理

2021-05-31 Thread Zorro
由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。

如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。
如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase SQL
connector做一些更改。不过这些更改是需要在Java代码层面的。

至于其他的处理逻辑可以用pyFlink很方便的改写。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

??????flink job exception

2021-05-31 Thread day
 history server??

https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/advanced/historyserver/




----
??: 
   "user-zh"



Re: [ANNOUNCE] Apache Flink 1.13.1 released

2021-05-31 Thread 김영우
Great work! Thank you Dawid and all of the contributors.
I'm eager to adopt the new release, however can't find docker images for
that from https://hub.docker.com/_/flink

Hope it'll be available soon.

Thanks,
Youngwoo


On Sat, May 29, 2021 at 1:49 AM Dawid Wysakowicz 
wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.13.1, which is the first bugfix release for the Apache Flink 1.13
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/05/28/release-1.13.1.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Dawid Wysakowicz
>