<https://stackoverflow.com/posts/69899953/timeline>
I am working on accessing azure blob storage through flink pipeline. As per flink documentation https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/ there are two approaches to implement this. 1)fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key> . I implemented this approach but hardcoding of access keys isnot suggestible way as per our organization security strategy. So this approach is not helpful. 2)fs.azure.account.keyprovider.<account_name>.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider **a)**We are using this approach for saving checkpoint and savepoints of the running job on azure blob storage say storage1 . Means this approach (or key value pair combination) has already been in use. **b)**Now we have a requirement that we want to save our csv/text/xml file on a differen blob storage say storage2. For accessing this blob storage account i need to provide access key and this needs to be accessible via configuration same way that i mentioned in point a. For that i created one my application specific class whose internal logic (except enviroment variable) is same as EnvironmentVariableKeyProvider. import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration;import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProvider;import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException; public class MyAppEnvironmentVariableKeyProvider implements KeyProvider { public static final String AZURE_STORAGE_KEY_ENV_VARIABLE = "AZURE_STORAGE_KEY_MYAPP"; @Override public String getStorageAccountKey(final String s, final Configuration configuration) throws KeyProviderException { // Currently hardcoding //String azureStorageKey = "abcdefghijk"; String azureStorageKey = System.getenv(AZURE_STORAGE_KEY_ENV_VARIABLE); if (azureStorageKey != null) { return azureStorageKey; } else { throw new KeyProviderException( "Unable to retrieve Azure storage key from environment. \"" + AZURE_STORAGE_KEY_ENV_VARIABLE + "\" not set."); } } } I declared the configuration in *deployment.ym*l as below flinkConfiguration: fs.azure.account.keyprovider.storage1.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider fs.azure.account.keyprovider.storage2.blob.core.windows.net: >- com.myapp.MyAppEnvironmentVariableKeyProvider //many other configuraiton exists here but not needed for this problem statement kubernetes: pods: affinity: null annotations: prometheus.io/port: '9249' prometheus.io/scrape: 'true' envVars: - name: AZURE_STORAGE_KEY valueFrom: secretKeyRef: key: azure.accesskey name: my-storage-secret - name: AZURE_STORAGE_KEY_MYAPP value: >- abcdefgh valueFrom: null Now when my application is trying to access this fs.azure.account.keyprovider.storage2.blob.core.windows.net propery it is giving me below error. org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class. at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1086) ~[?:?] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:538) ~[?:?] at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1358) ~[?:?] at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.createInitializedAzureFS(AbstractAzureFSFactory.java:88) ~[?:?] at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79) ~[?:?] at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1] at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:505) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1] at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:406) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1] at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1] Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class. It seems that flink is unable to load user defined class . Is there any approach to load this user defined MyAppEnvironmentVariableKeyProvider class. Thanks & Regards, Samir Vasani