Hi, The problem reported in FLINK-7590 only happened one time on our end. And, as you can see from its comments, we suspected it's caused by AWS-SDK or Hadoop's s3a implementation, which we have no control over.
Flink 1.4.0 has its own S3 implementations. I haven't tried it yet. On Thu, Dec 14, 2017 at 2:05 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Bowen Li (in CC) closed the issue but there is no fix (or at least it is > not linked in the JIRA). > Maybe it was resolved in another issue or can be differently resolved. > > @Bowen, can you comment on how to fix this problem? Will it work in Flink > 1.4.0? > > Thank you, > Fabian > > 2017-12-13 5:28 GMT+01:00 Hao Sun <ha...@zendesk.com>: > >> https://issues.apache.org/jira/browse/FLINK-7590 >> >> I have a similar situation with Flink 1.3.2 on K8S >> >> ========= >> 2017-12-13 00:57:12,403 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph >> - Source: KafkaSource(maxwell.tickets) -> >> MaxwellFilter->Maxwell(maxwell.tickets) >> -> FixedDelayWatermark(maxwell.tickets) -> >> MaxwellFPSEvent->InfluxDBData(maxwell.tickets) >> -> Sink: influxdbSink(maxwell.tickets) (1/3) >> (6ad009755a6009975d197e75afa05e14) >> switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: >> Could not materialize checkpoint 803 for operator Source: >> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) >> -> FixedDelayWatermark(maxwell.tickets) -> >> MaxwellFPSEvent->InfluxDBData(maxwell.tickets) >> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:970) at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: >> Could not materialize checkpoint 803 for operator Source: >> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) >> -> FixedDelayWatermark(maxwell.tickets) -> >> MaxwellFPSEvent->InfluxDBData(maxwell.tickets) >> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by: >> java.util.concurrent.ExecutionException: java.io.IOException: Could not >> flush and close the file system output stream to >> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d >> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc >> in order to obtain the stream state handle at >> java.util.concurrent.FutureTask.report(FutureTask.java:122) >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at >> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed: >> java.lang.Exception: Could not properly cancel managed operator state >> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes >> ult.cancel(OperatorSnapshotResult.java:98) at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.cleanup(StreamTask.java:1023) at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by: >> java.util.concurrent.ExecutionException: java.io.IOException: Could not >> flush and close the file system output stream to >> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d >> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc >> in order to obtain the stream state handle at >> java.util.concurrent.FutureTask.report(FutureTask.java:122) >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at >> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) >> at >> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) >> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes >> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by: >> java.io.IOException: Could not flush and close the file system output >> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d >> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc >> in order to obtain the stream state handle at org.apache.flink.runtime.state >> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu >> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at >> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC >> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) >> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$ >> 1.performOperation(DefaultOperatorStateBackend.java:270) at >> org.apache.flink.runtime.state.DefaultOperatorStateBackend$ >> 1.performOperation(DefaultOperatorStateBackend.java:233) at >> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca >> ll(AbstractAsyncIOCallable.java:72) at >> java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by: >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS >> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code: >> RequestTimeout, AWS Error Message: Your socket connection to the server was >> not read from or written to within the timeout period. Idle connections >> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2 >> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at >> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) >> at >> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) >> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) >> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) >> at >> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393) >> at com.amazonaws.services.s3.transfer.internal.UploadCallable.u >> ploadInOneChunk(UploadCallable.java:108) at >> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) >> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and >> close the file system output stream to s3a://zendesk-euc1-fraud-preve >> ntion-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0a >> f/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the >> stream state handle] 2017-12-13 00:57:12,404 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job KafkaDemo >> maxwell.tickets (env:production) (d5a8b2ab61625cf0aa1e66360b7ad0af) >> switched from state RUNNING to FAILING. >> AsynchronousException{java.lang.Exception: >> Could not materialize checkpoint 803 for operator Source: >> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) >> -> FixedDelayWatermark(maxwell.tickets) -> >> MaxwellFPSEvent->InfluxDBData(maxwell.tickets) >> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:970) at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: >> Could not materialize checkpoint 803 for operator Source: >> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) >> -> FixedDelayWatermark(maxwell.tickets) -> >> MaxwellFPSEvent->InfluxDBData(maxwell.tickets) >> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by: >> java.util.concurrent.ExecutionException: java.io.IOException: Could not >> flush and close the file system output stream to >> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d >> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc >> in order to obtain the stream state handle at >> java.util.concurrent.FutureTask.report(FutureTask.java:122) >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at >> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed: >> java.lang.Exception: Could not properly cancel managed operator state >> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes >> ult.cancel(OperatorSnapshotResult.java:98) at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.cleanup(StreamTask.java:1023) at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by: >> java.util.concurrent.ExecutionException: java.io.IOException: Could not >> flush and close the file system output stream to >> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d >> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc >> in order to obtain the stream state handle at >> java.util.concurrent.FutureTask.report(FutureTask.java:122) >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at >> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) >> at >> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) >> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes >> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by: >> java.io.IOException: Could not flush and close the file system output >> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d >> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc >> in order to obtain the stream state handle at org.apache.flink.runtime.state >> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu >> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at >> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC >> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) >> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$ >> 1.performOperation(DefaultOperatorStateBackend.java:270) at >> org.apache.flink.runtime.state.DefaultOperatorStateBackend$ >> 1.performOperation(DefaultOperatorStateBackend.java:233) at >> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca >> ll(AbstractAsyncIOCallable.java:72) at >> java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) >> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe >> ckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by: >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS >> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code: >> RequestTimeout, AWS Error Message: Your socket connection to the server was >> not read from or written to within the timeout period. Idle connections >> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2 >> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at >> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) >> at >> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) >> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) >> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) >> at >> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393) >> at com.amazonaws.services.s3.transfer.internal.UploadCallable.u >> ploadInOneChunk(UploadCallable.java:108) at >> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) >> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and >> close the file system output stream to s3a://zendesk-euc1-fraud-preve >> ntion-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0a >> f/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the >> stream state handle] >> >> >