This is an automated email from the ASF dual-hosted git repository.

MartijnVisser pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 401f343aa9f [FLINK-39136][filesystems] Bump google-cloud-storage to 
2.68.0 in flink-gs-fs-hadoop (#28286)
401f343aa9f is described below

commit 401f343aa9ff88ec842d971588aa05388473610c
Author: Martijn Visser <[email protected]>
AuthorDate: Wed Jun 3 08:59:01 2026 +0200

    [FLINK-39136][filesystems] Bump google-cloud-storage to 2.68.0 in 
flink-gs-fs-hadoop (#28286)
    
    * [FLINK-39136][filesystems] Bump google-cloud-storage to 2.68.0 in 
flink-gs-fs-hadoop
    
    The GCS file system bundled google-cloud-storage 2.29.1, which throws a
    NullPointerException instead of retrying certain GCS 503 Service Unavailable
    errors during resumable uploads, breaking checkpointing for jobs writing to
    gs:// via a RecoverableWriter. The upstream fix is in 
googleapis/java-storage#2987.
    
    Bump google-cloud-storage 2.29.1 -> 2.68.0 and the matching grpc artifacts
    1.59.1 -> 1.81.0, regenerate the bundled-dependency NOTICE accordingly, add 
the
    bundled license file for the newly bundled stax2-api, and update the version
    links in the GCS filesystem documentation.
    
    Add integration tests that run against a real GCS bucket, mirroring the 
existing
    S3 filesystem integration tests. They are skipped unless a bucket is 
configured
    via the IT_CASE_GCS_BUCKET environment variable; authentication uses 
Application
    Default Credentials (GOOGLE_APPLICATION_CREDENTIALS).
    
    Generated-by: Claude Code (Opus 4.8)
---
 docs/content.zh/docs/deployment/filesystems/gcs.md |   2 +-
 docs/content/docs/deployment/filesystems/gcs.md    |   2 +-
 .../cce59259-d3cb-4048-a1bd-31197f847189           |  12 ++
 flink-filesystems/flink-gs-fs-hadoop/pom.xml       |  18 ++-
 .../src/main/resources/META-INF/NOTICE             | 129 ++++++++++++--------
 .../resources/META-INF/licenses/LICENSE.stax2-api  |  22 ++++
 .../flink/fs/gs/GSFileSystemBehaviorITCase.java    |  69 +++++++++++
 .../flink/fs/gs/GSRecoverableWriterITCase.java     | 132 +++++++++++++++++++++
 .../org/apache/flink/fs/gs/GSTestCredentials.java  |  53 +++++++++
 9 files changed, 383 insertions(+), 56 deletions(-)

diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md 
b/docs/content.zh/docs/deployment/filesystems/gcs.md
index 9a3e964d5f7..f97d2cae218 100644
--- a/docs/content.zh/docs/deployment/filesystems/gcs.md
+++ b/docs/content.zh/docs/deployment/filesystems/gcs.md
@@ -58,7 +58,7 @@ Note that these examples are *not* exhaustive and you can use 
GCS in other place
 Flink provides the `flink-gs-fs-hadoop` file system to write to GCS.
 This implementation is self-contained with no dependency footprint, so there 
is no need to add Hadoop to the classpath to use it.
 
-`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the 
*gs://* scheme. It uses Google's 
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18)
 Hadoop library to access GCS. It also uses Google's 
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.29.1)
 library to provide `RecoverableWriter` support.
+`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the 
*gs://* scheme. It uses Google's 
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18)
 Hadoop library to access GCS. It also uses Google's 
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.68.0)
 library to provide `RecoverableWriter` support.
 
 This file system can be used with the [FileSystem connector]({{< ref 
"docs/connectors/datastream/filesystem.md" >}}).
 
diff --git a/docs/content/docs/deployment/filesystems/gcs.md 
b/docs/content/docs/deployment/filesystems/gcs.md
index 879b19febee..a06204a4508 100644
--- a/docs/content/docs/deployment/filesystems/gcs.md
+++ b/docs/content/docs/deployment/filesystems/gcs.md
@@ -58,7 +58,7 @@ Note that these examples are *not* exhaustive and you can use 
GCS in other place
 Flink provides the `flink-gs-fs-hadoop` file system to write to GCS.
 This implementation is self-contained with no dependency footprint, so there 
is no need to add Hadoop to the classpath to use it.
 
-`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the 
*gs://* scheme. It uses Google's 
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18)
 Hadoop library to access GCS. It also uses Google's 
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.29.1)
 library to provide `RecoverableWriter` support.
+`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the 
*gs://* scheme. It uses Google's 
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18)
 Hadoop library to access GCS. It also uses Google's 
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.68.0)
 library to provide `RecoverableWriter` support.
 
 This file system can be used with the [FileSystem connector]({{< ref 
"docs/connectors/datastream/filesystem.md" >}}).
 
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/archunit-violations/cce59259-d3cb-4048-a1bd-31197f847189
 
b/flink-filesystems/flink-gs-fs-hadoop/archunit-violations/cce59259-d3cb-4048-a1bd-31197f847189
index e69de29bb2d..f3c98676934 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/archunit-violations/cce59259-d3cb-4048-a1bd-31197f847189
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/archunit-violations/cce59259-d3cb-4048-a1bd-31197f847189
@@ -0,0 +1,12 @@
+org.apache.flink.fs.gs.GSFileSystemBehaviorITCase does not satisfy: only one 
of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension or are , and of type MiniClusterTestEnvironment and 
annotated with @TestEnv\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
+org.apache.flink.fs.gs.GSRecoverableWriterITCase does not satisfy: only one of 
the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension or are , and of type MiniClusterTestEnvironment and 
annotated with @TestEnv\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
diff --git a/flink-filesystems/flink-gs-fs-hadoop/pom.xml 
b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
index 00abf2e322a..75bab6a8543 100644
--- a/flink-filesystems/flink-gs-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
@@ -34,11 +34,11 @@ under the License.
        <properties>
                <!-- If updating these dependency versions, please also update 
the corresponding links -->
                <!-- in the GCS file system documentation. -->
-               <fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
+               <fs.gs.sdk.version>2.68.0</fs.gs.sdk.version>
                
<fs.gs.connector.version>hadoop3-2.2.18</fs.gs.connector.version>
                <fs.gs.cloud.nio.version>0.128.7</fs.gs.cloud.nio.version>
                <!-- Set this to the highest version of grpc artifacts from 
gcs-connector and google-cloud-storage -->
-               <fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
+               <fs.gs.grpc.version>1.81.0</fs.gs.grpc.version>
        </properties>
 
        <dependencies>
@@ -50,6 +50,15 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <!-- for the FileSystemBehaviorTestSuite used by 
GSFileSystemBehaviorITCase -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
                <!-- ArchUnit test dependencies -->
 
                <dependency>
@@ -103,6 +112,11 @@ under the License.
                                        <groupId>com.google.errorprone</groupId>
                                        
<artifactId>error_prone_annotations</artifactId>
                                </exclusion>
+                               <!-- static-analysis nullness annotations, not 
needed at runtime -->
+                               <exclusion>
+                                       <groupId>org.jspecify</groupId>
+                                       <artifactId>jspecify</artifactId>
+                               </exclusion>
                                <!-- exclude dependency because of its GPLv2 
license, see https://github.com/apache/flink/pull/15599#issuecomment-850241316 
-->
                                <exclusion>
                                        <groupId>javax.annotation</groupId>
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
index 2edfb07f127..64e8b118548 100644
--- a/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
+++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
@@ -6,58 +6,67 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
+- com.fasterxml.jackson.core:jackson-annotations:2.15.3
 - com.fasterxml.jackson.core:jackson-core:2.15.3
+- com.fasterxml.jackson.core:jackson-databind:2.15.3
+- com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.15.3
+- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.3
+- com.fasterxml.woodstox:woodstox-core:7.0.0
 - com.google.android:annotations:4.1.1.4
 - com.google.api-client:google-api-client-jackson2:2.0.1
-- com.google.api-client:google-api-client:2.2.0
-- com.google.api.grpc:gapic-google-cloud-storage-v2:2.29.1-alpha
-- com.google.api.grpc:grpc-google-cloud-storage-v2:2.29.1-alpha
-- com.google.api.grpc:proto-google-cloud-monitoring-v3:1.64.0
-- com.google.api.grpc:proto-google-cloud-storage-v2:2.29.1-alpha
-- com.google.api.grpc:proto-google-common-protos:2.28.0
-- com.google.api.grpc:proto-google-iam-v1:1.23.0
+- com.google.api-client:google-api-client:2.7.2
+- com.google.api.grpc:gapic-google-cloud-storage-v2:2.68.0
+- com.google.api.grpc:grpc-google-cloud-storage-v2:2.68.0
+- com.google.api.grpc:proto-google-cloud-monitoring-v3:3.52.0
+- com.google.api.grpc:proto-google-cloud-storage-v2:2.68.0
+- com.google.api.grpc:proto-google-common-protos:2.71.0
+- com.google.api.grpc:proto-google-iam-v1:1.66.0
 - com.google.apis:google-api-services-iamcredentials:v1-rev20211203-2.0.0
-- com.google.apis:google-api-services-storage:v1-rev20231028-2.0.0
-- com.google.auto.value:auto-value-annotations:1.10.4
+- com.google.apis:google-api-services-storage:v1-rev20260204-2.0.0
+- com.google.auto.value:auto-value-annotations:1.11.0
 - com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.18
 - com.google.cloud.bigdataoss:gcsio:2.2.18
 - com.google.cloud.bigdataoss:util-hadoop:hadoop3-2.2.18
 - com.google.cloud.bigdataoss:util:2.2.18
-- com.google.cloud:google-cloud-core-grpc:2.27.0
-- com.google.cloud:google-cloud-core-http:2.27.0
-- com.google.cloud:google-cloud-core:2.27.0
-- com.google.cloud:google-cloud-monitoring:1.82.0
-- com.google.cloud:google-cloud-storage:2.29.1
-- com.google.code.gson:gson:2.10.1
+- com.google.cloud.opentelemetry:detector-resources-support:0.33.0
+- com.google.cloud.opentelemetry:exporter-metrics:0.33.0
+- com.google.cloud.opentelemetry:shared-resourcemapping:0.33.0
+- com.google.cloud:google-cloud-core-grpc:2.70.0
+- com.google.cloud:google-cloud-core-http:2.70.0
+- com.google.cloud:google-cloud-core:2.70.0
+- com.google.cloud:google-cloud-monitoring:3.52.0
+- com.google.cloud:google-cloud-storage:2.68.0
+- com.google.code.gson:gson:2.13.2
 - com.google.flogger:flogger-system-backend:0.7.1
 - com.google.flogger:flogger:0.7.1
 - com.google.flogger:google-extensions:0.7.1
 - com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
-- com.google.http-client:google-http-client-apache-v2:1.43.3
-- com.google.http-client:google-http-client-appengine:1.43.3
-- com.google.http-client:google-http-client-gson:1.43.3
-- com.google.http-client:google-http-client-jackson2:1.43.3
-- com.google.http-client:google-http-client:1.43.3
-- com.google.oauth-client:google-oauth-client:1.34.1
+- com.google.http-client:google-http-client-apache-v2:2.1.0
+- com.google.http-client:google-http-client-appengine:2.1.0
+- com.google.http-client:google-http-client-gson:2.1.0
+- com.google.http-client:google-http-client-jackson2:2.1.0
+- com.google.http-client:google-http-client:2.1.0
+- com.google.oauth-client:google-oauth-client:1.39.0
 - com.lmax:disruptor:3.4.2
 - commons-codec:commons-codec:1.15
-- io.grpc:grpc-alts:1.59.1
-- io.grpc:grpc-api:1.59.1
-- io.grpc:grpc-auth:1.59.1
-- io.grpc:grpc-census:1.59.1
-- io.grpc:grpc-context:1.59.1
-- io.grpc:grpc-core:1.59.1
-- io.grpc:grpc-googleapis:1.59.1
-- io.grpc:grpc-grpclb:1.59.1
-- io.grpc:grpc-inprocess:1.59.1
-- io.grpc:grpc-netty-shaded:1.59.1
-- io.grpc:grpc-protobuf-lite:1.59.1
-- io.grpc:grpc-protobuf:1.59.1
-- io.grpc:grpc-rls:1.59.1
-- io.grpc:grpc-services:1.59.1
-- io.grpc:grpc-stub:1.59.1
-- io.grpc:grpc-util:1.59.1
-- io.grpc:grpc-xds:1.59.1
+- io.grpc:grpc-alts:1.81.0
+- io.grpc:grpc-api:1.81.0
+- io.grpc:grpc-auth:1.81.0
+- io.grpc:grpc-census:1.81.0
+- io.grpc:grpc-context:1.81.0
+- io.grpc:grpc-core:1.81.0
+- io.grpc:grpc-googleapis:1.81.0
+- io.grpc:grpc-grpclb:1.81.0
+- io.grpc:grpc-inprocess:1.81.0
+- io.grpc:grpc-netty-shaded:1.81.0
+- io.grpc:grpc-opentelemetry:1.81.0
+- io.grpc:grpc-protobuf-lite:1.81.0
+- io.grpc:grpc-protobuf:1.81.0
+- io.grpc:grpc-rls:1.81.0
+- io.grpc:grpc-services:1.81.0
+- io.grpc:grpc-stub:1.81.0
+- io.grpc:grpc-util:1.81.0
+- io.grpc:grpc-xds:1.81.0
 - io.opencensus:opencensus-api:0.31.1
 - io.opencensus:opencensus-contrib-exemplar-util:0.31.0
 - io.opencensus:opencensus-contrib-grpc-metrics:0.31.0
@@ -65,10 +74,20 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - io.opencensus:opencensus-contrib-resource-util:0.31.0
 - io.opencensus:opencensus-exporter-metrics-util:0.31.0
 - io.opencensus:opencensus-exporter-stats-stackdriver:0.31.0
-- io.opencensus:opencensus-impl:0.31.0
 - io.opencensus:opencensus-impl-core:0.31.0
-- io.opencensus:opencensus-proto:0.2.0
-- io.perfmark:perfmark-api:0.26.0
+- io.opencensus:opencensus-impl:0.31.0
+- io.opentelemetry.contrib:opentelemetry-gcp-resources:1.37.0-alpha
+- io.opentelemetry.semconv:opentelemetry-semconv:1.29.0-alpha
+- io.opentelemetry:opentelemetry-api:1.57.0
+- io.opentelemetry:opentelemetry-common:1.57.0
+- io.opentelemetry:opentelemetry-context:1.57.0
+- io.opentelemetry:opentelemetry-sdk-common:1.57.0
+- io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:1.57.0
+- io.opentelemetry:opentelemetry-sdk-logs:1.57.0
+- io.opentelemetry:opentelemetry-sdk-metrics:1.57.0
+- io.opentelemetry:opentelemetry-sdk-trace:1.57.0
+- io.opentelemetry:opentelemetry-sdk:1.57.0
+- io.perfmark:perfmark-api:0.27.0
 - org.apache.httpcomponents:httpclient:4.5.13
 - org.apache.httpcomponents:httpcore:4.4.14
 - org.conscrypt:conscrypt-openjdk-uber:2.5.2
@@ -76,21 +95,27 @@ This project bundles the following dependencies under the 
Apache Software Licens
 This project bundles the following dependencies under BSD-3 License 
(https://opensource.org/licenses/BSD-3-Clause).
 See bundled license files for details.
 
-- com.google.api:api-common:2.20.0
-- com.google.api:gax-grpc:2.37.0
-- com.google.api:gax-httpjson:2.37.0
-- com.google.api:gax:2.37.0
-- com.google.auth:google-auth-library-credentials:1.20.0
-- com.google.auth:google-auth-library-oauth2-http:1.20.0
-- com.google.protobuf:protobuf-java-util:3.24.4
-- com.google.protobuf:protobuf-java:3.24.4
-- org.threeten:threetenbp:1.6.8
+- com.google.api:api-common:2.63.0
+- com.google.api:gax-grpc:2.80.0
+- com.google.api:gax-httpjson:2.80.0
+- com.google.api:gax:2.80.0
+- com.google.auth:google-auth-library-credentials:1.47.0
+- com.google.auth:google-auth-library-oauth2-http:1.47.0
+- com.google.protobuf:protobuf-java-util:4.33.2
+- com.google.protobuf:protobuf-java:4.33.2
+- org.threeten:threetenbp:1.7.0
+
+This project bundles the following dependencies under BSD License 
(https://opensource.org/licenses/bsd-license.php).
+See bundled license files for details.
+
+- org.codehaus.woodstox:stax2-api:4.2.2 
(https://github.com/FasterXML/stax2-api/tree/stax2-api-4.2.2)
 
 This project bundles the following dependencies under the Go License 
(https://golang.org/LICENSE).
 See bundled license files for details.
-- com.google.re2j:re2j:1.7
+
+- com.google.re2j:re2j:1.8
 
 This project bundles the following dependencies under the MIT License.
 See bundled license files for details.
 
-- org.codehaus.mojo:animal-sniffer-annotations:1.23
+- org.codehaus.mojo:animal-sniffer-annotations:1.27
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/licenses/LICENSE.stax2-api
 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/licenses/LICENSE.stax2-api
new file mode 100644
index 00000000000..0ed63616996
--- /dev/null
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/licenses/LICENSE.stax2-api
@@ -0,0 +1,22 @@
+Copyright woodstox stax2api contributors.
+
+Redistribution and use in source and binary forms, with or without 
modification,
+are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice,
+   this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+   this list of conditions and the following disclaimer in the documentation
+   and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
DISCLAIMED.
+IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY 
DIRECT,
+INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 
DATA,
+OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
LIABILITY,
+WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemBehaviorITCase.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemBehaviorITCase.java
new file mode 100644
index 00000000000..68cb21c759e
--- /dev/null
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemBehaviorITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link FileSystemBehaviorTestSuite} for the Google 
Cloud Storage file
+ * system.
+ *
+ * <p>Runs against a real GCS bucket and is skipped unless one is configured; 
see {@link
+ * GSTestCredentials}.
+ */
+class GSFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+    private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+    @BeforeAll
+    static void checkCredentialsAndSetup() {
+        GSTestCredentials.assumeCredentialsAvailable();
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @AfterAll
+    static void clearFsConfig() throws IOException {
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @Override
+    protected FileSystem getFileSystem() throws Exception {
+        return getBasePath().getFileSystem();
+    }
+
+    @Override
+    protected Path getBasePath() throws Exception {
+        return new Path(GSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
+    }
+
+    @Override
+    protected FileSystemKind getFileSystemKind() {
+        return FileSystemKind.OBJECT_STORE;
+    }
+}
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSRecoverableWriterITCase.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSRecoverableWriterITCase.java
new file mode 100644
index 00000000000..fd25ee48b56
--- /dev/null
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSRecoverableWriterITCase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for the Google Cloud Storage {@link RecoverableWriter} 
against a real GCS
+ * bucket. Exercises the write / checkpoint ({@code persist}) / recover / 
commit flow that backs
+ * exactly-once {@code FileSink} checkpointing on GCS.
+ *
+ * <p>Skipped unless a GCS bucket is configured; see {@link GSTestCredentials}.
+ */
+class GSRecoverableWriterITCase {
+
+    private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+    private static FileSystem fileSystem;
+    private static Path basePath;
+
+    private Path targetFile;
+
+    @BeforeAll
+    static void checkCredentialsAndSetup() throws Exception {
+        GSTestCredentials.assumeCredentialsAvailable();
+        FileSystem.initialize(new Configuration(), null);
+        basePath = new Path(GSTestCredentials.getTestBucketUri() + 
TEST_DATA_DIR);
+        fileSystem = basePath.getFileSystem();
+    }
+
+    @AfterAll
+    static void teardown() throws Exception {
+        if (fileSystem != null) {
+            fileSystem.delete(basePath, true);
+        }
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        targetFile = new Path(basePath, UUID.randomUUID().toString());
+    }
+
+    @AfterEach
+    void afterEach() throws Exception {
+        fileSystem.delete(targetFile, false);
+    }
+
+    @Test
+    void testWriteAndCommit() throws Exception {
+        final byte[] data = randomData(4 * 1024 * 1024);
+
+        final RecoverableWriter writer = fileSystem.createRecoverableWriter();
+        final RecoverableFsDataOutputStream stream = writer.open(targetFile);
+        stream.write(data);
+        stream.closeForCommit().commit();
+
+        assertThat(readFully(targetFile)).isEqualTo(data);
+    }
+
+    @Test
+    void testPersistRecoverAndCommit() throws Exception {
+        final byte[] data = randomData(5 * 1024 * 1024);
+        final int checkpointOffset = 2 * 1024 * 1024;
+
+        final RecoverableWriter writer = fileSystem.createRecoverableWriter();
+
+        // write up to the checkpoint offset, then take a checkpoint
+        final RecoverableFsDataOutputStream stream = writer.open(targetFile);
+        stream.write(data, 0, checkpointOffset);
+        final RecoverableWriter.ResumeRecoverable checkpoint = 
stream.persist();
+
+        // simulate a failure: abandon the original stream and recover from 
the checkpoint
+        final RecoverableFsDataOutputStream recovered = 
writer.recover(checkpoint);
+        recovered.write(data, checkpointOffset, data.length - 
checkpointOffset);
+        recovered.closeForCommit().commit();
+
+        // the committed object must contain exactly the bytes written across 
the recovery
+        assertThat(readFully(targetFile)).isEqualTo(data);
+    }
+
+    private static byte[] randomData(int size) {
+        final byte[] data = new byte[size];
+        new Random(42).nextBytes(data);
+        return data;
+    }
+
+    private static byte[] readFully(Path path) throws Exception {
+        try (FSDataInputStream in = fileSystem.open(path);
+                ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            final byte[] buffer = new byte[64 * 1024];
+            int read;
+            while ((read = in.read(buffer)) > 0) {
+                out.write(buffer, 0, read);
+            }
+            return out.toByteArray();
+        }
+    }
+}
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSTestCredentials.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSTestCredentials.java
new file mode 100644
index 00000000000..0b26f2ccef9
--- /dev/null
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSTestCredentials.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import javax.annotation.Nullable;
+
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Access to the GCS bucket used by the integration tests in this module.
+ *
+ * <p>The integration tests are skipped unless the {@code IT_CASE_GCS_BUCKET} 
environment variable
+ * names a writable bucket. Authentication uses Application Default 
Credentials, so the {@code
+ * GOOGLE_APPLICATION_CREDENTIALS} environment variable should point to a 
service-account key file
+ * (the method recommended by the GCS filesystem documentation), which feeds 
both the gcs-connector
+ * and the google-cloud-storage library used by {@code RecoverableWriter}.
+ */
+class GSTestCredentials {
+
+    @Nullable private static final String GCS_TEST_BUCKET = 
System.getenv("IT_CASE_GCS_BUCKET");
+
+    /** Skips the calling test unless a GCS test bucket is configured. */
+    static void assumeCredentialsAvailable() {
+        assumeThat(GCS_TEST_BUCKET)
+                .as("No GCS test bucket configured via the IT_CASE_GCS_BUCKET 
environment variable")
+                .isNotBlank();
+    }
+
+    /** Returns the URI of the test bucket, e.g. {@code gs://my-bucket/temp/}. 
*/
+    static String getTestBucketUri() {
+        if (GCS_TEST_BUCKET == null) {
+            throw new IllegalStateException(
+                    "GCS test bucket not available (IT_CASE_GCS_BUCKET not 
set)");
+        }
+        return "gs://" + GCS_TEST_BUCKET + "/temp/";
+    }
+}

Reply via email to