PS: there are more information about this configuration in 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/python_config/#python-fn-execution-bundle-size

> 2021年9月24日 上午10:07,Dian Fu <dian0511...@gmail.com> 写道:
> 
> I agree with Roman that it seems that the Python process has crashed.
> 
> Besides the suggestions from Roman, I guess you could also try to configure 
> the bundle size to smaller value via “python.fn-execution.bundle.size”.
> 
> Regards,
> Dian
> 
>> 2021年9月24日 上午3:48,Roman Khachatryan <ro...@apache.org> 写道:
>> 
>> Hi,
>> 
>> Is it possible that the python process crashed or hung up? (probably
>> performing a snapshot)
>> Could you validate this by checking the OS logs for OOM killer
>> messages or process status?
>> 
>> Regards,
>> Roman
>> 
>> On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tricksho...@gmail.com> wrote:
>>> 
>>> Hi,
>>> I'm getting an error after enabling checkpointing in my pyflink application 
>>> that uses a keyed stream and rocksdb state.
>>> 
>>> Here is the error message:
>>> 
>>> 2021-09-22 16:18:14,408 INFO 
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - 
>>> Closed RocksDB State Backend. Cleaning up RocksDB working directory 
>>> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
>>> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - 
>>> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) 
>>> switched from RUNNING to FAILED with failure cause: java.io.IOException: 
>>> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed 
>>> (1/1)#34.
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>>   at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>>>   at 
>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
>>>   at 
>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
>>>   at 
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
>>>   at 
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
>>>   ... 19 more
>>> Caused by: java.util.concurrent.ExecutionException: 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
>>> CANCELLED: cancelled before receiving half close
>>>   at 
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>   at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>>   at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>>>   at 
>>> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>>>   at 
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>>>   at 
>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
>>>   ... 28 more
>>> Caused by: 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
>>> CANCELLED: cancelled before receiving half close
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>   at 
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>   at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>   at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>   ... 1 more
>>> 
>>> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>>> Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34 
>>> (8f4fd40e863dd058822060dc3cf98831).
>>> 2021-09-22 16:18:14,411 INFO 
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
>>> and sending final execution state FAILED to JobManager for task KEYED 
>>> PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831.
>>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>>> Attempting to cancel task Source: Custom Source -> 
>>> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
>>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>>> Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 
>>> (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING.
>>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>>> Triggering cancellation of task code Source: Custom Source -> 
>>> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
>>> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>>> Ignoring checkpoint aborted notification for non-running task Source: 
>>> Custom Source -> _stream_key_by_map_operator (1/1)#34.
>>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - 
>>> Metrics scheduler closed
>>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - 
>>> Closing reporter org.apache.kafka.common.metrics.JmxReporter
>>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - 
>>> Metrics reporters closed
>>> 
>>> 
>>> And, here is some source code:
>>> 
>>> class DedupeFunction(FlatMapFunction):
>>>   def __init__(self, schema):
>>>       super().__init__()
>>>       self.logger = None
>>>       self.state = None
>>>       self.my_state = None
>>>       self.schema = schema
>>>       self.metric_columns = [c.column_name for c in schema.columns if 
>>> c.is_metric]
>>> 
>>>   def open(self, runtime_context: RuntimeContext):
>>>       self.logger = logging
>>>       self.logger.info('Opening the FlatMapFunction')
>>>       descriptor = MapStateDescriptor("my_map_state_descriptor", 
>>> Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY())
>>>       self.state = runtime_context.get_map_state(descriptor)
>>> 
>>>   def flat_map(self, value):
>>>       try:
>>>           if not self.state.is_empty():
>>>               # self.logger.info('key in state')
>>>               previous_dict = {}
>>>               for k, v in self.state.items():
>>>                   # reverse the metric columns
>>>                   if k in self.metric_columns:
>>>                       if v:
>>>                           v = -v
>>>                   previous_dict[k] = v
>>>               yield Row(**previous_dict)
>>>           new_dict = value.as_dict()
>>>           self.state.put_all(new_dict.items())
>>>           yield value
>>>       except Exception as ex:
>>>           self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}')
>>> 
>>> class PrimaryKeySelector(KeySelector):
>>> 
>>>   def __init__(self, primary_key):
>>>       self.__primary_key__ = primary_key
>>> 
>>>   def get_key(self, kv_obj):
>>>       return kv_obj.as_dict().get(self.__primary_key__)
>>> 
>>> 
>>> 
>>> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR'))
>>> 
>>> self.__env__.set_state_backend(backend)
>>> 
>>> 
>>> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__), 
>>> key_type_info=primary_key_type_info)
>>> 
>>> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__), 
>>> output_type=type_info)
>>> 
>>> 
>>> This program works fine if checkpointing is not enabled. Any advice here?
>>> 
>>> 
>>> Thanks
> 

Reply via email to