yifan-c commented on code in PR #212:
URL:
https://github.com/apache/cassandra-analytics/pull/212#discussion_r3314310774
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/StorageCredentialPair.java:
##########
@@ -24,51 +24,105 @@
import o.a.c.sidecar.client.shaded.common.data.RestoreJobSecrets;
/**
- * A class representing the pair of credentials needed to complete an
analytics operation using the Storage transport.
- * It is possible that both credentials (read and write) are the same, but
also that they could represent
- * the credentials needed for two different buckets when using cross-region
synchronization to transfer data
- * between regions.
+ * A class representing the pair of auth configurations needed to complete an
analytics operation using the
+ * Storage transport. It is possible that both read and write auth are the
same, but they could also represent
+ * different buckets when using cross-region synchronization to transfer data
between regions.
+ *
+ * <p>The auth field is a {@link StorageAuth}, either:
+ * <ul>
+ * <li>{@link StorageCredentials} — explicit STS credentials for static
auth</li>
+ * <li>{@link IamStorageAuth} — no static credentials; the sidecar and Spark
executors use the AWS SDK
+ * default provider chain (instance profile / IRSA / ECS task role)</li>
+ * </ul>
+ *
+ * <p>For IAM mode use {@link #iamPair(String, String)}. The library wires the
correct sidecar payload
+ * automatically when {@code STORAGE_CREDENTIAL_TYPE=IAM} is set.
*/
public class StorageCredentialPair
{
private final String writeRegion;
- public final StorageCredentials writeCredentials;
+ private final StorageAuth writeAuth;
private final String readRegion;
- public final StorageCredentials readCredentials;
+ private final StorageAuth readAuth;
+
+ /**
+ * Creates a {@link StorageCredentialPair} for IAM instance profile mode.
+ * The credentials fields are null; only the regions are required so the
sidecar can route requests.
+ *
+ * @param writeRegion the AWS region for the write (upload) bucket
+ * @param readRegion the AWS region for the read (download) bucket
+ * @return a region-only pair suitable for use with {@code
STORAGE_CREDENTIAL_TYPE=IAM}
+ */
+ public static StorageCredentialPair iamPair(String writeRegion, String
readRegion)
Review Comment:
`RestoreJobSecrets.iamMode` has `readRegion` before `writeRegion`. Please
have the same parameter sequence.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##########
@@ -231,7 +231,21 @@ public BulkSparkConf(SparkConf conf, Map<String, String>
options, @Nullable Logg
long maxSizePerSSTableBundleInBytesS3Transport =
MapUtils.getLong(options,
WriterOptions.MAX_SIZE_PER_SSTABLE_BUNDLE_IN_BYTES_S3_TRANSPORT.name(),
DEFAULT_MAX_SIZE_PER_SSTABLE_BUNDLE_IN_BYTES_S3_TRANSPORT);
String transportExtensionClass = MapUtils.getOrDefault(options,
WriterOptions.DATA_TRANSPORT_EXTENSION_CLASS.name(), null);
- this.dataTransportInfo = new DataTransportInfo(dataTransport,
transportExtensionClass, maxSizePerSSTableBundleInBytesS3Transport);
+ String rawCredentialType = MapUtils.getOrDefault(options,
WriterOptions.STORAGE_CREDENTIAL_TYPE.name(), null);
+ String storageCredentialType = null;
+ if (rawCredentialType != null)
+ {
+ storageCredentialType = rawCredentialType.toUpperCase();
+ if (!storageCredentialType.equals("STATIC") &&
!storageCredentialType.equals("IAM"))
+ {
+ throw new IllegalArgumentException(
+ String.format("Invalid value '%s' for option '%s'. Valid
values are: STATIC, IAM",
+ rawCredentialType,
WriterOptions.STORAGE_CREDENTIAL_TYPE.name()));
+ }
+ }
+ this.dataTransportInfo = new DataTransportInfo(dataTransport,
transportExtensionClass,
Review Comment:
Add a test for this?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/IamStorageAuth.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.transports.storage;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+
+/**
+ * {@link StorageAuth} implementation for IAM instance profile / IRSA / ECS
task role authentication.
+ * Carries no static credentials; the AWS SDK resolves them automatically via
its default provider chain.
+ * Use {@link #INSTANCE} — there is no per-instance state.
+ */
+public final class IamStorageAuth implements StorageAuth
+{
+ public static final IamStorageAuth INSTANCE = new IamStorageAuth();
+
+ private IamStorageAuth()
+ {
+ }
+
+ @Override
+ public AwsCredentialsProvider toAwsCredentialsProvider()
+ {
+ return DefaultCredentialsProvider.create();
+ }
Review Comment:
Implement `toString()`? Potentially just the class name.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##########
@@ -478,12 +480,26 @@ private void
createRestoreJobsOnAllClusters(TransportContext.CloudStorageTranspo
});
}
- private CreateRestoreJobRequestPayload.Builder
createJobPayloadBuilder(JobInfo job, RestoreJobSecrets secrets)
+ /**
+ * Builds {@link RestoreJobSecrets} appropriate for the given credential
type.
+ * When {@code credentialTypeName} is {@code "IAM"} (case-insensitive),
region-only secrets are produced
+ * and the sidecar will use the AWS default credential chain.
+ * Otherwise, full static credentials from the pair are used.
+ */
+ static RestoreJobSecrets buildRestoreJobSecrets(StorageCredentialPair pair,
+
@org.jetbrains.annotations.Nullable String credentialTypeName)
{
- return createJobPayloadBuilder(job, secrets, null);
+ if ("IAM".equalsIgnoreCase(credentialTypeName))
Review Comment:
nit: use `equals` and be consistent with `CloudStorageStreamSession` and
other references.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/StorageAccessConfiguration.java:
##########
@@ -109,16 +111,22 @@ public void write(Kryo kryo, Output out,
StorageAccessConfiguration object)
{
out.writeString(object.region);
out.writeString(object.bucket);
- kryo.writeObject(out, object.storageCredentials);
+ boolean isIam = object.storageAuth instanceof IamStorageAuth;
+ out.writeBoolean(isIam);
+ if (!isIam)
Review Comment:
I think it is cheap to serialize the name string, but provides a lot better
flexibility; just in case that we are introduce other auth options later.
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/transports/storage/StorageAccessConfigurationTest.java:
##########
@@ -26,29 +26,15 @@
class StorageAccessConfigurationTest
{
@Test
- void testCopyWithNewCredentials()
+ void testCopyWithNewAuth()
{
StorageAccessConfiguration config1 = new
StorageAccessConfiguration("writeRegion", "writeBucket",
new StorageCredentials("access", "secret"));
- StorageAccessConfiguration config2 =
config1.copyWithNewCredentials(new StorageCredentials("newAccess",
"newSecret"));
+ StorageAccessConfiguration config2 = config1.copyWithNewAuth(new
StorageCredentials("newAccess", "newSecret"));
assertThat(config1).isNotSameAs(config2);
assertThat(config1.bucket()).isEqualTo(config2.bucket());
assertThat(config1.region()).isEqualTo(config2.region());
-
assertThat(config1.storageCredentials()).isNotEqualTo(config2.storageCredentials());
+ assertThat(config1.storageAuth()).isNotEqualTo(config2.storageAuth());
}
- @Test
- void testHashcodeAndEquals()
- {
- StorageAccessConfiguration config1 = new
StorageAccessConfiguration("writeRegion", "writeBucket",
-
new StorageCredentials("access", "secret"));
- StorageAccessConfiguration config2 = new
StorageAccessConfiguration("writeRegion", "writeBucket",
-
new StorageCredentials("access", "secret"));
- assertThat(config1.hashCode()).isEqualTo(config2.hashCode());
- assertThat(config1).isEqualTo(config2);
-
- config2 = config1.copyWithNewCredentials(new
StorageCredentials("newAccess", "newSecret"));
- assertThat(config1.hashCode()).isNotEqualTo(config2.hashCode());
- assertThat(config1).isNotEqualTo(config2);
- }
Review Comment:
Why is this test case removed?
##########
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java:
##########
@@ -134,8 +135,7 @@ public static void install(ClassLoader cl, Integer
nodeNumber)
public static void unbootstrap(@SuperCall Callable<?> orig) throws
Exception
{
transitionalStateStart.countDown();
-
TestUninterruptibles.awaitUninterruptiblyOrThrow(transitionalStateStart, 4,
TimeUnit.MINUTES);
-
TestUninterruptibles.awaitUninterruptiblyOrThrow(transitionalStateEnd, 2,
TimeUnit.MINUTES);
+ Uninterruptibles.awaitUninterruptibly(transitionalStateEnd, 2,
TimeUnit.MINUTES);
Review Comment:
Not sure why they are changed; seemingly unrelated change
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/transports/storage/StorageCredentialPairTest.java:
##########
@@ -43,16 +44,12 @@ void testToRestoreJobSecrets()
}
@Test
- void testHashcodeAndEquals()
Review Comment:
this test is also removed.
##########
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java:
##########
@@ -96,7 +96,7 @@ protected void afterClusterProvisioned()
protected void completeTransitionsAndValidateWrites(CountDownLatch
transitionalStateEnd, Stream<Arguments> testInputs)
{
- for (int i = 0; i < leavingNodesPerDc(); i++)
+ for (int i = 0; i < leavingNodes.size(); i++)
Review Comment:
Unrelated change; can we revert it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]