[
https://issues.apache.org/jira/browse/FLINK-39136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18060297#comment-18060297
]
Martijn Visser commented on FLINK-39136:
----------------------------------------
[~jonchase] Feel free to open a PR to update this for master first, and then we
can cherry pick this back to 1.20 too.
> GS file system fails to retry on certain 503's due to old GCS lib dependency
> ----------------------------------------------------------------------------
>
> Key: FLINK-39136
> URL: https://issues.apache.org/jira/browse/FLINK-39136
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.18.0, 1.19.0, 1.20.0
> Reporter: Jon Chase
> Priority: Major
> Fix For: 1.20.4
>
>
> We're running Flink 1.20.3 with the GCS file sink and we frequently get the
> following fatal error when GCS returns certain 503's:
>
> {code:java}
> java.io.IOException: Could not perform checkpoint 3568 for operator XXX ->
> XXX -> ... -> FileSink: Writer -> FileSink: Committer (1142/1536)#4.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: com.google.cloud.storage.StorageException: Unknown Error
> |> PUT
> https://storage.googleapis.com/upload/storage/v1/b/xxx-xxx-xxxx/o?name=.inprogress/xx-xx-xx/x-x-x/x/xxxxxxx-13.gz/xxxx&uploadType=resumable&upload_id=XXXXXXXXX
> |> content-range: bytes */*
> |> x-goog-gcs-idempotency-token: 421d97a0-dfef-4c8b-bd62-ccf6cd9e3c12
> |
> |< HTTP/1.1 503 Service Unavailable
> |< content-length: 0
> |< content-type: text/plain; charset=utf-8
> |< x-guploader-uploadid:
> AJRbA5WFzXVVcI8389tnO3SPL6WT67GZxMPLyD8hP-TwoHROmMMIZjW1vC85ULIebPXWwTFWEzE6bBc7GeBruYeIj-4LPfgqvW5ohCw5JtixAMw
> |
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:185)
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:117)
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:106)
> at
> com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:128)
> at
> com.google.cloud.storage.JsonResumableSession.query(JsonResumableSession.java:57)
> at
> com.google.cloud.storage.JsonResumableSession.lambda$put$0(JsonResumableSession.java:73)
> at com.google.cloud.storage.Retrying.lambda$run$0(Retrying.java:102)
> at
> com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
> at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
> at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
> at com.google.cloud.storage.Retrying.run(Retrying.java:99)
> at
> com.google.cloud.storage.JsonResumableSession.put(JsonResumableSession.java:68)
> at
> com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.internalWrite(ApiaryUnbufferedWritableByteChannel.java:114)
> at
> com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.writeAndClose(ApiaryUnbufferedWritableByteChannel.java:65)
> at
> com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel.writeAndClose(UnbufferedWritableByteChannelSession.java:40)
> at
> com.google.cloud.storage.DefaultBufferedWritableByteChannel.close(DefaultBufferedWritableByteChannel.java:167)
> at
> com.google.cloud.storage.StorageByteChannels$SynchronizedBufferedWritableByteChannel.close(StorageByteChannels.java:119)
> at
> com.google.cloud.storage.StorageException.wrapIOException(StorageException.java:184)
> at
> com.google.cloud.storage.BaseStorageWriteChannel.close(BaseStorageWriteChannel.java:84)
> at
> org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223)
> at
> org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99)
> at
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240)
> at
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.flush(GSRecoverableFsDataOutputStream.java:177)
> at java.base/java.util.zip.DeflaterOutputStream.flush(Unknown Source)
> at
> org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.flush(GzipCodec.java:107)
> at
> org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter.flush(HadoopCompressionBulkWriter.java:53)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:280)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:201)
> at
> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:221)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
> ... 22 more
> Caused by: java.lang.NullPointerException
> at
> com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:117)
> ... 56 more{code}
>
>
> This NPE is thrown b/c of this bug in the GCS connector:
> [https://github.com/googleapis/java-storage/pull/2987]
>
> Upgrading to a newer release of the GCS library that contains the above fix
> resolves the issue in the Flink sink and allows the sink to correctly retry
> these 503's instead of hitting an NPE.
> I'd like to apply this in the 1.20.4 release, since we are running a 1.20.x
> version of Flink and don't plan to upgrade to 2.x anytime soon. That said,
> I'm also happy to apply the fix to the 2.x branch, if someone can tell me the
> appropriate branch to do it on.
>
> I've manually built the GCS file system connector locally with an updated
> version of the GCS library and validated that it addresses this issue.
>
> {code:java}
> Old:
> <fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
> <fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
>
> New:
> <fs.gs.sdk.version>2.62.0</fs.gs.sdk.version>
> <fs.gs.grpc.version>1.76.2</fs.gs.grpc.version>{code}
>
>
> I'm happy to open PR(s) for this once there's alignment on the ticket itself!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)