<https://stackoverflow.com/posts/69899953/timeline>

I am working on accessing azure blob storage through flink pipeline.

As per flink documentation
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/
there
are two approaches to implement this.

1)fs.azure.account.key.<account_name>.blob.core.windows.net:
<azure_storage_key> .

I implemented this approach but hardcoding of access keys isnot suggestible
way as per our organization security strategy. So this approach is not
helpful.

2)fs.azure.account.keyprovider.<account_name>.blob.core.windows.net:
org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider

**a)**We are using this approach for saving checkpoint and savepoints of
the running job on azure blob storage say storage1 . Means this approach
(or key value pair combination) has already been in use.

**b)**Now we have a requirement that we want to save our csv/text/xml file
on a differen blob storage say storage2.

For accessing this blob storage account i need to provide access key and
this needs to be accessible via configuration same way that i mentioned in
point a.

For that i created one my application specific class whose internal logic
(except enviroment variable) is same as EnvironmentVariableKeyProvider.

import 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration;import
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProvider;import
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException;


public class MyAppEnvironmentVariableKeyProvider  implements KeyProvider  {

   public static final String AZURE_STORAGE_KEY_ENV_VARIABLE =
"AZURE_STORAGE_KEY_MYAPP";


    @Override
    public String getStorageAccountKey(final String s, final
Configuration configuration)
            throws KeyProviderException {
        // Currently hardcoding
        //String azureStorageKey = "abcdefghijk";
        String azureStorageKey = System.getenv(AZURE_STORAGE_KEY_ENV_VARIABLE);

        if (azureStorageKey != null) {
            return azureStorageKey;
        } else {
            throw new KeyProviderException(
                    "Unable to retrieve Azure storage key from environment. \""
                            + AZURE_STORAGE_KEY_ENV_VARIABLE
                            + "\" not set.");
        }
    }
}

I declared the configuration in *deployment.ym*l as below

flinkConfiguration:

        fs.azure.account.keyprovider.storage1.blob.core.windows.net:
org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider
        fs.azure.account.keyprovider.storage2.blob.core.windows.net:
>- com.myapp.MyAppEnvironmentVariableKeyProvider

        //many other configuraiton exists here but not needed for this
problem statement
      kubernetes:
        pods:
          affinity: null
          annotations:
            prometheus.io/port: '9249'
            prometheus.io/scrape: 'true'
          envVars:
            - name: AZURE_STORAGE_KEY
              valueFrom:
                secretKeyRef:
                  key: azure.accesskey
                  name: my-storage-secret
            - name: AZURE_STORAGE_KEY_MYAPP
              value: >-
                abcdefgh
              valueFrom: null

Now when my application is trying to access this
fs.azure.account.keyprovider.storage2.blob.core.windows.net propery it is
giving me below error.

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException:
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException:
Unable to load key provider class.
        at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1086)
~[?:?]
        at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:538)
~[?:?]
        at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1358)
~[?:?]
        at 
org.apache.flink.fs.azurefs.AbstractAzureFSFactory.createInitializedAzureFS(AbstractAzureFSFactory.java:88)
~[?:?]
        at 
org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79)
~[?:?]
        at 
org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:505)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:406)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
Caused by: 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException:
Unable to load key provider class.

It seems that flink is unable to load user defined class .

Is there any approach to load this user defined
MyAppEnvironmentVariableKeyProvider class.

Thanks & Regards,
Samir Vasani

Reply via email to