Samrat002 commented on code in PR #28070:
URL: https://github.com/apache/flink/pull/28070#discussion_r3167086980
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java:
##########
@@ -163,7 +162,9 @@ public static S3EncryptionConfig fromConfig(
return sseS3();
case "SSE_KMS":
case "AWS_KMS":
- return kmsKeyId != null && !kmsKeyId.isEmpty() ?
sseKms(kmsKeyId) : sseKms();
Review Comment:
"SSE_KMS" was kept as a placeholder for supporting KMS encryption. it can be
removed.
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3ExceptionUtils}. */
+class S3ExceptionUtilsTest {
Review Comment:
Unrelated changes
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java:
##########
@@ -148,7 +145,9 @@ public static S3EncryptionConfig sseKms(
* @throws IllegalArgumentException if the encryption type is invalid
*/
public static S3EncryptionConfig fromConfig(
- @Nullable String encryptionTypeStr, @Nullable String kmsKeyId) {
+ @Nullable String encryptionTypeStr,
+ @Nullable String kmsKeyId,
Review Comment:
The encryption context is silently dropped when kmsKeyId is null or empty.
Using the default AWS-managed key with an encryption context is a valid use
case. It lets IAM policies restrict decrypt access via context conditions
without requiring a custom key ARN.
Could we add something like `sseKms(Map<String, String> encryptionContext)`
factory overload and pass the context here instead of calling sseKms()?
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.NONE;
+import static
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_KMS;
+import static
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_S3;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link S3EncryptionConfig}. */
+class S3EncryptionConfigTest {
+
+ // ---- factory methods ----
+
+ @ParameterizedTest
+ @MethodSource
+ void testFactoryMethodProperties(
+ S3EncryptionConfig config,
+ S3EncryptionConfig.EncryptionType expectedType,
+ boolean expectedEnabled,
+ ServerSideEncryption expectedSse) {
+ assertThat(config.getEncryptionType()).isEqualTo(expectedType);
+ assertThat(config.isEnabled()).isEqualTo(expectedEnabled);
+ assertThat(config.getKmsKeyId()).isNull();
+ assertThat(config.getServerSideEncryption()).isEqualTo(expectedSse);
+ }
+
+ static Stream<Arguments> testFactoryMethodProperties() {
+ return Stream.of(
+ Arguments.of(S3EncryptionConfig.none(), NONE, false, null),
+ Arguments.of(S3EncryptionConfig.sseS3(), SSE_S3, true,
ServerSideEncryption.AES256),
+ Arguments.of(
+ S3EncryptionConfig.sseKms(), SSE_KMS, true,
ServerSideEncryption.AWS_KMS));
+ }
+
+ @Test
+ void testSseKmsExplicitKey() {
+ S3EncryptionConfig c =
S3EncryptionConfig.sseKms("arn:aws:kms:us-east-1:123:key/abc");
+
assertThat(c.getKmsKeyId()).isEqualTo("arn:aws:kms:us-east-1:123:key/abc");
+ assertThat(c.isEnabled()).isTrue();
+ }
+
+ @Test
+ void testSseKmsWithEncryptionContext() {
+ Map<String, String> ctx = Map.of("dept", "finance");
+ S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", ctx);
+ assertThat(c.getEncryptionContext()).isEqualTo(ctx);
+ assertThat(c.hasEncryptionContext()).isTrue();
+ }
+
+ @Test
+ void testEncryptionContextIsDefensiveCopy() {
+ Map<String, String> ctx = new java.util.HashMap<>(Map.of("k", "v"));
+ S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", ctx);
+ ctx.put("extra", "value");
+ assertThat(c.getEncryptionContext()).containsOnlyKeys("k");
+ }
+
+ @Test
+ void testNoEncryptionContextByDefault() {
+
assertThat(S3EncryptionConfig.sseKms().hasEncryptionContext()).isFalse();
+
assertThat(S3EncryptionConfig.sseKms().getEncryptionContext()).isEmpty();
+ }
+
+ // ---- fromConfig ----
+
+ @ParameterizedTest
+ @MethodSource
+ void testFromConfigParsesType(String input,
S3EncryptionConfig.EncryptionType expected) {
+ assertThat(
+ S3EncryptionConfig.fromConfig(input, null,
Collections.emptyMap())
+ .getEncryptionType())
+ .isEqualTo(expected);
+ }
+
+ static Stream<Arguments> testFromConfigParsesType() {
+ return Stream.of(
+ Arguments.of(null, NONE),
+ Arguments.of("", NONE),
+ Arguments.of("none", NONE),
+ Arguments.of("NONE", NONE),
+ Arguments.of("sse-s3", SSE_S3),
+ Arguments.of("AES256", SSE_S3),
+ Arguments.of("sse-kms", SSE_KMS),
+ Arguments.of("aws:kms", SSE_KMS));
+ }
+
+ @Test
+ void testFromConfigSseKmsKeyId() {
+ assertThat(
+ S3EncryptionConfig.fromConfig("sse-kms", "my-key",
Collections.emptyMap())
+ .getKmsKeyId())
+ .isEqualTo("my-key");
+ assertThat(
+ S3EncryptionConfig.fromConfig("sse-kms", "",
Collections.emptyMap())
+ .getKmsKeyId())
+ .isNull();
+ assertThat(
+ S3EncryptionConfig.fromConfig("sse-kms", null,
Collections.emptyMap())
+ .getKmsKeyId())
+ .isNull();
+ assertThat(
+ S3EncryptionConfig.fromConfig("sse-kms", "my-key",
Map.of("env", "prod"))
+ .getEncryptionContext())
+ .isEqualTo(Map.of("env", "prod"));
+ assertThat(
+ S3EncryptionConfig.fromConfig("sse-kms", null,
Map.of("env", "prod"))
Review Comment:
This assertion documents the data loss from the sseKms() fallback. The
context is configured but silently discarded. Once the fromConfig branch is
fixed to pass the context for the default-key case, this assertion should
become isTrue().
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3BlockLocation}. */
+class S3BlockLocationTest {
Review Comment:
These classes aren't modified in this PR. The tests are useful and showcases
gaps in the test suite 😅 . But bundling them here makes the diff harder to
review and attributes unrelated test coverage to this bug fix. Would you mind
moving them to a separate commit?
##########
docs/content.zh/docs/deployment/filesystems/s3.md:
##########
@@ -173,7 +173,8 @@ In addition to the [common
configuration](#common-configuration) options (`s3.ac
```yaml
# Server-side encryption
s3.sse.type: sse-s3 # or sse-kms, aws:kms, AES256, none (default)
-s3.sse.kms.key-id: arn:aws:kms:region:account:key/id # Required for SSE-KMS
+s3.sse.kms.key-id: arn:aws:kms:region:account:key/id # Optional: custom KMS
key for SSE-KMS
+s3.sse.kms.encryption-context: {"aws:s3:arn":
"arn:aws:s3:::my-bucket/my-file"} # Optional: KMS encryption context
Review Comment:
The Flink mapType() config parser expects `key:value,key:value` format, not
JSON. The curly-brace syntax here will fail to parse and produce an empty map.
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3FileStatus}. */
+class S3FileStatusTest {
Review Comment:
Unrelated changes
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java:
##########
@@ -80,9 +79,7 @@ private S3EncryptionConfig(
this.encryptionType = encryptionType;
this.kmsKeyId = kmsKeyId;
this.encryptionContext =
- encryptionContext != null
- ? Collections.unmodifiableMap(new
HashMap<>(encryptionContext))
- : Collections.emptyMap();
+ encryptionContext != null ? Map.copyOf(encryptionContext) :
Collections.emptyMap();
Review Comment:
Map.copyOf() throws NullPointerException for null keys or values. The
previous new HashMap<>() copy was null-tolerant. Shouldn't be an issue in
practice, but worth a note in case callers pass maps with optional/null values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]