Samrat002 commented on code in PR #28070:
URL: https://github.com/apache/flink/pull/28070#discussion_r3167086980


##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java:
##########
@@ -163,7 +162,9 @@ public static S3EncryptionConfig fromConfig(
                 return sseS3();
             case "SSE_KMS":
             case "AWS_KMS":
-                return kmsKeyId != null && !kmsKeyId.isEmpty() ? 
sseKms(kmsKeyId) : sseKms();

Review Comment:
   "SSE_KMS" was kept as a placeholder for supporting KMS encryption. it can be 
removed. 



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3ExceptionUtils}. */
+class S3ExceptionUtilsTest {

Review Comment:
   Unrelated changes 



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java:
##########
@@ -148,7 +145,9 @@ public static S3EncryptionConfig sseKms(
      * @throws IllegalArgumentException if the encryption type is invalid
      */
     public static S3EncryptionConfig fromConfig(
-            @Nullable String encryptionTypeStr, @Nullable String kmsKeyId) {
+            @Nullable String encryptionTypeStr,
+            @Nullable String kmsKeyId,

Review Comment:
   The encryption context is silently dropped when kmsKeyId is null or empty. 
Using the default AWS-managed key with an encryption context is a valid use 
case. It lets IAM policies restrict decrypt access via context conditions 
without requiring a custom key ARN. 
   
   Could we add something like `sseKms(Map<String, String> encryptionContext)` 
factory overload and pass the context here instead of calling sseKms()?



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.NONE;
+import static 
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_KMS;
+import static 
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_S3;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link S3EncryptionConfig}. */
+class S3EncryptionConfigTest {
+
+    // ---- factory methods ----
+
+    @ParameterizedTest
+    @MethodSource
+    void testFactoryMethodProperties(
+            S3EncryptionConfig config,
+            S3EncryptionConfig.EncryptionType expectedType,
+            boolean expectedEnabled,
+            ServerSideEncryption expectedSse) {
+        assertThat(config.getEncryptionType()).isEqualTo(expectedType);
+        assertThat(config.isEnabled()).isEqualTo(expectedEnabled);
+        assertThat(config.getKmsKeyId()).isNull();
+        assertThat(config.getServerSideEncryption()).isEqualTo(expectedSse);
+    }
+
+    static Stream<Arguments> testFactoryMethodProperties() {
+        return Stream.of(
+                Arguments.of(S3EncryptionConfig.none(), NONE, false, null),
+                Arguments.of(S3EncryptionConfig.sseS3(), SSE_S3, true, 
ServerSideEncryption.AES256),
+                Arguments.of(
+                        S3EncryptionConfig.sseKms(), SSE_KMS, true, 
ServerSideEncryption.AWS_KMS));
+    }
+
+    @Test
+    void testSseKmsExplicitKey() {
+        S3EncryptionConfig c = 
S3EncryptionConfig.sseKms("arn:aws:kms:us-east-1:123:key/abc");
+        
assertThat(c.getKmsKeyId()).isEqualTo("arn:aws:kms:us-east-1:123:key/abc");
+        assertThat(c.isEnabled()).isTrue();
+    }
+
+    @Test
+    void testSseKmsWithEncryptionContext() {
+        Map<String, String> ctx = Map.of("dept", "finance");
+        S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", ctx);
+        assertThat(c.getEncryptionContext()).isEqualTo(ctx);
+        assertThat(c.hasEncryptionContext()).isTrue();
+    }
+
+    @Test
+    void testEncryptionContextIsDefensiveCopy() {
+        Map<String, String> ctx = new java.util.HashMap<>(Map.of("k", "v"));
+        S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", ctx);
+        ctx.put("extra", "value");
+        assertThat(c.getEncryptionContext()).containsOnlyKeys("k");
+    }
+
+    @Test
+    void testNoEncryptionContextByDefault() {
+        
assertThat(S3EncryptionConfig.sseKms().hasEncryptionContext()).isFalse();
+        
assertThat(S3EncryptionConfig.sseKms().getEncryptionContext()).isEmpty();
+    }
+
+    // ---- fromConfig ----
+
+    @ParameterizedTest
+    @MethodSource
+    void testFromConfigParsesType(String input, 
S3EncryptionConfig.EncryptionType expected) {
+        assertThat(
+                        S3EncryptionConfig.fromConfig(input, null, 
Collections.emptyMap())
+                                .getEncryptionType())
+                .isEqualTo(expected);
+    }
+
+    static Stream<Arguments> testFromConfigParsesType() {
+        return Stream.of(
+                Arguments.of(null, NONE),
+                Arguments.of("", NONE),
+                Arguments.of("none", NONE),
+                Arguments.of("NONE", NONE),
+                Arguments.of("sse-s3", SSE_S3),
+                Arguments.of("AES256", SSE_S3),
+                Arguments.of("sse-kms", SSE_KMS),
+                Arguments.of("aws:kms", SSE_KMS));
+    }
+
+    @Test
+    void testFromConfigSseKmsKeyId() {
+        assertThat(
+                        S3EncryptionConfig.fromConfig("sse-kms", "my-key", 
Collections.emptyMap())
+                                .getKmsKeyId())
+                .isEqualTo("my-key");
+        assertThat(
+                        S3EncryptionConfig.fromConfig("sse-kms", "", 
Collections.emptyMap())
+                                .getKmsKeyId())
+                .isNull();
+        assertThat(
+                        S3EncryptionConfig.fromConfig("sse-kms", null, 
Collections.emptyMap())
+                                .getKmsKeyId())
+                .isNull();
+        assertThat(
+                        S3EncryptionConfig.fromConfig("sse-kms", "my-key", 
Map.of("env", "prod"))
+                                .getEncryptionContext())
+                .isEqualTo(Map.of("env", "prod"));
+        assertThat(
+                        S3EncryptionConfig.fromConfig("sse-kms", null, 
Map.of("env", "prod"))

Review Comment:
   This assertion documents the data loss from the sseKms() fallback. The 
context is configured but silently discarded. Once the fromConfig branch is 
fixed to pass the context for the default-key case, this assertion should 
become isTrue().



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3BlockLocation}. */
+class S3BlockLocationTest {

Review Comment:
   These classes aren't modified in this PR. The tests are useful and showcases 
gaps in the test suite 😅 . But bundling them here makes the diff harder to 
review and attributes unrelated test coverage to this bug fix. Would you mind 
moving them to a separate commit?



##########
docs/content.zh/docs/deployment/filesystems/s3.md:
##########
@@ -173,7 +173,8 @@ In addition to the [common 
configuration](#common-configuration) options (`s3.ac
 ```yaml
 # Server-side encryption
 s3.sse.type: sse-s3         # or sse-kms, aws:kms, AES256, none (default)
-s3.sse.kms.key-id: arn:aws:kms:region:account:key/id   # Required for SSE-KMS
+s3.sse.kms.key-id: arn:aws:kms:region:account:key/id   # Optional: custom KMS 
key for SSE-KMS
+s3.sse.kms.encryption-context: {"aws:s3:arn": 
"arn:aws:s3:::my-bucket/my-file"}  # Optional: KMS encryption context

Review Comment:
   The Flink mapType() config parser expects `key:value,key:value` format, not 
JSON. The curly-brace syntax here will fail to parse and produce an empty map. 



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3FileStatus}. */
+class S3FileStatusTest {

Review Comment:
   Unrelated changes 



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java:
##########
@@ -80,9 +79,7 @@ private S3EncryptionConfig(
         this.encryptionType = encryptionType;
         this.kmsKeyId = kmsKeyId;
         this.encryptionContext =
-                encryptionContext != null
-                        ? Collections.unmodifiableMap(new 
HashMap<>(encryptionContext))
-                        : Collections.emptyMap();
+                encryptionContext != null ? Map.copyOf(encryptionContext) : 
Collections.emptyMap();

Review Comment:
   Map.copyOf() throws NullPointerException for null keys or values. The 
previous new HashMap<>() copy was null-tolerant. Shouldn't be an issue in 
practice, but worth a note in case callers pass maps with optional/null values. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to