This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 49c0e24e3d Add support for IAM role based credentials in Kinesis
Plugin (#9071)
49c0e24e3d is described below
commit 49c0e24e3d85a97911a80352f93e8ebf34214812
Author: Kartik Khare <[email protected]>
AuthorDate: Wed Jul 27 21:36:08 2022 +0530
Add support for IAM role based credentials in Kinesis Plugin (#9071)
* Add support for IAM roles
* Add support for externalId
* Provide proper credentials to STS client
* Add default session id
* Add javadoc
Co-authored-by: Kartik Khare <[email protected]>
---
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 87 ++++++++++++++++++----
.../stream/kinesis/KinesisConnectionHandler.java | 53 +++++++++++--
2 files changed, 119 insertions(+), 21 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 95221ff3bf..d8a3795a2a 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.util.Map;
+import java.util.UUID;
import org.apache.pinot.spi.stream.StreamConfig;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
@@ -36,9 +38,32 @@ public class KinesisConfig {
public static final String MAX_RECORDS_TO_FETCH = "maxRecordsToFetch";
public static final String ENDPOINT = "endpoint";
+ // IAM role configs
+ /**
+ * Enable Role based access to AWS.
+ * iamRoleBasedAccessEnabled - Set it to `true` to enable role based access,
default: false
+ * roleArn - Required. specify the ARN of the role the client should assume.
+ * roleSessionName - session name to be used when creating a role based
session. default: pinot-kineis-uuid
+ * externalId - string external id value required by role's policy. default:
null
+ * sessionDurationSeconds - The duration, in seconds, of the role session.
Default: 900
+ * asyncSessionUpdateEnabled -
+ * Configure whether the provider should fetch credentials
asynchronously in the background.
+ * If this is true, threads are less likely to block when credentials
are loaded,
+ * but additional resources are used to maintain the provider. Default
- `true`
+ */
+ public static final String IAM_ROLE_BASED_ACCESS_ENABLED =
"iamRoleBasedAccessEnabled";
+ public static final String ROLE_ARN = "roleArn";
+ public static final String ROLE_SESSION_NAME = "roleSessionName";
+ public static final String EXTERNAL_ID = "externalId";
+ public static final String SESSION_DURATION_SECONDS =
"sessionDurationSeconds";
+ public static final String ASYNC_SESSION_UPDATED_ENABLED =
"asyncSessionUpdateEnabled";
+
// TODO: this is a starting point, until a better default is figured out
public static final String DEFAULT_MAX_RECORDS = "20";
public static final String DEFAULT_SHARD_ITERATOR_TYPE =
ShardIteratorType.LATEST.toString();
+ public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
+ public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
+ public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
private final String _streamTopicName;
private final String _awsRegion;
@@ -48,6 +73,14 @@ public class KinesisConfig {
private final String _secretKey;
private final String _endpoint;
+ // IAM Role values
+ private boolean _iamRoleBasedAccess;
+ private String _roleArn;
+ private String _roleSessionName;
+ private String _externalId;
+ private int _sessionDurationSeconds;
+ private boolean _asyncSessionUpdateEnabled;
+
public KinesisConfig(StreamConfig streamConfig) {
Map<String, String> props = streamConfig.getStreamConfigsMap();
_streamTopicName = streamConfig.getTopicName();
@@ -60,23 +93,23 @@ public class KinesisConfig {
_accessKey = props.get(ACCESS_KEY);
_secretKey = props.get(SECRET_KEY);
_endpoint = props.get(ENDPOINT);
- }
- public KinesisConfig(String streamTopicName, String awsRegion,
ShardIteratorType shardIteratorType, String accessKey,
- String secretKey, String endpoint) {
- this(streamTopicName, awsRegion, shardIteratorType, accessKey, secretKey,
Integer.parseInt(DEFAULT_MAX_RECORDS),
- endpoint);
- }
+ _iamRoleBasedAccess =
+ Boolean.parseBoolean(props.getOrDefault(IAM_ROLE_BASED_ACCESS_ENABLED,
DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED));
+ _roleArn = props.get(ROLE_ARN);
+ _roleSessionName =
+ props.getOrDefault(ROLE_SESSION_NAME, Joiner.on("-").join("pinot",
"kinesis", UUID.randomUUID()));
+ _externalId = props.get(EXTERNAL_ID);
+ _sessionDurationSeconds =
+ Integer.parseInt(props.getOrDefault(SESSION_DURATION_SECONDS,
DEFAULT_SESSION_DURATION_SECONDS));
+ _asyncSessionUpdateEnabled =
+ Boolean.parseBoolean(props.getOrDefault(ASYNC_SESSION_UPDATED_ENABLED,
DEFAULT_ASYNC_SESSION_UPDATED_ENABLED));
- public KinesisConfig(String streamTopicName, String awsRegion,
ShardIteratorType shardIteratorType, String accessKey,
- String secretKey, int maxRecords, String endpoint) {
- _streamTopicName = streamTopicName;
- _awsRegion = awsRegion;
- _shardIteratorType = shardIteratorType;
- _accessKey = accessKey;
- _secretKey = secretKey;
- _numMaxRecordsToFetch = maxRecords;
- _endpoint = endpoint;
+ if (_iamRoleBasedAccess) {
+ Preconditions.checkNotNull(_roleArn,
+ "Must provide 'roleArn' in stream config for table %s if
iamRoleBasedAccess is enabled",
+ streamConfig.getTableNameWithType());
+ }
}
public String getStreamTopicName() {
@@ -106,4 +139,28 @@ public class KinesisConfig {
public String getEndpoint() {
return _endpoint;
}
+
+ public boolean isIamRoleBasedAccess() {
+ return _iamRoleBasedAccess;
+ }
+
+ public String getRoleArn() {
+ return _roleArn;
+ }
+
+ public String getRoleSessionName() {
+ return _roleSessionName;
+ }
+
+ public String getExternalId() {
+ return _externalId;
+ }
+
+ public int getSessionDurationSeconds() {
+ return _sessionDurationSeconds;
+ }
+
+ public boolean isAsyncSessionUpdateEnabled() {
+ return _asyncSessionUpdateEnabled;
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index dd7bf36329..7fbd0e28c2 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
@@ -33,6 +34,9 @@ import
software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.sts.StsClient;
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
/**
@@ -45,6 +49,7 @@ public class KinesisConnectionHandler {
private final String _accessKey;
private final String _secretKey;
private final String _endpoint;
+ private final KinesisConfig _kinesisConfig;
public KinesisConnectionHandler(KinesisConfig kinesisConfig) {
_stream = kinesisConfig.getStreamTopicName();
@@ -52,6 +57,7 @@ public class KinesisConnectionHandler {
_accessKey = kinesisConfig.getAccessKey();
_secretKey = kinesisConfig.getSecretKey();
_endpoint = kinesisConfig.getEndpoint();
+ _kinesisConfig = kinesisConfig;
createConnection();
}
@@ -62,6 +68,7 @@ public class KinesisConnectionHandler {
_accessKey = kinesisConfig.getAccessKey();
_secretKey = kinesisConfig.getSecretKey();
_endpoint = kinesisConfig.getEndpoint();
+ _kinesisConfig = kinesisConfig;
_kinesisClient = kinesisClient;
}
@@ -80,17 +87,51 @@ public class KinesisConnectionHandler {
public void createConnection() {
if (_kinesisClient == null) {
KinesisClientBuilder kinesisClientBuilder;
+
+ AwsCredentialsProvider awsCredentialsProvider;
if (StringUtils.isNotBlank(_accessKey) &&
StringUtils.isNotBlank(_secretKey)) {
AwsBasicCredentials awsBasicCredentials =
AwsBasicCredentials.create(_accessKey, _secretKey);
- kinesisClientBuilder =
KinesisClient.builder().region(Region.of(_region))
-
.credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials))
- .httpClientBuilder(new
ApacheSdkHttpService().createHttpClientBuilder());
+ awsCredentialsProvider =
StaticCredentialsProvider.create(awsBasicCredentials);
} else {
- kinesisClientBuilder =
-
KinesisClient.builder().region(Region.of(_region)).credentialsProvider(DefaultCredentialsProvider.create())
- .httpClientBuilder(new
ApacheSdkHttpService().createHttpClientBuilder());
+ awsCredentialsProvider = DefaultCredentialsProvider.create();
+ }
+
+ if (_kinesisConfig.isIamRoleBasedAccess()) {
+ AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+ AssumeRoleRequest.builder()
+ .roleArn(_kinesisConfig.getRoleArn())
+ .roleSessionName(_kinesisConfig.getRoleSessionName())
+ .durationSeconds(_kinesisConfig.getSessionDurationSeconds());
+
+ AssumeRoleRequest assumeRoleRequest;
+ if (StringUtils.isNotEmpty(_kinesisConfig.getExternalId())) {
+ assumeRoleRequest = assumeRoleRequestBuilder
+ .externalId(_kinesisConfig.getExternalId())
+ .build();
+ } else {
+ assumeRoleRequest = assumeRoleRequestBuilder.build();
+ }
+
+ StsClient stsClient =
+ StsClient.builder()
+ .region(Region.of(_region))
+ .credentialsProvider(awsCredentialsProvider)
+ .build();
+
+ awsCredentialsProvider =
+ StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(stsClient)
+ .refreshRequest(assumeRoleRequest)
+
.asyncCredentialUpdateEnabled(_kinesisConfig.isAsyncSessionUpdateEnabled())
+ .build();
}
+ kinesisClientBuilder =
+ KinesisClient.builder()
+ .region(Region.of(_region))
+ .credentialsProvider(awsCredentialsProvider)
+ .httpClientBuilder(new
ApacheSdkHttpService().createHttpClientBuilder());
+
if (StringUtils.isNotBlank(_endpoint)) {
try {
kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new
URI(_endpoint));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]