RE: Could not stop job with a savepoint

2022-03-09 Thread Schwalbe Matthias
Hi Vinicius,

Your case, the taskmanager being actively killed by yarn was the other way this 
happened.

You are using RocksDBStateBackend, right?
Not being sure, I’ve got the strong suspicion that this has got to do with the 
glibc bug that is seemingly in the works.
There is some documentation here [1] and a solution that has been implemented 
for k8s containers [2] which replaces the glibc allocator with libjemalloc.so .

However we are not completely through with our encounter of the same problem.
Our intermediate solution is to reserve some unused extra memory, so the 
problem is delayed but not completely prevented (we restart our jobs daily by 
means of savepoint taking):

flink-conf.yaml:
…
taskmanager.memory.managed.fraction: 0.2
#reserve 2GB extra unused space (out of 8GB per TM) in order to mitigate the 
glibc memory leakage problem
taskmanager.memory.task.off-heap.size: 2048mb
…

I’m not sure if and starting with which Flink version libjemalloc.so is 
integrated by default into the flink runtime
… Flink team to the rescue !

Hope this helps

Thias

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_trouble/#container-memory-exceeded
[2] https://issues.apache.org/jira/browse/FLINK-19125

From: Vinicius Peracini 
Sent: Mittwoch, 9. März 2022 17:56
To: Schwalbe Matthias 
Cc: Dawid Wysakowicz ; user@flink.apache.org
Subject: Re: Could not stop job with a savepoint

So apparently the YARN container for Task Manager is running out of memory 
during the savepoint execution. Never had any problems with checkpoints though. 
Task Manager configuration:

"taskmanager.memory.process.size": "10240m",
"taskmanager.memory.managed.fraction": "0.6",
"taskmanager.memory.jvm-overhead.fraction": "0.07",
"taskmanager.memory.jvm-metaspace.size": "192mb",
"taskmanager.network.memory.buffer-debloat.enabled": "true",

On Wed, Mar 9, 2022 at 1:33 PM Vinicius Peracini 
mailto:vinicius.perac...@zenvia.com>> wrote:
Bom dia Schwalbe!

Thanks for the reply.

I'm using Flink 1.14.0. EMR is a managed cluster platform to run big data 
applications on AWS. This way Flink services are running on YARN. I tried to 
create another savepoint today and was able to retrieve the Job Manager log:

2022-03-09 15:42:10,294 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Triggering savepoint for job 6f9d71e57efba96dad7f5328ab9ac717.
2022-03-09 15:42:10,298 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 1378 (type=SAVEPOINT) @ 1646840530294 for job 
6f9d71e57efba96dad7f5328ab9ac717.
2022-03-09 15:45:19,636 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to 
[/172.30.0.169:57520] failed with 
java.io.IOException: Connection reset by peer
2022-03-09 15:45:19,648 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@ip-172-30-0-169.ec2.internal:46639] has failed, address is 
now gated for [50] ms. Reason: [Disassociated]
2022-03-09 15:45:19,652 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink-metrics@ip-172-30-0-169.ec2.internal:41533] has failed, 
address is now gated for [50] ms. Reason: [Disassociated]
2022-03-09 15:45:19,707 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
LEFT_JOIN_MESSAGE_BULK -> Map (1/3) (866e32468227f9f0adac82e9b83b970a) switched 
from RUNNING to FAILED on container_1646341714746_0005_01_04 @ 
ip-172-30-0-165.ec2.internal (dataPort=40231).
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'ip-172-30-0-169.ec2.internal/172.30.0.169:34413'. 
This might indicate that the remote task manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:186)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
 

Re: Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-09 Thread Jark Wu
Thanks Martijn for the reply and summary.

I totally agree with your plan and thank Yuxia for volunteering the Hive
tech debt issue.
I think we can create an umbrella issue for this and target version 1.16.
We can discuss
details and create subtasks there.

Regarding dropping old Hive versions, I'm also fine with that. But I would
like to investigate
some Hive users first to see whether it's acceptable at this point. My
first thought was we
can deprecate the old Hive versions in 1.15, and we can discuss dropping it
in 1.16 or 1.17.

Best,
Jark


On Thu, 10 Mar 2022 at 14:19, 罗宇侠(莫辞) 
wrote:

> Thanks Martijn for your insights.
>
> About the tech debt/maintenance with regards to Hive query syntax, I
> would like to chip-in and expect it can be resolved for Flink 1.16.
>
> Best regards,
>
> Yuxia
> ​
>
> --原始邮件 --
> *发件人:*Martijn Visser 
> *发送时间:*Thu Mar 10 04:03:34 2022
> *收件人:*User 
> *主题:*Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax
>
>> (Forwarding this also to the User mailing list as I made a typo when
>> replying to this email thread)
>>
>> -- Forwarded message -
>> From: Martijn Visser 
>> Date: Wed, 9 Mar 2022 at 20:57
>> Subject: Re: [DISCUSS] Flink's supported APIs and Hive query syntax
>> To: dev , Francesco Guardiani <
>> france...@ververica.com>, Timo Walther , <
>> us...@flink.apache.org>
>>
>>
>> Hi everyone,
>>
>> Thank you all very much for your input. From my perspective, I consider
>> batch as a special case of streaming. So with Flink SQL, we can support
>> both batch and streaming use cases and I think we should use Flink SQL as
>> our target.
>>
>> To reply on some of the comments:
>>
>> @Jing on your remark:
>> > Since Flink has a clear vision of unified batch and stream processing,
>> supporting batch jobs will be one of the critical core features to help us
>> reach the vision and let Flink have an even bigger impact in the industry.
>>
>> I fully agree with that statement. I do think that having Hive syntax
>> support doesn't help in that unified batch and stream processing. We're
>> making it easier for batch users to run their Hive batch jobs on Flink, but
>> that doesn't fit the "unified" part since it's focussed on batch, while
>> Flink SQL focusses on batch and streaming. I would have rather invested
>> time in making batch improvements to Flink and Flink SQL vs investing in
>> Hive syntax support. I do understand from the given replies that Hive
>> syntax support is valuable for those that are already running batch
>> processing and would like to run these queries on Flink. I do think that's
>> limited to mostly Chinese companies at the moment.
>>
>> @Jark I think you've provided great input and are spot on with:
>> > Regarding the maintenance concern you raised, I think that's a good
>> point and they are in the plan. The Hive dialect has already been a plugin
>> and option now, and the implementation is located in hive-connector module.
>> We still need some work to make the Hive dialect purely rely on public
>> APIs, and the Hive connector should be decopule with table planner. At that
>> time, we can move the whole Hive connector into a separate repository (I
>> guess this is also in the externalize connectors plan).
>>
>> I'm looping in Francesco and Timo who can elaborate more in depth on the
>> current maintenance issues. I think we need to have a proper plan on how
>> this tech debt/maintenance can be addressed and to get commitment that this
>> will be resolved in Flink 1.16, since we indeed need to move out all
>> previously agreed connectors before Flink 1.16 is released.
>>
>> > From my perspective, Hive is still widely used and there exists many
>> running Hive SQL jobs, so why not to provide users a better experience to
>> help them migrate Hive jobs to Flink? Also, it doesn't conflict with Flink
>> SQL as it's just a dialect option.
>>
>> I do think there is a conflict with Flink SQL; you can't use both of them
>> at the same time, so you don't have access to all features in Flink. That
>> increases feature sparsity and user friction. It also puts a bigger burden
>> on the Flink community, because having both options available means more
>> maintenance work. For example, an upgrade of Calcite is more impactful. The
>> Flink codebase is already rather large and CI build times are already too
>> long. More code means more risk of bugs. If a user at some point wants to
>> change his Hive batch job to a streaming Flink SQL job, there's still
>> migration work for the user, it just needs to happen at a later stage.
>>
>> @Jingsong I think you have a good argument that migrating SQL for Batch
>> ETL is indeed an expensive effort.
>>
>> Last but not least, there was no one who has yet commented on the
>> supported Hive versions and security issues. I've reached out to the Hive
>> community and from the info I've received so far is that only Hive 3.1.x
>> and Hive 2.3.x are still supported. The older Hive 

回复:Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-09 Thread 罗宇侠(莫辞)
Thanks Martijn for your insights.

About the tech debt/maintenance with regards to Hive query syntax, I would like 
to chip-in and expect it can be resolved for Flink 1.16.

Best regards,

Yuxia
​

 --原始邮件 --
发件人:Martijn Visser 
发送时间:Thu Mar 10 04:03:34 2022
收件人:User 
主题:Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax

(Forwarding this also to the User mailing list as I made a typo when replying 
to this email thread)

-- Forwarded message -
From: Martijn Visser 
Date: Wed, 9 Mar 2022 at 20:57
Subject: Re: [DISCUSS] Flink's supported APIs and Hive query syntax
To: dev , Francesco Guardiani , 
Timo Walther , 


Hi everyone,

Thank you all very much for your input. From my perspective, I consider batch 
as a special case of streaming. So with Flink SQL, we can support both batch 
and streaming use cases and I think we should use Flink SQL as our target. 

To reply on some of the comments:

@Jing on your remark:
> Since Flink has a clear vision of unified batch and stream processing, 
> supporting batch jobs will be one of the critical core features to help us 
> reach the vision and let Flink have an even bigger impact in the industry.

I fully agree with that statement. I do think that having Hive syntax support 
doesn't help in that unified batch and stream processing. We're making it 
easier for batch users to run their Hive batch jobs on Flink, but that doesn't 
fit the "unified" part since it's focussed on batch, while Flink SQL focusses 
on batch and streaming. I would have rather invested time in making batch 
improvements to Flink and Flink SQL vs investing in Hive syntax support. I do 
understand from the given replies that Hive syntax support is valuable for 
those that are already running batch processing and would like to run these 
queries on Flink. I do think that's limited to mostly Chinese companies at the 
moment. 

@Jark I think you've provided great input and are spot on with: 
> Regarding the maintenance concern you raised, I think that's a good point and 
> they are in the plan. The Hive dialect has already been a plugin and option 
> now, and the implementation is located in hive-connector module. We still 
> need some work to make the Hive dialect purely rely on public APIs, and the 
> Hive connector should be decopule with table planner. At that time, we can 
> move the whole Hive connector into a separate repository (I guess this is 
> also in the externalize connectors plan).

I'm looping in Francesco and Timo who can elaborate more in depth on the 
current maintenance issues. I think we need to have a proper plan on how this 
tech debt/maintenance can be addressed and to get commitment that this will be 
resolved in Flink 1.16, since we indeed need to move out all previously agreed 
connectors before Flink 1.16 is released.

> From my perspective, Hive is still widely used and there exists many running 
> Hive SQL jobs, so why not to provide users a better experience to help them 
> migrate Hive jobs to Flink? Also, it doesn't conflict with Flink SQL as it's 
> just a dialect option. 

I do think there is a conflict with Flink SQL; you can't use both of them at 
the same time, so you don't have access to all features in Flink. That 
increases feature sparsity and user friction. It also puts a bigger burden on 
the Flink community, because having both options available means more 
maintenance work. For example, an upgrade of Calcite is more impactful. The 
Flink codebase is already rather large and CI build times are already too long. 
More code means more risk of bugs. If a user at some point wants to change his 
Hive batch job to a streaming Flink SQL job, there's still migration work for 
the user, it just needs to happen at a later stage. 

@Jingsong I think you have a good argument that migrating SQL for Batch ETL is 
indeed an expensive effort. 

Last but not least, there was no one who has yet commented on the supported 
Hive versions and security issues. I've reached out to the Hive community and 
from the info I've received so far is that only Hive 3.1.x and Hive 2.3.x are 
still supported. The older Hive versions are no longer maintained and also 
don't receive security updates. This is important because many companies scan 
the Flink project for vulnerabilities and won't allow using it when these types 
of vulnerabilities are included. 

My summary would be the following:
* Like Jark said, in the short term, Hive syntax compatibility is the ticket 
for us to have a seat in the batch processing. Having improved Hive syntax 
support for that in Flink can help in this. 
* In the long term, we can and should drop it and focus on Flink SQL itself 
both for batch and stream processing.
* The Hive maintainers/volunteers should come up with a plan on how the tech 
debt/maintenance with regards to Hive query syntax can be addressed and will be 
resolved for Flink 1.16. This includes stuff like using public APIs and 

Re: Re: io.network.netty.exception

2022-03-09 Thread yue ma
hi , 解决这个问题需要对症下药,刚刚上面的回答也说到了,导致这个问题的原因很多,比如 gc
、网络原因等等。我觉得可以先看相关日志看到定位具体是什么原因,然后再看如何解决。比如 gc 问题 我们可以加大内存,或者优化代码等等


潘明文  于2022年3月8日周二 09:24写道:

> HI ,
>   谢谢,有没有好的解决方案解决该问题呀?
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-03-08 02:20:57,"Zhilong Hong"  写道:
> >Hi, 明文:
> >
>
> >这个报错实际上是TM失联,一般是TM被kill导致的,可以根据TM的Flink日志和GC日志、集群层面的NM日志(YARN环境)或者是K8S日志查看TM被kill的原因。一般情况下可能是:gc时间过长导致TM心跳超时被kill、TM内存超用导致container/pod被kill等等。
> >
> >Best.
> >Zhilong
> >
> >On Mon, Mar 7, 2022 at 10:18 AM 潘明文  wrote:
> >
> >> HI 读kafka,入hbase和kafka
> >> flink任务经常性报错
> >>
> >> org.apache.flink.runtime.io
> .network.netty.exception.RemoteTransportException:
> >> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'.
> >> This might indicate that the remote task manager was lost.
>


退订

2022-03-09 Thread Minc

退订

| |
Minc
|
|
邮箱:xing...@126.com
|

签名由 网易邮箱大师 定制

Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-09 Thread Martijn Visser
(Forwarding this also to the User mailing list as I made a typo when
replying to this email thread)

-- Forwarded message -
From: Martijn Visser 
Date: Wed, 9 Mar 2022 at 20:57
Subject: Re: [DISCUSS] Flink's supported APIs and Hive query syntax
To: dev , Francesco Guardiani ,
Timo Walther , 


Hi everyone,

Thank you all very much for your input. From my perspective, I consider
batch as a special case of streaming. So with Flink SQL, we can support
both batch and streaming use cases and I think we should use Flink SQL as
our target.

To reply on some of the comments:

@Jing on your remark:
> Since Flink has a clear vision of unified batch and stream processing,
supporting batch jobs will be one of the critical core features to help us
reach the vision and let Flink have an even bigger impact in the industry.

I fully agree with that statement. I do think that having Hive syntax
support doesn't help in that unified batch and stream processing. We're
making it easier for batch users to run their Hive batch jobs on Flink, but
that doesn't fit the "unified" part since it's focussed on batch, while
Flink SQL focusses on batch and streaming. I would have rather invested
time in making batch improvements to Flink and Flink SQL vs investing in
Hive syntax support. I do understand from the given replies that Hive
syntax support is valuable for those that are already running batch
processing and would like to run these queries on Flink. I do think that's
limited to mostly Chinese companies at the moment.

@Jark I think you've provided great input and are spot on with:
> Regarding the maintenance concern you raised, I think that's a good point
and they are in the plan. The Hive dialect has already been a plugin and
option now, and the implementation is located in hive-connector module. We
still need some work to make the Hive dialect purely rely on public APIs,
and the Hive connector should be decopule with table planner. At that time,
we can move the whole Hive connector into a separate repository (I guess
this is also in the externalize connectors plan).

I'm looping in Francesco and Timo who can elaborate more in depth on the
current maintenance issues. I think we need to have a proper plan on how
this tech debt/maintenance can be addressed and to get commitment that this
will be resolved in Flink 1.16, since we indeed need to move out all
previously agreed connectors before Flink 1.16 is released.

> From my perspective, Hive is still widely used and there exists many
running Hive SQL jobs, so why not to provide users a better experience to
help them migrate Hive jobs to Flink? Also, it doesn't conflict with Flink
SQL as it's just a dialect option.

I do think there is a conflict with Flink SQL; you can't use both of them
at the same time, so you don't have access to all features in Flink. That
increases feature sparsity and user friction. It also puts a bigger burden
on the Flink community, because having both options available means more
maintenance work. For example, an upgrade of Calcite is more impactful. The
Flink codebase is already rather large and CI build times are already too
long. More code means more risk of bugs. If a user at some point wants to
change his Hive batch job to a streaming Flink SQL job, there's still
migration work for the user, it just needs to happen at a later stage.

@Jingsong I think you have a good argument that migrating SQL for Batch ETL
is indeed an expensive effort.

Last but not least, there was no one who has yet commented on the supported
Hive versions and security issues. I've reached out to the Hive community
and from the info I've received so far is that only Hive 3.1.x and Hive
2.3.x are still supported. The older Hive versions are no longer maintained
and also don't receive security updates. This is important because many
companies scan the Flink project for vulnerabilities and won't allow using
it when these types of vulnerabilities are included.

My summary would be the following:
* Like Jark said, in the short term, Hive syntax compatibility is the
ticket for us to have a seat in the batch processing. Having improved Hive
syntax support for that in Flink can help in this.
* In the long term, we can and should drop it and focus on Flink SQL itself
both for batch and stream processing.
* The Hive maintainers/volunteers should come up with a plan on how the
tech debt/maintenance with regards to Hive query syntax can be addressed
and will be resolved for Flink 1.16. This includes stuff like using public
APIs and decoupling it from the planner. This is also extremely important
since we want to move out connectors with Flink 1.16 (next Flink release).
I'm hoping that those who can help out with this will chip-in.
* We follow the Hive versions that are still supported, which means we drop
support for Hive 1.*, 2.1.x and 2.2.x and upgrade Hive 2.3 and Hive 3.1 to
the latest version.

Thanks again for your input and looking forward to your thoughts on this.

Re: Flink sql calculate principle question

2022-03-09 Thread Martijn Visser
Hi JianWen,

I think this explained in the documentation on Dynamic Tables [1] Does that
answer your question?

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#query-restrictions

On Wed, 9 Mar 2022 at 18:32, JianWen Huang 
wrote:

> As we all know. Flink sql  will be translated into streaming api.
> Look at  a sql :
> SELECT color, sum(id)
>  FROM T
> GROUP BY color.
>
> In the actual calculation, Flink will store the whole t-stream into
> the state, and a piece of data in the stream will trigger a full
> stream calculation. Or does the status save only the calculation
> results and a new piece of data comes in the same group by key (color)
> the result can be added or subtracted. Is there a document or regular
> summary of the specific operation mechanism of Flink?thx.
>


Re: Left join query not clearing state after migrating from 1.9.0 to 1.14.3

2022-03-09 Thread Roman Khachatryan
Hi Prakhar,

Thanks for sharing this

>  Also, can you give us an idea of exactly what details you are looking for?
It would be helpful to know the sizes of different parts of the
checkpoint and the timings (e.g. sync/async phases, alignment
duration, etc.) [1]
Could you please share:
1. "Checkpoint Details", so that for each operator these details are
shown (summary of all operator subtasks)
2. "All Subtask Statistics" for the "Interval join operator" - as
shown in [2] (the last figure).
3. Job overview (with watermark progress), as shown in [3]. I think
watermarks can indeed cause the issue.


[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/monitoring/checkpoint_monitoring/#history-tab
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/monitoring/checkpoint_monitoring/#all-subtask-statistics
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/monitoring/back_pressure/#example

Regards,
Roman

On Wed, Mar 9, 2022 at 7:26 AM Prakhar Mathur  wrote:
>
> Hi Roman,
>
> Thanks for the reply, here is the screenshot of the latest failed checkpoint.
>
>
>
> I couldn't find the details for the last successful one as we only store the 
> last 10 checkpoints' details. Also, can you give us an idea of exactly what 
> details you are looking for?
>
> For the operator, the source operators for both the input streams look fine, 
> the Interval join operator seems to be having the issue of not clearing the 
> state it is holding.
>
> IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
> leftLowerBound=-540, leftUpperBound=0, leftTimeIndex=8, 
> rightTimeIndex=5], where=[((price_calculation_id = $f92) AND (rowtime0 >= 
> rowtime) AND (rowtime0 <= (rowtime + 540:INTERVAL MINUTE)))],
>
> We are currently doubting the way we are generating watermarks for the new 
> Kafka source, they might be creating a problem as the output of 
> CURRENT_WATERMARK(rowtime) is coming as null from the SQL Query.
>
> Thanks
> Prakhar
>
> On Tue, Mar 8, 2022 at 5:49 PM Roman Khachatryan  wrote:
>>
>> Hi Prakhar,
>>
>> Could you please share the statistics about the last successful and
>> failed checkpoints, e.g. from the UI.
>> Ideally, with detailed breakdown for the operators that seems problematic.
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 4, 2022 at 8:48 AM Prakhar Mathur  wrote:
>> >
>> > Hi,
>> >
>> > Can someone kindly help and take a look at this? It's a major blocker for 
>> > us.
>> >
>> > Thanks,
>> > Prakhar
>> >
>> > On Wed, Mar 2, 2022 at 2:11 PM Prakhar Mathur  wrote:
>> >>
>> >> Hello,
>> >>
>> >> We recently did a migration of our Flink jobs from version 1.9.0 to 
>> >> 1.14.3. These jobs consume from Kafka and produce to respective sinks. We 
>> >> are using MemoryStateBackend for our checkpointing and GCS as our remote 
>> >> fs. After migration, we found a few jobs that had left join in the SQL 
>> >> query started failing where their checkpoint size kept increasing. We 
>> >> haven't changed the SQL Query. Following is one of the queries that have 
>> >> started failing with the issue mentioned.
>> >>
>> >> SELECT
>> >> table1.field1,
>> >> table2.field2,
>> >> table2.field3,
>> >> table1.rowtime as estimate_timestamp,
>> >> table2.creation_time as created_timestamp,
>> >> CAST(table2.rowtime AS TIMESTAMP)
>> >> FROM
>> >> table1
>> >> LEFT JOIN table2 ON table1.field1 = coalesce(
>> >> nullif(table2.field4, ''),
>> >> table2.field5
>> >> )
>> >> AND table2.rowtime BETWEEN table1.rowtime
>> >> AND table1.rowtime + INTERVAL '90' MINUTE
>> >> WHERE
>> >> table2.field6 IS NOT TRUE
>> >>
>> >> Few data points:
>> >>
>> >> On version 1.9.0 it was running on parallelism of 20, now it is not even 
>> >> able to run on 40.
>> >> On version 1.9.0 the max checkpoint size was going up to 3.5 GB during 
>> >> peak hours. Now on 1.14.3, it just keeps on increasing and goes up to 30 
>> >> GB and eventually fails due to lack of resources.
>> >> Earlier in version 1.9.0, we were using 
>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer and now in 
>> >> 1.14.3 we have moved to the new Kafka Source.
>> >>
>> >> Any help will be highly appreciated as these are production jobs.
>> >>
>> >> Thanks
>> >> Prakhar Mathur


Flink sql calculate principle question

2022-03-09 Thread JianWen Huang
As we all know. Flink sql  will be translated into streaming api.
Look at  a sql :
SELECT color, sum(id)
 FROM T
GROUP BY color.

In the actual calculation, Flink will store the whole t-stream into
the state, and a piece of data in the stream will trigger a full
stream calculation. Or does the status save only the calculation
results and a new piece of data comes in the same group by key (color)
the result can be added or subtracted. Is there a document or regular
summary of the specific operation mechanism of Flink?thx.


Re: Could not stop job with a savepoint

2022-03-09 Thread Vinicius Peracini
So apparently the YARN container for Task Manager is running out of memory
during the savepoint execution. Never had any problems with checkpoints
though. Task Manager configuration:

"taskmanager.memory.process.size": "10240m",
"taskmanager.memory.managed.fraction": "0.6",
"taskmanager.memory.jvm-overhead.fraction": "0.07",
"taskmanager.memory.jvm-metaspace.size": "192mb",
"taskmanager.network.memory.buffer-debloat.enabled": "true",

On Wed, Mar 9, 2022 at 1:33 PM Vinicius Peracini <
vinicius.perac...@zenvia.com> wrote:

> Bom dia Schwalbe!
>
> Thanks for the reply.
>
> I'm using Flink 1.14.0. EMR is a managed cluster platform to run big data
> applications on AWS. This way Flink services are running on YARN. I tried
> to create another savepoint today and was able to retrieve the Job Manager
> log:
>
> 2022-03-09 15:42:10,294 INFO  org.apache.flink.runtime.jobmaster.JobMaster
> [] - Triggering savepoint for job
> 6f9d71e57efba96dad7f5328ab9ac717.
> 2022-03-09 15:42:10,298 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 1378 (type=SAVEPOINT) @ 1646840530294 for job
> 6f9d71e57efba96dad7f5328ab9ac717.
> 2022-03-09 15:45:19,636 WARN  akka.remote.transport.netty.NettyTransport
> [] - Remote connection to [/172.30.0.169:57520] failed
> with java.io.IOException: Connection reset by peer
> 2022-03-09 15:45:19,648 WARN  akka.remote.ReliableDeliverySupervisor
> [] - Association with remote system
> [akka.tcp://flink@ip-172-30-0-169.ec2.internal:46639] has failed, address
> is now gated for [50] ms. Reason: [Disassociated]
> 2022-03-09 15:45:19,652 WARN  akka.remote.ReliableDeliverySupervisor
> [] - Association with remote system
> [akka.tcp://flink-metrics@ip-172-30-0-169.ec2.internal:41533] has failed,
> address is now gated for [50] ms. Reason: [Disassociated]
> 2022-03-09 15:45:19,707 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> LEFT_JOIN_MESSAGE_BULK -> Map (1/3) (866e32468227f9f0adac82e9b83b970a)
> switched from RUNNING to FAILED on container_1646341714746_0005_01_04 @
> ip-172-30-0-165.ec2.internal (dataPort=40231).
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager
> 'ip-172-30-0-169.ec2.internal/172.30.0.169:34413'. This might indicate
> that the remote task manager was lost.
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:186)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at

Re: Could not stop job with a savepoint

2022-03-09 Thread Vinicius Peracini
Bom dia Schwalbe!

Thanks for the reply.

I'm using Flink 1.14.0. EMR is a managed cluster platform to run big data
applications on AWS. This way Flink services are running on YARN. I tried
to create another savepoint today and was able to retrieve the Job Manager
log:

2022-03-09 15:42:10,294 INFO  org.apache.flink.runtime.jobmaster.JobMaster
[] - Triggering savepoint for job
6f9d71e57efba96dad7f5328ab9ac717.
2022-03-09 15:42:10,298 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Triggering checkpoint 1378 (type=SAVEPOINT) @ 1646840530294 for job
6f9d71e57efba96dad7f5328ab9ac717.
2022-03-09 15:45:19,636 WARN  akka.remote.transport.netty.NettyTransport
[] - Remote connection to [/172.30.0.169:57520] failed with
java.io.IOException: Connection reset by peer
2022-03-09 15:45:19,648 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://flink@ip-172-30-0-169.ec2.internal:46639] has failed, address
is now gated for [50] ms. Reason: [Disassociated]
2022-03-09 15:45:19,652 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://flink-metrics@ip-172-30-0-169.ec2.internal:41533] has failed,
address is now gated for [50] ms. Reason: [Disassociated]
2022-03-09 15:45:19,707 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
LEFT_JOIN_MESSAGE_BULK -> Map (1/3) (866e32468227f9f0adac82e9b83b970a)
switched from RUNNING to FAILED on container_1646341714746_0005_01_04 @
ip-172-30-0-165.ec2.internal (dataPort=40231).
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager
'ip-172-30-0-169.ec2.internal/172.30.0.169:34413'. This might indicate that
the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:186)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at

Re: replay kinesis events

2022-03-09 Thread Guoqin Zheng
Hi Danny,

Thanks for getting back to me. This is very helpful and makes a lot of
sense to me.

Thanks,
-Guoqin


On Wed, Mar 9, 2022 at 1:32 AM Danny Cranmer 
wrote:

> Hey Guoqin,
>
> In order to achieve this you would need to either:
> - Restart the job and resume from an old savepoint (taken before the
> events you want to replay), assuming the state is still compatible with
> your bugfix, or
> - Restart the job without any state and seed the consumer with the start
> position [1]. You can use AT_TIMESTAMP and specify a suitable time in the
> past
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kinesis/#configuring-starting-position
>
>
> Hope this helps,
> Thanks
>
>
> On Wed, Mar 9, 2022 at 12:15 AM Guoqin Zheng 
> wrote:
>
>> Hi Flink experts,
>>
>> Wondering if there is a built-in way to replay already-processed events
>> in an event queue. For example, if I have a flink app processing event
>> stream from Kinesis. Now if I find a bug in the flink app and make a fix.
>> And I would like to re-process events that are already processed in the
>> kinesis stream. Is there a simple mechanism to allow me to do this?
>>
>> Thanks,
>> -Guoqin
>>
>


Evolving Schemas with ParquetColumnarRowInputFormat

2022-03-09 Thread Kevin Lam
Hi all,

We're interested in using ParquetColumnarRowInputFormat

or similar with evolving Parquet schemas. Any advice or recommendations?

Specifically, the situation we are interested in is when the passed in
RowType projectedType contains a new field with Type.Repetition.OPTIONAL
that is not present in the Parquet file being read. In this case we want
that column to just be read as null.

Thanks in advance for your help!


Re: Flink Statefun Kafka Ingress Record Key Deserializer

2022-03-09 Thread Igal Shilman
 Hello Xin Li,

Indeed the built in ingress that ships with StateFun requires that the key
part will be a utf-8 string, This string then becomes the id part of the
target address.
StateFun is extensible via the StatefulFunctionModule[1] and customizing
the Kafka ingress is also possible, take a look here[2]

Unfortunately a more generic solution is not currently available.
Previously we had an xpath based solution for Protobuf, but it wasn't used
by the community.

If you are interested in working on a contribution along similar lines (a
very simple xpath like key extraction) I'd be happy to guide you through.

Thanks,
Igal.


[1]
https://github.com/apache/flink-statefun/blob/master/statefun-sdk-embedded%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstatefun%2Fsdk%2Fspi%2FStatefulFunctionModule.java
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-kafka-io%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstatefun%2Fsdk%2Fkafka%2FKafkaIngressBuilder.java#L107

On Tue, Mar 8, 2022 at 8:33 AM Xin Li  wrote:

> *Hello Flink Team,*
>
> I am right now using Flink stateful function in my project, which are
> consuming avro serialized events(both key and value are serialized) from
> kafka, but it seems there is no configuration that users can customize for
> deserializing the kafka record's key, because I noticed that the key
> deserializer is fixed to be a UTF-8 String Deserializer here:
> RoutableKafkaIngressDeserializer.java
> 
> .
>
> As a result, the deserialized key becomes chaos code, then incorrect hash
> values will be generated based on these chaos codes, which leads to highly
> possibly uneven record distribution and is prone to cause data skew.
>
> I wonder if the community will consider adding a configuration for users
> to customize the deserializer in the Flink stateful function kafka ingress ?
>
> Looking forward to hearing from you
>
> Best regards
>
> *Xin Li*
>
>


Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-09 Thread Igal Shilman
Hi Fil,
I've replied in the JIRA

Cheers,
Igal.

On Tue, Mar 8, 2022 at 6:08 PM Filip Karnicki 
wrote:

> Hi Roman, Igal (@ below)
>
> Thank you for your answer. I don't think I'll have access to flink's lib
> folder given it's a shared Cloudera cluster. The only thing I could think
> of is to not include com.google.protobuf in the
> classloader.parent-first-patterns.additional setting, and
> including protobuf-java 3.7.1 in the uber jar.
>
> I created a jira for this just now + a discuss thread on the dev group
> https://issues.apache.org/jira/browse/FLINK-26537
>
> Hi @Igal Shilman  , is the plugin solution
> outlined by Roman something that fits in better with Statefun than having
> the creators of uber .jars be responsible for using a statefun-compatible
> protobuf-java?
>
> Kind regards
> Fil
>
> On Tue, 8 Mar 2022 at 14:02, Roman Khachatryan  wrote:
>
>> Hi Filip,
>>
>> Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder?
>> Or maybe re-writing the dependencies you mentioned to be loaded as
>> plugins? [1]
>>
>> I don't see any other ways to solve this problem.
>> Probably Chesnay or Seth will suggest a better solution.
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>>
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki 
>> wrote:
>> >
>> > Hi All!
>> >
>> > We're running a statefun uber jar on a shared cloudera flink cluster,
>> the latter of which launches with some ancient protobuf dependencies
>> because of reasons[1].
>> >
>> > Setting the following flink-config settings on the entire cluster
>> >
>> > classloader.parent-first-patterns.additional:
>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>> >
>> > causes these old protobuf dependencies to get loaded over statefun's
>> protobuf-java 3.7.1, and NoSuchMethod exceptions occur.
>> >
>> > We hacked together a version of statefun that doesn't perform the check
>> whether the classloader settings contain the three patterns from above, and
>> as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf
>> pattern is not present in the classloader.parent-first-patterns.additional
>> setting, then all is well.
>> >
>> > Aside from removing old hadoop from the classpath, which may not be
>> possible given that it's a shared cluster, is there anything we can do
>> other than adding a configurable override not to perform the config check
>> in StatefulFunctionsConfigValidator to an upcoming statefun core release?
>> >
>> > Many thanks
>> > Fil
>> >
>> >
>> > [1] We're still trying to find out if it's absolutely necessary to have
>> these on the classpath.
>>
>


Re: Flatmap operator in an Asynchronous call

2022-03-09 Thread Arvid Heise
You can use flatMap to flatten and have an asyncIO after it.

On Wed, Mar 9, 2022 at 8:08 AM Diwakar Jha  wrote:

> Thanks Gen, I will look into customized Source and SpiltEnumerator.
>
> On Mon, Mar 7, 2022 at 10:20 PM Gen Luo  wrote:
>
>> Hi Diwakar,
>>
>> An asynchronous flatmap function without the support of the framework can
>> be problematic. You should not call collector.collect outside the main
>> thread of the task, i.e. outside the flatMap method.
>>
>> I'd suggest using a customized Source instead to process the files, which
>> uses a SplitEnumerator to discover the files and SourceReaders to read the
>> files. In this way checkpoints can be triggered between two calls of
>> pollNext, so you don't have to implement it asynchronously. It would be
>> better if the readers read the lines and the records are enriched in a map
>> function following.
>>
>>
>>
>> On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha 
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> I'm running a streaming application using Flink 1.11 and EMR 6.01. My
>>> use case is reading files from a s3 bucket, filter file contents ( say
>>> record) and enrich each record. Filter records and output to a sink.
>>> I'm reading 6k files per 15mints and the total number of records is 3
>>> billion/15mints. I'm using a flat map operator to convert the file into
>>> records and enrich records in a synchronous call.
>>>
>>> *Problem* : My application fails (Checkpoint timeout) to run if i add
>>> more filter criteria(operator). I suspect the system is not able to scale
>>> (CPU util as still 20%) because of the synchronous call. I want to convert
>>> this flat map to an asynchronous call using AsyncFunction. I was looking
>>> for something like an AsyncCollector.collect
>>> 
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html
>>> to complement my current synchronous implementation using flatmap but it
>>> seems like this is not available in Flink 1.11.
>>>
>>> *Question* :
>>> Could someone please help me with converting this flatmap operation to
>>> an asynchronous call?
>>>
>>> Please let me know if you have any questions.
>>>
>>> Best,
>>>
>>


Re: Move savepoint to another s3 bucket

2022-03-09 Thread Lukáš Drbal
Hi Dawid,

I just tried the same steps on flink builded from git branch release-1.13
and everything works as expected!

Thank you all!

L.

On Wed, Mar 9, 2022 at 8:49 AM Dawid Wysakowicz 
wrote:

> Hi Lukas,
>
> I am afraid you're hitting this bug:
> https://issues.apache.org/jira/browse/FLINK-25952
>
> Best,
>
> Dawid
> On 08/03/2022 16:37, Lukáš Drbal wrote:
>
> Hello everyone,
>
> I'm trying to move savepoint to another s3 account but restore always
> failed with some weird 404 error.
>
> We are using lyft k8s operator [1] and flink 1.13.6 (in stacktrace you can
> see version 1.13.6-396a8d44-szn which is just internal build from flink
> commit b2ca390d478aa855eb0f2028d0ed965803a98af1)
>
> What I'm trying to do:
>
>1. create savepoint for pipeline via ./bin/flink savepoint 
>2. copy data under path configured in state.savepoints.dir from source
>s3 to new s3
>3. change all configuration and restore pipeline
>
> Is this steps correct or I'm doing something wrong or unsupported?
>
> All options related to s3 have valid values for new s3 account but restore
> failed with exception bellow. Error message includes original path
> (s3://flink/savepoints/activity-searched-query) which doesn't exists on new
> account so that 404 is expected. But I still don't understand why flink
> tries that path because related config options contains new bucket info.
> high-availability.storageDir:
> 's3:///ha/pipelines-runner-activity-searched-query'
>
> jobmanager.archive.fs.dir: 's3:///history'
>
> state.checkpoints.dir:
>> 's3:///checkpoints/activity-searched-query'
>
> state.savepoints.dir:
>> 's3:///savepoints/activity-searched-query'
>
>
> + valid values for s3.access-key and s3.secret-key
>
> I found original path in _metadata file in savepoint data but changing
> that (search) leads to some weird OOM, I hope this should not be
> needed and that values should be ignored.
>
> state.backend is hashmap if it is important.
>
> Restore back from source butcket works as expected.
>
> Thanks a lot!
>
> Regards,
> L.
>
> Stacktrace:
>
> 2022-03-08 15:39:25,838 [flink-akka.actor.default-dispatcher-4] INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph -
>> CombineToSearchedQuery -> (LateElementsCounter, TransformToStreamElement ->
>> Sink: SearchedQueryKafkaSink) (1/2) (0c0f108c393b9a5b58f861c1032671d0)
>> switched from INITIALIZING to FAILED on 10.67.158.155:45521-d8d19d @
>> 10.67.158.155 (dataPort=36341).
>> org.apache.flink.util.SerializedThrowable: Exception while creating
>> StreamOperatorStateContext.
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at java.lang.Thread.run(Thread.java:832) ~[?:?]
>> Caused by: org.apache.flink.util.SerializedThrowable: Could not restore
>> keyed state backend for
>> WindowOperator_bd2a73c53230733509ca171c6476fcc5_(1/2) from any of the 1
>> provided restore options.
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> 

Re: replay kinesis events

2022-03-09 Thread Danny Cranmer
Hey Guoqin,

In order to achieve this you would need to either:
- Restart the job and resume from an old savepoint (taken before the events
you want to replay), assuming the state is still compatible with your
bugfix, or
- Restart the job without any state and seed the consumer with the start
position [1]. You can use AT_TIMESTAMP and specify a suitable time in the
past

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kinesis/#configuring-starting-position


Hope this helps,
Thanks


On Wed, Mar 9, 2022 at 12:15 AM Guoqin Zheng  wrote:

> Hi Flink experts,
>
> Wondering if there is a built-in way to replay already-processed events in
> an event queue. For example, if I have a flink app processing event stream
> from Kinesis. Now if I find a bug in the flink app and make a fix. And I
> would like to re-process events that are already processed in the kinesis
> stream. Is there a simple mechanism to allow me to do this?
>
> Thanks,
> -Guoqin
>


Re: k8s native session 问题咨询

2022-03-09 Thread yu'an huang
Hi Yang,

我看到你发的issue中是跟Resource ProfileInfo不可被序列化有关,查了下感觉应该是在这两个issue中被修复的:
https://issues.apache.org/jira/browse/FLINK-25732 
,
https://issues.apache.org/jira/browse/FLINK-25837 
。

相关的PR是https://github.com/apache/flink/pull/18422 


按照Issue所说应该换成
1.15.0 
,
 1.13.6 
,
 1.14.4 
以上的版本应该就没问题了。



> On 9 Mar 2022, at 12:00 PM, Yang Wang  wrote:
> 
> 你用新版本试一下,看着是已经修复了
> 
> https://issues.apache.org/jira/browse/FLINK-19212
> 
> Best,
> Yang
> 
> 崔深圳  于2022年3月9日周三 10:31写道:
> 
>> 
>> 
>> 
>> web ui报错:请求这个接口: /jobs/overview,时而报错, Exception on server
>> side:\norg.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException:
>> Failed to serialize the result for RPC call :
>> requestMultipleJobDetails.\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)\n\tat
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
>> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)\n\tat
>> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)\n\tat
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)\n\tat
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\n\tat
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\n\tat
>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\n\tat
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
>> akka.actor.Actor.aroundReceive(Actor.scala:537)\n\tat
>> akka.actor.Actor.aroundReceive$(Actor.scala:535)\n\tat
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\n\tat
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)\n\tat
>> akka.actor.ActorCell.invoke(ActorCell.scala:548)\n\tat
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\n\tat
>> akka.dispatch.Mailbox.run(Mailbox.scala:231)\n\tat
>> akka.dispatch.Mailbox.exec(Mailbox.scala:243)\n\tat
>> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)\n\tat
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)\n\tat
>> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)\n\tat
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)\nCaused
>> by: java.io.NotSerializableException: java.util.HashMap$Values\n\tat
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)\n\tat
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)\n\tat
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)\n\tat
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)\n\tat
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)\n\tat
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)\n\tat
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)\n\t...
>> 29 more\n\nEnd of exception on server side"
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2022-03-09 09:56:21,"yu'an huang"  写道:
>>> 你好,路由到非master节点会有什么问题吗,非master节点在处理请求时应该会通过HA服务找到Active的Job manager,
>> 然后向Active Job manager拿到结果再返回给client.
>>> 
 On 7 Mar 2022, at 7:46 PM, 崔深圳  wrote:
 
 k8s native session 模式下,配置了ha,job_manager 的数量为3,然后web ui,通过rest
>> service访问,总是路由到非master节点,有什么办法使其稳定吗?
>>> 
>>