Repository: beam Updated Branches: refs/heads/master 78a4b8e4f -> e2be6961d
http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/tests/data/standard_coders.yaml ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/data/standard_coders.yaml b/sdks/python/apache_beam/tests/data/standard_coders.yaml deleted file mode 100644 index 790cacb..0000000 --- a/sdks/python/apache_beam/tests/data/standard_coders.yaml +++ /dev/null @@ -1,196 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# This file is broken into multiple sections delimited by ---. Each section specifies a set of -# reference encodings for a single standardized coder used in a specific context. -# -# Each section contains up to 3 properties: -# -# coder: a common coder spec. Currently, a URN and URNs for component coders as necessary. -# nested: a boolean meaning whether the coder was used in the nested context. Missing means to -# test both contexts, a shorthand for when the coder is invariant across context. -# examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context. -# The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is -# one of a few standard JSON types such as numbers, strings, dicts that map naturally -# to the type encoded by the coder. -# -# These choices were made to strike a balance between portability, ease of use, and simple -# legibility of this file itself. -# -# It is expected that future work will move the `coder` field into a format that it would be -# represented by the Runner API, so that it can be understood by all SDKs and harnesses. -# -# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated. - - - -coder: - urn: "urn:beam:coders:bytes:0.1" -nested: false -examples: - "abc": abc - "ab\0c": "ab\0c" - ---- - -coder: - urn: "urn:beam:coders:bytes:0.1" -nested: true -examples: - "\u0003abc": abc - "\u0004ab\0c": "ab\0c" - "\u00c8\u0001 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|": - " 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|" - ---- - -coder: - urn: "urn:beam:coders:varint:0.1" -examples: - "\0": 0 - "\u0001": 1 - "\u000A": 10 - "\u00c8\u0001": 200 - "\u00e8\u0007": 1000 - "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1 - ---- - -coder: - urn: "urn:beam:coders:kv:0.1" - components: [{urn: "urn:beam:coders:bytes:0.1"}, - {urn: "urn:beam:coders:varint:0.1"}] -examples: - "\u0003abc\0": {key: abc, value: 0} - "\u0004ab\0c\u000A": {key: "ab\0c", value: 10} - ---- - -coder: - urn: "urn:beam:coders:kv:0.1" - components: [{urn: "urn:beam:coders:bytes:0.1"}, - {urn: "urn:beam:coders:bytes:0.1"}] -nested: false -examples: - "\u0003abcdef": {key: abc, value: def} - "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"} - ---- - -coder: - urn: "urn:beam:coders:kv:0.1" - components: [{urn: "urn:beam:coders:bytes:0.1"}, - {urn: "urn:beam:coders:bytes:0.1"}] -nested: true -examples: - "\u0003abc\u0003def": {key: abc, value: def} - "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"} - ---- - -coder: - urn: "urn:beam:coders:interval_window:0.1" -examples: - "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000} - "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000} - "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365} - "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0} - ---- - -coder: - urn: "urn:beam:coders:stream:0.1" - components: [{urn: "urn:beam:coders:varint:0.1"}] -examples: - "\0\0\0\u0001\0": [0] - "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000] - "\0\0\0\0": [] - ---- - -coder: - urn: "urn:beam:coders:stream:0.1" - components: [{urn: "urn:beam:coders:bytes:0.1"}] -examples: - "\0\0\0\u0001\u0003abc": ["abc"] - "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"] - "\0\0\0\0": [] - ---- - -coder: - urn: "urn:beam:coders:stream:0.1" - components: [{urn: "urn:beam:coders:bytes:0.1"}] - # This is for iterables of unknown length, where the encoding is not - # deterministic. - non_deterministic: True -examples: - "\u00ff\u00ff\u00ff\u00ff\u0000": [] - "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"] - "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"] - ---- - -coder: - urn: "urn:beam:coders:stream:0.1" - components: [{urn: "urn:beam:coders:global_window:0.1"}] -examples: - "\0\0\0\u0001": [""] - ---- - -coder: - urn: "urn:beam:coders:global_window:0.1" -examples: - "": "" - ---- - -# All windowed values consist of pane infos that represent NO_FIRING until full support is added -# in the Python SDK (BEAM-1522). -coder: - urn: "urn:beam:coders:windowed_value:0.1" - components: [{urn: "urn:beam:coders:varint:0.1"}, - {urn: "urn:beam:coders:global_window:0.1"}] -examples: - "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": { - value: 2, - timestamp: 1454293425000, - pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0}, - windows: ["global"] - } - ---- - -coder: - urn: "urn:beam:coders:windowed_value:0.1" - components: [{urn: "urn:beam:coders:varint:0.1"}, - {urn: "urn:beam:coders:interval_window:0.1"}] -examples: - "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": { - value: 4, - timestamp: -400000, - pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0}, - windows: [{end: 1454293425000, span: 280000}] - } - - "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": { - value: 2, - timestamp: -100, - pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0}, - windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}] - } http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py deleted file mode 100644 index df05054..0000000 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ /dev/null @@ -1,146 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""End-to-end test result verifiers - -A set of verifiers that are used in end-to-end tests to verify state/output -of test pipeline job. Customized verifier should extend -`hamcrest.core.base_matcher.BaseMatcher` and override _matches. -""" - -import logging -import time - -from hamcrest.core.base_matcher import BaseMatcher - -from apache_beam.io.filesystems import FileSystems -from apache_beam.runners.runner import PipelineState -from apache_beam.tests import test_utils as utils -from apache_beam.utils import retry - -try: - from apitools.base.py.exceptions import HttpError -except ImportError: - HttpError = None - -MAX_RETRIES = 4 - - -class PipelineStateMatcher(BaseMatcher): - """Matcher that verify pipeline job terminated in expected state - - Matcher compares the actual pipeline terminate state with expected. - By default, `PipelineState.DONE` is used as expected state. - """ - - def __init__(self, expected_state=PipelineState.DONE): - self.expected_state = expected_state - - def _matches(self, pipeline_result): - return pipeline_result.state == self.expected_state - - def describe_to(self, description): - description \ - .append_text("Test pipeline expected terminated in state: ") \ - .append_text(self.expected_state) - - def describe_mismatch(self, pipeline_result, mismatch_description): - mismatch_description \ - .append_text("Test pipeline job terminated in state: ") \ - .append_text(pipeline_result.state) - - -def retry_on_io_error_and_server_error(exception): - """Filter allowing retries on file I/O errors and service error.""" - return isinstance(exception, IOError) or \ - (HttpError is not None and isinstance(exception, HttpError)) - - -class FileChecksumMatcher(BaseMatcher): - """Matcher that verifies file(s) content by comparing file checksum. - - Use apache_beam.io.fileio to fetch file(s) from given path. File checksum - is a hash string computed from content of file(s). - """ - - def __init__(self, file_path, expected_checksum, sleep_secs=None): - """Initialize a FileChecksumMatcher object - - Args: - file_path : A string that is the full path of output file. This path - can contain globs. - expected_checksum : A hash string that is computed from expected - result. - sleep_secs : Number of seconds to wait before verification start. - Extra time are given to make sure output files are ready on FS. - """ - if sleep_secs is not None: - if isinstance(sleep_secs, int): - self.sleep_secs = sleep_secs - else: - raise ValueError('Sleep seconds, if received, must be int. ' - 'But received: %r, %s' % (sleep_secs, - type(sleep_secs))) - else: - self.sleep_secs = None - - self.file_path = file_path - self.expected_checksum = expected_checksum - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry_on_io_error_and_server_error) - def _read_with_retry(self): - """Read path with retry if I/O failed""" - read_lines = [] - match_result = FileSystems.match([self.file_path])[0] - matched_path = [f.path for f in match_result.metadata_list] - if not matched_path: - raise IOError('No such file or directory: %s' % self.file_path) - - logging.info('Find %d files in %s: \n%s', - len(matched_path), self.file_path, '\n'.join(matched_path)) - for path in matched_path: - with FileSystems.open(path, 'r') as f: - for line in f: - read_lines.append(line) - return read_lines - - def _matches(self, _): - if self.sleep_secs: - # Wait to have output file ready on FS - logging.info('Wait %d seconds...', self.sleep_secs) - time.sleep(self.sleep_secs) - - # Read from given file(s) path - read_lines = self._read_with_retry() - - # Compute checksum - self.checksum = utils.compute_hash(read_lines) - logging.info('Read from given path %s, %d lines, checksum: %s.', - self.file_path, len(read_lines), self.checksum) - return self.checksum == self.expected_checksum - - def describe_to(self, description): - description \ - .append_text("Expected checksum is ") \ - .append_text(self.expected_checksum) - - def describe_mismatch(self, pipeline_result, mismatch_description): - mismatch_description \ - .append_text("Actual checksum is ") \ - .append_text(self.checksum) http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/tests/pipeline_verifiers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py deleted file mode 100644 index 909917d..0000000 --- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py +++ /dev/null @@ -1,148 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the test pipeline verifiers""" - -import logging -import tempfile -import unittest - -from hamcrest import assert_that as hc_assert_that -from mock import Mock, patch - -from apache_beam.io.localfilesystem import LocalFileSystem -from apache_beam.runners.runner import PipelineState -from apache_beam.runners.runner import PipelineResult -from apache_beam.tests import pipeline_verifiers as verifiers -from apache_beam.tests.test_utils import patch_retry - -try: - # pylint: disable=wrong-import-order, wrong-import-position - # pylint: disable=ungrouped-imports - from apitools.base.py.exceptions import HttpError - from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem -except ImportError: - HttpError = None - GCSFileSystem = None - - -class PipelineVerifiersTest(unittest.TestCase): - - def setUp(self): - self._mock_result = Mock() - patch_retry(self, verifiers) - - def test_pipeline_state_matcher_success(self): - """Test PipelineStateMatcher successes when using default expected state - and job actually finished in DONE - """ - pipeline_result = PipelineResult(PipelineState.DONE) - hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher()) - - def test_pipeline_state_matcher_given_state(self): - """Test PipelineStateMatcher successes when matches given state""" - pipeline_result = PipelineResult(PipelineState.FAILED) - hc_assert_that(pipeline_result, - verifiers.PipelineStateMatcher(PipelineState.FAILED)) - - def test_pipeline_state_matcher_fails(self): - """Test PipelineStateMatcher fails when using default expected state - and job actually finished in CANCELLED/DRAINED/FAILED/STOPPED/UNKNOWN - """ - failed_state = [PipelineState.CANCELLED, - PipelineState.DRAINED, - PipelineState.FAILED, - PipelineState.STOPPED, - PipelineState.UNKNOWN] - - for state in failed_state: - pipeline_result = PipelineResult(state) - with self.assertRaises(AssertionError): - hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher()) - - test_cases = [ - {'content': 'Test FileChecksumMatcher with single file', - 'num_files': 1, - 'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'}, - {'content': 'Test FileChecksumMatcher with multiple files', - 'num_files': 3, - 'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'}, - {'content': '', - 'num_files': 1, - 'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'}, - ] - - def create_temp_file(self, content, directory=None): - with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f: - f.write(content) - return f.name - - def test_file_checksum_matcher_success(self): - for case in self.test_cases: - temp_dir = tempfile.mkdtemp() - for _ in range(case['num_files']): - self.create_temp_file(case['content'], temp_dir) - matcher = verifiers.FileChecksumMatcher(temp_dir + '/*', - case['expected_checksum']) - hc_assert_that(self._mock_result, matcher) - - @patch.object(LocalFileSystem, 'match') - def test_file_checksum_matcher_read_failed(self, mock_match): - mock_match.side_effect = IOError('No file found.') - matcher = verifiers.FileChecksumMatcher('dummy/path', Mock()) - with self.assertRaises(IOError): - hc_assert_that(self._mock_result, matcher) - self.assertTrue(mock_match.called) - self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) - - @patch.object(GCSFileSystem, 'match') - @unittest.skipIf(HttpError is None, 'google-apitools is not installed') - def test_file_checksum_matcher_service_error(self, mock_match): - mock_match.side_effect = HttpError( - response={'status': '404'}, url='', content='Not Found', - ) - matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock()) - with self.assertRaises(HttpError): - hc_assert_that(self._mock_result, matcher) - self.assertTrue(mock_match.called) - self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) - - def test_file_checksum_matchcer_invalid_sleep_time(self): - with self.assertRaises(ValueError) as cm: - verifiers.FileChecksumMatcher('file_path', - 'expected_checksum', - 'invalid_sleep_time') - self.assertEqual(cm.exception.message, - 'Sleep seconds, if received, must be int. ' - 'But received: \'invalid_sleep_time\', ' - '<type \'str\'>') - - @patch('time.sleep', return_value=None) - def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep): - temp_dir = tempfile.mkdtemp() - case = self.test_cases[0] - self.create_temp_file(case['content'], temp_dir) - matcher = verifiers.FileChecksumMatcher(temp_dir + '/*', - case['expected_checksum'], - 10) - hc_assert_that(self._mock_result, matcher) - self.assertTrue(mocked_sleep.called) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/tests/test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py deleted file mode 100644 index 666207e..0000000 --- a/sdks/python/apache_beam/tests/test_utils.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Utility methods for testing""" - -import hashlib -import imp -from mock import Mock, patch - -from apache_beam.utils import retry - -DEFAULT_HASHING_ALG = 'sha1' - - -def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG): - """Compute a hash value from a list of string.""" - content.sort() - m = hashlib.new(hashing_alg) - for elem in content: - m.update(str(elem)) - return m.hexdigest() - - -def patch_retry(testcase, module): - """A function to patch retry module to use mock clock and logger. - - Clock and logger that defined in retry decorator will be replaced in test - in order to skip sleep phase when retry happens. - - Args: - testcase: An instance of unittest.TestCase that calls this function to - patch retry module. - module: The module that uses retry and need to be replaced with mock - clock and logger in test. - """ - real_retry_with_exponential_backoff = retry.with_exponential_backoff - - def patched_retry_with_exponential_backoff(num_retries, retry_filter): - """A patch for retry decorator to use a mock dummy clock and logger.""" - return real_retry_with_exponential_backoff( - num_retries=num_retries, retry_filter=retry_filter, logger=Mock(), - clock=Mock()) - - patch.object(retry, 'with_exponential_backoff', - side_effect=patched_retry_with_exponential_backoff).start() - - # Reload module after patching. - imp.reload(module) - - def remove_patches(): - patch.stopall() - # Reload module again after removing patch. - imp.reload(module) - - testcase.addCleanup(remove_patches) http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index af76889..1822c19 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -22,7 +22,7 @@ import unittest import hamcrest as hc import apache_beam as beam -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline import apache_beam.transforms.combiners as combine from apache_beam.transforms.core import CombineGlobally from apache_beam.transforms.core import Create http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/create_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py index 2352acd..9ede4c7 100644 --- a/sdks/python/apache_beam/transforms/create_test.py +++ b/sdks/python/apache_beam/transforms/create_test.py @@ -22,7 +22,7 @@ from apache_beam.io import source_test_utils from apache_beam import Create, assert_that, equal_to from apache_beam.coders import FastPrimitivesCoder -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline class CreateTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 5948460..137992d 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -32,7 +32,7 @@ from apache_beam.metrics.metric import MetricsFilter from apache_beam.io.iobase import Read from apache_beam.options.pipeline_options import TypeOptions import apache_beam.pvalue as pvalue -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms import window import apache_beam.transforms.combiners as combine from apache_beam.transforms.display import DisplayData, DisplayDataItem http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index bf9aeff..0bc9107 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -23,7 +23,7 @@ import unittest from nose.plugins.attrib import attr import apache_beam as beam -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms import window from apache_beam.transforms.util import assert_that, equal_to http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index c08a791..da7f43b 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -26,7 +26,7 @@ import yaml import apache_beam as beam from apache_beam.runners import pipeline_context -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms import trigger from apache_beam.transforms.core import Windowing from apache_beam.transforms.trigger import AccumulationMode http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/util_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 9656827..7fdef70 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -20,7 +20,7 @@ import unittest from apache_beam import Create -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that, equal_to, is_empty http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 2d2b03d..a7797dd 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -20,7 +20,7 @@ import unittest from apache_beam.runners import pipeline_context -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms import CombinePerKey from apache_beam.transforms import combiners from apache_beam.transforms import core http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/write_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index 3d7fbd9..27e7caa 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -22,7 +22,7 @@ import unittest import apache_beam as beam from apache_beam.io import iobase -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.util import assert_that, is_empty http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 3812fb1..3494cfe 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -23,10 +23,10 @@ import unittest import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints -from apache_beam.test_pipeline import TestPipeline +from apache_beam.options.pipeline_options import OptionsContext +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that, equal_to from apache_beam.typehints import WithTypeHints -from apache_beam.options.pipeline_options import OptionsContext # These test often construct a pipeline as value | PTransform to test side # effects (e.g. errors). http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/utils/test_stream.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/test_stream.py b/sdks/python/apache_beam/utils/test_stream.py deleted file mode 100644 index 7ae27b7..0000000 --- a/sdks/python/apache_beam/utils/test_stream.py +++ /dev/null @@ -1,163 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Provides TestStream for verifying streaming runner semantics.""" - -from abc import ABCMeta -from abc import abstractmethod - -from apache_beam import coders -from apache_beam import pvalue -from apache_beam.transforms import PTransform -from apache_beam.transforms.window import TimestampedValue -from apache_beam.utils import timestamp -from apache_beam.utils.windowed_value import WindowedValue - - -class Event(object): - """Test stream event to be emitted during execution of a TestStream.""" - - __metaclass__ = ABCMeta - - def __cmp__(self, other): - if type(self) is not type(other): - return cmp(type(self), type(other)) - return self._typed_cmp(other) - - @abstractmethod - def _typed_cmp(self, other): - raise NotImplementedError - - -class ElementEvent(Event): - """Element-producing test stream event.""" - - def __init__(self, timestamped_values): - self.timestamped_values = timestamped_values - - def _typed_cmp(self, other): - return cmp(self.timestamped_values, other.timestamped_values) - - -class WatermarkEvent(Event): - """Watermark-advancing test stream event.""" - - def __init__(self, new_watermark): - self.new_watermark = timestamp.Timestamp.of(new_watermark) - - def _typed_cmp(self, other): - return cmp(self.new_watermark, other.new_watermark) - - -class ProcessingTimeEvent(Event): - """Processing time-advancing test stream event.""" - - def __init__(self, advance_by): - self.advance_by = timestamp.Duration.of(advance_by) - - def _typed_cmp(self, other): - return cmp(self.advance_by, other.advance_by) - - -class TestStream(PTransform): - """Test stream that generates events on an unbounded PCollection of elements. - - Each event emits elements, advances the watermark or advances the processing - time. After all of the specified elements are emitted, ceases to produce - output. - """ - - def __init__(self, coder=coders.FastPrimitivesCoder): - assert coder is not None - self.coder = coder - self.current_watermark = timestamp.MIN_TIMESTAMP - self.events = [] - - def expand(self, pbegin): - assert isinstance(pbegin, pvalue.PBegin) - self.pipeline = pbegin.pipeline - return pvalue.PCollection(self.pipeline) - - def _infer_output_coder(self, input_type=None, input_coder=None): - return self.coder - - def _add(self, event): - if isinstance(event, ElementEvent): - for tv in event.timestamped_values: - assert tv.timestamp < timestamp.MAX_TIMESTAMP, ( - 'Element timestamp must be before timestamp.MAX_TIMESTAMP.') - elif isinstance(event, WatermarkEvent): - assert event.new_watermark > self.current_watermark, ( - 'Watermark must strictly-monotonically advance.') - self.current_watermark = event.new_watermark - elif isinstance(event, ProcessingTimeEvent): - assert event.advance_by > 0, ( - 'Must advance processing time by positive amount.') - else: - raise ValueError('Unknown event: %s' % event) - self.events.append(event) - - def add_elements(self, elements): - """Add elements to the TestStream. - - Elements added to the TestStream will be produced during pipeline execution. - These elements can be TimestampedValue, WindowedValue or raw unwrapped - elements that are serializable using the TestStream's specified Coder. When - a TimestampedValue or a WindowedValue element is used, the timestamp of the - TimestampedValue or WindowedValue will be the timestamp of the produced - element; otherwise, the current watermark timestamp will be used for that - element. The windows of a given WindowedValue are ignored by the - TestStream. - """ - timestamped_values = [] - for element in elements: - if isinstance(element, TimestampedValue): - timestamped_values.append(element) - elif isinstance(element, WindowedValue): - # Drop windows for elements in test stream. - timestamped_values.append( - TimestampedValue(element.value, element.timestamp)) - else: - # Add elements with timestamp equal to current watermark. - timestamped_values.append( - TimestampedValue(element, self.current_watermark)) - self._add(ElementEvent(timestamped_values)) - return self - - def advance_watermark_to(self, new_watermark): - """Advance the watermark to a given Unix timestamp. - - The Unix timestamp value used must be later than the previous watermark - value and should be given as an int, float or utils.timestamp.Timestamp - object. - """ - self._add(WatermarkEvent(new_watermark)) - return self - - def advance_watermark_to_infinity(self): - """Advance the watermark to the end of time.""" - self.advance_watermark_to(timestamp.MAX_TIMESTAMP) - return self - - def advance_processing_time(self, advance_by): - """Advance the current processing time by a given duration in seconds. - - The duration must be a positive second duration and should be given as an - int, float or utils.timestamp.Duration object. - """ - self._add(ProcessingTimeEvent(advance_by)) - return self http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/utils/test_stream_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/test_stream_test.py b/sdks/python/apache_beam/utils/test_stream_test.py deleted file mode 100644 index b5b5c69..0000000 --- a/sdks/python/apache_beam/utils/test_stream_test.py +++ /dev/null @@ -1,83 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the test_stream module.""" - -import unittest - -from apache_beam.transforms.window import TimestampedValue -from apache_beam.utils import timestamp -from apache_beam.utils.test_stream import ElementEvent -from apache_beam.utils.test_stream import ProcessingTimeEvent -from apache_beam.utils.test_stream import TestStream -from apache_beam.utils.test_stream import WatermarkEvent -from apache_beam.utils.windowed_value import WindowedValue - - -class TestStreamTest(unittest.TestCase): - - def test_basic_test_stream(self): - test_stream = (TestStream() - .advance_watermark_to(0) - .add_elements([ - 'a', - WindowedValue('b', 3, []), - TimestampedValue('c', 6)]) - .advance_processing_time(10) - .advance_watermark_to(8) - .add_elements(['d']) - .advance_watermark_to_infinity()) - self.assertEqual( - test_stream.events, - [ - WatermarkEvent(0), - ElementEvent([ - TimestampedValue('a', 0), - TimestampedValue('b', 3), - TimestampedValue('c', 6), - ]), - ProcessingTimeEvent(10), - WatermarkEvent(8), - ElementEvent([ - TimestampedValue('d', 8), - ]), - WatermarkEvent(timestamp.MAX_TIMESTAMP), - ] - ) - - def test_test_stream_errors(self): - with self.assertRaises(AssertionError, msg=( - 'Watermark must strictly-monotonically advance.')): - _ = (TestStream() - .advance_watermark_to(5) - .advance_watermark_to(4)) - - with self.assertRaises(AssertionError, msg=( - 'Must advance processing time by positive amount.')): - _ = (TestStream() - .advance_processing_time(-1)) - - with self.assertRaises(AssertionError, msg=( - 'Element timestamp must be before timestamp.MAX_TIMESTAMP.')): - _ = (TestStream() - .add_elements([ - TimestampedValue('a', timestamp.MAX_TIMESTAMP) - ])) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 681abbf..9bf3cf4 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -122,7 +122,7 @@ setuptools.setup( author_email=PACKAGE_EMAIL, packages=setuptools.find_packages(), package_data={'apache_beam': [ - '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', 'tests/data/*']}, + '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', 'testing/data/*']}, ext_modules=cythonize([ 'apache_beam/**/*.pyx', 'apache_beam/coders/coder_impl.py',