[ 
https://issues.apache.org/jira/browse/FLINK-39136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18060297#comment-18060297
 ] 

Martijn Visser commented on FLINK-39136:
----------------------------------------

[~jonchase] Feel free to open a PR to update this for master first, and then we 
can cherry pick this back to 1.20 too.

> GS file system fails to retry on certain 503's due to old GCS lib dependency
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-39136
>                 URL: https://issues.apache.org/jira/browse/FLINK-39136
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.18.0, 1.19.0, 1.20.0
>            Reporter: Jon Chase
>            Priority: Major
>             Fix For: 1.20.4
>
>
> We're running Flink 1.20.3 with the GCS file sink and we frequently get the 
> following fatal error when GCS returns certain 503's:
>  
> {code:java}
> java.io.IOException: Could not perform checkpoint 3568 for operator XXX -> 
> XXX -> ... -> FileSink: Writer -> FileSink: Committer (1142/1536)#4.
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>    at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
>    at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
>    at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
>    at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>    at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>    at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: com.google.cloud.storage.StorageException: Unknown Error
>    |> PUT 
> https://storage.googleapis.com/upload/storage/v1/b/xxx-xxx-xxxx/o?name=.inprogress/xx-xx-xx/x-x-x/x/xxxxxxx-13.gz/xxxx&uploadType=resumable&upload_id=XXXXXXXXX
>    |> content-range: bytes */*
>    |> x-goog-gcs-idempotency-token: 421d97a0-dfef-4c8b-bd62-ccf6cd9e3c12
>    | 
>    |< HTTP/1.1 503 Service Unavailable
>    |< content-length: 0
>    |< content-type: text/plain; charset=utf-8
>    |< x-guploader-uploadid: 
> AJRbA5WFzXVVcI8389tnO3SPL6WT67GZxMPLyD8hP-TwoHROmMMIZjW1vC85ULIebPXWwTFWEzE6bBc7GeBruYeIj-4LPfgqvW5ohCw5JtixAMw
>    | 
>    at 
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:185)
>    at 
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:117)
>    at 
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:106)
>    at 
> com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:128)
>    at 
> com.google.cloud.storage.JsonResumableSession.query(JsonResumableSession.java:57)
>    at 
> com.google.cloud.storage.JsonResumableSession.lambda$put$0(JsonResumableSession.java:73)
>    at com.google.cloud.storage.Retrying.lambda$run$0(Retrying.java:102)
>    at 
> com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
>    at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
>    at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
>    at com.google.cloud.storage.Retrying.run(Retrying.java:99)
>    at 
> com.google.cloud.storage.JsonResumableSession.put(JsonResumableSession.java:68)
>    at 
> com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.internalWrite(ApiaryUnbufferedWritableByteChannel.java:114)
>    at 
> com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.writeAndClose(ApiaryUnbufferedWritableByteChannel.java:65)
>    at 
> com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel.writeAndClose(UnbufferedWritableByteChannelSession.java:40)
>    at 
> com.google.cloud.storage.DefaultBufferedWritableByteChannel.close(DefaultBufferedWritableByteChannel.java:167)
>    at 
> com.google.cloud.storage.StorageByteChannels$SynchronizedBufferedWritableByteChannel.close(StorageByteChannels.java:119)
>    at 
> com.google.cloud.storage.StorageException.wrapIOException(StorageException.java:184)
>    at 
> com.google.cloud.storage.BaseStorageWriteChannel.close(BaseStorageWriteChannel.java:84)
>    at 
> org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223)
>    at 
> org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99)
>    at 
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240)
>    at 
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.flush(GSRecoverableFsDataOutputStream.java:177)
>    at java.base/java.util.zip.DeflaterOutputStream.flush(Unknown Source)
>    at 
> org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.flush(GzipCodec.java:107)
>    at 
> org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter.flush(HadoopCompressionBulkWriter.java:53)
>    at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
>    at 
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:280)
>    at 
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:201)
>    at 
> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:221)
>    at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
>    at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
>    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
>    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
>    ... 22 more
> Caused by: java.lang.NullPointerException
>    at 
> com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:117)
>    ... 56 more{code}
>  
>  
> This NPE is thrown b/c of this bug in the GCS connector: 
> [https://github.com/googleapis/java-storage/pull/2987]
>  
> Upgrading to a newer release of the GCS library that contains the above fix 
> resolves the issue in the Flink sink and allows the sink to correctly retry 
> these 503's instead of hitting an NPE.
> I'd like to apply this in the 1.20.4 release, since we are running a 1.20.x 
> version of Flink and don't plan to upgrade to 2.x anytime soon.  That said, 
> I'm also happy to apply the fix to the 2.x branch, if someone can tell me the 
> appropriate branch to do it on.
>  
> I've manually built the GCS file system connector locally with an updated 
> version of the GCS library and validated that it addresses this issue.
>  
> {code:java}
> Old:
> <fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
> <fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
>  
> New:
> <fs.gs.sdk.version>2.62.0</fs.gs.sdk.version>
> <fs.gs.grpc.version>1.76.2</fs.gs.grpc.version>{code}
>  
>  
> I'm happy to open PR(s) for this once there's alignment on the ticket itself!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to