[BEAM-2236] Cherry pick #3017 - Move test utilities out of python core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ecebd22 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ecebd22 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ecebd22 Branch: refs/heads/release-2.0.0 Commit: 2ecebd22e3071d0ba6d3647273367d16582ca852 Parents: aa8c9d1 Author: Mark Liu <mark...@google.com> Authored: Tue May 9 16:41:46 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Tue May 9 20:08:55 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/coders/standard_coders_test.py | 2 +- .../examples/complete/autocomplete_test.py | 2 +- .../examples/complete/estimate_pi_test.py | 2 +- .../complete/game/hourly_team_score_test.py | 2 +- .../examples/complete/game/user_score_test.py | 2 +- .../apache_beam/examples/complete/tfidf_test.py | 2 +- .../complete/top_wikipedia_sessions_test.py | 2 +- .../cookbook/bigquery_side_input_test.py | 2 +- .../cookbook/bigquery_tornadoes_it_test.py | 4 +- .../cookbook/bigquery_tornadoes_test.py | 2 +- .../examples/cookbook/coders_test.py | 2 +- .../examples/cookbook/combiners_test.py | 2 +- .../examples/cookbook/custom_ptransform_test.py | 2 +- .../examples/cookbook/filters_test.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 2 +- .../examples/snippets/snippets_test.py | 2 +- .../apache_beam/examples/wordcount_it_test.py | 6 +- sdks/python/apache_beam/io/avroio_test.py | 2 +- .../python/apache_beam/io/concat_source_test.py | 2 +- .../apache_beam/io/filebasedsource_test.py | 6 +- sdks/python/apache_beam/io/fileio_test.py | 2 +- .../io/gcp/datastore/v1/helper_test.py | 2 +- .../io/gcp/tests/bigquery_matcher.py | 2 +- .../io/gcp/tests/bigquery_matcher_test.py | 2 +- sdks/python/apache_beam/io/sources_test.py | 2 +- sdks/python/apache_beam/io/textio_test.py | 2 +- sdks/python/apache_beam/io/tfrecordio_test.py | 2 +- sdks/python/apache_beam/pipeline_test.py | 2 +- sdks/python/apache_beam/pvalue_test.py | 2 +- .../runners/dataflow/dataflow_runner_test.py | 4 +- sdks/python/apache_beam/test_pipeline.py | 163 --------------- sdks/python/apache_beam/test_pipeline_test.py | 112 ----------- sdks/python/apache_beam/testing/__init__.py | 16 ++ .../apache_beam/testing/data/privatekey.p12 | Bin 0 -> 2452 bytes .../testing/data/standard_coders.yaml | 196 +++++++++++++++++++ .../apache_beam/testing/pipeline_verifiers.py | 146 ++++++++++++++ .../testing/pipeline_verifiers_test.py | 148 ++++++++++++++ .../python/apache_beam/testing/test_pipeline.py | 163 +++++++++++++++ .../apache_beam/testing/test_pipeline_test.py | 112 +++++++++++ sdks/python/apache_beam/testing/test_stream.py | 163 +++++++++++++++ .../apache_beam/testing/test_stream_test.py | 83 ++++++++ sdks/python/apache_beam/testing/test_utils.py | 69 +++++++ sdks/python/apache_beam/tests/__init__.py | 16 -- .../apache_beam/tests/data/privatekey.p12 | Bin 2452 -> 0 bytes .../apache_beam/tests/data/standard_coders.yaml | 196 ------------------- .../apache_beam/tests/pipeline_verifiers.py | 146 -------------- .../tests/pipeline_verifiers_test.py | 148 -------------- sdks/python/apache_beam/tests/test_utils.py | 69 ------- .../apache_beam/transforms/combiners_test.py | 2 +- .../apache_beam/transforms/create_test.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 2 +- .../apache_beam/transforms/sideinputs_test.py | 2 +- .../apache_beam/transforms/trigger_test.py | 2 +- sdks/python/apache_beam/transforms/util_test.py | 2 +- .../apache_beam/transforms/window_test.py | 2 +- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py | 4 +- sdks/python/apache_beam/utils/test_stream.py | 163 --------------- .../apache_beam/utils/test_stream_test.py | 83 -------- sdks/python/setup.py | 2 +- 60 files changed, 1143 insertions(+), 1143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/coders/standard_coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index 4a48ed9..885e88f 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -34,7 +34,7 @@ from apache_beam.transforms.window import IntervalWindow from apache_beam.transforms import window STANDARD_CODERS_YAML = os.path.join( - os.path.dirname(__file__), '..', 'tests', 'data', 'standard_coders.yaml') + os.path.dirname(__file__), '..', 'testing', 'data', 'standard_coders.yaml') def _load_test_cases(test_yaml): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/autocomplete_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index d59d0f5..438633a 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -21,7 +21,7 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import autocomplete -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/estimate_pi_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index dc5b901..12d8379 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -21,7 +21,7 @@ import logging import unittest from apache_beam.examples.complete import estimate_pi -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import BeamAssertException http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py index 0eaa8c6..bd0abca 100644 --- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py @@ -21,8 +21,8 @@ import logging import unittest import apache_beam as beam -from apache_beam.test_pipeline import TestPipeline from apache_beam.examples.complete.game import hourly_team_score +from apache_beam.testing.test_pipeline import TestPipeline class HourlyTeamScoreTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/game/user_score_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py index 750729d..2db53bd 100644 --- a/sdks/python/apache_beam/examples/complete/game/user_score_test.py +++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py @@ -21,8 +21,8 @@ import logging import unittest import apache_beam as beam -from apache_beam.test_pipeline import TestPipeline from apache_beam.examples.complete.game import user_score +from apache_beam.testing.test_pipeline import TestPipeline class UserScoreTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index 05e53a4..0e30254 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -25,7 +25,7 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import tfidf -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline EXPECTED_RESULTS = set([ http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py index 9b9d9b1..4850c04 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py @@ -23,7 +23,7 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import top_wikipedia_sessions -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline class ComputeTopSessionsTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 5869976..1ca25c9 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -22,7 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import bigquery_side_input -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline class BigQuerySideInputTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py index 3e302d1..5d2ee7c 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py @@ -26,8 +26,8 @@ from nose.plugins.attrib import attr from apache_beam.examples.cookbook import bigquery_tornadoes from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher -from apache_beam.test_pipeline import TestPipeline -from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline class BigqueryTornadoesIT(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py index 0c66d7e..ca7ca9e 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py @@ -22,7 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import bigquery_tornadoes -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline class BigQueryTornadoesTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py index 4a92abb..35cf252 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders_test.py +++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py @@ -22,7 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import coders -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py index a8ed555..45c779f 100644 --- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py +++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py @@ -27,7 +27,7 @@ import logging import unittest import apache_beam as beam -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline class CombinersTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py index cd1c04a..2d35d8d 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py @@ -22,7 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import custom_ptransform -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/filters_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py index 28bb1e1..44a352f 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters_test.py +++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py @@ -22,7 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import filters -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline class FiltersTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 26af71d..1bdb9a3 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -31,8 +31,8 @@ string. The tags can contain only letters, digits and _. """ import apache_beam as beam -from apache_beam.test_pipeline import TestPipeline from apache_beam.metrics import Metrics +from apache_beam.testing.test_pipeline import TestPipeline # Quiet some pylint warnings that happen because of the somewhat special # format for the code snippets. http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index da0a962..85d8bde 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -36,7 +36,7 @@ from apache_beam.examples.snippets import snippets from apache_beam.utils.windowed_value import WindowedValue # pylint: disable=expression-not-assigned -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/wordcount_it_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index 54e54e8..4bee127 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -25,9 +25,9 @@ from hamcrest.core.core.allof import all_of from nose.plugins.attrib import attr from apache_beam.examples import wordcount -from apache_beam.test_pipeline import TestPipeline -from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher -from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher +from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline class WordCountIT(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/avroio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 5f2db62..4a21839 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -26,7 +26,7 @@ from apache_beam.io import iobase from apache_beam.io import avroio from apache_beam.io import filebasedsource from apache_beam.io import source_test_utils -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.util import assert_that http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/concat_source_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py index 807c3fd..a02f9ad 100644 --- a/sdks/python/apache_beam/io/concat_source_test.py +++ b/sdks/python/apache_beam/io/concat_source_test.py @@ -26,7 +26,7 @@ from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.io import source_test_utils from apache_beam.io.concat_source import ConcatSource -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 4ff23fc..e17a004 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -38,13 +38,13 @@ from apache_beam.io.concat_source import ConcatSource from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource from apache_beam.io.filebasedsource import FileBasedSource -from apache_beam.test_pipeline import TestPipeline +from apache_beam.options.value_provider import StaticValueProvider +from apache_beam.options.value_provider import RuntimeValueProvider +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to -from apache_beam.options.value_provider import StaticValueProvider -from apache_beam.options.value_provider import RuntimeValueProvider class LineSource(FileBasedSource): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index e0e9774..4c25505 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -31,7 +31,7 @@ import mock import apache_beam as beam from apache_beam import coders from apache_beam.io import fileio -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py index 5d4bb6f..a804c09 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py @@ -23,7 +23,7 @@ from mock import MagicMock from apache_beam.io.gcp.datastore.v1 import fake_datastore from apache_beam.io.gcp.datastore.v1 import helper -from apache_beam.tests.test_utils import patch_retry +from apache_beam.testing.test_utils import patch_retry # Protect against environments where apitools library is not available. http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py index 66d99b3..f42b70f 100644 --- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py @@ -21,7 +21,7 @@ import logging from hamcrest.core.base_matcher import BaseMatcher -from apache_beam.tests.test_utils import compute_hash +from apache_beam.testing.test_utils import compute_hash from apache_beam.utils import retry # Protect against environments where bigquery library is not available. http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py index d8aa148..f12293e 100644 --- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py @@ -24,7 +24,7 @@ from hamcrest import assert_that as hc_assert_that from mock import Mock, patch from apache_beam.io.gcp.tests import bigquery_matcher as bq_verifier -from apache_beam.tests.test_utils import patch_retry +from apache_beam.testing.test_utils import patch_retry # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/sources_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py index 3f92756..c0b8ad6 100644 --- a/sdks/python/apache_beam/io/sources_test.py +++ b/sdks/python/apache_beam/io/sources_test.py @@ -27,7 +27,7 @@ import apache_beam as beam from apache_beam import coders from apache_beam.io import iobase from apache_beam.io import range_trackers -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 90dc665..d00afef 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -43,7 +43,7 @@ from apache_beam.io.filebasedsource_test import write_data from apache_beam.io.filebasedsource_test import write_pattern from apache_beam.io.filesystem import CompressionTypes -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/tfrecordio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py index 29a9fb8..b7e370d 100644 --- a/sdks/python/apache_beam/io/tfrecordio_test.py +++ b/sdks/python/apache_beam/io/tfrecordio_test.py @@ -35,7 +35,7 @@ from apache_beam.io.tfrecordio import _TFRecordSource from apache_beam.io.tfrecordio import _TFRecordUtil from apache_beam.io.tfrecordio import ReadFromTFRecord from apache_beam.io.tfrecordio import WriteToTFRecord -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline import crcmod http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index ebcc43b..c6b1e48 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -32,7 +32,7 @@ from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor from apache_beam.pvalue import AsSingleton from apache_beam.runners.dataflow.native_io.iobase import NativeSource -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms import CombineGlobally from apache_beam.transforms import Create from apache_beam.transforms import FlatMap http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/pvalue_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py index 529ddf7..4acbc52 100644 --- a/sdks/python/apache_beam/pvalue_test.py +++ b/sdks/python/apache_beam/pvalue_test.py @@ -20,7 +20,7 @@ import unittest from apache_beam.pvalue import PValue -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline class PValueTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index a9f61a7..b61a683 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -26,6 +26,7 @@ import mock import apache_beam as beam import apache_beam.transforms as ptransform +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import Pipeline, AppliedPTransform from apache_beam.pvalue import PCollection from apache_beam.runners import create_runner @@ -34,10 +35,9 @@ from apache_beam.runners import TestDataflowRunner from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api -from apache_beam.test_pipeline import TestPipeline +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayDataItem from apache_beam.typehints import typehints -from apache_beam.options.pipeline_options import PipelineOptions # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/test_pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py deleted file mode 100644 index 20f4839..0000000 --- a/sdks/python/apache_beam/test_pipeline.py +++ /dev/null @@ -1,163 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Test Pipeline, a wrapper of Pipeline for test purpose""" - -import argparse -import shlex - -from apache_beam.internal import pickler -from apache_beam.pipeline import Pipeline -from apache_beam.runners.runner import PipelineState -from apache_beam.options.pipeline_options import PipelineOptions -from nose.plugins.skip import SkipTest - - -class TestPipeline(Pipeline): - """TestPipeline class is used inside of Beam tests that can be configured to - run against pipeline runner. - - It has a functionality to parse arguments from command line and build pipeline - options for tests who runs against a pipeline runner and utilizes resources - of the pipeline runner. Those test functions are recommended to be tagged by - @attr("ValidatesRunner") annotation. - - In order to configure the test with customized pipeline options from command - line, system argument 'test-pipeline-options' can be used to obtains a list - of pipeline options. If no options specified, default value will be used. - - For example, use following command line to execute all ValidatesRunner tests:: - - python setup.py nosetests -a ValidatesRunner \ - --test-pipeline-options="--runner=DirectRunner \ - --job_name=myJobName \ - --num_workers=1" - - For example, use assert_that for test validation:: - - pipeline = TestPipeline() - pcoll = ... - assert_that(pcoll, equal_to(...)) - pipeline.run() - """ - - def __init__(self, - runner=None, - options=None, - argv=None, - is_integration_test=False, - blocking=True): - """Initialize a pipeline object for test. - - Args: - runner: An object of type 'PipelineRunner' that will be used to execute - the pipeline. For registered runners, the runner name can be specified, - otherwise a runner object must be supplied. - options: A configured 'PipelineOptions' object containing arguments - that should be used for running the pipeline job. - argv: A list of arguments (such as sys.argv) to be used for building a - 'PipelineOptions' object. This will only be used if argument 'options' - is None. - is_integration_test: True if the test is an integration test, False - otherwise. - blocking: Run method will wait until pipeline execution is completed. - - Raises: - ValueError: if either the runner or options argument is not of the - expected type. - """ - self.is_integration_test = is_integration_test - self.options_list = self._parse_test_option_args(argv) - self.blocking = blocking - if options is None: - options = PipelineOptions(self.options_list) - super(TestPipeline, self).__init__(runner, options) - - def run(self): - result = super(TestPipeline, self).run() - if self.blocking: - state = result.wait_until_finish() - assert state == PipelineState.DONE, "Pipeline execution failed." - - return result - - def _parse_test_option_args(self, argv): - """Parse value of command line argument: --test-pipeline-options to get - pipeline options. - - Args: - argv: An iterable of command line arguments to be used. If not specified - then sys.argv will be used as input for parsing arguments. - - Returns: - An argument list of options that can be parsed by argparser or directly - build a pipeline option. - """ - parser = argparse.ArgumentParser() - parser.add_argument('--test-pipeline-options', - type=str, - action='store', - help='only run tests providing service options') - known, unused_argv = parser.parse_known_args(argv) - - if self.is_integration_test and not known.test_pipeline_options: - # Skip integration test when argument '--test-pipeline-options' is not - # specified since nose calls integration tests when runs unit test by - # 'setup.py test'. - raise SkipTest('IT is skipped because --test-pipeline-options ' - 'is not specified') - - return shlex.split(known.test_pipeline_options) \ - if known.test_pipeline_options else [] - - def get_full_options_as_args(self, **extra_opts): - """Get full pipeline options as an argument list. - - Append extra pipeline options to existing option list if provided. - Test verifier (if contains in extra options) should be pickled before - appending, and will be unpickled later in the TestRunner. - """ - options = list(self.options_list) - for k, v in extra_opts.items(): - if not v: - continue - elif isinstance(v, bool) and v: - options.append('--%s' % k) - elif 'matcher' in k: - options.append('--%s=%s' % (k, pickler.dumps(v))) - else: - options.append('--%s=%s' % (k, v)) - return options - - def get_option(self, opt_name): - """Get a pipeline option value by name - - Args: - opt_name: The name of the pipeline option. - - Returns: - None if option is not found in existing option list which is generated - by parsing value of argument `test-pipeline-options`. - """ - parser = argparse.ArgumentParser() - opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name - # Option name should start with '--' when it's used for parsing. - parser.add_argument('--' + opt_name, - type=str, - action='store') - known, _ = parser.parse_known_args(self.options_list) - return getattr(known, opt_name) if hasattr(known, opt_name) else None http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/test_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py deleted file mode 100644 index 325cab7..0000000 --- a/sdks/python/apache_beam/test_pipeline_test.py +++ /dev/null @@ -1,112 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit test for the TestPipeline class""" - -import logging -import unittest - -from hamcrest.core.base_matcher import BaseMatcher -from hamcrest.core.assert_that import assert_that as hc_assert_that - -from apache_beam.internal import pickler -from apache_beam.test_pipeline import TestPipeline -from apache_beam.options.pipeline_options import PipelineOptions - - -# A simple matcher that is ued for testing extra options appending. -class SimpleMatcher(BaseMatcher): - def _matches(self, item): - return True - - -class TestPipelineTest(unittest.TestCase): - - TEST_CASE = {'options': - ['--test-pipeline-options', '--job=mockJob --male --age=1'], - 'expected_list': ['--job=mockJob', '--male', '--age=1'], - 'expected_dict': {'job': 'mockJob', - 'male': True, - 'age': 1}} - - # Used for testing pipeline option creation. - class TestParsingOptions(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument('--job', action='store', help='mock job') - parser.add_argument('--male', action='store_true', help='mock gender') - parser.add_argument('--age', action='store', type=int, help='mock age') - - def test_option_args_parsing(self): - test_pipeline = TestPipeline(argv=self.TEST_CASE['options']) - self.assertListEqual( - sorted(test_pipeline.get_full_options_as_args()), - sorted(self.TEST_CASE['expected_list'])) - - def test_empty_option_args_parsing(self): - test_pipeline = TestPipeline() - self.assertListEqual([], - test_pipeline.get_full_options_as_args()) - - def test_create_test_pipeline_options(self): - test_pipeline = TestPipeline(argv=self.TEST_CASE['options']) - test_options = PipelineOptions(test_pipeline.get_full_options_as_args()) - self.assertDictContainsSubset(self.TEST_CASE['expected_dict'], - test_options.get_all_options()) - - EXTRA_OPT_CASES = [ - {'options': {'name': 'Mark'}, - 'expected': ['--name=Mark']}, - {'options': {'student': True}, - 'expected': ['--student']}, - {'options': {'student': False}, - 'expected': []}, - {'options': {'name': 'Mark', 'student': True}, - 'expected': ['--name=Mark', '--student']} - ] - - def test_append_extra_options(self): - test_pipeline = TestPipeline() - for case in self.EXTRA_OPT_CASES: - opt_list = test_pipeline.get_full_options_as_args(**case['options']) - self.assertListEqual(sorted(opt_list), sorted(case['expected'])) - - def test_append_verifier_in_extra_opt(self): - extra_opt = {'matcher': SimpleMatcher()} - opt_list = TestPipeline().get_full_options_as_args(**extra_opt) - _, value = opt_list[0].split('=', 1) - matcher = pickler.loads(value) - self.assertTrue(isinstance(matcher, BaseMatcher)) - hc_assert_that(None, matcher) - - def test_get_option(self): - name, value = ('job', 'mockJob') - test_pipeline = TestPipeline() - test_pipeline.options_list = ['--%s=%s' % (name, value)] - self.assertEqual(test_pipeline.get_option(name), value) - - def test_skip_IT(self): - test_pipeline = TestPipeline(is_integration_test=True) - test_pipeline.run() - # Note that this will never be reached since it should be skipped above. - self.fail() - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/__init__.py b/sdks/python/apache_beam/testing/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/testing/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/data/privatekey.p12 ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/data/privatekey.p12 b/sdks/python/apache_beam/testing/data/privatekey.p12 new file mode 100644 index 0000000..c369ecb Binary files /dev/null and b/sdks/python/apache_beam/testing/data/privatekey.p12 differ http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/data/standard_coders.yaml ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/data/standard_coders.yaml b/sdks/python/apache_beam/testing/data/standard_coders.yaml new file mode 100644 index 0000000..790cacb --- /dev/null +++ b/sdks/python/apache_beam/testing/data/standard_coders.yaml @@ -0,0 +1,196 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file is broken into multiple sections delimited by ---. Each section specifies a set of +# reference encodings for a single standardized coder used in a specific context. +# +# Each section contains up to 3 properties: +# +# coder: a common coder spec. Currently, a URN and URNs for component coders as necessary. +# nested: a boolean meaning whether the coder was used in the nested context. Missing means to +# test both contexts, a shorthand for when the coder is invariant across context. +# examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context. +# The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is +# one of a few standard JSON types such as numbers, strings, dicts that map naturally +# to the type encoded by the coder. +# +# These choices were made to strike a balance between portability, ease of use, and simple +# legibility of this file itself. +# +# It is expected that future work will move the `coder` field into a format that it would be +# represented by the Runner API, so that it can be understood by all SDKs and harnesses. +# +# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated. + + + +coder: + urn: "urn:beam:coders:bytes:0.1" +nested: false +examples: + "abc": abc + "ab\0c": "ab\0c" + +--- + +coder: + urn: "urn:beam:coders:bytes:0.1" +nested: true +examples: + "\u0003abc": abc + "\u0004ab\0c": "ab\0c" + "\u00c8\u0001 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|": + " 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|" + +--- + +coder: + urn: "urn:beam:coders:varint:0.1" +examples: + "\0": 0 + "\u0001": 1 + "\u000A": 10 + "\u00c8\u0001": 200 + "\u00e8\u0007": 1000 + "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1 + +--- + +coder: + urn: "urn:beam:coders:kv:0.1" + components: [{urn: "urn:beam:coders:bytes:0.1"}, + {urn: "urn:beam:coders:varint:0.1"}] +examples: + "\u0003abc\0": {key: abc, value: 0} + "\u0004ab\0c\u000A": {key: "ab\0c", value: 10} + +--- + +coder: + urn: "urn:beam:coders:kv:0.1" + components: [{urn: "urn:beam:coders:bytes:0.1"}, + {urn: "urn:beam:coders:bytes:0.1"}] +nested: false +examples: + "\u0003abcdef": {key: abc, value: def} + "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"} + +--- + +coder: + urn: "urn:beam:coders:kv:0.1" + components: [{urn: "urn:beam:coders:bytes:0.1"}, + {urn: "urn:beam:coders:bytes:0.1"}] +nested: true +examples: + "\u0003abc\u0003def": {key: abc, value: def} + "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"} + +--- + +coder: + urn: "urn:beam:coders:interval_window:0.1" +examples: + "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000} + "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000} + "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365} + "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0} + +--- + +coder: + urn: "urn:beam:coders:stream:0.1" + components: [{urn: "urn:beam:coders:varint:0.1"}] +examples: + "\0\0\0\u0001\0": [0] + "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000] + "\0\0\0\0": [] + +--- + +coder: + urn: "urn:beam:coders:stream:0.1" + components: [{urn: "urn:beam:coders:bytes:0.1"}] +examples: + "\0\0\0\u0001\u0003abc": ["abc"] + "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"] + "\0\0\0\0": [] + +--- + +coder: + urn: "urn:beam:coders:stream:0.1" + components: [{urn: "urn:beam:coders:bytes:0.1"}] + # This is for iterables of unknown length, where the encoding is not + # deterministic. + non_deterministic: True +examples: + "\u00ff\u00ff\u00ff\u00ff\u0000": [] + "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"] + "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"] + +--- + +coder: + urn: "urn:beam:coders:stream:0.1" + components: [{urn: "urn:beam:coders:global_window:0.1"}] +examples: + "\0\0\0\u0001": [""] + +--- + +coder: + urn: "urn:beam:coders:global_window:0.1" +examples: + "": "" + +--- + +# All windowed values consist of pane infos that represent NO_FIRING until full support is added +# in the Python SDK (BEAM-1522). +coder: + urn: "urn:beam:coders:windowed_value:0.1" + components: [{urn: "urn:beam:coders:varint:0.1"}, + {urn: "urn:beam:coders:global_window:0.1"}] +examples: + "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": { + value: 2, + timestamp: 1454293425000, + pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0}, + windows: ["global"] + } + +--- + +coder: + urn: "urn:beam:coders:windowed_value:0.1" + components: [{urn: "urn:beam:coders:varint:0.1"}, + {urn: "urn:beam:coders:interval_window:0.1"}] +examples: + "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": { + value: 4, + timestamp: -400000, + pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0}, + windows: [{end: 1454293425000, span: 280000}] + } + + "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": { + value: 2, + timestamp: -100, + pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0}, + windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}] + } http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py new file mode 100644 index 0000000..5a6082a --- /dev/null +++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py @@ -0,0 +1,146 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test result verifiers + +A set of verifiers that are used in end-to-end tests to verify state/output +of test pipeline job. Customized verifier should extend +`hamcrest.core.base_matcher.BaseMatcher` and override _matches. +""" + +import logging +import time + +from hamcrest.core.base_matcher import BaseMatcher + +from apache_beam.io.filesystems import FileSystems +from apache_beam.runners.runner import PipelineState +from apache_beam.testing import test_utils as utils +from apache_beam.utils import retry + +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None + +MAX_RETRIES = 4 + + +class PipelineStateMatcher(BaseMatcher): + """Matcher that verify pipeline job terminated in expected state + + Matcher compares the actual pipeline terminate state with expected. + By default, `PipelineState.DONE` is used as expected state. + """ + + def __init__(self, expected_state=PipelineState.DONE): + self.expected_state = expected_state + + def _matches(self, pipeline_result): + return pipeline_result.state == self.expected_state + + def describe_to(self, description): + description \ + .append_text("Test pipeline expected terminated in state: ") \ + .append_text(self.expected_state) + + def describe_mismatch(self, pipeline_result, mismatch_description): + mismatch_description \ + .append_text("Test pipeline job terminated in state: ") \ + .append_text(pipeline_result.state) + + +def retry_on_io_error_and_server_error(exception): + """Filter allowing retries on file I/O errors and service error.""" + return isinstance(exception, IOError) or \ + (HttpError is not None and isinstance(exception, HttpError)) + + +class FileChecksumMatcher(BaseMatcher): + """Matcher that verifies file(s) content by comparing file checksum. + + Use apache_beam.io.fileio to fetch file(s) from given path. File checksum + is a hash string computed from content of file(s). + """ + + def __init__(self, file_path, expected_checksum, sleep_secs=None): + """Initialize a FileChecksumMatcher object + + Args: + file_path : A string that is the full path of output file. This path + can contain globs. + expected_checksum : A hash string that is computed from expected + result. + sleep_secs : Number of seconds to wait before verification start. + Extra time are given to make sure output files are ready on FS. + """ + if sleep_secs is not None: + if isinstance(sleep_secs, int): + self.sleep_secs = sleep_secs + else: + raise ValueError('Sleep seconds, if received, must be int. ' + 'But received: %r, %s' % (sleep_secs, + type(sleep_secs))) + else: + self.sleep_secs = None + + self.file_path = file_path + self.expected_checksum = expected_checksum + + @retry.with_exponential_backoff( + num_retries=MAX_RETRIES, + retry_filter=retry_on_io_error_and_server_error) + def _read_with_retry(self): + """Read path with retry if I/O failed""" + read_lines = [] + match_result = FileSystems.match([self.file_path])[0] + matched_path = [f.path for f in match_result.metadata_list] + if not matched_path: + raise IOError('No such file or directory: %s' % self.file_path) + + logging.info('Find %d files in %s: \n%s', + len(matched_path), self.file_path, '\n'.join(matched_path)) + for path in matched_path: + with FileSystems.open(path, 'r') as f: + for line in f: + read_lines.append(line) + return read_lines + + def _matches(self, _): + if self.sleep_secs: + # Wait to have output file ready on FS + logging.info('Wait %d seconds...', self.sleep_secs) + time.sleep(self.sleep_secs) + + # Read from given file(s) path + read_lines = self._read_with_retry() + + # Compute checksum + self.checksum = utils.compute_hash(read_lines) + logging.info('Read from given path %s, %d lines, checksum: %s.', + self.file_path, len(read_lines), self.checksum) + return self.checksum == self.expected_checksum + + def describe_to(self, description): + description \ + .append_text("Expected checksum is ") \ + .append_text(self.expected_checksum) + + def describe_mismatch(self, pipeline_result, mismatch_description): + mismatch_description \ + .append_text("Actual checksum is ") \ + .append_text(self.checksum) http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/pipeline_verifiers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers_test.py b/sdks/python/apache_beam/testing/pipeline_verifiers_test.py new file mode 100644 index 0000000..15e0a04 --- /dev/null +++ b/sdks/python/apache_beam/testing/pipeline_verifiers_test.py @@ -0,0 +1,148 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the test pipeline verifiers""" + +import logging +import tempfile +import unittest + +from hamcrest import assert_that as hc_assert_that +from mock import Mock, patch + +from apache_beam.io.localfilesystem import LocalFileSystem +from apache_beam.runners.runner import PipelineResult +from apache_beam.runners.runner import PipelineState +from apache_beam.testing.test_utils import patch_retry +from apache_beam.testing import pipeline_verifiers as verifiers + +try: + # pylint: disable=wrong-import-order, wrong-import-position + # pylint: disable=ungrouped-imports + from apitools.base.py.exceptions import HttpError + from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem +except ImportError: + HttpError = None + GCSFileSystem = None + + +class PipelineVerifiersTest(unittest.TestCase): + + def setUp(self): + self._mock_result = Mock() + patch_retry(self, verifiers) + + def test_pipeline_state_matcher_success(self): + """Test PipelineStateMatcher successes when using default expected state + and job actually finished in DONE + """ + pipeline_result = PipelineResult(PipelineState.DONE) + hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher()) + + def test_pipeline_state_matcher_given_state(self): + """Test PipelineStateMatcher successes when matches given state""" + pipeline_result = PipelineResult(PipelineState.FAILED) + hc_assert_that(pipeline_result, + verifiers.PipelineStateMatcher(PipelineState.FAILED)) + + def test_pipeline_state_matcher_fails(self): + """Test PipelineStateMatcher fails when using default expected state + and job actually finished in CANCELLED/DRAINED/FAILED/STOPPED/UNKNOWN + """ + failed_state = [PipelineState.CANCELLED, + PipelineState.DRAINED, + PipelineState.FAILED, + PipelineState.STOPPED, + PipelineState.UNKNOWN] + + for state in failed_state: + pipeline_result = PipelineResult(state) + with self.assertRaises(AssertionError): + hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher()) + + test_cases = [ + {'content': 'Test FileChecksumMatcher with single file', + 'num_files': 1, + 'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'}, + {'content': 'Test FileChecksumMatcher with multiple files', + 'num_files': 3, + 'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'}, + {'content': '', + 'num_files': 1, + 'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'}, + ] + + def create_temp_file(self, content, directory=None): + with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f: + f.write(content) + return f.name + + def test_file_checksum_matcher_success(self): + for case in self.test_cases: + temp_dir = tempfile.mkdtemp() + for _ in range(case['num_files']): + self.create_temp_file(case['content'], temp_dir) + matcher = verifiers.FileChecksumMatcher(temp_dir + '/*', + case['expected_checksum']) + hc_assert_that(self._mock_result, matcher) + + @patch.object(LocalFileSystem, 'match') + def test_file_checksum_matcher_read_failed(self, mock_match): + mock_match.side_effect = IOError('No file found.') + matcher = verifiers.FileChecksumMatcher('dummy/path', Mock()) + with self.assertRaises(IOError): + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mock_match.called) + self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) + + @patch.object(GCSFileSystem, 'match') + @unittest.skipIf(HttpError is None, 'google-apitools is not installed') + def test_file_checksum_matcher_service_error(self, mock_match): + mock_match.side_effect = HttpError( + response={'status': '404'}, url='', content='Not Found', + ) + matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock()) + with self.assertRaises(HttpError): + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mock_match.called) + self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) + + def test_file_checksum_matchcer_invalid_sleep_time(self): + with self.assertRaises(ValueError) as cm: + verifiers.FileChecksumMatcher('file_path', + 'expected_checksum', + 'invalid_sleep_time') + self.assertEqual(cm.exception.message, + 'Sleep seconds, if received, must be int. ' + 'But received: \'invalid_sleep_time\', ' + '<type \'str\'>') + + @patch('time.sleep', return_value=None) + def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep): + temp_dir = tempfile.mkdtemp() + case = self.test_cases[0] + self.create_temp_file(case['content'], temp_dir) + matcher = verifiers.FileChecksumMatcher(temp_dir + '/*', + case['expected_checksum'], + 10) + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mocked_sleep.called) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py new file mode 100644 index 0000000..20f4839 --- /dev/null +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Test Pipeline, a wrapper of Pipeline for test purpose""" + +import argparse +import shlex + +from apache_beam.internal import pickler +from apache_beam.pipeline import Pipeline +from apache_beam.runners.runner import PipelineState +from apache_beam.options.pipeline_options import PipelineOptions +from nose.plugins.skip import SkipTest + + +class TestPipeline(Pipeline): + """TestPipeline class is used inside of Beam tests that can be configured to + run against pipeline runner. + + It has a functionality to parse arguments from command line and build pipeline + options for tests who runs against a pipeline runner and utilizes resources + of the pipeline runner. Those test functions are recommended to be tagged by + @attr("ValidatesRunner") annotation. + + In order to configure the test with customized pipeline options from command + line, system argument 'test-pipeline-options' can be used to obtains a list + of pipeline options. If no options specified, default value will be used. + + For example, use following command line to execute all ValidatesRunner tests:: + + python setup.py nosetests -a ValidatesRunner \ + --test-pipeline-options="--runner=DirectRunner \ + --job_name=myJobName \ + --num_workers=1" + + For example, use assert_that for test validation:: + + pipeline = TestPipeline() + pcoll = ... + assert_that(pcoll, equal_to(...)) + pipeline.run() + """ + + def __init__(self, + runner=None, + options=None, + argv=None, + is_integration_test=False, + blocking=True): + """Initialize a pipeline object for test. + + Args: + runner: An object of type 'PipelineRunner' that will be used to execute + the pipeline. For registered runners, the runner name can be specified, + otherwise a runner object must be supplied. + options: A configured 'PipelineOptions' object containing arguments + that should be used for running the pipeline job. + argv: A list of arguments (such as sys.argv) to be used for building a + 'PipelineOptions' object. This will only be used if argument 'options' + is None. + is_integration_test: True if the test is an integration test, False + otherwise. + blocking: Run method will wait until pipeline execution is completed. + + Raises: + ValueError: if either the runner or options argument is not of the + expected type. + """ + self.is_integration_test = is_integration_test + self.options_list = self._parse_test_option_args(argv) + self.blocking = blocking + if options is None: + options = PipelineOptions(self.options_list) + super(TestPipeline, self).__init__(runner, options) + + def run(self): + result = super(TestPipeline, self).run() + if self.blocking: + state = result.wait_until_finish() + assert state == PipelineState.DONE, "Pipeline execution failed." + + return result + + def _parse_test_option_args(self, argv): + """Parse value of command line argument: --test-pipeline-options to get + pipeline options. + + Args: + argv: An iterable of command line arguments to be used. If not specified + then sys.argv will be used as input for parsing arguments. + + Returns: + An argument list of options that can be parsed by argparser or directly + build a pipeline option. + """ + parser = argparse.ArgumentParser() + parser.add_argument('--test-pipeline-options', + type=str, + action='store', + help='only run tests providing service options') + known, unused_argv = parser.parse_known_args(argv) + + if self.is_integration_test and not known.test_pipeline_options: + # Skip integration test when argument '--test-pipeline-options' is not + # specified since nose calls integration tests when runs unit test by + # 'setup.py test'. + raise SkipTest('IT is skipped because --test-pipeline-options ' + 'is not specified') + + return shlex.split(known.test_pipeline_options) \ + if known.test_pipeline_options else [] + + def get_full_options_as_args(self, **extra_opts): + """Get full pipeline options as an argument list. + + Append extra pipeline options to existing option list if provided. + Test verifier (if contains in extra options) should be pickled before + appending, and will be unpickled later in the TestRunner. + """ + options = list(self.options_list) + for k, v in extra_opts.items(): + if not v: + continue + elif isinstance(v, bool) and v: + options.append('--%s' % k) + elif 'matcher' in k: + options.append('--%s=%s' % (k, pickler.dumps(v))) + else: + options.append('--%s=%s' % (k, v)) + return options + + def get_option(self, opt_name): + """Get a pipeline option value by name + + Args: + opt_name: The name of the pipeline option. + + Returns: + None if option is not found in existing option list which is generated + by parsing value of argument `test-pipeline-options`. + """ + parser = argparse.ArgumentParser() + opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name + # Option name should start with '--' when it's used for parsing. + parser.add_argument('--' + opt_name, + type=str, + action='store') + known, _ = parser.parse_known_args(self.options_list) + return getattr(known, opt_name) if hasattr(known, opt_name) else None http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_pipeline_test.py b/sdks/python/apache_beam/testing/test_pipeline_test.py new file mode 100644 index 0000000..747d64c7 --- /dev/null +++ b/sdks/python/apache_beam/testing/test_pipeline_test.py @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit test for the TestPipeline class""" + +import logging +import unittest + +from hamcrest.core.base_matcher import BaseMatcher +from hamcrest.core.assert_that import assert_that as hc_assert_that + +from apache_beam.internal import pickler +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.options.pipeline_options import PipelineOptions + + +# A simple matcher that is ued for testing extra options appending. +class SimpleMatcher(BaseMatcher): + def _matches(self, item): + return True + + +class TestPipelineTest(unittest.TestCase): + + TEST_CASE = {'options': + ['--test-pipeline-options', '--job=mockJob --male --age=1'], + 'expected_list': ['--job=mockJob', '--male', '--age=1'], + 'expected_dict': {'job': 'mockJob', + 'male': True, + 'age': 1}} + + # Used for testing pipeline option creation. + class TestParsingOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--job', action='store', help='mock job') + parser.add_argument('--male', action='store_true', help='mock gender') + parser.add_argument('--age', action='store', type=int, help='mock age') + + def test_option_args_parsing(self): + test_pipeline = TestPipeline(argv=self.TEST_CASE['options']) + self.assertListEqual( + sorted(test_pipeline.get_full_options_as_args()), + sorted(self.TEST_CASE['expected_list'])) + + def test_empty_option_args_parsing(self): + test_pipeline = TestPipeline() + self.assertListEqual([], + test_pipeline.get_full_options_as_args()) + + def test_create_test_pipeline_options(self): + test_pipeline = TestPipeline(argv=self.TEST_CASE['options']) + test_options = PipelineOptions(test_pipeline.get_full_options_as_args()) + self.assertDictContainsSubset(self.TEST_CASE['expected_dict'], + test_options.get_all_options()) + + EXTRA_OPT_CASES = [ + {'options': {'name': 'Mark'}, + 'expected': ['--name=Mark']}, + {'options': {'student': True}, + 'expected': ['--student']}, + {'options': {'student': False}, + 'expected': []}, + {'options': {'name': 'Mark', 'student': True}, + 'expected': ['--name=Mark', '--student']} + ] + + def test_append_extra_options(self): + test_pipeline = TestPipeline() + for case in self.EXTRA_OPT_CASES: + opt_list = test_pipeline.get_full_options_as_args(**case['options']) + self.assertListEqual(sorted(opt_list), sorted(case['expected'])) + + def test_append_verifier_in_extra_opt(self): + extra_opt = {'matcher': SimpleMatcher()} + opt_list = TestPipeline().get_full_options_as_args(**extra_opt) + _, value = opt_list[0].split('=', 1) + matcher = pickler.loads(value) + self.assertTrue(isinstance(matcher, BaseMatcher)) + hc_assert_that(None, matcher) + + def test_get_option(self): + name, value = ('job', 'mockJob') + test_pipeline = TestPipeline() + test_pipeline.options_list = ['--%s=%s' % (name, value)] + self.assertEqual(test_pipeline.get_option(name), value) + + def test_skip_IT(self): + test_pipeline = TestPipeline(is_integration_test=True) + test_pipeline.run() + # Note that this will never be reached since it should be skipped above. + self.fail() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_stream.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py new file mode 100644 index 0000000..7ae27b7 --- /dev/null +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Provides TestStream for verifying streaming runner semantics.""" + +from abc import ABCMeta +from abc import abstractmethod + +from apache_beam import coders +from apache_beam import pvalue +from apache_beam.transforms import PTransform +from apache_beam.transforms.window import TimestampedValue +from apache_beam.utils import timestamp +from apache_beam.utils.windowed_value import WindowedValue + + +class Event(object): + """Test stream event to be emitted during execution of a TestStream.""" + + __metaclass__ = ABCMeta + + def __cmp__(self, other): + if type(self) is not type(other): + return cmp(type(self), type(other)) + return self._typed_cmp(other) + + @abstractmethod + def _typed_cmp(self, other): + raise NotImplementedError + + +class ElementEvent(Event): + """Element-producing test stream event.""" + + def __init__(self, timestamped_values): + self.timestamped_values = timestamped_values + + def _typed_cmp(self, other): + return cmp(self.timestamped_values, other.timestamped_values) + + +class WatermarkEvent(Event): + """Watermark-advancing test stream event.""" + + def __init__(self, new_watermark): + self.new_watermark = timestamp.Timestamp.of(new_watermark) + + def _typed_cmp(self, other): + return cmp(self.new_watermark, other.new_watermark) + + +class ProcessingTimeEvent(Event): + """Processing time-advancing test stream event.""" + + def __init__(self, advance_by): + self.advance_by = timestamp.Duration.of(advance_by) + + def _typed_cmp(self, other): + return cmp(self.advance_by, other.advance_by) + + +class TestStream(PTransform): + """Test stream that generates events on an unbounded PCollection of elements. + + Each event emits elements, advances the watermark or advances the processing + time. After all of the specified elements are emitted, ceases to produce + output. + """ + + def __init__(self, coder=coders.FastPrimitivesCoder): + assert coder is not None + self.coder = coder + self.current_watermark = timestamp.MIN_TIMESTAMP + self.events = [] + + def expand(self, pbegin): + assert isinstance(pbegin, pvalue.PBegin) + self.pipeline = pbegin.pipeline + return pvalue.PCollection(self.pipeline) + + def _infer_output_coder(self, input_type=None, input_coder=None): + return self.coder + + def _add(self, event): + if isinstance(event, ElementEvent): + for tv in event.timestamped_values: + assert tv.timestamp < timestamp.MAX_TIMESTAMP, ( + 'Element timestamp must be before timestamp.MAX_TIMESTAMP.') + elif isinstance(event, WatermarkEvent): + assert event.new_watermark > self.current_watermark, ( + 'Watermark must strictly-monotonically advance.') + self.current_watermark = event.new_watermark + elif isinstance(event, ProcessingTimeEvent): + assert event.advance_by > 0, ( + 'Must advance processing time by positive amount.') + else: + raise ValueError('Unknown event: %s' % event) + self.events.append(event) + + def add_elements(self, elements): + """Add elements to the TestStream. + + Elements added to the TestStream will be produced during pipeline execution. + These elements can be TimestampedValue, WindowedValue or raw unwrapped + elements that are serializable using the TestStream's specified Coder. When + a TimestampedValue or a WindowedValue element is used, the timestamp of the + TimestampedValue or WindowedValue will be the timestamp of the produced + element; otherwise, the current watermark timestamp will be used for that + element. The windows of a given WindowedValue are ignored by the + TestStream. + """ + timestamped_values = [] + for element in elements: + if isinstance(element, TimestampedValue): + timestamped_values.append(element) + elif isinstance(element, WindowedValue): + # Drop windows for elements in test stream. + timestamped_values.append( + TimestampedValue(element.value, element.timestamp)) + else: + # Add elements with timestamp equal to current watermark. + timestamped_values.append( + TimestampedValue(element, self.current_watermark)) + self._add(ElementEvent(timestamped_values)) + return self + + def advance_watermark_to(self, new_watermark): + """Advance the watermark to a given Unix timestamp. + + The Unix timestamp value used must be later than the previous watermark + value and should be given as an int, float or utils.timestamp.Timestamp + object. + """ + self._add(WatermarkEvent(new_watermark)) + return self + + def advance_watermark_to_infinity(self): + """Advance the watermark to the end of time.""" + self.advance_watermark_to(timestamp.MAX_TIMESTAMP) + return self + + def advance_processing_time(self, advance_by): + """Advance the current processing time by a given duration in seconds. + + The duration must be a positive second duration and should be given as an + int, float or utils.timestamp.Duration object. + """ + self._add(ProcessingTimeEvent(advance_by)) + return self http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_stream_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py new file mode 100644 index 0000000..e32dda2 --- /dev/null +++ b/sdks/python/apache_beam/testing/test_stream_test.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the test_stream module.""" + +import unittest + +from apache_beam.testing.test_stream import ElementEvent +from apache_beam.testing.test_stream import ProcessingTimeEvent +from apache_beam.testing.test_stream import TestStream +from apache_beam.testing.test_stream import WatermarkEvent +from apache_beam.transforms.window import TimestampedValue +from apache_beam.utils import timestamp +from apache_beam.utils.windowed_value import WindowedValue + + +class TestStreamTest(unittest.TestCase): + + def test_basic_test_stream(self): + test_stream = (TestStream() + .advance_watermark_to(0) + .add_elements([ + 'a', + WindowedValue('b', 3, []), + TimestampedValue('c', 6)]) + .advance_processing_time(10) + .advance_watermark_to(8) + .add_elements(['d']) + .advance_watermark_to_infinity()) + self.assertEqual( + test_stream.events, + [ + WatermarkEvent(0), + ElementEvent([ + TimestampedValue('a', 0), + TimestampedValue('b', 3), + TimestampedValue('c', 6), + ]), + ProcessingTimeEvent(10), + WatermarkEvent(8), + ElementEvent([ + TimestampedValue('d', 8), + ]), + WatermarkEvent(timestamp.MAX_TIMESTAMP), + ] + ) + + def test_test_stream_errors(self): + with self.assertRaises(AssertionError, msg=( + 'Watermark must strictly-monotonically advance.')): + _ = (TestStream() + .advance_watermark_to(5) + .advance_watermark_to(4)) + + with self.assertRaises(AssertionError, msg=( + 'Must advance processing time by positive amount.')): + _ = (TestStream() + .advance_processing_time(-1)) + + with self.assertRaises(AssertionError, msg=( + 'Element timestamp must be before timestamp.MAX_TIMESTAMP.')): + _ = (TestStream() + .add_elements([ + TimestampedValue('a', timestamp.MAX_TIMESTAMP) + ])) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py new file mode 100644 index 0000000..666207e --- /dev/null +++ b/sdks/python/apache_beam/testing/test_utils.py @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utility methods for testing""" + +import hashlib +import imp +from mock import Mock, patch + +from apache_beam.utils import retry + +DEFAULT_HASHING_ALG = 'sha1' + + +def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG): + """Compute a hash value from a list of string.""" + content.sort() + m = hashlib.new(hashing_alg) + for elem in content: + m.update(str(elem)) + return m.hexdigest() + + +def patch_retry(testcase, module): + """A function to patch retry module to use mock clock and logger. + + Clock and logger that defined in retry decorator will be replaced in test + in order to skip sleep phase when retry happens. + + Args: + testcase: An instance of unittest.TestCase that calls this function to + patch retry module. + module: The module that uses retry and need to be replaced with mock + clock and logger in test. + """ + real_retry_with_exponential_backoff = retry.with_exponential_backoff + + def patched_retry_with_exponential_backoff(num_retries, retry_filter): + """A patch for retry decorator to use a mock dummy clock and logger.""" + return real_retry_with_exponential_backoff( + num_retries=num_retries, retry_filter=retry_filter, logger=Mock(), + clock=Mock()) + + patch.object(retry, 'with_exponential_backoff', + side_effect=patched_retry_with_exponential_backoff).start() + + # Reload module after patching. + imp.reload(module) + + def remove_patches(): + patch.stopall() + # Reload module again after removing patch. + imp.reload(module) + + testcase.addCleanup(remove_patches) http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/__init__.py b/sdks/python/apache_beam/tests/__init__.py deleted file mode 100644 index cce3aca..0000000 --- a/sdks/python/apache_beam/tests/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/data/privatekey.p12 ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/data/privatekey.p12 b/sdks/python/apache_beam/tests/data/privatekey.p12 deleted file mode 100644 index c369ecb..0000000 Binary files a/sdks/python/apache_beam/tests/data/privatekey.p12 and /dev/null differ