Re: pyflink keyed stream checkpoint error

2021-10-14 Thread Dian Fu
Hi Curt,

Could you try if it works by reducing python.fn-execution.bundle.size to
1000 or 100?

Regards,
Dian

On Thu, Oct 14, 2021 at 2:47 AM Curt Buechter  wrote:

> Hi guys,
> I'm still running into this problem. I checked the logs, and there is no
> evidence that the python process crashed. I checked the process IDs and
> they are still active after the error. No `killed process` messages in
> /var/log/messages.
>
> I don't think it's necessarily related to checkpointing. I noticed
> https://issues.apache.org/jira/browse/FLINK-24123 and thought it was
> possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly)
> same error, but now the error happens outside the context of performing the
> checkpointing operation.
>
> I tried reducing python.fn-execution.bundle.size to 10,000 (default
> 100,000), and no luck there, either.
>
> 2021-10-13 13:39:19
> java.lang.RuntimeException: Error while waiting for
> BeamPythonFunctionRunner flush
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.invokeFinishBundle(
> AbstractPythonFunctionOperator.java:361)
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(
> AbstractPythonFunctionOperator.java:321)
> at org.apache.flink.streaming.api.operators.python.
> AbstractOneInputPythonFunctionOperator.processElement(
> AbstractOneInputPythonFunctionOperator.java:139)
> at org.apache.flink.streaming.api.operators.python.
> PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java:
> 176)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:233)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
> .processElement(AbstractStreamTaskNetworkInput.java:134)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
> .emitNext(AbstractStreamTaskNetworkInput.java:105)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:496)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:203)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:809)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:761)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377)
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361)
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2(
> AbstractPythonFunctionOperator.java:340)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
> .java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:624)
> ... 1 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: cancelled before receiving half close
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture
> .java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
> at org.apache.beam.runners.fnexecution.control.
> SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
> 504)
> at org.apache.beam.runners.fnexecution.control.
> DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(
> DefaultJobBundleFactory.java:555)
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
> ... 7 more
> Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> StatusRuntimeException: CANCELLED: cancelled before receiving half close
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(
> Status.java:524)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.
> ServerCalls$StreamingServerCallHandler$StreamingServerCallListener
> .onCancel(ServerCalls.java:275)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> PartialForwardingServerCallListener.onCancel(
> 

Re: pyflink keyed stream checkpoint error

2021-10-13 Thread Curt Buechter
Hi guys,
I'm still running into this problem. I checked the logs, and there is no
evidence that the python process crashed. I checked the process IDs and
they are still active after the error. No `killed process` messages in
/var/log/messages.

I don't think it's necessarily related to checkpointing. I noticed
https://issues.apache.org/jira/browse/FLINK-24123 and thought it was
possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly)
same error, but now the error happens outside the context of performing the
checkpointing operation.

I tried reducing python.fn-execution.bundle.size to 10,000 (default
100,000), and no luck there, either.

2021-10-13 13:39:19
java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner
flush
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.invokeFinishBundle(
AbstractPythonFunctionOperator.java:361)
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(
AbstractPythonFunctionOperator.java:321)
at org.apache.flink.streaming.api.operators.python.
AbstractOneInputPythonFunctionOperator.processElement(
AbstractOneInputPythonFunctionOperator.java:139)
at org.apache.flink.streaming.api.operators.python.
PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java:
176)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377)
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361)
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2(
AbstractPythonFunctionOperator.java:340)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624)
... 1 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
CANCELLED: cancelled before receiving half close
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:
357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
504)
at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(
DefaultJobBundleFactory.java:555)
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
... 7 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
StatusRuntimeException: CANCELLED: cancelled before receiving half close
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(
Status.java:524)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.
ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(
ServerCalls.java:275)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
PartialForwardingServerCallListener.onCancel(
PartialForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener
.onCancel(ForwardingServerCallListener.java:23)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(
ForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Curt Buechter
Guess my last reply didn't go through, so here goes again...

Possibly, but I don't think so. Since I submitted this, I have done some
more testing. It works fine with file system or memory state backends, but
not with rocksdb. I will try again and check the logs, though.
I've also tested rocksdb checkpointing on other jobs, and it works fine.
But when I combine rocksdb with the keyed stream, it fails.

Thanks for the suggestions, I'll look into them.

On Thu, Sep 23, 2021 at 9:07 PM Dian Fu  wrote:

> I agree with Roman that it seems that the Python process has crashed.
>
> Besides the suggestions from Roman, I guess you could also try to
> configure the bundle size to smaller value via
> “python.fn-execution.bundle.size”.
>
> Regards,
> Dian
>
> > 2021年9月24日 上午3:48,Roman Khachatryan  写道:
> >
> > Hi,
> >
> > Is it possible that the python process crashed or hung up? (probably
> > performing a snapshot)
> > Could you validate this by checking the OS logs for OOM killer
> > messages or process status?
> >
> > Regards,
> > Roman
> >
> > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter 
> wrote:
> >>
> >> Hi,
> >> I'm getting an error after enabling checkpointing in my pyflink
> application that uses a keyed stream and rocksdb state.
> >>
> >> Here is the error message:
> >>
> >> 2021-09-22 16:18:14,408 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] -
> Closed RocksDB State Backend. Cleaning up RocksDB working directory
> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
> >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task
> [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34
> (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with
> failure cause: java.io.IOException: Could not perform checkpoint 2 for
> operator KEYED PROCESS -> Sink: Unnamed (1/1)#34.
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> >>at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> >>at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> >>at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> >>at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> >>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> >>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> >>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> >>at java.lang.Thread.run(Thread.java:748)
> >> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> >>at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
> >>at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
> >>at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
> >>at
> 

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Dian Fu
PS: there are more information about this configuration in 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/python_config/#python-fn-execution-bundle-size

> 2021年9月24日 上午10:07,Dian Fu  写道:
> 
> I agree with Roman that it seems that the Python process has crashed.
> 
> Besides the suggestions from Roman, I guess you could also try to configure 
> the bundle size to smaller value via “python.fn-execution.bundle.size”.
> 
> Regards,
> Dian
> 
>> 2021年9月24日 上午3:48,Roman Khachatryan  写道:
>> 
>> Hi,
>> 
>> Is it possible that the python process crashed or hung up? (probably
>> performing a snapshot)
>> Could you validate this by checking the OS logs for OOM killer
>> messages or process status?
>> 
>> Regards,
>> Roman
>> 
>> On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter  wrote:
>>> 
>>> Hi,
>>> I'm getting an error after enabling checkpointing in my pyflink application 
>>> that uses a keyed stream and rocksdb state.
>>> 
>>> Here is the error message:
>>> 
>>> 2021-09-22 16:18:14,408 INFO 
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - 
>>> Closed RocksDB State Backend. Cleaning up RocksDB working directory 
>>> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
>>> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - 
>>> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) 
>>> switched from RUNNING to FAILED with failure cause: java.io.IOException: 
>>> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed 
>>> (1/1)#34.
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>>   at 
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>>   at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>>>   at 
>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
>>>   at 
>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
>>>   at 
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
>>>   at 
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
>>>   at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
>>>   at 
>>> 

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Dian Fu
I agree with Roman that it seems that the Python process has crashed.

Besides the suggestions from Roman, I guess you could also try to configure the 
bundle size to smaller value via “python.fn-execution.bundle.size”.

Regards,
Dian

> 2021年9月24日 上午3:48,Roman Khachatryan  写道:
> 
> Hi,
> 
> Is it possible that the python process crashed or hung up? (probably
> performing a snapshot)
> Could you validate this by checking the OS logs for OOM killer
> messages or process status?
> 
> Regards,
> Roman
> 
> On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter  wrote:
>> 
>> Hi,
>> I'm getting an error after enabling checkpointing in my pyflink application 
>> that uses a keyed stream and rocksdb state.
>> 
>> Here is the error message:
>> 
>> 2021-09-22 16:18:14,408 INFO 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - 
>> Closed RocksDB State Backend. Cleaning up RocksDB working directory 
>> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
>> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - 
>> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) 
>> switched from RUNNING to FAILED with failure cause: java.io.IOException: 
>> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed 
>> (1/1)#34.
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>>at 
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>>at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>>at 
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
>>at 
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
>>at 
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
>>at 
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
>>at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
>>at 
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
>>at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
>>at 
>> 

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Roman Khachatryan
Hi,

Is it possible that the python process crashed or hung up? (probably
performing a snapshot)
Could you validate this by checking the OS logs for OOM killer
messages or process status?

Regards,
Roman

On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter  wrote:
>
> Hi,
> I'm getting an error after enabling checkpointing in my pyflink application 
> that uses a keyed stream and rocksdb state.
>
> Here is the error message:
>
> 2021-09-22 16:18:14,408 INFO 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - Closed 
> RocksDB State Backend. Cleaning up RocksDB working directory 
> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) 
> switched from RUNNING to FAILED with failure cause: java.io.IOException: 
> Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed 
> (1/1)#34.
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
> at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
> ... 19 more
> Caused by: java.util.concurrent.ExecutionException: 
> 

pyflink keyed stream checkpoint error

2021-09-22 Thread Curt Buechter
Hi,
I'm getting an error after enabling checkpointing in my pyflink application
that uses a keyed stream and rocksdb state.

Here is the error message:

2021-09-22 16:18:14,408 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] -
Closed RocksDB State Backend. Cleaning up RocksDB working directory
/mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] -
KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831)
switched from RUNNING to FAILED with failure cause: java.io.IOException:
Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed
(1/1)#34.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
... 19 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
CANCELLED: cancelled before receiving half close
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at

Re: Checkpoint error - "The job has failed"

2021-04-28 Thread Dan Hill
Oh interesting.  Yea, could be.  We'll soon update to v1.12.  Thanks Robert
and Yun!

On Wed, Apr 28, 2021 at 1:30 AM Yun Tang  wrote:

> Hi Dan,
>
> You could refer to the "Fix Versions" in FLINK-16753 [1] and know that
> this bug is resolved after 1.11.3 not 1.11.1.
>
> [1] https://issues.apache.org/jira/browse/FLINK-16753
>
> Best
> Yun Tang
> --
> *From:* Dan Hill 
> *Sent:* Tuesday, April 27, 2021 7:50
> *To:* Yun Tang 
> *Cc:* Robert Metzger ; user 
> *Subject:* Re: Checkpoint error - "The job has failed"
>
> Hey Yun and Robert,
>
> I'm using Flink v1.11.1.
>
> Robert, I'll send you a separate email with the logs.
>
> On Mon, Apr 26, 2021 at 12:46 AM Yun Tang  wrote:
>
> Hi Dan,
>
> I think you might use older version of Flink and this problem has been
> resolved by FLINK-16753 [1] after Flink-1.10.3.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-16753
>
> Best
> Yun Tang
> --
> *From:* Robert Metzger 
> *Sent:* Monday, April 26, 2021 14:46
> *To:* Dan Hill 
> *Cc:* user 
> *Subject:* Re: Checkpoint error - "The job has failed"
>
> Hi Dan,
>
> can you provide me with the JobManager logs to take a look as well? (This
> will also tell me which Flink version you are using)
>
>
>
> On Mon, Apr 26, 2021 at 7:20 AM Dan Hill  wrote:
>
> My Flink job failed to checkpoint with a "The job has failed" error.  The
> logs contained no other recent errors.  I keep hitting the error even if I
> cancel the jobs and restart them.  When I restarted my jobmanager and
> taskmanager, the error went away.
>
> What error am I hitting?  It looks like there is bad state that lives
> outside the scope of a job.
>
> How often do people restart their jobmanagers and taskmanager to deal with
> errors like this?
>
>


Re: Checkpoint error - "The job has failed"

2021-04-28 Thread Yun Tang
Hi Dan,

You could refer to the "Fix Versions" in FLINK-16753 [1] and know that this bug 
is resolved after 1.11.3 not 1.11.1.

[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Dan Hill 
Sent: Tuesday, April 27, 2021 7:50
To: Yun Tang 
Cc: Robert Metzger ; user 
Subject: Re: Checkpoint error - "The job has failed"

Hey Yun and Robert,

I'm using Flink v1.11.1.

Robert, I'll send you a separate email with the logs.

On Mon, Apr 26, 2021 at 12:46 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Dan,

I think you might use older version of Flink and this problem has been resolved 
by FLINK-16753 [1] after Flink-1.10.3.


[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Robert Metzger mailto:rmetz...@apache.org>>
Sent: Monday, April 26, 2021 14:46
To: Dan Hill mailto:quietgol...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Checkpoint error - "The job has failed"

Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This will 
also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
My Flink job failed to checkpoint with a "The job has failed" error.  The logs 
contained no other recent errors.  I keep hitting the error even if I cancel 
the jobs and restart them.  When I restarted my jobmanager and taskmanager, the 
error went away.

What error am I hitting?  It looks like there is bad state that lives outside 
the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with 
errors like this?


Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Dan Hill
Hey Yun and Robert,

I'm using Flink v1.11.1.

Robert, I'll send you a separate email with the logs.

On Mon, Apr 26, 2021 at 12:46 AM Yun Tang  wrote:

> Hi Dan,
>
> I think you might use older version of Flink and this problem has been
> resolved by FLINK-16753 [1] after Flink-1.10.3.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-16753
>
> Best
> Yun Tang
> --
> *From:* Robert Metzger 
> *Sent:* Monday, April 26, 2021 14:46
> *To:* Dan Hill 
> *Cc:* user 
> *Subject:* Re: Checkpoint error - "The job has failed"
>
> Hi Dan,
>
> can you provide me with the JobManager logs to take a look as well? (This
> will also tell me which Flink version you are using)
>
>
>
> On Mon, Apr 26, 2021 at 7:20 AM Dan Hill  wrote:
>
> My Flink job failed to checkpoint with a "The job has failed" error.  The
> logs contained no other recent errors.  I keep hitting the error even if I
> cancel the jobs and restart them.  When I restarted my jobmanager and
> taskmanager, the error went away.
>
> What error am I hitting?  It looks like there is bad state that lives
> outside the scope of a job.
>
> How often do people restart their jobmanagers and taskmanager to deal with
> errors like this?
>
>


Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Yun Tang
Hi Dan,

I think you might use older version of Flink and this problem has been resolved 
by FLINK-16753 [1] after Flink-1.10.3.


[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Robert Metzger 
Sent: Monday, April 26, 2021 14:46
To: Dan Hill 
Cc: user 
Subject: Re: Checkpoint error - "The job has failed"

Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This will 
also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
My Flink job failed to checkpoint with a "The job has failed" error.  The logs 
contained no other recent errors.  I keep hitting the error even if I cancel 
the jobs and restart them.  When I restarted my jobmanager and taskmanager, the 
error went away.

What error am I hitting?  It looks like there is bad state that lives outside 
the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with 
errors like this?


Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Robert Metzger
Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This
will also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill  wrote:

> My Flink job failed to checkpoint with a "The job has failed" error.  The
> logs contained no other recent errors.  I keep hitting the error even if I
> cancel the jobs and restart them.  When I restarted my jobmanager and
> taskmanager, the error went away.
>
> What error am I hitting?  It looks like there is bad state that lives
> outside the scope of a job.
>
> How often do people restart their jobmanagers and taskmanager to deal with
> errors like this?
>


Checkpoint error - "The job has failed"

2021-04-25 Thread Dan Hill
My Flink job failed to checkpoint with a "The job has failed" error.  The
logs contained no other recent errors.  I keep hitting the error even if I
cancel the jobs and restart them.  When I restarted my jobmanager and
taskmanager, the error went away.

What error am I hitting?  It looks like there is bad state that lives
outside the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with
errors like this?


Re: Re: Re: Checkpoint Error

2021-03-10 Thread Till Rohrmann
Could it be that another process might have deleted the in progress
checkpoint file?

Cheers,
Till

On Mon, Mar 8, 2021 at 4:31 PM Yun Gao  wrote:

> Hi Navneeth,
>
> Is the attached exception the root cause for the checkpoint failure ?
> Namely is it also reported in job manager log?
>
> Also, have you enabled concurrent checkpoint?
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*Navneeth Krishnan 
> *Send Date:*Mon Mar 8 13:10:46 2021
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: Re: Checkpoint Error
>
>> Hi Yun,
>>
>> Thanks for the response. I checked the mounts and only the JM's and TM's
>> are mounted with this EFS. Not sure how to debug this.
>>
>> Thanks
>>
>> On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:
>>
>>> Hi Navneeth,
>>>
>>> It seems from the stack that the exception is caused by the underlying
>>> EFS problems ? Have you checked
>>> if there are errors reported for EFS, or if there might be duplicate
>>> mounting for the same EFS and others
>>> have ever deleted the directory?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*Navneeth Krishnan 
>>> *Send Date:*Sun Mar 7 15:44:59 2021
>>> *Recipients:*user 
>>> *Subject:*Re: Checkpoint Error
>>>
>>>> Hi All,
>>>>
>>>> Any suggestions?
>>>>
>>>> Thanks
>>>>
>>>> On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan <
>>>> reachnavnee...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> We are running our streaming job on flink 1.7.2 and we are noticing
>>>>> the below error. Not sure what's causing it, any pointers would help. We
>>>>> have 10 TM's checkpointing to AWS EFS.
>>>>>
>>>>> AsynchronousException{java.lang.Exception: Could not materialize 
>>>>> checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).}at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
>>>>>  
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
>>>>>  
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
>>>>>  
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
>>>>>  
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
>>>>>  java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: 
>>>>> Could not materialize checkpoint 11 for operator Processor -> Sink: 
>>>>> KafkaSink (34/42).at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
>>>>>  6 moreCaused by: java.util.concurrent.ExecutionException: 
>>>>> java.io.IOException: Could not flush and close the file system output 
>>>>> stream to 
>>>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>>>  in order to obtain the stream state handleat 
>>>>> java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
>>>>> java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
>>>>>  
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
>>>>>  5 moreCaused by: java.io.IOException: Could not flush and close the file 
>>>>> system output stream to 
>>>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>>>  in order to obtain the stream state handleat 
>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.c

Re: Re: Re: Checkpoint Error

2021-03-08 Thread Yun Gao
Hi Navneeth,

Is the attached exception the root cause for the checkpoint failure ?
Namely is it also reported in job manager log?

Also, have you enabled concurrent checkpoint? 

Best,
 Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Mon Mar 8 13:10:46 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Checkpoint Error

Hi Yun,

Thanks for the response. I checked the mounts and only the JM's and TM's are 
mounted with this EFS. Not sure how to debug this.

Thanks
On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:

Hi Navneeth,

It seems from the stack that the exception is caused by the underlying EFS 
problems ? Have you checked
if there are errors reported for EFS, or if there might be duplicate mounting 
for the same EFS and others
have ever deleted the directory?

Best,
Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Sun Mar 7 15:44:59 2021
Recipients:user 
Subject:Re: Checkpoint Error

Hi All,

Any suggestions?

Thanks
On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan  
wrote:

Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the below 
error. Not sure what's causing it, any pointers would help. We have 10 TM's 
checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 
for operator Processor -> Sink: KafkaSink (34/42).}at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
 java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not 
materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: 
Could not flush and close the file system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
 5 moreCaused by: java.io.IOException: Could not flush and close the file 
system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 
moreCaused by: java.io.IOException: Stale file handleat 
java.io.FileOutputStream.close0(Native Method)at 
java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
java.io.FileOutputStream.close(FileOutputStream.java:354)at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
 12 more

Thanks

Re: Re: Checkpoint Error

2021-03-07 Thread Navneeth Krishnan
Hi Yun,

Thanks for the response. I checked the mounts and only the JM's and TM's
are mounted with this EFS. Not sure how to debug this.

Thanks

On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:

> Hi Navneeth,
>
> It seems from the stack that the exception is caused by the underlying EFS
> problems ? Have you checked
> if there are errors reported for EFS, or if there might be duplicate
> mounting for the same EFS and others
> have ever deleted the directory?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Navneeth Krishnan 
> *Send Date:*Sun Mar 7 15:44:59 2021
> *Recipients:*user 
> *Subject:*Re: Checkpoint Error
>
>> Hi All,
>>
>> Any suggestions?
>>
>> Thanks
>>
>> On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> We are running our streaming job on flink 1.7.2 and we are noticing the
>>> below error. Not sure what's causing it, any pointers would help. We have
>>> 10 TM's checkpointing to AWS EFS.
>>>
>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
>>> 11 for operator Processor -> Sink: KafkaSink (34/42).}at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
>>>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
>>>  
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
>>>  java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could 
>>> not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink 
>>> (34/42).at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
>>>  6 moreCaused by: java.util.concurrent.ExecutionException: 
>>> java.io.IOException: Could not flush and close the file system output 
>>> stream to 
>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>  in order to obtain the stream state handleat 
>>> java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
>>> java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
>>>  5 moreCaused by: java.io.IOException: Could not flush and close the file 
>>> system output stream to 
>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>  in order to obtain the stream state handleat 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
>>>  
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
>>>  
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
>>>  
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
>>>  java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 
>>> 7 moreCaused by: java.io.IOException: Stale file handleat 
>>> java.io.FileOutputStream.close0(Native Method)at 
>>> java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
>>> java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
>>> java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
>>> java.io.FileOutputStream.close(FileOutputStream.java:354)at 
>>> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
>>>  
>>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
>>>  
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
>>>  12 more
>>>
>>>
>>> Thanks
>>>
>>>


Re: Re: Checkpoint Error

2021-03-07 Thread Yun Gao
Hi Navneeth,

It seems from the stack that the exception is caused by the underlying EFS 
problems ? Have you checked
if there are errors reported for EFS, or if there might be duplicate mounting 
for the same EFS and others
have ever deleted the directory?

Best,
Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Sun Mar 7 15:44:59 2021
Recipients:user 
Subject:Re: Checkpoint Error

Hi All,

Any suggestions?

Thanks
On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan  
wrote:

Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the below 
error. Not sure what's causing it, any pointers would help. We have 10 TM's 
checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 
for operator Processor -> Sink: KafkaSink (34/42).}at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
 java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not 
materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: 
Could not flush and close the file system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
 5 moreCaused by: java.io.IOException: Could not flush and close the file 
system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 
moreCaused by: java.io.IOException: Stale file handleat 
java.io.FileOutputStream.close0(Native Method)at 
java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
java.io.FileOutputStream.close(FileOutputStream.java:354)at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
 12 more

Thanks

Re: Checkpoint Error

2021-03-06 Thread Navneeth Krishnan
Hi All,

Any suggestions?

Thanks

On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> We are running our streaming job on flink 1.7.2 and we are noticing the
> below error. Not sure what's causing it, any pointers would help. We have
> 10 TM's checkpointing to AWS EFS.
>
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 11 for operator Processor -> Sink: KafkaSink (34/42).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 11 for 
> operator Processor -> Sink: KafkaSink (34/42).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>   ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
>   at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>   ... 7 more
> Caused by: java.io.IOException: Stale file handle
>   at java.io.FileOutputStream.close0(Native Method)
>   at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
>   at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
>   at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
>   at java.io.FileOutputStream.close(FileOutputStream.java:354)
>   at 
> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
>   ... 12 more
>
>
> Thanks
>
>


Checkpoint Error

2021-01-18 Thread Navneeth Krishnan
Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the
below error. Not sure what's causing it, any pointers would help. We have
10 TM's checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize
checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 11
for operator Processor -> Sink: KafkaSink (34/42).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file
system output stream to
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: java.io.IOException: Stale file handle
at java.io.FileOutputStream.close0(Native Method)
at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
at java.io.FileOutputStream.close(FileOutputStream.java:354)
at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)
at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
... 12 more


Thanks


Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-24 Thread Robert Metzger
Thanks for opening the ticket. I've asked a committer who knows the
streaming sink well to take a look at the ticket.

On Fri, Apr 24, 2020 at 6:47 AM Lu Niu  wrote:

> Hi, Robert
>
> BTW, I did some field study and I think it's possible to support streaming
> sink using presto s3 filesystem. I think that would help user to use presto
> s3 fs in all access to s3. I created this jira ticket
> https://issues.apache.org/jira/browse/FLINK-17364 . what do you think?
>
> Best
> Lu
>
> On Tue, Apr 21, 2020 at 1:46 PM Lu Niu  wrote:
>
>> Cool, thanks!
>>
>> On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger 
>> wrote:
>>
>>> I'm not aware of anything. I think the presto s3 file system is
>>> generally the recommended S3 FS implementation.
>>>
>>> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:
>>>
 Thank you both. Given the debug overhead, I might just try out presto
 s3 file system then. Besides that presto s3 file system doesn't support
 streaming sink, is there anything else I need to keep in mind? Thanks!

 Best
 Lu

 On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
 wrote:

> Hey,
> Others have experienced this as well, yes:
> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
> I have also notified the Hadoop project about this issue:
> https://issues.apache.org/jira/browse/HADOOP-15915
>
> I agree with Congxian: You could try reaching out to the Hadoop user@
> list for additional help. Maybe logging on DEBUG level helps already?
> If you are up for an adventure, you could also consider adding some
> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
> version.
>
> Best,
> Robert
>
>
> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
> wrote:
>
>> Hi LU
>>
>> I'm not familiar with S3 file system, maybe others in Flink community
>> can help you in this case, or maybe you can also reach out to s3
>> teams/community for help.
>>
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午11:05写道:
>>
>>> Hi, Congxiao
>>>
>>> Thanks for replying. yeah, I also found those references. However,
>>> as I mentioned in original post, there is enough capacity in all disk.
>>> Also, when I switch to presto file system, the problem goes away. 
>>> Wondering
>>> whether others encounter similar issue.
>>>
>>> Best
>>> Lu
>>>
>>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>>> wrote:
>>>
 Hi
 From the stack, seems the problem is that "
 org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.
 util.DiskChecker$DiskErrorException: Could not find any valid
 local directory for s3ablock-0001-", and I googled the exception, found
 there is some relative page[1], could you please make sure there is 
 enough
 space on the local dis.

 [1]
 https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
 Best,
 Congxian


 Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from
> S3AFileSystem. But there is no capacity issue on any disk. we are 
> using
> hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for 
> state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-23 Thread Lu Niu
Hi, Robert

BTW, I did some field study and I think it's possible to support streaming
sink using presto s3 filesystem. I think that would help user to use presto
s3 fs in all access to s3. I created this jira ticket
https://issues.apache.org/jira/browse/FLINK-17364 . what do you think?

Best
Lu

On Tue, Apr 21, 2020 at 1:46 PM Lu Niu  wrote:

> Cool, thanks!
>
> On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger 
> wrote:
>
>> I'm not aware of anything. I think the presto s3 file system is generally
>> the recommended S3 FS implementation.
>>
>> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:
>>
>>> Thank you both. Given the debug overhead, I might just try out presto s3
>>> file system then. Besides that presto s3 file system doesn't support
>>> streaming sink, is there anything else I need to keep in mind? Thanks!
>>>
>>> Best
>>> Lu
>>>
>>> On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
>>> wrote:
>>>
 Hey,
 Others have experienced this as well, yes:
 https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
 I have also notified the Hadoop project about this issue:
 https://issues.apache.org/jira/browse/HADOOP-15915

 I agree with Congxian: You could try reaching out to the Hadoop user@
 list for additional help. Maybe logging on DEBUG level helps already?
 If you are up for an adventure, you could also consider adding some
 debugging code into Hadoop's DiskChecker and compile a custom Hadoop
 version.

 Best,
 Robert


 On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
 wrote:

> Hi LU
>
> I'm not familiar with S3 file system, maybe others in Flink community
> can help you in this case, or maybe you can also reach out to s3
> teams/community for help.
>
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午11:05写道:
>
>> Hi, Congxiao
>>
>> Thanks for replying. yeah, I also found those references. However, as
>> I mentioned in original post, there is enough capacity in all disk. Also,
>> when I switch to presto file system, the problem goes away. Wondering
>> whether others encounter similar issue.
>>
>> Best
>> Lu
>>
>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>> From the stack, seems the problem is that "
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.
>>> util.DiskChecker$DiskErrorException: Could not find any valid local
>>> directory for s3ablock-0001-", and I googled the exception, found there 
>>> is
>>> some relative page[1], could you please make sure there is enough space 
>>> on
>>> the local dis.
>>>
>>> [1]
>>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>>> Best,
>>> Congxian
>>>
>>>
>>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>>
 Hi, flink users

 Did anyone encounter such error? The error comes from
 S3AFileSystem. But there is no capacity issue on any disk. we are using
 hadoop 2.7.1.
 ```

 Caused by: java.util.concurrent.ExecutionException: 
 java.io.IOException: Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
 org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
 org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
 Caused by: java.io.IOException: Could not open output stream for state 
 backend
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at 
 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-21 Thread Lu Niu
Cool, thanks!

On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger  wrote:

> I'm not aware of anything. I think the presto s3 file system is generally
> the recommended S3 FS implementation.
>
> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:
>
>> Thank you both. Given the debug overhead, I might just try out presto s3
>> file system then. Besides that presto s3 file system doesn't support
>> streaming sink, is there anything else I need to keep in mind? Thanks!
>>
>> Best
>> Lu
>>
>> On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
>> wrote:
>>
>>> Hey,
>>> Others have experienced this as well, yes:
>>> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
>>> I have also notified the Hadoop project about this issue:
>>> https://issues.apache.org/jira/browse/HADOOP-15915
>>>
>>> I agree with Congxian: You could try reaching out to the Hadoop user@
>>> list for additional help. Maybe logging on DEBUG level helps already?
>>> If you are up for an adventure, you could also consider adding some
>>> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
>>> version.
>>>
>>> Best,
>>> Robert
>>>
>>>
>>> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
>>> wrote:
>>>
 Hi LU

 I'm not familiar with S3 file system, maybe others in Flink community
 can help you in this case, or maybe you can also reach out to s3
 teams/community for help.

 Best,
 Congxian


 Lu Niu  于2020年4月8日周三 上午11:05写道:

> Hi, Congxiao
>
> Thanks for replying. yeah, I also found those references. However, as
> I mentioned in original post, there is enough capacity in all disk. Also,
> when I switch to presto file system, the problem goes away. Wondering
> whether others encounter similar issue.
>
> Best
> Lu
>
> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
> wrote:
>
>> Hi
>> From the stack, seems the problem is that "
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.
>> util.DiskChecker$DiskErrorException: Could not find any valid local
>> directory for s3ablock-0001-", and I googled the exception, found there 
>> is
>> some relative page[1], could you please make sure there is enough space 
>> on
>> the local dis.
>>
>> [1]
>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>
>>> Hi, flink users
>>>
>>> Did anyone encounter such error? The error comes from S3AFileSystem.
>>> But there is no capacity issue on any disk. we are using hadoop 2.7.1.
>>> ```
>>>
>>> Caused by: java.util.concurrent.ExecutionException: 
>>> java.io.IOException: Could not open output stream for state backend
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at 
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>> at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>> ... 3 more
>>> Caused by: java.io.IOException: Could not open output stream for state 
>>> backend
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>> at 
>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>> at 
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>> at 
>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>> at 
>>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>> at 
>>> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-21 Thread Robert Metzger
I'm not aware of anything. I think the presto s3 file system is generally
the recommended S3 FS implementation.

On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:

> Thank you both. Given the debug overhead, I might just try out presto s3
> file system then. Besides that presto s3 file system doesn't support
> streaming sink, is there anything else I need to keep in mind? Thanks!
>
> Best
> Lu
>
> On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
> wrote:
>
>> Hey,
>> Others have experienced this as well, yes:
>> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
>> I have also notified the Hadoop project about this issue:
>> https://issues.apache.org/jira/browse/HADOOP-15915
>>
>> I agree with Congxian: You could try reaching out to the Hadoop user@
>> list for additional help. Maybe logging on DEBUG level helps already?
>> If you are up for an adventure, you could also consider adding some
>> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
>> version.
>>
>> Best,
>> Robert
>>
>>
>> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
>> wrote:
>>
>>> Hi LU
>>>
>>> I'm not familiar with S3 file system, maybe others in Flink community
>>> can help you in this case, or maybe you can also reach out to s3
>>> teams/community for help.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Lu Niu  于2020年4月8日周三 上午11:05写道:
>>>
 Hi, Congxiao

 Thanks for replying. yeah, I also found those references. However, as I
 mentioned in original post, there is enough capacity in all disk. Also,
 when I switch to presto file system, the problem goes away. Wondering
 whether others encounter similar issue.

 Best
 Lu

 On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
 wrote:

> Hi
> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could
> not find any valid local directory for s3ablock-0001-", and I googled the
> exception, found there is some relative page[1], could you please make 
> sure
> there is enough space on the local dis.
>
> [1]
> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午8:41写道:
>
>> Hi, flink users
>>
>> Did anyone encounter such error? The error comes from S3AFileSystem.
>> But there is no capacity issue on any disk. we are using hadoop 2.7.1.
>> ```
>>
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>> Could not open output stream for state backend
>>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>  at 
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>  at 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>  ... 3 more
>> Caused by: java.io.IOException: Could not open output stream for state 
>> backend
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>  at 
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>  at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>  at 
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>  at 
>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>  at 
>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>>  at 
>> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-13 Thread Lu Niu
Thank you both. Given the debug overhead, I might just try out presto s3
file system then. Besides that presto s3 file system doesn't support
streaming sink, is there anything else I need to keep in mind? Thanks!

Best
Lu

On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger  wrote:

> Hey,
> Others have experienced this as well, yes:
> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
> I have also notified the Hadoop project about this issue:
> https://issues.apache.org/jira/browse/HADOOP-15915
>
> I agree with Congxian: You could try reaching out to the Hadoop user@
> list for additional help. Maybe logging on DEBUG level helps already?
> If you are up for an adventure, you could also consider adding some
> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
> version.
>
> Best,
> Robert
>
>
> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
> wrote:
>
>> Hi LU
>>
>> I'm not familiar with S3 file system, maybe others in Flink community can
>> help you in this case, or maybe you can also reach out to s3
>> teams/community for help.
>>
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午11:05写道:
>>
>>> Hi, Congxiao
>>>
>>> Thanks for replying. yeah, I also found those references. However, as I
>>> mentioned in original post, there is enough capacity in all disk. Also,
>>> when I switch to presto file system, the problem goes away. Wondering
>>> whether others encounter similar issue.
>>>
>>> Best
>>> Lu
>>>
>>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>>> wrote:
>>>
 Hi
 From the stack, seems the problem is that "org.apache.flink.fs.shaded.
 hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could
 not find any valid local directory for s3ablock-0001-", and I googled the
 exception, found there is some relative page[1], could you please make sure
 there is enough space on the local dis.

 [1]
 https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
 Best,
 Congxian


 Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from S3AFileSystem.
> But there is no capacity issue on any disk. we are using hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for state 
> backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>   at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>   at 
> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>   at 
> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-09 Thread Robert Metzger
Hey,
Others have experienced this as well, yes:
https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
I have also notified the Hadoop project about this issue:
https://issues.apache.org/jira/browse/HADOOP-15915

I agree with Congxian: You could try reaching out to the Hadoop user@ list
for additional help. Maybe logging on DEBUG level helps already?
If you are up for an adventure, you could also consider adding some
debugging code into Hadoop's DiskChecker and compile a custom Hadoop
version.

Best,
Robert


On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu  wrote:

> Hi LU
>
> I'm not familiar with S3 file system, maybe others in Flink community can
> help you in this case, or maybe you can also reach out to s3
> teams/community for help.
>
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午11:05写道:
>
>> Hi, Congxiao
>>
>> Thanks for replying. yeah, I also found those references. However, as I
>> mentioned in original post, there is enough capacity in all disk. Also,
>> when I switch to presto file system, the problem goes away. Wondering
>> whether others encounter similar issue.
>>
>> Best
>> Lu
>>
>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
>>> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could
>>> not find any valid local directory for s3ablock-0001-", and I googled the
>>> exception, found there is some relative page[1], could you please make sure
>>> there is enough space on the local dis.
>>>
>>> [1]
>>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>>> Best,
>>> Congxian
>>>
>>>
>>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>>
 Hi, flink users

 Did anyone encounter such error? The error comes from S3AFileSystem.
 But there is no capacity issue on any disk. we are using hadoop 2.7.1.
 ```

 Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
 Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
 org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
 org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
 Caused by: java.io.IOException: Could not open output stream for state 
 backend
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at 
 org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at 
 java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
 org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
 java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at 
 java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
at 
 org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
  

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-08 Thread Congxian Qiu
Hi LU

I'm not familiar with S3 file system, maybe others in Flink community can
help you in this case, or maybe you can also reach out to s3
teams/community for help.

Best,
Congxian


Lu Niu  于2020年4月8日周三 上午11:05写道:

> Hi, Congxiao
>
> Thanks for replying. yeah, I also found those references. However, as I
> mentioned in original post, there is enough capacity in all disk. Also,
> when I switch to presto file system, the problem goes away. Wondering
> whether others encounter similar issue.
>
> Best
> Lu
>
> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
> wrote:
>
>> Hi
>> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
>> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not
>> find any valid local directory for s3ablock-0001-", and I googled the
>> exception, found there is some relative page[1], could you please make sure
>> there is enough space on the local dis.
>>
>> [1]
>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>
>>> Hi, flink users
>>>
>>> Did anyone encounter such error? The error comes from S3AFileSystem. But
>>> there is no capacity issue on any disk. we are using hadoop 2.7.1.
>>> ```
>>>
>>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>>> Could not open output stream for state backend
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at 
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>> at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>> ... 3 more
>>> Caused by: java.io.IOException: Could not open output stream for state 
>>> backend
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>> at 
>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>> at 
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>> at 
>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>> at 
>>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>> at 
>>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>>> at 
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>>> at 
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>>> at 
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>>> at 
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>>> ... 5 more
>>> Caused by: 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>>>  Could not find any valid local directory for s3ablock-0001-
>>> at 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>>> at 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>>> at 
>>> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Lu Niu
Hi, Congxiao

Thanks for replying. yeah, I also found those references. However, as I
mentioned in original post, there is enough capacity in all disk. Also,
when I switch to presto file system, the problem goes away. Wondering
whether others encounter similar issue.

Best
Lu

On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu  wrote:

> Hi
> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not
> find any valid local directory for s3ablock-0001-", and I googled the
> exception, found there is some relative page[1], could you please make sure
> there is enough space on the local dis.
>
> [1]
> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午8:41写道:
>
>> Hi, flink users
>>
>> Did anyone encounter such error? The error comes from S3AFileSystem. But
>> there is no capacity issue on any disk. we are using hadoop 2.7.1.
>> ```
>>
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>> Could not open output stream for state backend
>>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>  at 
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>  at 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>  ... 3 more
>> Caused by: java.io.IOException: Could not open output stream for state 
>> backend
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>  at 
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>  at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>  at 
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>  at 
>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>  at 
>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>>  at 
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>  at 
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>>  ... 5 more
>> Caused by: 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>>  Could not find any valid local directory for s3ablock-0001-
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
>>  at 
>> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Congxian Qiu
Hi
>From the stack, seems the problem is that "org.apache.flink.fs.shaded.
hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not
find any valid local directory for s3ablock-0001-", and I googled the
exception, found there is some relative page[1], could you please make sure
there is enough space on the local dis.

[1]
https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
Best,
Congxian


Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from S3AFileSystem. But
> there is no capacity issue on any disk. we are using hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>   at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>   at 
> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>   at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>   ... 5 more
> Caused by: 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>  Could not find any valid local directory for s3ablock-0001-
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:168)
>   at 
> 

Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Lu Niu
Hi, flink users

Did anyone encounter such error? The error comes from S3AFileSystem. But
there is no capacity issue on any disk. we are using hadoop 2.7.1.
```

Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
Caused by: java.io.IOException: Could not open output stream for state backend
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at 
java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
... 5 more
Caused by: 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
Could not find any valid local directory for s3ablock-0001-
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:168)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
at 
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141)
at 
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
at 

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread Yun Tang
Hi

The root cause is checkpoint error due to fail to send data to kafka during 
'preCommit'. The right solution is avoid to send data to kafka unsuccessfully 
which might be scope of Kafka.

If you cannot ensure the status of kafka with its client and no request for 
exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending 
data during 'preCommit' when creating the kafka producer.

If you don't want job failed due to checkpoint error, you can increase the 
tolerableDeclinedCheckpointNumber:

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);

Best
Yun Tang

From: jose farfan 
Sent: Wednesday, January 15, 2020 23:21
To: ouywl 
Cc: user ; user...@flink.apache.org 

Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The 
streamTask checkpoint error .

Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl mailto:ou...@139.com>> wrote:
Hi all:
When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as 
log-1,code is::

input.addSink(
new FlinkKafkaProducer(
parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("output-topic"),
new KafkaEventDeSchema()));

Log-1:
2020-01-09 09:13:44,476 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.
2020-01-09 09:15:33,069 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task f643244ff791dbd3fbfb88bfafdf1872 of job 
d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ 
producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).
2020-01-09 09:15:33,070 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was 
declined.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 
ms has passed since batch creation
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommit

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread Yun Tang
Hi

The root cause is checkpoint error due to fail to send data to kafka during 
'preCommit'. The right solution is avoid to send data to kafka unsuccessfully 
which might be scope of Kafka.

If you cannot ensure the status of kafka with its client and no request for 
exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending 
data during 'preCommit' when creating the kafka producer.

If you don't want job failed due to checkpoint error, you can increase the 
tolerableDeclinedCheckpointNumber:

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);

Best
Yun Tang

From: jose farfan 
Sent: Wednesday, January 15, 2020 23:21
To: ouywl 
Cc: user ; user-zh@flink.apache.org 

Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The 
streamTask checkpoint error .

Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl mailto:ou...@139.com>> wrote:
Hi all:
When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as 
log-1,code is::

input.addSink(
new FlinkKafkaProducer(
parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("output-topic"),
new KafkaEventDeSchema()));

Log-1:
2020-01-09 09:13:44,476 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.
2020-01-09 09:15:33,069 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task f643244ff791dbd3fbfb88bfafdf1872 of job 
d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ 
producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).
2020-01-09 09:15:33,070 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was 
declined.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 
ms has passed since batch creation
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommit

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread jose farfan
Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl  wrote:

> Hi all:
> When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was
> happen as* log-1,code is::*
>
> input.addSink(
> new FlinkKafkaProducer(
> parameterTool.getRequired("bootstrap.servers"),
> parameterTool.getRequired("output-topic"),
> new KafkaEventDeSchema()));
>
>
> *Log-1:*
> 2020-01-09 09:13:44,476 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job
> d8827b3f4165b6ba27c8b59c7aa1a400.
> 2020-01-09 09:15:33,069 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Decline checkpoint 1 by task
> f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400
> at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j
> (dataPort=33361).
> 2020-01-09 09:15:33,070 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Discarding checkpoint 1 of job
> d8827b3f4165b6ba27c8b59c7aa1a400.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason:
> Checkpoint was declined.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:431)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask
> .java:1282)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:
> 1216)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:872)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:777)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:708)
> at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
> .notifyCheckpoint(CheckpointBarrierHandler.java:88)
> at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner
> .processBarrier(CheckpointBarrierAligner.java:113)
> at org.apache.flink.streaming.runtime.io.CheckpointedInputGate
> .pollNext(CheckpointedInputGate.java:155)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .pollNextNullable(StreamTaskNetworkInput.java:102)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .pollNextNullable(StreamTaskNetworkInput.java:47)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:135)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:
> 120018 ms has passed since batch creation
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .checkErroneous(FlinkKafkaProducer.java:1196)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .flush(FlinkKafkaProducer.java:968)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .preCommit(FlinkKafkaProducer.java:892)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .preCommit(FlinkKafkaProducer.java:98)
> at org.apache.flink.streaming.api.functions.sink.
> TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:
> 311)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .snapshotState(FlinkKafkaProducer.java:973)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .snapshotFunctionState(StreamingFunctionUtils.java:99)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .snapshotState(AbstractUdfStreamOperator.java:90)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:399)
> ... 17 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58
> record(s) for k8s-test-data-0:120018 ms has passed since batch creation
> 2020-01-09 09:15:33,074 INFO org.apache.flink.runtime.executiongraph.
> ExecutionGraph - Job producer data frequece
> (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING.
> org.apache.flink.util.FlinkRuntimeException: Exceeded 

When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-09 Thread ouywl







Hi all:  When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as log-1,code is::input.addSink(new FlinkKafkaProducer(parameterTool.getRequired("bootstrap.servers"),parameterTool.getRequired("output-topic"),new KafkaEventDeSchema()));Log-1:2020-01-09 09:13:44,476 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.2020-01-09 09:15:33,069 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).2020-01-09 09:15:33,070 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creationat org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)... 17 moreCaused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creation2020-01-09 09:15:33,074 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job producer data frequece (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING.org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)at 

Flink DataStream ElasticsearchSink Checkpoint Error

2019-10-22 Thread 王佩
Flink 写入 Elasticsearch,Checkpoint 一直处于IN_PROGRESS 状态,从而导致数据不能写入ES。

如图:

[image: image.png]

帮忙看下!
感谢!


Re: Flink 1.7 job cluster (restore from checkpoint error)

2018-12-06 Thread Hao Sun
Thanks for the tip! I did change the jobGraph this time.

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Thu, Dec 6, 2018 at 2:47 AM Till Rohrmann  wrote:

> Hi Hao,
>
> if Flink tries to recover from a checkpoint, then the JobGraph should not
> be modified and the system should be able to restore the state.
>
> Have you changed the JobGraph and are you now trying to recover from the
> latest checkpoint which is stored in ZooKeeper? If so, then you can also
> start the job cluster with a different cluster id and manually pass the
> path to the latest checkpoint as the savepoint path to resume from. By
> specifying a new cluster id, the system will create a new ZNode in
> ZooKeeper and don't use the checkpoints from the previous run.
>
> If you did not change the JobGraph, then this sounds like a bug. For
> further investigation the debug log files would be helpful.
>
> Cheers,
> Till
>
> On Wed, Dec 5, 2018 at 7:18 PM Hao Sun  wrote:
>
>> Till, Flink is automatically trying to recover from a checkpoint not
>> savepoint. How can I get allowNonRestoredState applied in this case?
>>
>> Hao Sun
>> Team Lead
>> 1019 Market St. 7F
>> San Francisco, CA 94103
>>
>>
>> On Wed, Dec 5, 2018 at 10:09 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Hao,
>>>
>>> I think you need to provide a savepoint file via --fromSavepoint to
>>> resume from in order to specify --allowNonRestoredState. Otherwise this
>>> option will be ignored because it only works if you resume from a savepoint.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Dec 5, 2018 at 12:29 AM Hao Sun  wrote:
>>>
 I am using 1.7 and job cluster on k8s.

 Here is how I start my job
 
 docker-entrypoint.sh job-cluster -j
 com.zendesk.fraud_prevention.examples.ConnectedStreams
 --allowNonRestoredState
 

 *Seems like --allowNonRestoredState is not honored*

 === Logs ===
 java","line":"1041","message":"Restoring job
  from latest valid checkpoint: Checkpoint
 8103 @ 0 for ."}
 {"timestamp":"2018-12-04
 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
 error occurred in the cluster entrypoint."}
 java.lang.RuntimeException:
 org.apache.flink.runtime.client.JobExecutionException: Could not set up
 JobManager
 at
 org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
 at
 java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
 not set up JobManager
 at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
 http://JobManagerRunner.java:176
 )
 at
 org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
 at
 org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
 at
 org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
 ... 7 more
 Caused by: java.lang.IllegalStateException: There is no operator for
 the state 2f4bc854a18755730e14a90e1d4d7c7d
 at
 org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
 at
 org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
 at
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
 at
 org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
 at org.apache.flink.runtime.jobmaster.JobMaster.(
 http://JobMaster.java:296
 )
 at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
 http://JobManagerRunner.java:157
 )
 ==

 Can somebody help out? Thanks

 Hao Sun

>>>


Re: Flink 1.7 job cluster (restore from checkpoint error)

2018-12-06 Thread Till Rohrmann
Hi Hao,

if Flink tries to recover from a checkpoint, then the JobGraph should not
be modified and the system should be able to restore the state.

Have you changed the JobGraph and are you now trying to recover from the
latest checkpoint which is stored in ZooKeeper? If so, then you can also
start the job cluster with a different cluster id and manually pass the
path to the latest checkpoint as the savepoint path to resume from. By
specifying a new cluster id, the system will create a new ZNode in
ZooKeeper and don't use the checkpoints from the previous run.

If you did not change the JobGraph, then this sounds like a bug. For
further investigation the debug log files would be helpful.

Cheers,
Till

On Wed, Dec 5, 2018 at 7:18 PM Hao Sun  wrote:

> Till, Flink is automatically trying to recover from a checkpoint not
> savepoint. How can I get allowNonRestoredState applied in this case?
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103
>
>
> On Wed, Dec 5, 2018 at 10:09 AM Till Rohrmann 
> wrote:
>
>> Hi Hao,
>>
>> I think you need to provide a savepoint file via --fromSavepoint to
>> resume from in order to specify --allowNonRestoredState. Otherwise this
>> option will be ignored because it only works if you resume from a savepoint.
>>
>> Cheers,
>> Till
>>
>> On Wed, Dec 5, 2018 at 12:29 AM Hao Sun  wrote:
>>
>>> I am using 1.7 and job cluster on k8s.
>>>
>>> Here is how I start my job
>>> 
>>> docker-entrypoint.sh job-cluster -j
>>> com.zendesk.fraud_prevention.examples.ConnectedStreams
>>> --allowNonRestoredState
>>> 
>>>
>>> *Seems like --allowNonRestoredState is not honored*
>>>
>>> === Logs ===
>>> java","line":"1041","message":"Restoring job
>>>  from latest valid checkpoint: Checkpoint
>>> 8103 @ 0 for ."}
>>> {"timestamp":"2018-12-04
>>> 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
>>> error occurred in the cluster entrypoint."}
>>> java.lang.RuntimeException:
>>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>>> JobManager
>>> at
>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>> not set up JobManager
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>>> http://JobManagerRunner.java:176)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>>> at
>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>> ... 7 more
>>> Caused by: java.lang.IllegalStateException: There is no operator for the
>>> state 2f4bc854a18755730e14a90e1d4d7c7d
>>> at
>>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
>>> at
>>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.(
>>> http://JobMaster.java:296)
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>>> http://JobManagerRunner.java:157)
>>> ==
>>>
>>> Can somebody help out? Thanks
>>>
>>> Hao Sun
>>>
>>


Re: Flink 1.7 job cluster (restore from checkpoint error)

2018-12-05 Thread Hao Sun
Till, Flink is automatically trying to recover from a checkpoint not
savepoint. How can I get allowNonRestoredState applied in this case?

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Wed, Dec 5, 2018 at 10:09 AM Till Rohrmann  wrote:

> Hi Hao,
>
> I think you need to provide a savepoint file via --fromSavepoint to resume
> from in order to specify --allowNonRestoredState. Otherwise this option
> will be ignored because it only works if you resume from a savepoint.
>
> Cheers,
> Till
>
> On Wed, Dec 5, 2018 at 12:29 AM Hao Sun  wrote:
>
>> I am using 1.7 and job cluster on k8s.
>>
>> Here is how I start my job
>> 
>> docker-entrypoint.sh job-cluster -j
>> com.zendesk.fraud_prevention.examples.ConnectedStreams
>> --allowNonRestoredState
>> 
>>
>> *Seems like --allowNonRestoredState is not honored*
>>
>> === Logs ===
>> java","line":"1041","message":"Restoring job
>>  from latest valid checkpoint: Checkpoint
>> 8103 @ 0 for ."}
>> {"timestamp":"2018-12-04
>> 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
>> error occurred in the cluster entrypoint."}
>> java.lang.RuntimeException:
>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>> JobManager
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>> http://JobManagerRunner.java:176
>> )
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>> ... 7 more
>> Caused by: java.lang.IllegalStateException: There is no operator for the
>> state 2f4bc854a18755730e14a90e1d4d7c7d
>> at
>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
>> at
>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
>> at org.apache.flink.runtime.jobmaster.JobMaster.(
>> http://JobMaster.java:296
>> )
>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.(
>> http://JobManagerRunner.java:157
>> )
>> ==
>>
>> Can somebody help out? Thanks
>>
>> Hao Sun
>>
>


Re: Flink 1.7 job cluster (restore from checkpoint error)

2018-12-05 Thread Till Rohrmann
Hi Hao,

I think you need to provide a savepoint file via --fromSavepoint to resume
from in order to specify --allowNonRestoredState. Otherwise this option
will be ignored because it only works if you resume from a savepoint.

Cheers,
Till

On Wed, Dec 5, 2018 at 12:29 AM Hao Sun  wrote:

> I am using 1.7 and job cluster on k8s.
>
> Here is how I start my job
> 
> docker-entrypoint.sh job-cluster -j
> com.zendesk.fraud_prevention.examples.ConnectedStreams
> --allowNonRestoredState
> 
>
> *Seems like --allowNonRestoredState is not honored*
>
> === Logs ===
> java","line":"1041","message":"Restoring job
>  from latest valid checkpoint: Checkpoint
> 8103 @ 0 for ."}
> {"timestamp":"2018-12-04
> 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
> error occurred in the cluster entrypoint."}
> java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more
> Caused by: java.lang.IllegalStateException: There is no operator for the
> state 2f4bc854a18755730e14a90e1d4d7c7d
> at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
> at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> ==
>
> Can somebody help out? Thanks
>
> Hao Sun
>


Flink 1.7 job cluster (restore from checkpoint error)

2018-12-04 Thread Hao Sun
I am using 1.7 and job cluster on k8s.

Here is how I start my job

docker-entrypoint.sh job-cluster -j
com.zendesk.fraud_prevention.examples.ConnectedStreams
--allowNonRestoredState


*Seems like --allowNonRestoredState is not honored*

=== Logs ===
java","line":"1041","message":"Restoring job
 from latest valid checkpoint: Checkpoint
8103 @ 0 for ."}
{"timestamp":"2018-12-04
23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
error occurred in the cluster entrypoint."}
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.IllegalStateException: There is no operator for the
state 2f4bc854a18755730e14a90e1d4d7c7d
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
==

Can somebody help out? Thanks

Hao Sun


Re: Checkpoint Error in flink with Rockdb state backend

2016-05-29 Thread Aljoscha Krettek
Ah yes, if you used a local filesystem for backups this certainly was the
source of the problem.

On Sun, 29 May 2016 at 17:57 arpit srivastava  wrote:

> I think the problem was that i was using local filesystem in a cluster.
> Now I have switched to hdfs.
>
> Thanks,
> Arpit
>
> On Sun, May 29, 2016 at 12:57 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> could you please provide the code of your user function that has the
>> Checkpointed interface and is keeping state? This might give people a
>> chance of understanding what is going on.
>>
>> Cheers,
>> Aljoscha
>>
>> On Sat, 28 May 2016 at 20:55 arpit srivastava 
>> wrote:
>>
>>> Hi,
>>>
>>> I am using Flink on yarn cluster. My job was running for 2-3 days. After
>>> that it failed with two errors
>>>
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Error at remote task manager 'ip-xx.xx.xx.xxx'.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>> at
>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by:
>>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>> at 

Re: Checkpoint Error in flink with Rockdb state backend

2016-05-29 Thread arpit srivastava
I think the problem was that i was using local filesystem in a cluster. Now
I have switched to hdfs.

Thanks,
Arpit

On Sun, May 29, 2016 at 12:57 PM, Aljoscha Krettek 
wrote:

> Hi,
> could you please provide the code of your user function that has the
> Checkpointed interface and is keeping state? This might give people a
> chance of understanding what is going on.
>
> Cheers,
> Aljoscha
>
> On Sat, 28 May 2016 at 20:55 arpit srivastava  wrote:
>
>> Hi,
>>
>> I am using Flink on yarn cluster. My job was running for 2-3 days. After
>> that it failed with two errors
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Error at remote task manager 'ip-xx.xx.xx.xxx'.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by:
>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>> at
>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>
>> multiple instances of "checkpoint not found exception"
>>
>> java.lang.Exception: Could not restore checkpointed state to operators
>> and functions
>> at
>> 

Re: Checkpoint Error in flink with Rockdb state backend

2016-05-29 Thread Aljoscha Krettek
Hi,
could you please provide the code of your user function that has the
Checkpointed interface and is keeping state? This might give people a
chance of understanding what is going on.

Cheers,
Aljoscha

On Sat, 28 May 2016 at 20:55 arpit srivastava  wrote:

> Hi,
>
> I am using Flink on yarn cluster. My job was running for 2-3 days. After
> that it failed with two errors
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Error at remote task manager 'ip-xx.xx.xx.xxx'.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.flink.runtime.io.network.partition.ProducerFailedException
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
> at
> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>
> multiple instances of "checkpoint not found exception"
>
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException:
> 

Checkpoint Error in flink with Rockdb state backend

2016-05-28 Thread arpit srivastava
Hi,

I am using Flink on yarn cluster. My job was running for 2-3 days. After
that it failed with two errors

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Error at remote task manager 'ip-xx.xx.xx.xxx'.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Caused by:
org.apache.flink.runtime.io.network.partition.ProducerFailedException
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
at
io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

multiple instances of "checkpoint not found exception"

java.lang.Exception: Could not restore checkpointed state to operators and
functions
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException:
/mnt/tmp/ce/flink/jit/checkpoint/1363f00ad9c261070babe30b822e8e61/chk-2862/04bb3654-645c-4a0f-93c8-9582140613d2
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:52)
at