Repository: incubator-beam Updated Branches: refs/heads/python-sdk 70c1de9b9 -> aa9071d56
Few datastoreio fixes - pin googledatastore version to 6.4.1 - add num_shards options to datastore wordcount example - move datastore wordcount example to cookbook directory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/495a2d8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/495a2d8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/495a2d8c Branch: refs/heads/python-sdk Commit: 495a2d8c01949e4248d7e6dc9ad7a04168a292d1 Parents: 70c1de9 Author: Vikas Kedigehalli <vika...@google.com> Authored: Tue Nov 29 10:02:14 2016 -0800 Committer: Vikas Kedigehalli <vika...@google.com> Committed: Wed Nov 30 12:18:53 2016 -0800 ---------------------------------------------------------------------- .../examples/cookbook/datastore_wordcount.py | 210 +++++++++++++++++++ .../apache_beam/examples/datastore_wordcount.py | 203 ------------------ sdks/python/setup.py | 2 +- 3 files changed, 211 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/495a2d8c/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py new file mode 100644 index 0000000..eb62614 --- /dev/null +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -0,0 +1,210 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A word-counting workflow that uses Google Cloud Datastore.""" + +from __future__ import absolute_import + +import argparse +import logging +import re +import uuid + +from google.datastore.v1 import entity_pb2 +from google.datastore.v1 import query_pb2 +from googledatastore import helper as datastore_helper, PropertyFilter + +import apache_beam as beam +from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore +from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore +from apache_beam.utils.options import GoogleCloudOptions +from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.options import SetupOptions + +empty_line_aggregator = beam.Aggregator('emptyLines') +average_word_size_aggregator = beam.Aggregator('averageWordLength', + beam.combiners.MeanCombineFn(), + float) + + +class WordExtractingDoFn(beam.DoFn): + """Parse each line of input text into words.""" + + def process(self, context): + """Returns an iterator over words in contents of Cloud Datastore entity. + The element is a line of text. If the line is blank, note that, too. + Args: + context: the call-specific context: data and aggregator. + Returns: + The processed element. + """ + content_value = context.element.properties.get('content', None) + text_line = '' + if content_value: + text_line = content_value.string_value + + if not text_line: + context.aggregate_to(empty_line_aggregator, 1) + words = re.findall(r'[A-Za-z\']+', text_line) + for w in words: + context.aggregate_to(average_word_size_aggregator, len(w)) + return words + + +class EntityWrapper(object): + """Create a Cloud Datastore entity from the given string.""" + def __init__(self, namespace, kind, ancestor): + self._namespace = namespace + self._kind = kind + self._ancestor = ancestor + + def make_entity(self, content): + entity = entity_pb2.Entity() + if self._namespace is not None: + entity.key.partition_id.namespace_id = self._namespace + + # All entities created will have the same ancestor + datastore_helper.add_key_path(entity.key, self._kind, self._ancestor, + self._kind, str(uuid.uuid4())) + + datastore_helper.add_properties(entity, {"content": unicode(content)}) + return entity + + +def write_to_datastore(project, user_options, pipeline_options): + """Creates a pipeline that writes entities to Cloud Datastore.""" + p = beam.Pipeline(options=pipeline_options) + + # pylint: disable=expression-not-assigned + (p + | 'read' >> beam.io.Read(beam.io.TextFileSource(user_options.input)) + | 'create entity' >> beam.Map( + EntityWrapper(user_options.namespace, user_options.kind, + user_options.ancestor).make_entity) + | 'write to datastore' >> WriteToDatastore(project)) + + # Actually run the pipeline (all operations above are deferred). + p.run() + + +def make_ancestor_query(kind, namespace, ancestor): + """Creates a Cloud Datastore ancestor query. + + The returned query will fetch all the entities that have the parent key name + set to the given `ancestor`. + """ + ancestor_key = entity_pb2.Key() + datastore_helper.add_key_path(ancestor_key, kind, ancestor) + if namespace is not None: + ancestor_key.partition_id.namespace_id = namespace + + query = query_pb2.Query() + query.kind.add().name = kind + + datastore_helper.set_property_filter( + query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor_key) + + return query + + +def read_from_datastore(project, user_options, pipeline_options): + """Creates a pipeline that reads entities from Cloud Datastore.""" + p = beam.Pipeline(options=pipeline_options) + # Create a query to read entities from datastore. + query = make_ancestor_query(user_options.kind, user_options.namespace, + user_options.ancestor) + + # Read entities from Cloud Datastore into a PCollection. + lines = p | 'read from datastore' >> ReadFromDatastore( + project, query, user_options.namespace) + + # Count the occurrences of each word. + counts = (lines + | 'split' >> (beam.ParDo(WordExtractingDoFn()) + .with_output_types(unicode)) + | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) + | 'group' >> beam.GroupByKey() + | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) + + # Format the counts into a PCollection of strings. + output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | 'write' >> beam.io.WriteToText(file_path_prefix=user_options.output, + num_shards=user_options.num_shards) + + # Actually run the pipeline (all operations above are deferred). + return p.run() + + +def run(argv=None): + """Main entry point; defines and runs the wordcount pipeline.""" + + parser = argparse.ArgumentParser() + parser.add_argument('--input', + dest='input', + default='gs://dataflow-samples/shakespeare/kinglear.txt', + help='Input file to process.') + parser.add_argument('--kind', + dest='kind', + required=True, + help='Datastore Kind') + parser.add_argument('--namespace', + dest='namespace', + help='Datastore Namespace') + parser.add_argument('--ancestor', + dest='ancestor', + default='root', + help='The ancestor key name for all entities.') + parser.add_argument('--output', + dest='output', + required=True, + help='Output file to write results to.') + parser.add_argument('--read_only', + action='store_true', + help='Read an existing dataset, do not write first') + parser.add_argument('--num_shards', + dest='num_shards', + type=int, + # If the system should choose automatically. + default=0, + help='Number of output shards') + + known_args, pipeline_args = parser.parse_known_args(argv) + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = True + gcloud_options = pipeline_options.view_as(GoogleCloudOptions) + + # Write to Datastore if `read_only` options is not specified. + if not known_args.read_only: + write_to_datastore(gcloud_options.project, known_args, pipeline_options) + + # Read from Datastore. + result = read_from_datastore(gcloud_options.project, known_args, + pipeline_options) + + empty_line_values = result.aggregated_values(empty_line_aggregator) + logging.info('number of empty lines: %d', sum(empty_line_values.values())) + word_length_values = result.aggregated_values(average_word_size_aggregator) + logging.info('average word lengths: %s', word_length_values.values()) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/495a2d8c/sdks/python/apache_beam/examples/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/datastore_wordcount.py b/sdks/python/apache_beam/examples/datastore_wordcount.py deleted file mode 100644 index 6b9779b..0000000 --- a/sdks/python/apache_beam/examples/datastore_wordcount.py +++ /dev/null @@ -1,203 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A word-counting workflow that uses Google Cloud Datastore.""" - -from __future__ import absolute_import - -import argparse -import logging -import re -import uuid - -from google.datastore.v1 import entity_pb2 -from google.datastore.v1 import query_pb2 -from googledatastore import helper as datastore_helper, PropertyFilter - -import apache_beam as beam -from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore -from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore -from apache_beam.utils.options import GoogleCloudOptions -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions - -empty_line_aggregator = beam.Aggregator('emptyLines') -average_word_size_aggregator = beam.Aggregator('averageWordLength', - beam.combiners.MeanCombineFn(), - float) - - -class WordExtractingDoFn(beam.DoFn): - """Parse each line of input text into words.""" - - def process(self, context): - """Returns an iterator over words in contents of Cloud Datastore entity. - The element is a line of text. If the line is blank, note that, too. - Args: - context: the call-specific context: data and aggregator. - Returns: - The processed element. - """ - content_value = context.element.properties.get('content', None) - text_line = '' - if content_value: - text_line = content_value.string_value - - if not text_line: - context.aggregate_to(empty_line_aggregator, 1) - words = re.findall(r'[A-Za-z\']+', text_line) - for w in words: - context.aggregate_to(average_word_size_aggregator, len(w)) - return words - - -class EntityWrapper(object): - """Create a Cloud Datastore entity from the given string.""" - def __init__(self, namespace, kind, ancestor): - self._namespace = namespace - self._kind = kind - self._ancestor = ancestor - - def make_entity(self, content): - entity = entity_pb2.Entity() - if self._namespace is not None: - entity.key.partition_id.namespace_id = self._namespace - - # All entities created will have the same ancestor - datastore_helper.add_key_path(entity.key, self._kind, self._ancestor, - self._kind, str(uuid.uuid4())) - - datastore_helper.add_properties(entity, {"content": unicode(content)}) - return entity - - -def write_to_datastore(project, user_options, pipeline_options): - """Creates a pipeline that writes entities to Cloud Datastore.""" - p = beam.Pipeline(options=pipeline_options) - - # pylint: disable=expression-not-assigned - (p - | 'read' >> beam.io.Read(beam.io.TextFileSource(user_options.input)) - | 'create entity' >> beam.Map( - EntityWrapper(user_options.namespace, user_options.kind, - user_options.ancestor).make_entity) - | 'write to datastore' >> WriteToDatastore(project)) - - # Actually run the pipeline (all operations above are deferred). - p.run() - - -def make_ancestor_query(kind, namespace, ancestor): - """Creates a Cloud Datastore ancestor query. - - The returned query will fetch all the entities that have the parent key name - set to the given `ancestor`. - """ - ancestor_key = entity_pb2.Key() - datastore_helper.add_key_path(ancestor_key, kind, ancestor) - if namespace is not None: - ancestor_key.partition_id.namespace_id = namespace - - query = query_pb2.Query() - query.kind.add().name = kind - - datastore_helper.set_property_filter( - query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor_key) - - return query - - -def read_from_datastore(project, user_options, pipeline_options): - """Creates a pipeline that reads entities from Cloud Datastore.""" - p = beam.Pipeline(options=pipeline_options) - # Create a query to read entities from datastore. - query = make_ancestor_query(user_options.kind, user_options.namespace, - user_options.ancestor) - - # Read entities from Cloud Datastore into a PCollection. - lines = p | 'read from datastore' >> ReadFromDatastore( - project, query, user_options.namespace) - - # Count the occurrences of each word. - counts = (lines - | 'split' >> (beam.ParDo(WordExtractingDoFn()) - .with_output_types(unicode)) - | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) - | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) - - # Format the counts into a PCollection of strings. - output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) - - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - output | 'write' >> beam.io.Write(beam.io.TextFileSink(user_options.output)) - - # Actually run the pipeline (all operations above are deferred). - return p.run() - - -def run(argv=None): - """Main entry point; defines and runs the wordcount pipeline.""" - - parser = argparse.ArgumentParser() - parser.add_argument('--input', - dest='input', - default='gs://dataflow-samples/shakespeare/kinglear.txt', - help='Input file to process.') - parser.add_argument('--kind', - dest='kind', - required=True, - help='Datastore Kind') - parser.add_argument('--namespace', - dest='namespace', - help='Datastore Namespace') - parser.add_argument('--ancestor', - dest='ancestor', - default='root', - help='The ancestor key name for all entities.') - parser.add_argument('--output', - dest='output', - required=True, - help='Output file to write results to.') - parser.add_argument('--read_only', - action='store_true', - help='Read an existing dataset, do not write first') - - known_args, pipeline_args = parser.parse_known_args(argv) - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module level). - pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = True - gcloud_options = pipeline_options.view_as(GoogleCloudOptions) - - # Write to Datastore if `read_only` options is not specified. - if not known_args.read_only: - write_to_datastore(gcloud_options.project, known_args, pipeline_options) - - # Read from Datastore. - result = read_from_datastore(gcloud_options.project, known_args, - pipeline_options) - - empty_line_values = result.aggregated_values(empty_line_aggregator) - logging.info('number of empty lines: %d', sum(empty_line_values.values())) - word_length_values = result.aggregated_values(average_word_size_aggregator) - logging.info('average word lengths: %s', word_length_values.values()) - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/495a2d8c/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index add6dc0..73927bb 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -87,7 +87,7 @@ REQUIRED_PACKAGES = [ 'avro>=1.7.7,<2.0.0', 'dill>=0.2.5,<0.3', 'google-apitools>=0.5.2,<1.0.0', - 'googledatastore>=6.4.1,<7.0.0', + 'googledatastore==6.4.1', 'httplib2>=0.8,<0.10', 'mock>=1.0.1,<3.0.0', 'oauth2client>=2.0.1,<4.0.0',