Re: pyflink keyed stream checkpoint error
Hi, Is it possible that the python process crashed or hung up? (probably performing a snapshot) Could you validate this by checking the OS logs for OOM killer messages or process status? Regards, Roman On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter wrote: > > Hi, > I'm getting an error after enabling checkpointing in my pyflink application > that uses a keyed stream and rocksdb state. > > Here is the error message: > > 2021-09-22 16:18:14,408 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - Closed > RocksDB State Backend. Cleaning up RocksDB working directory > /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39. > 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - > KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) > switched from RUNNING to FAILED with failure cause: java.io.IOException: > Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed > (1/1)#34. > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Failed to close remote bundle > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383) > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029) > ... 19 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.beam.vendo
Re: pyflink keyed stream checkpoint error
I agree with Roman that it seems that the Python process has crashed. Besides the suggestions from Roman, I guess you could also try to configure the bundle size to smaller value via “python.fn-execution.bundle.size”. Regards, Dian > 2021年9月24日 上午3:48,Roman Khachatryan 写道: > > Hi, > > Is it possible that the python process crashed or hung up? (probably > performing a snapshot) > Could you validate this by checking the OS logs for OOM killer > messages or process status? > > Regards, > Roman > > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter wrote: >> >> Hi, >> I'm getting an error after enabling checkpointing in my pyflink application >> that uses a keyed stream and rocksdb state. >> >> Here is the error message: >> >> 2021-09-22 16:18:14,408 INFO >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - >> Closed RocksDB State Backend. Cleaning up RocksDB working directory >> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39. >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - >> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) >> switched from RUNNING to FAILED with failure cause: java.io.IOException: >> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed >> (1/1)#34. >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) >>at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) >>at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) >>at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) >>at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) >>at >> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) >>at >> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) >>at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) >>at >> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) >>at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) >>at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >>at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >>at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.RuntimeException: Failed to close remote bundle >>at >> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383) >>at >> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331) >>at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320) >>at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175) >>at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415) >>at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292) >>at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) >>at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowin
Re: pyflink keyed stream checkpoint error
PS: there are more information about this configuration in https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/python_config/#python-fn-execution-bundle-size > 2021年9月24日 上午10:07,Dian Fu 写道: > > I agree with Roman that it seems that the Python process has crashed. > > Besides the suggestions from Roman, I guess you could also try to configure > the bundle size to smaller value via “python.fn-execution.bundle.size”. > > Regards, > Dian > >> 2021年9月24日 上午3:48,Roman Khachatryan 写道: >> >> Hi, >> >> Is it possible that the python process crashed or hung up? (probably >> performing a snapshot) >> Could you validate this by checking the OS logs for OOM killer >> messages or process status? >> >> Regards, >> Roman >> >> On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter wrote: >>> >>> Hi, >>> I'm getting an error after enabling checkpointing in my pyflink application >>> that uses a keyed stream and rocksdb state. >>> >>> Here is the error message: >>> >>> 2021-09-22 16:18:14,408 INFO >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - >>> Closed RocksDB State Backend. Cleaning up RocksDB working directory >>> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39. >>> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - >>> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) >>> switched from RUNNING to FAILED with failure cause: java.io.IOException: >>> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed >>> (1/1)#34. >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) >>> at >>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) >>> at >>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) >>> at >>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) >>> at >>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) >>> at >>> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) >>> at >>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) >>> at >>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) >>> at >>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) >>> at >>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) >>> at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.RuntimeException: Failed to close remote bundle >>> at >>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383) >>> at >>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415) >>> at >>> org.apache.flink.streaming.runtime.tasks.SubtaskC
Re: pyflink keyed stream checkpoint error
Guess my last reply didn't go through, so here goes again... Possibly, but I don't think so. Since I submitted this, I have done some more testing. It works fine with file system or memory state backends, but not with rocksdb. I will try again and check the logs, though. I've also tested rocksdb checkpointing on other jobs, and it works fine. But when I combine rocksdb with the keyed stream, it fails. Thanks for the suggestions, I'll look into them. On Thu, Sep 23, 2021 at 9:07 PM Dian Fu wrote: > I agree with Roman that it seems that the Python process has crashed. > > Besides the suggestions from Roman, I guess you could also try to > configure the bundle size to smaller value via > “python.fn-execution.bundle.size”. > > Regards, > Dian > > > 2021年9月24日 上午3:48,Roman Khachatryan 写道: > > > > Hi, > > > > Is it possible that the python process crashed or hung up? (probably > > performing a snapshot) > > Could you validate this by checking the OS logs for OOM killer > > messages or process status? > > > > Regards, > > Roman > > > > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter > wrote: > >> > >> Hi, > >> I'm getting an error after enabling checkpointing in my pyflink > application that uses a keyed stream and rocksdb state. > >> > >> Here is the error message: > >> > >> 2021-09-22 16:18:14,408 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - > Closed RocksDB State Backend. Cleaning up RocksDB working directory > /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39. > >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task > [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34 > (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with > failure cause: java.io.IOException: Could not perform checkpoint 2 for > operator KEYED PROCESS -> Sink: Unnamed (1/1)#34. > >>at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) > >>at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) > >>at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) > >>at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) > >>at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) > >>at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) > >>at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) > >>at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) > >>at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) > >>at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > >>at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > >>at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > >>at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > >>at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > >>at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > >>at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > >>at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > >>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > >>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > >>at java.lang.Thread.run(Thread.java:748) > >> Caused by: java.lang.RuntimeException: Failed to close remote bundle > >>at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383) > >>at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331) > >>at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320) > >>at > org.apache.flink.streaming.api.operators.python.AbstractPythonFu
Re: pyflink keyed stream checkpoint error
Hi guys, I'm still running into this problem. I checked the logs, and there is no evidence that the python process crashed. I checked the process IDs and they are still active after the error. No `killed process` messages in /var/log/messages. I don't think it's necessarily related to checkpointing. I noticed https://issues.apache.org/jira/browse/FLINK-24123 and thought it was possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly) same error, but now the error happens outside the context of performing the checkpointing operation. I tried reducing python.fn-execution.bundle.size to 10,000 (default 100,000), and no luck there, either. 2021-10-13 13:39:19 java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush at org.apache.flink.streaming.api.operators.python. AbstractPythonFunctionOperator.invokeFinishBundle( AbstractPythonFunctionOperator.java:361) at org.apache.flink.streaming.api.operators.python. AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount( AbstractPythonFunctionOperator.java:321) at org.apache.flink.streaming.api.operators.python. AbstractOneInputPythonFunctionOperator.processElement( AbstractOneInputPythonFunctionOperator.java:139) at org.apache.flink.streaming.api.operators.python. PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java: 176) at org.apache.flink.streaming.runtime.tasks. OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask .java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput .processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput .emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam. BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377) at org.apache.flink.streaming.api.runners.python.beam. BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361) at org.apache.flink.streaming.api.operators.python. AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2( AbstractPythonFunctionOperator.java:340) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) ... 1 more Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java: 357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60) at org.apache.beam.runners.fnexecution.control. SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java: 504) at org.apache.beam.runners.fnexecution.control. DefaultJobBundleFactory$SimpleStageBundleFactory$1.close( DefaultJobBundleFactory.java:555) at org.apache.flink.streaming.api.runners.python.beam. BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375) ... 7 more Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc. StatusRuntimeException: CANCELLED: cancelled before receiving half close at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException( Status.java:524) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub. ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel( ServerCalls.java:275) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc. PartialForwardingServerCallListener.onCancel( PartialForwardingServerCallListener.java:40) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener .onCancel(ForwardingServerCallListener.java:23) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc. ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel( ForwardingServerCallListener.java:40) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc. Contexts$Con
Re: pyflink keyed stream checkpoint error
Hi Curt, Could you try if it works by reducing python.fn-execution.bundle.size to 1000 or 100? Regards, Dian On Thu, Oct 14, 2021 at 2:47 AM Curt Buechter wrote: > Hi guys, > I'm still running into this problem. I checked the logs, and there is no > evidence that the python process crashed. I checked the process IDs and > they are still active after the error. No `killed process` messages in > /var/log/messages. > > I don't think it's necessarily related to checkpointing. I noticed > https://issues.apache.org/jira/browse/FLINK-24123 and thought it was > possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly) > same error, but now the error happens outside the context of performing the > checkpointing operation. > > I tried reducing python.fn-execution.bundle.size to 10,000 (default > 100,000), and no luck there, either. > > 2021-10-13 13:39:19 > java.lang.RuntimeException: Error while waiting for > BeamPythonFunctionRunner flush > at org.apache.flink.streaming.api.operators.python. > AbstractPythonFunctionOperator.invokeFinishBundle( > AbstractPythonFunctionOperator.java:361) > at org.apache.flink.streaming.api.operators.python. > AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount( > AbstractPythonFunctionOperator.java:321) > at org.apache.flink.streaming.api.operators.python. > AbstractOneInputPythonFunctionOperator.processElement( > AbstractOneInputPythonFunctionOperator.java:139) > at org.apache.flink.streaming.api.operators.python. > PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java: > 176) > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask > .java:233) > at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput > .processElement(AbstractStreamTaskNetworkInput.java:134) > at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput > .emitNext(AbstractStreamTaskNetworkInput.java:105) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:65) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:496) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:203) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:809) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > .java:761) > at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( > Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Failed to close remote bundle > at org.apache.flink.streaming.api.runners.python.beam. > BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377) > at org.apache.flink.streaming.api.runners.python.beam. > BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361) > at org.apache.flink.streaming.api.operators.python. > AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2( > AbstractPythonFunctionOperator.java:340) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor > .java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor > .java:624) > ... 1 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: > CANCELLED: cancelled before receiving half close > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture > .java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60) > at org.apache.beam.runners.fnexecution.control. > SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java: > 504) > at org.apache.beam.runners.fnexecution.control. > DefaultJobBundleFactory$SimpleStageBundleFactory$1.close( > DefaultJobBundleFactory.java:555) > at org.apache.flink.streaming.api.runners.python.beam. > BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375) > ... 7 more > Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc. > StatusRuntimeException: CANCELLED: cancelled before receiving half close > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException( > Status.java:524) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub. > ServerCalls$StreamingServerCallHandler$StreamingServerCallListener > .onCancel(ServerCalls.java:275) > at org.apache.beam.vendor.grpc.v1p26p0.io.grpc. > PartialForwardingServerCallListener.onCancel( > PartialForwardingServerCallListene