seropian commented on code in PR #2409: URL: https://github.com/apache/jackrabbit-oak/pull/2409#discussion_r2426105382
########## oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/v8/AzureBlobContainerProviderV8.java: ########## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.v8; + +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenRequestContext; +import com.azure.identity.ClientSecretCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentialsToken; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.UserDelegationKey; +import com.microsoft.azure.storage.blob.BlobRequestOptions; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.SharedAccessBlobHeaders; +import com.microsoft.azure.storage.blob.SharedAccessBlobPermissions; +import com.microsoft.azure.storage.blob.SharedAccessBlobPolicy; +import org.apache.commons.lang3.StringUtils; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzureConstants; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Date; +import java.util.EnumSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class AzureBlobContainerProviderV8 implements Closeable { + private static final Logger log = LoggerFactory.getLogger(AzureBlobContainerProviderV8.class); + private static final String DEFAULT_ENDPOINT_SUFFIX = "core.windows.net"; + private static final String AZURE_DEFAULT_SCOPE = "https://storage.azure.com/.default"; + private final String azureConnectionString; + private final String accountName; + private final String containerName; + private final String blobEndpoint; + private final String sasToken; + private final String accountKey; + private final String tenantId; + private final String clientId; + private final String clientSecret; + private ClientSecretCredential clientSecretCredential; + private AccessToken accessToken; + private StorageCredentialsToken storageCredentialsToken; + private static final long TOKEN_REFRESHER_INITIAL_DELAY = 45L; + private static final long TOKEN_REFRESHER_DELAY = 1L; + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + private AzureBlobContainerProviderV8(Builder builder) { + this.azureConnectionString = builder.azureConnectionString; + this.accountName = builder.accountName; + this.containerName = builder.containerName; + this.blobEndpoint = builder.blobEndpoint; + this.sasToken = builder.sasToken; + this.accountKey = builder.accountKey; + this.tenantId = builder.tenantId; + this.clientId = builder.clientId; + this.clientSecret = builder.clientSecret; + } + + public static class Builder { + private final String containerName; + + private Builder(String containerName) { + this.containerName = containerName; + } + + public static Builder builder(String containerName) { + return new Builder(containerName); + } + + private String azureConnectionString; + private String accountName; + private String blobEndpoint; + private String sasToken; + private String accountKey; + private String tenantId; + private String clientId; + private String clientSecret; + + public Builder withAzureConnectionString(String azureConnectionString) { + this.azureConnectionString = azureConnectionString; + return this; + } + + public Builder withAccountName(String accountName) { + this.accountName = accountName; + return this; + } + + public Builder withBlobEndpoint(String blobEndpoint) { + this.blobEndpoint = blobEndpoint; + return this; + } + + public Builder withSasToken(String sasToken) { + this.sasToken = sasToken; + return this; + } + + public Builder withAccountKey(String accountKey) { + this.accountKey = accountKey; + return this; + } + + public Builder withTenantId(String tenantId) { + this.tenantId = tenantId; + return this; + } + + public Builder withClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public Builder withClientSecret(String clientSecret) { + this.clientSecret = clientSecret; + return this; + } + + public Builder initializeWithProperties(Properties properties) { + withAzureConnectionString(properties.getProperty(AzureConstants.AZURE_CONNECTION_STRING, "")); + withAccountName(properties.getProperty(AzureConstants.AZURE_STORAGE_ACCOUNT_NAME, "")); + withBlobEndpoint(properties.getProperty(AzureConstants.AZURE_BLOB_ENDPOINT, "")); + withSasToken(properties.getProperty(AzureConstants.AZURE_SAS, "")); + withAccountKey(properties.getProperty(AzureConstants.AZURE_STORAGE_ACCOUNT_KEY, "")); + withTenantId(properties.getProperty(AzureConstants.AZURE_TENANT_ID, "")); + withClientId(properties.getProperty(AzureConstants.AZURE_CLIENT_ID, "")); + withClientSecret(properties.getProperty(AzureConstants.AZURE_CLIENT_SECRET, "")); + return this; + } + + public AzureBlobContainerProviderV8 build() { + return new AzureBlobContainerProviderV8(this); + } + } + + public String getContainerName() { + return containerName; + } + + @NotNull + public CloudBlobContainer getBlobContainer() throws DataStoreException { + return this.getBlobContainer(null); + } + + @NotNull + public CloudBlobContainer getBlobContainer(@Nullable BlobRequestOptions blobRequestOptions) throws DataStoreException { + // connection string will be given preference over service principals / sas / account key + if (StringUtils.isNotBlank(azureConnectionString)) { + log.debug("connecting to azure blob storage via azureConnectionString"); + return UtilsV8.getBlobContainer(azureConnectionString, containerName, blobRequestOptions); + } else if (authenticateViaServicePrincipal()) { + log.debug("connecting to azure blob storage via service principal credentials"); + return getBlobContainerFromServicePrincipals(blobRequestOptions); + } else if (StringUtils.isNotBlank(sasToken)) { + log.debug("connecting to azure blob storage via sas token"); + final String connectionStringWithSasToken = UtilsV8.getConnectionStringForSas(sasToken, blobEndpoint, accountName); + return UtilsV8.getBlobContainer(connectionStringWithSasToken, containerName, blobRequestOptions); + } + log.debug("connecting to azure blob storage via access key"); + final String connectionStringWithAccountKey = UtilsV8.getConnectionString(accountName, accountKey, blobEndpoint); + return UtilsV8.getBlobContainer(connectionStringWithAccountKey, containerName, blobRequestOptions); + } + + @NotNull + private CloudBlobContainer getBlobContainerFromServicePrincipals(@Nullable BlobRequestOptions blobRequestOptions) throws DataStoreException { + StorageCredentialsToken storageCredentialsToken = getStorageCredentials(); + try { + CloudStorageAccount cloud = new CloudStorageAccount(storageCredentialsToken, true, DEFAULT_ENDPOINT_SUFFIX, accountName); + CloudBlobClient cloudBlobClient = cloud.createCloudBlobClient(); + if (blobRequestOptions != null) { + cloudBlobClient.setDefaultRequestOptions(blobRequestOptions); + } + return cloudBlobClient.getContainerReference(containerName); + } catch (URISyntaxException | StorageException e) { + throw new DataStoreException(e); + } + } + + @NotNull + private StorageCredentialsToken getStorageCredentials() { + boolean isAccessTokenGenerated = false; + /* generate access token, the same token will be used for subsequent access + * generated token is valid for 1 hour only and will be refreshed in background + * */ + if (accessToken == null) { + clientSecretCredential = new ClientSecretCredentialBuilder() + .clientId(clientId) + .clientSecret(clientSecret) + .tenantId(tenantId) + .build(); + accessToken = clientSecretCredential.getTokenSync(new TokenRequestContext().addScopes(AZURE_DEFAULT_SCOPE)); + if (accessToken == null || StringUtils.isBlank(accessToken.getToken())) { + log.error("Access token is null or empty"); + throw new IllegalArgumentException("Could not connect to azure storage, access token is null or empty"); + } + storageCredentialsToken = new StorageCredentialsToken(accountName, accessToken.getToken()); + isAccessTokenGenerated = true; + } + + Objects.requireNonNull(storageCredentialsToken, "storage credentials token cannot be null"); + + // start refresh token executor only when the access token is first generated + if (isAccessTokenGenerated) { + log.info("starting refresh token task at: {}", OffsetDateTime.now()); + TokenRefresher tokenRefresher = new TokenRefresher(); + executorService.scheduleWithFixedDelay(tokenRefresher, TOKEN_REFRESHER_INITIAL_DELAY, TOKEN_REFRESHER_DELAY, TimeUnit.MINUTES); + } + return storageCredentialsToken; + } + + @NotNull + public String generateSharedAccessSignature(BlobRequestOptions requestOptions, + String key, + EnumSet<SharedAccessBlobPermissions> permissions, + int expirySeconds, + SharedAccessBlobHeaders optionalHeaders) throws DataStoreException, URISyntaxException, StorageException, InvalidKeyException { + SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy(); + Date expiry = Date.from(Instant.now().plusSeconds(expirySeconds)); + policy.setSharedAccessExpiryTime(expiry); + policy.setPermissions(permissions); + + CloudBlockBlob blob = getBlobContainer(requestOptions).getBlockBlobReference(key); + + if (authenticateViaServicePrincipal()) { + return generateUserDelegationKeySignedSas(blob, policy, optionalHeaders, expiry); + } + return generateSas(blob, policy, optionalHeaders); + } + + @NotNull + private String generateUserDelegationKeySignedSas(CloudBlockBlob blob, + SharedAccessBlobPolicy policy, + SharedAccessBlobHeaders optionalHeaders, + Date expiry) throws StorageException { + fillEmptyHeaders(optionalHeaders); + UserDelegationKey userDelegationKey = blob.getServiceClient().getUserDelegationKey(Date.from(Instant.now().minusSeconds(900)), + expiry); + return optionalHeaders == null ? blob.generateUserDelegationSharedAccessSignature(userDelegationKey, policy) : + blob.generateUserDelegationSharedAccessSignature(userDelegationKey, policy, optionalHeaders, null, null); + } + + /* set empty headers as blank string due to a bug in Azure SDK + * Azure SDK considers null headers as 'null' string which corrupts the string to sign and generates an invalid + * sas token + * */ + private void fillEmptyHeaders(SharedAccessBlobHeaders sharedAccessBlobHeaders) { + final String EMPTY_STRING = ""; + Optional.ofNullable(sharedAccessBlobHeaders) + .ifPresent(headers -> { + if (StringUtils.isBlank(headers.getCacheControl())) { + headers.setCacheControl(EMPTY_STRING); + } + if (StringUtils.isBlank(headers.getContentDisposition())) { + headers.setContentDisposition(EMPTY_STRING); + } + if (StringUtils.isBlank(headers.getContentEncoding())) { + headers.setContentEncoding(EMPTY_STRING); + } + if (StringUtils.isBlank(headers.getContentLanguage())) { + headers.setContentLanguage(EMPTY_STRING); + } + if (StringUtils.isBlank(headers.getContentType())) { + headers.setContentType(EMPTY_STRING); + } + }); + } + + @NotNull + private String generateSas(CloudBlockBlob blob, + SharedAccessBlobPolicy policy, + SharedAccessBlobHeaders optionalHeaders) throws InvalidKeyException, StorageException { + return optionalHeaders == null ? blob.generateSharedAccessSignature(policy, null) : + blob.generateSharedAccessSignature(policy, + optionalHeaders, null, null, null, true); + } + + private boolean authenticateViaServicePrincipal() { + return StringUtils.isBlank(azureConnectionString) && + StringUtils.isNoneBlank(accountName, tenantId, clientId, clientSecret); + } + + class TokenRefresher implements Runnable { + @Override + public void run() { + try { + log.debug("Checking for azure access token expiry at: {}", LocalDateTime.now()); + OffsetDateTime tokenExpiryThreshold = OffsetDateTime.now().plusMinutes(5); + if (accessToken.getExpiresAt() != null && accessToken.getExpiresAt().isBefore(tokenExpiryThreshold)) { + log.info("Access token is about to expire (5 minutes or less) at: {}. New access token will be generated", + accessToken.getExpiresAt().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); + AccessToken newToken = clientSecretCredential.getTokenSync(new TokenRequestContext().addScopes(AZURE_DEFAULT_SCOPE)); + log.info("New azure access token generated at: {}", LocalDateTime.now()); + if (newToken == null || StringUtils.isBlank(newToken.getToken())) { + log.error("New access token is null or empty"); + return; + } + // update access token with newly generated token + accessToken = newToken; + storageCredentialsToken.updateToken(accessToken.getToken()); + } + } catch (Exception e) { + log.error("Error while acquiring new access token: ", e); + } + } + } + + @Override + public void close() { + new ExecutorCloser(executorService).close(); Review Comment: How? It its already `org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
