[jira] [Updated] (BEAM-5431) StarMap transform for Python SDK

2018-09-19 Thread Stephan Hoyer (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Hoyer updated BEAM-5431:

Description: 
I'd like to propose a new high-level transform "StarMap" for the Python SDK. 
The transform would be syntactic sugar for ParDo like Map, but would would 
automatically unpack arguments like 
[itertools.starmap|https://docs.python.org/3/library/itertools.html#itertools.starmap]
 from Python's standard library.

The use-case is to handle applying functions to tuples of arguments, which is a 
common pattern when using Beam's combine and group-by transforms. Right now, 
it's common to write functions with manual unpacking, e.g., 
{code:java}
def my_func(inputs):
  key, value = inputs
  ...

beam.Map(my_func) {code}
StarMap offers a much more readable alternative: 
{code:java}
def my_func(key, value):
  ...

beam.StarMap(my_func){code}
 

The need for StarMap is especially pressing with the advent of Python 3 support 
and the eventual wind-down of Python 2. Currently, it's common to achieve this 
pattern using unpacking in a function definition, e.g., beam.Map(lambda (k, v): 
my_func(k, v)), but this is invalid syntax in Python 3. My internal search of 
Google's codebase turns up quite a few matches for "beam\.Map(lambda\ (", none 
of which would work on Python 3.

 

  was:
I'd like to propose a new high-level transform "StarMap" for the Python SDK. 
The transform would be syntactic sugar for ParDo like Map, but would would 
automatically unpack arguments like 
[itertools.starmap|https://docs.python.org/3/library/itertools.html#itertools.starmap]
 from Python's standard library.

The use-case is to handle applying functions to tuples of arguments, which is a 
common pattern when using Beam's combine and group-by transforms. Right now, 
it's common to write functions with manual unpacking, e.g.,

 

 
{code:java}
def my_func(inputs):
  key, value = inputs
  ...

beam.Map(my_func) {code}
StarMap offers a much more readable alternative: 
{code:java}
def my_func(key, value):
  ...

beam.StarMap(my_func){code}
 

 

The need for StarMap is especially pressing with the advent of Python 3 support 
and the eventual wind-down of Python 2. Currently, it's common to achieve this 
pattern using unpacking in a function definition, e.g., beam.Map(lambda (k, v): 
my_func(k, v)), but this is invalid syntax in Python 3. My internal search of 
Google's codebase turns up quite a few matches for "beam\.Map\(lambda\ \(", 
none of which would work on Python 3.

 


> StarMap transform for Python SDK
> 
>
> Key: BEAM-5431
> URL: https://issues.apache.org/jira/browse/BEAM-5431
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Stephan Hoyer
>Assignee: Ahmet Altay
>Priority: Major
>
> I'd like to propose a new high-level transform "StarMap" for the Python SDK. 
> The transform would be syntactic sugar for ParDo like Map, but would would 
> automatically unpack arguments like 
> [itertools.starmap|https://docs.python.org/3/library/itertools.html#itertools.starmap]
>  from Python's standard library.
> The use-case is to handle applying functions to tuples of arguments, which is 
> a common pattern when using Beam's combine and group-by transforms. Right 
> now, it's common to write functions with manual unpacking, e.g., 
> {code:java}
> def my_func(inputs):
>   key, value = inputs
>   ...
> beam.Map(my_func) {code}
> StarMap offers a much more readable alternative: 
> {code:java}
> def my_func(key, value):
>   ...
> beam.StarMap(my_func){code}
>  
> The need for StarMap is especially pressing with the advent of Python 3 
> support and the eventual wind-down of Python 2. Currently, it's common to 
> achieve this pattern using unpacking in a function definition, e.g., 
> beam.Map(lambda (k, v): my_func(k, v)), but this is invalid syntax in Python 
> 3. My internal search of Google's codebase turns up quite a few matches for 
> "beam\.Map(lambda\ (", none of which would work on Python 3.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-5431) StarMap transform for Python SDK

2018-09-19 Thread Stephan Hoyer (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Hoyer updated BEAM-5431:

Issue Type: New Feature  (was: Bug)

> StarMap transform for Python SDK
> 
>
> Key: BEAM-5431
> URL: https://issues.apache.org/jira/browse/BEAM-5431
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Stephan Hoyer
>Assignee: Ahmet Altay
>Priority: Major
>
> I'd like to propose a new high-level transform "StarMap" for the Python SDK. 
> The transform would be syntactic sugar for ParDo like Map, but would would 
> automatically unpack arguments like 
> [itertools.starmap|https://docs.python.org/3/library/itertools.html#itertools.starmap]
>  from Python's standard library.
> The use-case is to handle applying functions to tuples of arguments, which is 
> a common pattern when using Beam's combine and group-by transforms. Right 
> now, it's common to write functions with manual unpacking, e.g.,
>  
>  
> {code:java}
> def my_func(inputs):
>   key, value = inputs
>   ...
> beam.Map(my_func) {code}
> StarMap offers a much more readable alternative: 
> {code:java}
> def my_func(key, value):
>   ...
> beam.StarMap(my_func){code}
>  
>  
> The need for StarMap is especially pressing with the advent of Python 3 
> support and the eventual wind-down of Python 2. Currently, it's common to 
> achieve this pattern using unpacking in a function definition, e.g., 
> beam.Map(lambda (k, v): my_func(k, v)), but this is invalid syntax in Python 
> 3. My internal search of Google's codebase turns up quite a few matches for 
> "beam\.Map\(lambda\ \(", none of which would work on Python 3.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5431) StarMap transform for Python SDK

2018-09-19 Thread Stephan Hoyer (JIRA)
Stephan Hoyer created BEAM-5431:
---

 Summary: StarMap transform for Python SDK
 Key: BEAM-5431
 URL: https://issues.apache.org/jira/browse/BEAM-5431
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Reporter: Stephan Hoyer
Assignee: Ahmet Altay


I'd like to propose a new high-level transform "StarMap" for the Python SDK. 
The transform would be syntactic sugar for ParDo like Map, but would would 
automatically unpack arguments like 
[itertools.starmap|https://docs.python.org/3/library/itertools.html#itertools.starmap]
 from Python's standard library.

The use-case is to handle applying functions to tuples of arguments, which is a 
common pattern when using Beam's combine and group-by transforms. Right now, 
it's common to write functions with manual unpacking, e.g.,

 

 
{code:java}
def my_func(inputs):
  key, value = inputs
  ...

beam.Map(my_func) {code}
StarMap offers a much more readable alternative: 
{code:java}
def my_func(key, value):
  ...

beam.StarMap(my_func){code}
 

 

The need for StarMap is especially pressing with the advent of Python 3 support 
and the eventual wind-down of Python 2. Currently, it's common to achieve this 
pattern using unpacking in a function definition, e.g., beam.Map(lambda (k, v): 
my_func(k, v)), but this is invalid syntax in Python 3. My internal search of 
Google's codebase turns up quite a few matches for "beam\.Map\(lambda\ \(", 
none of which would work on Python 3.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4805) beam.Map doesn't work on functions defined with *args

2018-07-17 Thread Stephan Hoyer (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547267#comment-16547267
 ] 

Stephan Hoyer commented on BEAM-4805:
-

> Map expects a function with the following signature:
 > f(element, *args, **kwargs)

Right. The problem is that from a Python perspective, f(*args, **kwargs) 
indicates that a function accepts _any_ arguments. This can easily arise when 
using decorators, for example. Obviously Beam should still error if it turns 
out the function cannot actually accept a single positional argument, but 
there's no way to know that at runtime.

There is particular value in accepting a function like the function f(*args, 
**kwargs) I wrote above, because it's an example experienced Python programmers 
will use to see what's going on under the hood.

Testing this out a little more, it looks like Beam is actually OK with _some_ 
uses of *args:

>>> range(3) | beam.Map(lambda *x: x[0])
 [0, 1, 2]
 >>> range(3) | beam.Map(lambda x: (x,))
 [(0,), (1,), (2,)]
 >>> range(3) | beam.Map(lambda *x: x)
# same error as above

> beam.Map doesn't work on functions defined with *args
> -
>
> Key: BEAM-4805
> URL: https://issues.apache.org/jira/browse/BEAM-4805
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Stephan Hoyer
>Assignee: Ahmet Altay
>Priority: Major
>
> Consider the following example:
> {code:python}
> import apache_beam as beam
> def f(*args, **kwargs):
> return args, kwargs
> [1, 2, 3] | beam.Map(f)
> {code}
> When I run this code using the latest released version of Beam (2.5.0), I see 
> the following error:
> {noformat}
> TypeErrorTraceback (most recent call last)
>  in ()
> > 1 range(3) | beam.Map(f)
> /usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc 
> in __ror__(self, left, label)
> 491 _allocate_materialized_pipeline(p)
> 492 materialized_result = 
> _AddMaterializationTransforms().visit(result)
> --> 493 p.run().wait_until_finish()
> 494 _release_materialized_pipeline(p)
> 495 return _FinalizeMaterialization().visit(materialized_result)
> /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, 
> test_runner_api)
> 388 if test_runner_api and self._verify_runner_api_compatible():
> 389   return Pipeline.from_runner_api(
> --> 390   self.to_runner_api(), self.runner, self._options).run(False)
> 391 
> 392 if self._options.view_as(TypeOptions).runtime_type_check:
> /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, 
> test_runner_api)
> 401   finally:
> 402 shutil.rmtree(tmpdir)
> --> 403 return self.runner.run_pipeline(self)
> 404 
> 405   def __enter__(self):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc
>  in run_pipeline(self, pipeline)
> 132   runner = BundleBasedDirectRunner()
> 133 
> --> 134 return runner.run_pipeline(pipeline)
> 135 
> 136 
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
>  in run_pipeline(self, pipeline)
> 216 from apache_beam.runners.dataflow.dataflow_runner import 
> DataflowRunner
> 217 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
> --> 218 return self.run_via_runner_api(pipeline.to_runner_api())
> 219 
> 220   def run_via_runner_api(self, pipeline_proto):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
>  in run_via_runner_api(self, pipeline_proto)
> 219 
> 220   def run_via_runner_api(self, pipeline_proto):
> --> 221 return self.run_stages(*self.create_stages(pipeline_proto))
> 222 
> 223   def create_stages(self, pipeline_proto):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
>  in run_stages(self, pipeline_components, stages, safe_coders)
> 857 metrics_by_stage[stage.name] = self.run_stage(
> 858 controller, pipeline_components, stage,
> --> 859 pcoll_buffers, safe_coders).process_bundle.metrics
> 860 finally:
> 861   controller.close()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
>  in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, 
> safe_coders)
> 968 return BundleManager(
> 969 controller, get_buffer, process_bundle_descriptor,
> --> 970 self._progress_frequency).process_bundle(data_input, 
> data_output)
> 971 
> 972   # These classes are used to interact with the worker.
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
>  in pr

[jira] [Updated] (BEAM-4805) beam.Map doesn't work on functions defined with *args

2018-07-17 Thread Stephan Hoyer (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Hoyer updated BEAM-4805:

Description: 
Consider the following example:
{code:python}
import apache_beam as beam

def f(*args, **kwargs):
return args, kwargs

[1, 2, 3] | beam.Map(f)
{code}

When I run this code using the latest released version of Beam (2.5.0), I see 
the following error:
{noformat}
TypeErrorTraceback (most recent call last)
 in ()
> 1 range(3) | beam.Map(f)

/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc in 
__ror__(self, left, label)
491 _allocate_materialized_pipeline(p)
492 materialized_result = _AddMaterializationTransforms().visit(result)
--> 493 p.run().wait_until_finish()
494 _release_materialized_pipeline(p)
495 return _FinalizeMaterialization().visit(materialized_result)

/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, 
test_runner_api)
388 if test_runner_api and self._verify_runner_api_compatible():
389   return Pipeline.from_runner_api(
--> 390   self.to_runner_api(), self.runner, self._options).run(False)
391 
392 if self._options.view_as(TypeOptions).runtime_type_check:

/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, 
test_runner_api)
401   finally:
402 shutil.rmtree(tmpdir)
--> 403 return self.runner.run_pipeline(self)
404 
405   def __enter__(self):

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc
 in run_pipeline(self, pipeline)
132   runner = BundleBasedDirectRunner()
133 
--> 134 return runner.run_pipeline(pipeline)
135 
136 

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in run_pipeline(self, pipeline)
216 from apache_beam.runners.dataflow.dataflow_runner import 
DataflowRunner
217 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
--> 218 return self.run_via_runner_api(pipeline.to_runner_api())
219 
220   def run_via_runner_api(self, pipeline_proto):

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in run_via_runner_api(self, pipeline_proto)
219 
220   def run_via_runner_api(self, pipeline_proto):
--> 221 return self.run_stages(*self.create_stages(pipeline_proto))
222 
223   def create_stages(self, pipeline_proto):

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in run_stages(self, pipeline_components, stages, safe_coders)
857 metrics_by_stage[stage.name] = self.run_stage(
858 controller, pipeline_components, stage,
--> 859 pcoll_buffers, safe_coders).process_bundle.metrics
860 finally:
861   controller.close()

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, 
safe_coders)
968 return BundleManager(
969 controller, get_buffer, process_bundle_descriptor,
--> 970 self._progress_frequency).process_bundle(data_input, 
data_output)
971 
972   # These classes are used to interact with the worker.

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in process_bundle(self, inputs, expected_outputs)
   1172 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
   1173 
process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1174 result_future = 
self._controller.control_handler.push(process_bundle)
   1175 
   1176 with ProgressRequester(

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in push(self, request)
   1052 request.instruction_id = 'control_%s' % self._uid_counter
   1053   logging.debug('CONTROL REQUEST %s', request)
-> 1054   response = self.worker.do_instruction(request)
   1055   logging.debug('CONTROL RESPONSE %s', response)
   1056   return ControlFuture(request.instruction_id, response)

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc
 in do_instruction(self, request)
206   # E.g. if register is set, this will call 
self.register(request.register))
207   return getattr(self, request_type)(getattr(request, request_type),
--> 208  request.instruction_id)
209 else:
210   raise NotImplementedError

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc
 in process_bundle(self, request, instruction_id)
228 try:
229   with state_handler.process_instruction_id(instruction_id):
--> 230 processor.process_bundle(instruction_id)
231 finally:
232   del 

[jira] [Created] (BEAM-4805) beam.Map doesn't work on functions defined with *args

2018-07-17 Thread Stephan Hoyer (JIRA)
Stephan Hoyer created BEAM-4805:
---

 Summary: beam.Map doesn't work on functions defined with *args
 Key: BEAM-4805
 URL: https://issues.apache.org/jira/browse/BEAM-4805
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Reporter: Stephan Hoyer
Assignee: Ahmet Altay


Consider the following example:
{code:python}
 import apache_beam as beam
def f(*args, **kwargs):
 return args, kwargs
[1, 2, 3] | beam.Map(f)
{code}

When I run this code using the latest released version of Beam (2.5.0), I see 
the following error:
{noformat}
TypeErrorTraceback (most recent call last)
 in ()
> 1 range(3) | beam.Map(f)

/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc in 
__ror__(self, left, label)
491 _allocate_materialized_pipeline(p)
492 materialized_result = _AddMaterializationTransforms().visit(result)
--> 493 p.run().wait_until_finish()
494 _release_materialized_pipeline(p)
495 return _FinalizeMaterialization().visit(materialized_result)

/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, 
test_runner_api)
388 if test_runner_api and self._verify_runner_api_compatible():
389   return Pipeline.from_runner_api(
--> 390   self.to_runner_api(), self.runner, self._options).run(False)
391 
392 if self._options.view_as(TypeOptions).runtime_type_check:

/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, 
test_runner_api)
401   finally:
402 shutil.rmtree(tmpdir)
--> 403 return self.runner.run_pipeline(self)
404 
405   def __enter__(self):

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc
 in run_pipeline(self, pipeline)
132   runner = BundleBasedDirectRunner()
133 
--> 134 return runner.run_pipeline(pipeline)
135 
136 

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in run_pipeline(self, pipeline)
216 from apache_beam.runners.dataflow.dataflow_runner import 
DataflowRunner
217 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
--> 218 return self.run_via_runner_api(pipeline.to_runner_api())
219 
220   def run_via_runner_api(self, pipeline_proto):

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in run_via_runner_api(self, pipeline_proto)
219 
220   def run_via_runner_api(self, pipeline_proto):
--> 221 return self.run_stages(*self.create_stages(pipeline_proto))
222 
223   def create_stages(self, pipeline_proto):

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in run_stages(self, pipeline_components, stages, safe_coders)
857 metrics_by_stage[stage.name] = self.run_stage(
858 controller, pipeline_components, stage,
--> 859 pcoll_buffers, safe_coders).process_bundle.metrics
860 finally:
861   controller.close()

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, 
safe_coders)
968 return BundleManager(
969 controller, get_buffer, process_bundle_descriptor,
--> 970 self._progress_frequency).process_bundle(data_input, 
data_output)
971 
972   # These classes are used to interact with the worker.

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in process_bundle(self, inputs, expected_outputs)
   1172 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
   1173 
process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1174 result_future = 
self._controller.control_handler.push(process_bundle)
   1175 
   1176 with ProgressRequester(

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc
 in push(self, request)
   1052 request.instruction_id = 'control_%s' % self._uid_counter
   1053   logging.debug('CONTROL REQUEST %s', request)
-> 1054   response = self.worker.do_instruction(request)
   1055   logging.debug('CONTROL RESPONSE %s', response)
   1056   return ControlFuture(request.instruction_id, response)

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc
 in do_instruction(self, request)
206   # E.g. if register is set, this will call 
self.register(request.register))
207   return getattr(self, request_type)(getattr(request, request_type),
--> 208  request.instruction_id)
209 else:
210   raise NotImplementedError

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc
 in process_bundle(self, request, instruction

[jira] [Closed] (BEAM-4591) beam.Create should be splittable

2018-06-19 Thread Stephan Hoyer (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Hoyer closed BEAM-4591.
---
   Resolution: Fixed
Fix Version/s: Not applicable

> beam.Create should be splittable
> 
>
> Key: BEAM-4591
> URL: https://issues.apache.org/jira/browse/BEAM-4591
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core, sdk-py-core
>Reporter: Stephan Hoyer
>Priority: Major
> Fix For: Not applicable
>
>
> beam.Create() should be splittable. This would allow the unintuitive 
> "Reshuffle" step below to be safely omitted:
>  
> {{pipeline = (}}
> {{    beam.Create(range(large_number))}}
> {{    | beam.Reshuffle()  # prevent task fusion}}
> {{    | beam.Map(very_expensive_function)}}
> {{    ...}}
> {{)}}
>  
> These sort of pipelines with small inputs to expensive CPU bound tasks arise 
> frequently in scientific computing use-cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4591) beam.Create should be splittable

2018-06-19 Thread Stephan Hoyer (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517624#comment-16517624
 ] 

Stephan Hoyer commented on BEAM-4591:
-

Apologies for the noise – it looks like this actually already works!

> beam.Create should be splittable
> 
>
> Key: BEAM-4591
> URL: https://issues.apache.org/jira/browse/BEAM-4591
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core, sdk-py-core
>Reporter: Stephan Hoyer
>Priority: Major
> Fix For: Not applicable
>
>
> beam.Create() should be splittable. This would allow the unintuitive 
> "Reshuffle" step below to be safely omitted:
>  
> {{pipeline = (}}
> {{    beam.Create(range(large_number))}}
> {{    | beam.Reshuffle()  # prevent task fusion}}
> {{    | beam.Map(very_expensive_function)}}
> {{    ...}}
> {{)}}
>  
> These sort of pipelines with small inputs to expensive CPU bound tasks arise 
> frequently in scientific computing use-cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4591) beam.Create should be splittable

2018-06-19 Thread Stephan Hoyer (JIRA)
Stephan Hoyer created BEAM-4591:
---

 Summary: beam.Create should be splittable
 Key: BEAM-4591
 URL: https://issues.apache.org/jira/browse/BEAM-4591
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Reporter: Stephan Hoyer
Assignee: Ahmet Altay


beam.Create() should be splittable. This would allow the unintuitive 
"Reshuffle" step below to be safely omitted:
 
{{pipeline = (}}
{{    beam.Create(range(large_number))}}
{{    | beam.Reshuffle()  # prevent task fusion}}
{{    | beam.Map(very_expensive_function)}}
{{    ...}}
{{)}}
 
These sort of pipelines with small inputs to expensive CPU bound tasks arise 
frequently in scientific computing use-cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3956) Stacktraces from exceptions in user code should be preserved in the Python SDK

2018-04-06 Thread Stephan Hoyer (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428813#comment-16428813
 ] 

Stephan Hoyer commented on BEAM-3956:
-

[~altay] my PR #4959 fixes this for DirectRunner as well as for runners that 
use RPCs.

> Stacktraces from exceptions in user code should be preserved in the Python SDK
> --
>
> Key: BEAM-3956
> URL: https://issues.apache.org/jira/browse/BEAM-3956
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Stephan Hoyer
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Currently, Beam's Python SDK loses stacktraces for exceptions. It does 
> helpfully add a tag like "[while running StageA]" to exception error 
> messages, but that doesn't include the stacktrace of Python functions being 
> called.
> Including the full stacktraces would make a big difference for the ease of 
> debugging Beam pipelines when things go wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3956) Stacktraces from exceptions in user code should be preserved in the Python SDK

2018-04-06 Thread Stephan Hoyer (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428749#comment-16428749
 ] 

Stephan Hoyer commented on BEAM-3956:
-

Indeed, it looks losing stack-traces for the direct runner was introduced 
inadvertently by [https://github.com/apache/beam/pull/4376]

> Stacktraces from exceptions in user code should be preserved in the Python SDK
> --
>
> Key: BEAM-3956
> URL: https://issues.apache.org/jira/browse/BEAM-3956
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Stephan Hoyer
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Currently, Beam's Python SDK loses stacktraces for exceptions. It does 
> helpfully add a tag like "[while running StageA]" to exception error 
> messages, but that doesn't include the stacktrace of Python functions being 
> called.
> Including the full stacktraces would make a big difference for the ease of 
> debugging Beam pipelines when things go wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3956) Stacktraces from exceptions in user code should be preserved in the Python SDK

2018-03-27 Thread Stephan Hoyer (JIRA)
Stephan Hoyer created BEAM-3956:
---

 Summary: Stacktraces from exceptions in user code should be 
preserved in the Python SDK
 Key: BEAM-3956
 URL: https://issues.apache.org/jira/browse/BEAM-3956
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Reporter: Stephan Hoyer
Assignee: Ahmet Altay


Currently, Beam's Python SDK loses stacktraces for exceptions. It does 
helpfully add a tag like "[while running StageA]" to exception error messages, 
but that doesn't include the stacktrace of Python functions being called.

Including the full stacktraces would make a big difference for the ease of 
debugging Beam pipelines when things go wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)