Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Roman Khachatryan
Hi,

Is it possible that the python process crashed or hung up? (probably
performing a snapshot)
Could you validate this by checking the OS logs for OOM killer
messages or process status?

Regards,
Roman

On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter  wrote:
>
> Hi,
> I'm getting an error after enabling checkpointing in my pyflink application 
> that uses a keyed stream and rocksdb state.
>
> Here is the error message:
>
> 2021-09-22 16:18:14,408 INFO 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - Closed 
> RocksDB State Backend. Cleaning up RocksDB working directory 
> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) 
> switched from RUNNING to FAILED with failure cause: java.io.IOException: 
> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed 
> (1/1)#34.
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
> at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
> ... 19 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.beam.vendo

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Dian Fu
I agree with Roman that it seems that the Python process has crashed.

Besides the suggestions from Roman, I guess you could also try to configure the 
bundle size to smaller value via “python.fn-execution.bundle.size”.

Regards,
Dian

> 2021年9月24日 上午3:48,Roman Khachatryan  写道:
> 
> Hi,
> 
> Is it possible that the python process crashed or hung up? (probably
> performing a snapshot)
> Could you validate this by checking the OS logs for OOM killer
> messages or process status?
> 
> Regards,
> Roman
> 
> On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter  wrote:
>> 
>> Hi,
>> I'm getting an error after enabling checkpointing in my pyflink application 
>> that uses a keyed stream and rocksdb state.
>> 
>> Here is the error message:
>> 
>> 2021-09-22 16:18:14,408 INFO 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - 
>> Closed RocksDB State Backend. Cleaning up RocksDB working directory 
>> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
>> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - 
>> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) 
>> switched from RUNNING to FAILED with failure cause: java.io.IOException: 
>> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed 
>> (1/1)#34.
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>>at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>>at 
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
>>at 
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
>>at 
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
>>at 
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
>>at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
>>at 
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowin

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Dian Fu
PS: there are more information about this configuration in 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/python_config/#python-fn-execution-bundle-size

> 2021年9月24日 上午10:07,Dian Fu  写道:
> 
> I agree with Roman that it seems that the Python process has crashed.
> 
> Besides the suggestions from Roman, I guess you could also try to configure 
> the bundle size to smaller value via “python.fn-execution.bundle.size”.
> 
> Regards,
> Dian
> 
>> 2021年9月24日 上午3:48,Roman Khachatryan  写道:
>> 
>> Hi,
>> 
>> Is it possible that the python process crashed or hung up? (probably
>> performing a snapshot)
>> Could you validate this by checking the OS logs for OOM killer
>> messages or process status?
>> 
>> Regards,
>> Roman
>> 
>> On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter  wrote:
>>> 
>>> Hi,
>>> I'm getting an error after enabling checkpointing in my pyflink application 
>>> that uses a keyed stream and rocksdb state.
>>> 
>>> Here is the error message:
>>> 
>>> 2021-09-22 16:18:14,408 INFO 
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - 
>>> Closed RocksDB State Backend. Cleaning up RocksDB working directory 
>>> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
>>> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - 
>>> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) 
>>> switched from RUNNING to FAILED with failure cause: java.io.IOException: 
>>> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed 
>>> (1/1)#34.
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>>   at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>>>   at 
>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
>>>   at 
>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
>>>   at 
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
>>>   at 
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.SubtaskC

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Curt Buechter
Guess my last reply didn't go through, so here goes again...

Possibly, but I don't think so. Since I submitted this, I have done some
more testing. It works fine with file system or memory state backends, but
not with rocksdb. I will try again and check the logs, though.
I've also tested rocksdb checkpointing on other jobs, and it works fine.
But when I combine rocksdb with the keyed stream, it fails.

Thanks for the suggestions, I'll look into them.

On Thu, Sep 23, 2021 at 9:07 PM Dian Fu  wrote:

> I agree with Roman that it seems that the Python process has crashed.
>
> Besides the suggestions from Roman, I guess you could also try to
> configure the bundle size to smaller value via
> “python.fn-execution.bundle.size”.
>
> Regards,
> Dian
>
> > 2021年9月24日 上午3:48,Roman Khachatryan  写道:
> >
> > Hi,
> >
> > Is it possible that the python process crashed or hung up? (probably
> > performing a snapshot)
> > Could you validate this by checking the OS logs for OOM killer
> > messages or process status?
> >
> > Regards,
> > Roman
> >
> > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter 
> wrote:
> >>
> >> Hi,
> >> I'm getting an error after enabling checkpointing in my pyflink
> application that uses a keyed stream and rocksdb state.
> >>
> >> Here is the error message:
> >>
> >> 2021-09-22 16:18:14,408 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] -
> Closed RocksDB State Backend. Cleaning up RocksDB working directory
> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
> >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task
> [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34
> (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with
> failure cause: java.io.IOException: Could not perform checkpoint 2 for
> operator KEYED PROCESS -> Sink: Unnamed (1/1)#34.
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> >>at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> >>at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> >>at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> >>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> >>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> >>at java.lang.Thread.run(Thread.java:748)
> >> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> >>at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
> >>at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
> >>at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
> >>at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFu

Re: pyflink keyed stream checkpoint error

2021-10-13 Thread Curt Buechter
Hi guys,
I'm still running into this problem. I checked the logs, and there is no
evidence that the python process crashed. I checked the process IDs and
they are still active after the error. No `killed process` messages in
/var/log/messages.

I don't think it's necessarily related to checkpointing. I noticed
https://issues.apache.org/jira/browse/FLINK-24123 and thought it was
possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly)
same error, but now the error happens outside the context of performing the
checkpointing operation.

I tried reducing python.fn-execution.bundle.size to 10,000 (default
100,000), and no luck there, either.

2021-10-13 13:39:19
java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner
flush
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.invokeFinishBundle(
AbstractPythonFunctionOperator.java:361)
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(
AbstractPythonFunctionOperator.java:321)
at org.apache.flink.streaming.api.operators.python.
AbstractOneInputPythonFunctionOperator.processElement(
AbstractOneInputPythonFunctionOperator.java:139)
at org.apache.flink.streaming.api.operators.python.
PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java:
176)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377)
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361)
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2(
AbstractPythonFunctionOperator.java:340)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624)
... 1 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
CANCELLED: cancelled before receiving half close
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:
357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
504)
at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(
DefaultJobBundleFactory.java:555)
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
... 7 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
StatusRuntimeException: CANCELLED: cancelled before receiving half close
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(
Status.java:524)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.
ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(
ServerCalls.java:275)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
PartialForwardingServerCallListener.onCancel(
PartialForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener
.onCancel(ForwardingServerCallListener.java:23)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(
ForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
Contexts$Con

Re: pyflink keyed stream checkpoint error

2021-10-14 Thread Dian Fu
Hi Curt,

Could you try if it works by reducing python.fn-execution.bundle.size to
1000 or 100?

Regards,
Dian

On Thu, Oct 14, 2021 at 2:47 AM Curt Buechter  wrote:

> Hi guys,
> I'm still running into this problem. I checked the logs, and there is no
> evidence that the python process crashed. I checked the process IDs and
> they are still active after the error. No `killed process` messages in
> /var/log/messages.
>
> I don't think it's necessarily related to checkpointing. I noticed
> https://issues.apache.org/jira/browse/FLINK-24123 and thought it was
> possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly)
> same error, but now the error happens outside the context of performing the
> checkpointing operation.
>
> I tried reducing python.fn-execution.bundle.size to 10,000 (default
> 100,000), and no luck there, either.
>
> 2021-10-13 13:39:19
> java.lang.RuntimeException: Error while waiting for
> BeamPythonFunctionRunner flush
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.invokeFinishBundle(
> AbstractPythonFunctionOperator.java:361)
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(
> AbstractPythonFunctionOperator.java:321)
> at org.apache.flink.streaming.api.operators.python.
> AbstractOneInputPythonFunctionOperator.processElement(
> AbstractOneInputPythonFunctionOperator.java:139)
> at org.apache.flink.streaming.api.operators.python.
> PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java:
> 176)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:233)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
> .processElement(AbstractStreamTaskNetworkInput.java:134)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
> .emitNext(AbstractStreamTaskNetworkInput.java:105)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:496)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:203)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:809)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:761)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377)
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361)
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2(
> AbstractPythonFunctionOperator.java:340)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
> .java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:624)
> ... 1 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: cancelled before receiving half close
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture
> .java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
> at org.apache.beam.runners.fnexecution.control.
> SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
> 504)
> at org.apache.beam.runners.fnexecution.control.
> DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(
> DefaultJobBundleFactory.java:555)
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
> ... 7 more
> Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> StatusRuntimeException: CANCELLED: cancelled before receiving half close
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(
> Status.java:524)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.
> ServerCalls$StreamingServerCallHandler$StreamingServerCallListener
> .onCancel(ServerCalls.java:275)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> PartialForwardingServerCallListener.onCancel(
> PartialForwardingServerCallListene