This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 883a362c930 Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK (#24212) 883a362c930 is described below commit 883a362c930aca4298551697d7aaacbe7b6602f1 Author: creste <cre...@users.noreply.github.com> AuthorDate: Tue Nov 22 12:23:04 2022 -0500 Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK (#24212) * Added support for using DefaultAzureCredential() with Blob Storage. Tests not working yet due to TLS issue with Azurite. * SubjectAltNames are now included in the SSL cert. * DefaultAzureCredential is no longer used with AZURE_STORAGE_CONNECTION_STRING. * Fixed integration tests. * Can now run azure integration tests in parallel across all python versions. * Change pipeline option names to be consistent with Java SDK. * Fix missing Apache 2.0 licenses. Deleted cert and key files because they can't have a license header. Moved cert generation to azure_integration_test.sh. * Fix path to azure integration tests in tox comment. * Fix path to cert.pem. * Fix linting issues. * Fix path to cert.pem * Fix linter error. * Added azure integration tests to python 3.7 post-commit test suite. * Fix unit tests. * Updated README.md. * Move changes entry to 2.44.0. --- CHANGES.md | 1 + build.gradle.kts | 1 + sdks/python/apache_beam/internal/azure/__init__.py | 18 +++++ sdks/python/apache_beam/internal/azure/auth.py | 83 ++++++++++++++++++++++ .../apache_beam/io/azure/blobstoragefilesystem.py | 28 +++++--- .../io/azure/blobstoragefilesystem_test.py | 24 +++---- sdks/python/apache_beam/io/azure/blobstorageio.py | 17 ++++- .../io/azure/integration_test/Dockerfile | 46 ++++++++++++ .../integration_test/azure_integration_test.sh | 79 ++++++++++++++++++++ .../io/azure/integration_test/docker-compose.yml | 50 +++++++++++++ .../apache_beam/io/azure/integration_test/ssl.conf | 37 ++++++++++ .../python/apache_beam/options/pipeline_options.py | 29 ++++++++ sdks/python/setup.py | 1 + sdks/python/test-suites/direct/common.gradle | 9 +++ sdks/python/tox.ini | 37 ++++++++++ 15 files changed, 434 insertions(+), 26 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 414241d1c5b..2446bee91d1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -61,6 +61,7 @@ * Support for Bigtable sink (Write and WriteBatch) added (Go) ([#23324](https://github.com/apache/beam/issues/23324)). * S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)). * Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)). +* Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([#24210](https://github.com/apache/beam/issues/24210)). ## New Features / Improvements diff --git a/build.gradle.kts b/build.gradle.kts index 2556db7bc9a..fe0956a6ea9 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -340,6 +340,7 @@ tasks.register("python37PostCommit") { dependsOn(":sdks:python:test-suites:direct:py37:postCommitIT") dependsOn(":sdks:python:test-suites:direct:py37:directRunnerIT") dependsOn(":sdks:python:test-suites:direct:py37:hdfsIntegrationTest") + dependsOn(":sdks:python:test-suites:direct:py37:azureIntegrationTest") dependsOn(":sdks:python:test-suites:direct:py37:mongodbioIT") dependsOn(":sdks:python:test-suites:portable:py37:postCommitPy37") dependsOn(":sdks:python:test-suites:dataflow:py37:spannerioIT") diff --git a/sdks/python/apache_beam/internal/azure/__init__.py b/sdks/python/apache_beam/internal/azure/__init__.py new file mode 100644 index 00000000000..0bce5d68f72 --- /dev/null +++ b/sdks/python/apache_beam/internal/azure/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""For internal use only; no backwards-compatibility guarantees.""" diff --git a/sdks/python/apache_beam/internal/azure/auth.py b/sdks/python/apache_beam/internal/azure/auth.py new file mode 100644 index 00000000000..d505cc200f9 --- /dev/null +++ b/sdks/python/apache_beam/internal/azure/auth.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Azure credentials and authentication.""" + +# pytype: skip-file + +import logging +import threading + +from apache_beam.options.pipeline_options import AzureOptions + +try: + from azure.identity import DefaultAzureCredential + _AZURE_AUTH_AVAILABLE = True +except ImportError: + _AZURE_AUTH_AVAILABLE = False + +_LOGGER = logging.getLogger(__name__) + + +def get_service_credentials(pipeline_options): + """For internal use only; no backwards-compatibility guarantees. + + Get credentials to access Azure services. + Args: + pipeline_options: Pipeline options, used in creating credentials + like managed identity credentials. + + Returns: + A ``azure.identity.*Credential`` object or None if credentials + not found. Returned object is thread-safe. + """ + return _Credentials.get_service_credentials(pipeline_options) + + +class _Credentials(object): + _credentials_lock = threading.Lock() + _credentials_init = False + _credentials = None + + @classmethod + def get_service_credentials(cls, pipeline_options): + with cls._credentials_lock: + if cls._credentials_init: + return cls._credentials + cls._credentials = cls._get_service_credentials(pipeline_options) + cls._credentials_init = True + + return cls._credentials + + @staticmethod + def _get_service_credentials(pipeline_options): + if not _AZURE_AUTH_AVAILABLE: + _LOGGER.warning( + 'Unable to find default credentials because the azure.identity ' + 'library is not available. Install the azure.identity library to use ' + 'Azure default credentials.') + return None + + try: + credentials = DefaultAzureCredential( + managed_identity_client_id=pipeline_options.view_as(AzureOptions)\ + .azure_managed_identity_client_id) + _LOGGER.debug('Connecting using Azure Default Credentials.') + return credentials + except Exception as e: + _LOGGER.warning('Unable to find Azure credentials to use: %s\n', e) + return None diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index 7b86d592b28..cc678f9ae0f 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -37,6 +37,10 @@ class BlobStorageFileSystem(FileSystem): CHUNK_SIZE = blobstorageio.MAX_BATCH_OPERATION_SIZE AZURE_FILE_SYSTEM_PREFIX = 'azfs://' + def __init__(self, pipeline_options): + super().__init__(pipeline_options) + self._pipeline_options = pipeline_options + @classmethod def scheme(cls): """URI scheme for the FileSystem @@ -118,12 +122,15 @@ class BlobStorageFileSystem(FileSystem): ``BeamIOError``: if listing fails, but not if no files were found. """ try: - for path, (size, updated) in blobstorageio.BlobStorageIO() \ + for path, (size, updated) in self._blobstorageIO() \ .list_prefix(dir_or_prefix, with_metadata=True).items(): yield FileMetadata(path, size, updated) except Exception as e: # pylint: disable=broad-except raise BeamIOError("List operation failed", {dir_or_prefix: e}) + def _blobstorageIO(self): + return blobstorageio.BlobStorageIO(pipeline_options=self._pipeline_options) + def _path_open( self, path, @@ -134,8 +141,7 @@ class BlobStorageFileSystem(FileSystem): """ compression_type = FileSystem._get_compression_type(path, compression_type) mime_type = CompressionTypes.mime_type(compression_type, mime_type) - raw_file = blobstorageio.BlobStorageIO().open( - path, mode, mime_type=mime_type) + raw_file = self._blobstorageIO().open(path, mode, mime_type=mime_type) if compression_type == CompressionTypes.UNCOMPRESSED: return raw_file return CompressedFile(raw_file, compression_type=compression_type) @@ -190,7 +196,7 @@ class BlobStorageFileSystem(FileSystem): message = 'Unable to copy unequal number of sources and destinations.' raise BeamIOError(message) src_dest_pairs = list(zip(source_file_names, destination_file_names)) - return blobstorageio.BlobStorageIO().copy_paths(src_dest_pairs) + return self._blobstorageIO().copy_paths(src_dest_pairs) def rename(self, source_file_names, destination_file_names): """Rename the files at the source list to the destination list. @@ -207,7 +213,7 @@ class BlobStorageFileSystem(FileSystem): message = 'Unable to rename unequal number of sources and destinations.' raise BeamIOError(message) src_dest_pairs = list(zip(source_file_names, destination_file_names)) - results = blobstorageio.BlobStorageIO().rename_files(src_dest_pairs) + results = self._blobstorageIO().rename_files(src_dest_pairs) # Retrieve exceptions. exceptions = {(src, dest): error for (src, dest, error) in results if error is not None} @@ -223,7 +229,7 @@ class BlobStorageFileSystem(FileSystem): Returns: boolean flag indicating if path exists """ try: - return blobstorageio.BlobStorageIO().exists(path) + return self._blobstorageIO().exists(path) except Exception as e: # pylint: disable=broad-except raise BeamIOError("Exists operation failed", {path: e}) @@ -239,7 +245,7 @@ class BlobStorageFileSystem(FileSystem): ``BeamIOError``: if path doesn't exist. """ try: - return blobstorageio.BlobStorageIO().size(path) + return self._blobstorageIO().size(path) except Exception as e: # pylint: disable=broad-except raise BeamIOError("Size operation failed", {path: e}) @@ -255,7 +261,7 @@ class BlobStorageFileSystem(FileSystem): ``BeamIOError``: if path doesn't exist. """ try: - return blobstorageio.BlobStorageIO().last_updated(path) + return self._blobstorageIO().last_updated(path) except Exception as e: # pylint: disable=broad-except raise BeamIOError("Last updated operation failed", {path: e}) @@ -272,7 +278,7 @@ class BlobStorageFileSystem(FileSystem): ``BeamIOError``: if path isn't a file or doesn't exist. """ try: - return blobstorageio.BlobStorageIO().checksum(path) + return self._blobstorageIO().checksum(path) except Exception as e: # pylint: disable=broad-except raise BeamIOError("Checksum operation failed", {path, e}) @@ -289,7 +295,7 @@ class BlobStorageFileSystem(FileSystem): ``BeamIOError``: if path isn't a file or doesn't exist. """ try: - file_metadata = blobstorageio.BlobStorageIO()._status(path) + file_metadata = self._blobstorageIO()._status(path) return FileMetadata( path, file_metadata['size'], file_metadata['last_updated']) except Exception as e: # pylint: disable=broad-except @@ -305,7 +311,7 @@ class BlobStorageFileSystem(FileSystem): Raises: ``BeamIOError``: if any of the delete operations fail """ - results = blobstorageio.BlobStorageIO().delete_paths(paths) + results = self._blobstorageIO().delete_paths(paths) # Retrieve exceptions. exceptions = { path: error diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py index 241cc72a15d..6c844329867 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py @@ -88,7 +88,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock blobstorageio_mock.exists.return_value = True blobstorageio_mock._status.return_value = { 'size': 1, 'last_updated': 99999.0 @@ -107,7 +107,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock blobstorageio_mock.list_prefix.return_value = { 'azfs://storageaccount/container/file1': (1, 99999.0), 'azfs://storageaccount/container/file2': (2, 88888.0) @@ -128,7 +128,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): blobstorageio_mock = mock.MagicMock() limit = 1 blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock blobstorageio_mock.list_prefix.return_value = { 'azfs://storageaccount/container/file1': (1, 99999.0) } @@ -146,7 +146,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock exception = IOError('Failed') blobstorageio_mock.list_prefix.side_effect = exception @@ -165,7 +165,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock blobstorageio_mock.list_prefix.side_effect = [ { 'azfs://storageaccount/container/file1': (1, 99999.0) @@ -189,7 +189,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock # Issue file copy. _ = self.fs.create( 'azfs://storageaccount/container/file1', 'application/octet-stream') @@ -204,7 +204,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock # Issue file copy. _ = self.fs.open( 'azfs://storageaccount/container/file1', 'application/octet-stream') @@ -219,7 +219,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock sources = [ 'azfs://storageaccount/container/from1', 'azfs://storageaccount/container/from2', @@ -240,7 +240,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock sources = [ 'azfs://storageaccount/container/from1', 'azfs://storageaccount/container/from2', @@ -260,7 +260,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock blobstorageio_mock.size.return_value = 0 files = [ 'azfs://storageaccount/container/from1', @@ -276,7 +276,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock nonexistent_directory = 'azfs://storageaccount/nonexistent-container/tree/' exception = blobstorageio.BlobStorageError('Not found', 404) @@ -307,7 +307,7 @@ class BlobStorageFileSystemTest(unittest.TestCase): # Prepare mocks. blobstorageio_mock = mock.MagicMock() blobstoragefilesystem.blobstorageio.BlobStorageIO = \ - lambda: blobstorageio_mock + lambda pipeline_options: blobstorageio_mock sources = [ 'azfs://storageaccount/container/original_blob1', diff --git a/sdks/python/apache_beam/io/azure/blobstorageio.py b/sdks/python/apache_beam/io/azure/blobstorageio.py index ae0a4944d52..c614ad64ab3 100644 --- a/sdks/python/apache_beam/io/azure/blobstorageio.py +++ b/sdks/python/apache_beam/io/azure/blobstorageio.py @@ -28,10 +28,12 @@ import re import tempfile import time +from apache_beam.internal.azure import auth from apache_beam.io.filesystemio import Downloader from apache_beam.io.filesystemio import DownloaderStream from apache_beam.io.filesystemio import Uploader from apache_beam.io.filesystemio import UploaderStream +from apache_beam.options.pipeline_options import AzureOptions from apache_beam.utils import retry _LOGGER = logging.getLogger(__name__) @@ -105,10 +107,19 @@ class BlobStorageError(Exception): class BlobStorageIO(object): """Azure Blob Storage I/O client.""" - def __init__(self, client=None): - connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING') + def __init__(self, client=None, pipeline_options=None): if client is None: - self.client = BlobServiceClient.from_connection_string(connect_str) + azure_options = pipeline_options.view_as(AzureOptions) + connect_str = azure_options.azure_connection_string or \ + os.getenv('AZURE_STORAGE_CONNECTION_STRING') + if connect_str: + self.client = BlobServiceClient.from_connection_string( + conn_str=connect_str) + else: + credential = auth.get_service_credentials(pipeline_options) + self.client = BlobServiceClient( + account_url=azure_options.blob_service_endpoint, + credential=credential) else: self.client = client if not AZURE_DEPS_INSTALLED: diff --git a/sdks/python/apache_beam/io/azure/integration_test/Dockerfile b/sdks/python/apache_beam/io/azure/integration_test/Dockerfile new file mode 100644 index 00000000000..7ae67fb8500 --- /dev/null +++ b/sdks/python/apache_beam/io/azure/integration_test/Dockerfile @@ -0,0 +1,46 @@ +############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### + +# This image contains a Python SDK build and dependencies. +# By default it runs wordcount against a locally accessible Azurite service. +# See azure_integration_test.sh for example usage. +ARG BASE_IMAGE +FROM $BASE_IMAGE + +# Install Azure CLI. +RUN curl -sL https://aka.ms/InstallAzureCLIDeb | bash + +WORKDIR /app + +# Add Beam SDK sources. +COPY sdks/python /app/sdks/python +COPY model /app/model + +# This step should look like setupVirtualenv minus virtualenv creation. +RUN pip install --no-cache-dir tox==3.11.1 -r sdks/python/build-requirements.txt + +# Add Azurite's self-signed cert to the global CA cert store. +COPY cert.pem /usr/local/share/ca-certificates/azurite.crt +RUN update-ca-certificates + +# Tell python to use the global CA cert store. +# See https://stackoverflow.com/a/66494735 +ENV REQUESTS_CA_BUNDLE /etc/ssl/certs/ca-certificates.crt + +# Run wordcount, and write results to Azurite. +CMD cd sdks/python && tox -e azure_integration_test diff --git a/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh b/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh new file mode 100755 index 00000000000..cfac5421093 --- /dev/null +++ b/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh @@ -0,0 +1,79 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Runs Python Azure integration tests. +# +# Requires docker, docker-compose to be installed. + +# Usage check. +if [[ $# != 1 ]]; then + printf "Usage: \n$> ./apache_beam/io/azure/integration_test/azure_integration_test.sh <python_version>" + printf "\n\tpython_version: [required] Python version used for container build and run tests." + printf " Use 'python:3.8' for Python3.8." + exit 1 +fi + +set -e -u -x + +PY_PREFIX=$(echo "$1" | tr ':.' '_') + +# Setup context directory. +TEST_DIR=$(dirname $0) +ROOT_DIR=$(cd "${TEST_DIR}/../../../../../.." && pwd) +CONTEXT_DIR=${ROOT_DIR}/build/azure_integration_${PY_PREFIX} +rm -rf ${CONTEXT_DIR} || true + +mkdir -p ${CONTEXT_DIR}/sdks +cp -f ${TEST_DIR}/* ${CONTEXT_DIR}/ +cp -r ${ROOT_DIR}/sdks/python ${CONTEXT_DIR}/sdks/ +cp -r ${ROOT_DIR}/model ${CONTEXT_DIR}/ + +# Use a unique name to allow concurrent runs on the same machine. +PROJECT_NAME=$(echo azure_IT-$PY_PREFIX-${BUILD_TAG:-non-jenkins}) + +if [ -z "${BUILD_TAG:-}" ]; then + COLOR_OPT="" +else + COLOR_OPT="--no-ansi" +fi +COMPOSE_OPT="-p ${PROJECT_NAME} ${COLOR_OPT}" + +cd ${CONTEXT_DIR} + +# Generate a self-signed certificate for Azurite. +openssl req -x509 -nodes -days 3650 -newkey rsa:2048 -keyout key.pem \ + -out cert.pem -config ssl.conf -extensions 'v3_req' + +# Clean up leftover unused networks from previous runs. BEAM-4051 +# This might mess with leftover containers that still reference pruned networks, +# so --force-recreate is passed to 'docker up' below. +# https://github.com/docker/compose/issues/5745#issuecomment-370031631 +docker network prune --force + +# BEAM-7405: Create and point to an empty config file to work around "gcloud" +# appearing in ~/.docker/config.json but not being installed. +export DOCKER_CONFIG=. +echo '{}' > config.json + +function finally { + time docker-compose ${COMPOSE_OPT} down +} +trap finally EXIT + +time docker-compose ${COMPOSE_OPT} build --build-arg BASE_IMAGE=$1 +time docker-compose ${COMPOSE_OPT} up --exit-code-from test \ + --abort-on-container-exit --force-recreate diff --git a/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml b/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml new file mode 100644 index 00000000000..963b4a94c56 --- /dev/null +++ b/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '3' + +services: + azurite: + image: mcr.microsoft.com/azure-storage/azurite:3.20.1 + command: + - "azurite-blob" + - "--blobHost" + - "0.0.0.0" + - "--blobPort" + - "10000" + - "--oauth" + - "basic" + - "--cert" + - "/opt/azurite/certs/cert.pem" + - "--key" + - "/opt/azurite/certs/key.pem" + hostname: azurite + networks: + - azure_test_net + volumes: + - ./:/opt/azurite/certs + + # Integration test. + test: + build: . + networks: + - azure_test_net + depends_on: + - "azurite" + +networks: + azure_test_net: \ No newline at end of file diff --git a/sdks/python/apache_beam/io/azure/integration_test/ssl.conf b/sdks/python/apache_beam/io/azure/integration_test/ssl.conf new file mode 100644 index 00000000000..75128200ae8 --- /dev/null +++ b/sdks/python/apache_beam/io/azure/integration_test/ssl.conf @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[ req ] +default_bits = 2048 +distinguished_name = req_distinguished_name +x509_extensions = v3_req +prompt = no + +[ req_distinguished_name ] +countryName = CO +stateOrProvinceName = ST +localityName = LO +organizationName = OU +commonName = CN + +[ v3_req ] +keyUsage = keyEncipherment, dataEncipherment +extendedKeyUsage = serverAuth +subjectAltName = @alt_names + +[ alt_names ] +DNS.1 = 127.0.0.1 +DNS.2 = azurite diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index b51a3c93bb8..02d8b02c18e 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -42,6 +42,7 @@ __all__ = [ 'TypeOptions', 'DirectOptions', 'GoogleCloudOptions', + 'AzureOptions', 'HadoopFileSystemOptions', 'WorkerOptions', 'DebugOptions', @@ -861,6 +862,34 @@ class GoogleCloudOptions(PipelineOptions): return errors +class AzureOptions(PipelineOptions): + """Azure Blob Storage options.""" + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--azure_connection_string', + default=None, + help='Connection string of the Azure Blob Storage Account.') + parser.add_argument( + '--blob_service_endpoint', + default=None, + help='URL of the Azure Blob Storage Account.') + parser.add_argument( + '--azure_managed_identity_client_id', + default=None, + help='Client ID of a user-assigned managed identity.') + + def validate(self, validator): + errors = [] + if self.azure_connection_string: + if self.blob_service_endpoint: + errors.append( + '--azure_connection_string and ' + '--blob_service_endpoint are mutually exclusive.') + + return errors + + class HadoopFileSystemOptions(PipelineOptions): """``HadoopFileSystem`` connection options.""" @classmethod diff --git a/sdks/python/setup.py b/sdks/python/setup.py index ba0c2f38eec..2c30414d931 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -343,6 +343,7 @@ if __name__ == '__main__': 'azure': [ 'azure-storage-blob >=12.3.2', 'azure-core >=1.7.0', + 'azure-identity >=1.12.0', ], #(TODO): Some tests using Pandas implicitly calls inspect.stack() # with python 3.10 leading to incorrect stacktrace. diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle index cd23645db15..c8db6388305 100644 --- a/sdks/python/test-suites/direct/common.gradle +++ b/sdks/python/test-suites/direct/common.gradle @@ -188,6 +188,15 @@ tasks.register("hdfsIntegrationTest") { } } +tasks.register("azureIntegrationTest") { + doLast { + exec { + executable 'sh' + args '-c', "${rootDir}/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh python:${pythonContainerVersion}" + } + } +} + // Pytorch RunInference IT tests task torchInferenceTest { dependsOn 'installGcpTest' diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 33ec39c4189..76d26c271e2 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -183,6 +183,43 @@ commands = commands_pre = pip check +[testenv:azure_integration_test] +# Used by azure/integration_test/azure_integration_test.sh. +# Do not run this directly, as it depends on nodes defined in +# azure/integration_test/docker-compose.yml. +deps = + -r build-requirements.txt +extras = + azure +whitelist_externals = + echo + sleep +passenv = REQUESTS_CA_BUNDLE +setenv = + CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=https://azurite:10000/devstoreaccount1; +commands_pre = + pip check + wget storage.googleapis.com/dataflow-samples/shakespeare/kinglear.txt + # Create container for storing files. + az storage container create -n container --connection-string {env:CONNECTION_STRING} + # Upload test file. + az storage blob upload -f kinglear.txt -c container -n kinglear.txt --connection-string {env:CONNECTION_STRING} +commands = + # Test --azure_connection_string + python -m apache_beam.examples.wordcount \ + --input azfs://devstoreaccount1/container/* \ + --output azfs://devstoreaccount1/container/py-wordcount-integration \ + --azure_connection_string {env:CONNECTION_STRING} + # This doesn't work because there's no way to send a fake bearer token to + # Azurite when using DefaultAzureCredential. + # See https://github.com/Azure/Azurite/issues/389#issuecomment-615298432 + # and https://github.com/Azure/Azurite/issues/1750#issue-1449778593 + #python -m apache_beam.examples.wordcount \ + # --input azfs://devstoreaccount1/container/* \ + # --output azfs://devstoreaccount1/container/py-wordcount-integration \ + # --blob_service_endpoint https://azurite:10000/devstoreaccount1/container-name \ + # --azure_managed_identity_client_id "abc123" + [testenv:py3-yapf] # keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml deps =