Re: Re: PartitionNotFoundException

2023-04-09 Thread Weihua Hu
Hi,

可以提供下 JobManager 和相关 TaskManager 的日志吗?

一般来说 PartitionNotFoundException 只在作业启动建立链接的时候才会出现,
根据你的描述,应该是一个消费 Kafka 的流式任务,不太应该在运行一周后
出现 PartitionNotFoundException

可以检查下是否存在其他异常

Best,
Weihua


On Mon, Apr 10, 2023 at 9:51 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> taskmanager.network.max-num-tcp-connections: 4
>
> 这两参数已经调整了的,connections  之前只是由1改为2   但运行一周后又出现了 PartitionNotFoundException
>
>
>
> From: Shammon FY
> Date: 2023-04-10 09:46
> To: user-zh
> Subject: Re: Re: PartitionNotFoundException
> 像上面提到的,流式作业可以设置taskmanager.network.tcp-connection.enable-reuse-across-jobs:
> false,一般作业影响不会有影响
>
> Best,
> Shammon FY
>
> On Mon, Apr 10, 2023 at 9:27 AM zhan...@eastcom-sw.com <
> zhan...@eastcom-sw.com> wrote:
>
> > hi, 上周调整这两参数后,正常运行了近一个星期后 又重现了[PartitionNotFoundException]...
> >
> > taskmanager.network.max-num-tcp-connections  只是调整为2,可能是太小了 今天我改为4 再看看
> > 或者 将flink版本升级到 1.17 是否可修复该问题?
> >
> > From: yidan zhao
> > Date: 2023-04-03 10:45
> > To: user-zh
> > Subject: Re: PartitionNotFoundException
> > 设置 taskmanager.network.tcp-connection.enable-reuse-across-jobs 为
> > false,设置 taskmanager.network.max-num-tcp-connections 大点。
> > 之前有个bug导致这个问题我记得,不知道1.16修复没有。
> >
> > zhan...@eastcom-sw.com  于2023年4月3日周一 10:08写道:
> > >
> > >
> > > hi, 最近从1.14升级到1.16后,kafka消费不定时会出现 [org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > Partition *** not found.]
> > > 然后不停自动重启job再继续抛出该异常后 不断重启,直到手动cancel任务后 再启动才恢复正常消费
> > >
> > > 在1.14集群中从未出现的问题,升到1.16后才出现,请问是否有配置可以优化或避免该异常?
> >
>


Re: Quick question about flink document.

2023-04-09 Thread Hang Ruan
Hi, Dongwoo,

I think there is no problem in this part. This part describes snapshotting
Operator State, which is checkpointing. The checkpoint will store by the
JobManager and use the checkpoint storage.

Best,
Hang

Feng Jin  于2023年4月10日周一 00:32写道:

> Hi Dongwoo
>
>
> This can be quite confusing.
> Before Flink 1.13, Flink's statebackend was actually a hybrid concept that
> included three types of statebackends:
> *MemoryStateBackend*, *FsStateBackend*, and *RocksDBStateBackend*.
>
> The default *MemoryStateBackend* uses heap as the backend, and the state
> is stored in jobManger.
>
>
> You can refer to this migration document for more information:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#migrating-from-legacy-backends
> .
>
>
> Best
> Feng
>
> On Sun, Apr 9, 2023 at 10:23 PM Dongwoo Kim 
> wrote:
>
>> Hi community, I’m new to flink and trying to learn about the concepts of
>> flink to prepare migrating heron application to flink.
>> I have a quick question about this flink document.
>> (
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
>> )
>>
>> What I understood is states are stored in configured state backend which
>> can be either task manager’s heap or rocksdb.
>> And snapshots of checkpoint is stored by default in job manager’s heap
>> and mostly in distributed file system.
>> But in the document it says like below and it is confusing to me. Isn’t
>> the second line talking about checkpoint storage or checkpoint backend? Not
>> state backend? Thanks in advance, enjoy your weekend!
>>
>> *"Because the state of a snapshot may be large, it is stored in a
>> configurable state backend
>> .
>> By default, this is the JobManager’s memory, but for production use a
>> distributed reliable storage should be configured (such as HDFS)” *
>>
>


Re: Quick question about flink document.

2023-04-09 Thread Shammon FY
Hi Dongwoo

I think there are two configurations about state, one is state backend and
the other is snapshot storage. Flink will create a snapshot for each state
when the stateful operator collects all checkpoint barriers.

As @Feng mentioned above, users can config different state backend with
option: state.backend

The snapshot of state can be stored in JobManager. When the state is large,
flink supports storing the snapshot a distributed storage with option:
state.checkpoints.dir:

Best,
Shammon FY


On Mon, Apr 10, 2023 at 12:31 AM Feng Jin  wrote:

> Hi Dongwoo
>
>
> This can be quite confusing.
> Before Flink 1.13, Flink's statebackend was actually a hybrid concept that
> included three types of statebackends:
> *MemoryStateBackend*, *FsStateBackend*, and *RocksDBStateBackend*.
>
> The default *MemoryStateBackend* uses heap as the backend, and the state
> is stored in jobManger.
>
>
> You can refer to this migration document for more information:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#migrating-from-legacy-backends
> .
>
>
> Best
> Feng
>
> On Sun, Apr 9, 2023 at 10:23 PM Dongwoo Kim 
> wrote:
>
>> Hi community, I’m new to flink and trying to learn about the concepts of
>> flink to prepare migrating heron application to flink.
>> I have a quick question about this flink document.
>> (
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
>> )
>>
>> What I understood is states are stored in configured state backend which
>> can be either task manager’s heap or rocksdb.
>> And snapshots of checkpoint is stored by default in job manager’s heap
>> and mostly in distributed file system.
>> But in the document it says like below and it is confusing to me. Isn’t
>> the second line talking about checkpoint storage or checkpoint backend? Not
>> state backend? Thanks in advance, enjoy your weekend!
>>
>> *"Because the state of a snapshot may be large, it is stored in a
>> configurable state backend
>> .
>> By default, this is the JobManager’s memory, but for production use a
>> distributed reliable storage should be configured (such as HDFS)” *
>>
>


Re: Re: PartitionNotFoundException

2023-04-09 Thread Shammon FY
像上面提到的,流式作业可以设置taskmanager.network.tcp-connection.enable-reuse-across-jobs:
false,一般作业影响不会有影响

Best,
Shammon FY

On Mon, Apr 10, 2023 at 9:27 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> hi, 上周调整这两参数后,正常运行了近一个星期后 又重现了[PartitionNotFoundException]...
>
> taskmanager.network.max-num-tcp-connections  只是调整为2,可能是太小了 今天我改为4 再看看
> 或者 将flink版本升级到 1.17 是否可修复该问题?
>
> From: yidan zhao
> Date: 2023-04-03 10:45
> To: user-zh
> Subject: Re: PartitionNotFoundException
> 设置 taskmanager.network.tcp-connection.enable-reuse-across-jobs 为
> false,设置 taskmanager.network.max-num-tcp-connections 大点。
> 之前有个bug导致这个问题我记得,不知道1.16修复没有。
>
> zhan...@eastcom-sw.com  于2023年4月3日周一 10:08写道:
> >
> >
> > hi, 最近从1.14升级到1.16后,kafka消费不定时会出现 
> > [org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition *** not found.]
> > 然后不停自动重启job再继续抛出该异常后 不断重启,直到手动cancel任务后 再启动才恢复正常消费
> >
> > 在1.14集群中从未出现的问题,升到1.16后才出现,请问是否有配置可以优化或避免该异常?
>


退订

2023-04-09 Thread 柒朵
退订

Re: Quick question about flink document.

2023-04-09 Thread Feng Jin
Hi Dongwoo


This can be quite confusing.
Before Flink 1.13, Flink's statebackend was actually a hybrid concept that
included three types of statebackends:
*MemoryStateBackend*, *FsStateBackend*, and *RocksDBStateBackend*.

The default *MemoryStateBackend* uses heap as the backend, and the state is
stored in jobManger.


You can refer to this migration document for more information:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#migrating-from-legacy-backends
.


Best
Feng

On Sun, Apr 9, 2023 at 10:23 PM Dongwoo Kim  wrote:

> Hi community, I’m new to flink and trying to learn about the concepts of
> flink to prepare migrating heron application to flink.
> I have a quick question about this flink document.
> (
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
> )
>
> What I understood is states are stored in configured state backend which
> can be either task manager’s heap or rocksdb.
> And snapshots of checkpoint is stored by default in job manager’s heap and
> mostly in distributed file system.
> But in the document it says like below and it is confusing to me. Isn’t
> the second line talking about checkpoint storage or checkpoint backend? Not
> state backend? Thanks in advance, enjoy your weekend!
>
> *"Because the state of a snapshot may be large, it is stored in a
> configurable state backend
> .
> By default, this is the JobManager’s memory, but for production use a
> distributed reliable storage should be configured (such as HDFS)” *
>


Quick question about flink document.

2023-04-09 Thread Dongwoo Kim
Hi community, I’m new to flink and trying to learn about the concepts of flink 
to prepare migrating heron application to flink. 
I have a quick question about this flink document.
(https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
 
)

What I understood is states are stored in configured state backend which can be 
either task manager’s heap or rocksdb.
And snapshots of checkpoint is stored by default in job manager’s heap and 
mostly in distributed file system.
But in the document it says like below and it is confusing to me. Isn’t the 
second line talking about checkpoint storage or checkpoint backend? Not state 
backend? Thanks in advance, enjoy your weekend!

"Because the state of a snapshot may be large, it is stored in a configurable 
state backend 
.
 By default, this is the JobManager’s memory, but for production use a 
distributed reliable storage should be configured (such as HDFS)”