[jira] [Created] (BEAM-10182) Support custom gcs location for bigquery read

2020-06-02 Thread Ning Kang (Jira)
Ning Kang created BEAM-10182:


 Summary: Support custom gcs location for bigquery read
 Key: BEAM-10182
 URL: https://issues.apache.org/jira/browse/BEAM-10182
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Reporter: Ning Kang
Assignee: Pablo Estrada


Java and Python handles the usage of temp location for bigquery io differently.
Java's Read only uses and validates tempLocation but Write takes 
customGcsLocation and falls back to tempLocation.
Python's Read takes gcs_location and falls back to temp_location while Write 
takes custom_gcs_temp_location and falls back to temp_location.


We need an equivalence of 
https://github.com/apache/beam/blob/0a0399f71cf14ecabe7e73b6cd596325bb7ff2ea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2314
 for the Read class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10178) Error messages for unspecified options should display the command line flag that needs to be specified

2020-06-02 Thread Ning Kang (Jira)
Ning Kang created BEAM-10178:


 Summary: Error messages for unspecified options should display the 
command line flag that needs to be specified
 Key: BEAM-10178
 URL: https://issues.apache.org/jira/browse/BEAM-10178
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core, sdk-py-core
Reporter: Ning Kang
Assignee: Ning Kang


An example error trace:

{code:java}
java.lang.IllegalArgumentException: BigQueryIO.Read needs a GCS temp location 
to store temp files.
        at 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate(BigQueryIO.java:762)
        at 
org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:641)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
        at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:577)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:312)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
        at 
org.apache.beam.examples.cookbook.BigQueryTornadoes.runBigQueryTornadoes(BigQueryTornadoes.java:199)
        at 
org.apache.beam.examples.cookbook.BigQueryTornadoesIT.runE2EBigQueryTornadoesTest(BigQueryTornadoesIT.java:70)
        at 
org.apache.beam.examples.cookbook.BigQueryTornadoesIT.testE2EBigQueryTornadoesWithExport(BigQueryTornadoesIT.java:82)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:349)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:314)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:312)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:396)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
        at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
        at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
        at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
        at 

[jira] [Updated] (BEAM-9927) [Beam Internship] Example Notebook w/ diary study

2020-05-07 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-9927:

Description: 
* Pick/add a topic from [https://kevingg.github.io/diary/]
 * Setup local jupyter lab environment based on 
[instructions|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]
 * Write an example interactive beam notebook and take notes on improvement AIs 
throughout the process.
 * The example notebook should be created under to 
[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/examples]
 and sent out for review.

  was:
* Pick a topic from [https://kevingg.github.io/diary/]
 * Setup local jupyter lab environment based on 
[instructions|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]
 * Write an example interactive beam notebook and take notes on improvement AIs 
throughout the process.


> [Beam Internship] Example Notebook w/ diary study
> -
>
> Key: BEAM-9927
> URL: https://issues.apache.org/jira/browse/BEAM-9927
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Ning Kang
>Priority: Minor
>
> * Pick/add a topic from [https://kevingg.github.io/diary/]
>  * Setup local jupyter lab environment based on 
> [instructions|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]
>  * Write an example interactive beam notebook and take notes on improvement 
> AIs throughout the process.
>  * The example notebook should be created under to 
> [https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/examples]
>  and sent out for review.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9927) [Beam Internship] Example Notebook w/ diary study

2020-05-07 Thread Ning Kang (Jira)
Ning Kang created BEAM-9927:
---

 Summary: [Beam Internship] Example Notebook w/ diary study
 Key: BEAM-9927
 URL: https://issues.apache.org/jira/browse/BEAM-9927
 Project: Beam
  Issue Type: Improvement
  Components: sdk-py-core
Reporter: Ning Kang


* Pick a topic from [https://kevingg.github.io/diary/]
 * Setup local jupyter lab environment based on 
[instructions|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]
 * Write an example interactive beam notebook and take notes on improvement AIs 
throughout the process.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9907) apache_beam.transforms.external_test.ExternalTransformTest.test_nested flaky

2020-05-06 Thread Ning Kang (Jira)
Ning Kang created BEAM-9907:
---

 Summary: 
apache_beam.transforms.external_test.ExternalTransformTest.test_nested flaky
 Key: BEAM-9907
 URL: https://issues.apache.org/jira/browse/BEAM-9907
 Project: Beam
  Issue Type: Test
  Components: sdk-py-core
Reporter: Ning Kang


Example test failures:

https://builds.apache.org/job/beam_PreCommit_Python_Commit/12682/
https://builds.apache.org/job/beam_PreCommit_Python_Commit/12684/

A stacktrace

{code:bash}
apache_beam.transforms.external_test.ExternalTransformTest.test_nested (from 
py37-cloud)

Failing for the past 1 build (Since Failed#12682 )
Took 54 ms.
Error Message
google.protobuf.json_format.ParseError: Unexpected type for Value message.
Stacktrace
self = 

def test_nested(self):
  with beam.Pipeline() as p:
>   assert_that(p | FibTransform(6), equal_to([8]))

apache_beam/transforms/external_test.py:250: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/transforms/ptransform.py:562: in __ror__
result = p.apply(self, pvalueish, label)
apache_beam/pipeline.py:651: in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:198: in apply
return m(transform, input, options)
apache_beam/runners/runner.py:228: in apply_PTransform
return transform.expand(input)
apache_beam/runners/portability/expansion_service_test.py:257: in expand
expansion_service.ExpansionServiceServicer())
apache_beam/pvalue.py:140: in __or__
return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:598: in apply
transform.transform, pvalueish, label or transform.label)
apache_beam/pipeline.py:608: in apply
return self.apply(transform, pvalueish)
apache_beam/pipeline.py:651: in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:198: in apply
return m(transform, input, options)
apache_beam/runners/runner.py:228: in apply_PTransform
return transform.expand(input)
apache_beam/transforms/external.py:322: in expand
pipeline_options=job_utils.pipeline_options_dict_to_struct(options))
apache_beam/runners/job/utils.py:38: in pipeline_options_dict_to_struct
v in options.items() if v is not None
apache_beam/runners/job/utils.py:44: in dict_to_struct
return json_format.ParseDict(dict_obj, struct_pb2.Struct())
target/.tox-py37-cloud/py37-cloud/lib/python3.7/site-packages/google/protobuf/json_format.py:450:
 in ParseDict
parser.ConvertMessage(js_dict, message)
target/.tox-py37-cloud/py37-cloud/lib/python3.7/site-packages/google/protobuf/json_format.py:479:
 in ConvertMessage
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
target/.tox-py37-cloud/py37-cloud/lib/python3.7/site-packages/google/protobuf/json_format.py:667:
 in _ConvertStructMessage
self._ConvertValueMessage(value[key], message.fields[key])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
value = 
message = 

def _ConvertValueMessage(self, value, message):
  """Convert a JSON representation into Value message."""
  if isinstance(value, dict):
self._ConvertStructMessage(value, message.struct_value)
  elif isinstance(value, list):
self. _ConvertListValueMessage(value, message.list_value)
  elif value is None:
message.null_value = 0
  elif isinstance(value, bool):
message.bool_value = value
  elif isinstance(value, six.string_types):
message.string_value = value
  elif isinstance(value, _INT_OR_FLOAT):
message.number_value = value
  else:
>   raise ParseError('Unexpected type for Value message.')
E   google.protobuf.json_format.ParseError: Unexpected type for Value 
message.

target/.tox-py37-cloud/py37-cloud/lib/python3.7/site-packages/google/protobuf/json_format.py:647:
 ParseError
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9832) KeyError: 'No such coder: ' in fn_runner_test

2020-04-28 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094678#comment-17094678
 ] 

Ning Kang commented on BEAM-9832:
-

That test expects some work to be completed within a timeout. If Jenkins is 
busy during that timeout, the work will not be completed or even started at 
all, thus causing all kinds of error scenarios such as mismatch result due to 
incompletion of the work, no PCollection due to no work done and etc.

> KeyError: 'No such coder: ' in fn_runner_test
> -
>
> Key: BEAM-9832
> URL: https://issues.apache.org/jira/browse/BEAM-9832
> Project: Beam
>  Issue Type: Test
>  Components: sdk-py-core, test-failures
>Reporter: Ning Kang
>Assignee: Pablo Estrada
>Priority: Critical
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Failed test results can be found 
> [here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]]
>  
> A stack trace:
> {code:java}
> self = 
>   testMethod=test_read>
> def test_read(self):
>   # Can't use NamedTemporaryFile as a context
>   # due to https://bugs.python.org/issue14243
>   temp_file = tempfile.NamedTemporaryFile(delete=False)
>   try:
> temp_file.write(b'a\nb\nc')
> temp_file.close()
> with self.create_pipeline() as p:
>   assert_that(
> > p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', 
> > 'c']))
> apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> apache_beam/pipeline.py:529: in __exit__
> self.run().wait_until_finish()
> apache_beam/pipeline.py:502: in run
> self._options).run(False)
> apache_beam/pipeline.py:515: in run
> return self.runner.run_pipeline(self, self._options)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in 
> run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in 
> run_via_runner_api
> return self.run_stages(stage_context, stages)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages
> bundle_context_manager,
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage
> bundle_manager)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in 
> process_bundle
> timer_inputs)):
> /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator
> yield future.result()
> /usr/lib/python3.5/concurrent/futures/_base.py:405: in result
> return self.__get_result()
> /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result
> raise self._exception
> apache_beam/utils/thread_pool_executor.py:44: in run
> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute
> dry_run)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in 
> process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push
> response = self.worker.do_instruction(request)
> apache_beam/runners/worker/sdk_worker.py:471: in do_instruction
> getattr(request, request_type), request.instruction_id)
> apache_beam/runners/worker/sdk_worker.py:500: in process_bundle
> instruction_id, request.process_bundle_descriptor_id)
> apache_beam/runners/worker/sdk_worker.py:374: in get
> self.data_channel_factory)
> apache_beam/runners/worker/bundle_processor.py:782: in __init__
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
> apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree
> descriptor.transforms, key=topological_height, reverse=True)
> apache_beam/runners/worker/bundle_processor.py:836: in 
> (transform_id, get_operation(transform_id)) for transform_id in sorted(
> apache_beam/runners/worker/bundle_processor.py:726: in wrapper
> result = cache[args] = func(*args)
> apache_beam/runners/worker/bundle_processor.py:820: in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> apache_beam/runners/worker/bundle_processor.py:820: in 
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> apache_beam/runners/worker/bundle_processor.py:818: in 
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> apache_beam/runners/worker/bundle_processor.py:726: in wrapper
> result = cache[args] = func(*args)
> 

[jira] [Closed] (BEAM-9842) interactive_runner_test test_streaming_wordcount failing due to 'ValueError: PCollection not available'

2020-04-28 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang closed BEAM-9842.
---
Fix Version/s: Not applicable
   Resolution: Duplicate

> interactive_runner_test test_streaming_wordcount failing due to 'ValueError: 
> PCollection not available'
> ---
>
> Key: BEAM-9842
> URL: https://issues.apache.org/jira/browse/BEAM-9842
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Sam Rohde
>Priority: Major
> Fix For: Not applicable
>
>
> beam_PreCommit_Python seems to be failing due to this.
> For example,
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12542/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/testReport/junit/apache_beam.runners.interactive.interactive_runner_test/InteractiveRunnerTest/test_streaming_wordcount_2/]
>  
> > raise ValueError('PCollection not available, please run the pipeline.') E 
> > ValueError: PCollection not available, please run the pipeline.
>  
> Ning, can you please take a look ?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-9842) interactive_runner_test test_streaming_wordcount failing due to 'ValueError: PCollection not available'

2020-04-28 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang reassigned BEAM-9842:
---

Assignee: Sam Rohde  (was: Ning Kang)

> interactive_runner_test test_streaming_wordcount failing due to 'ValueError: 
> PCollection not available'
> ---
>
> Key: BEAM-9842
> URL: https://issues.apache.org/jira/browse/BEAM-9842
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Sam Rohde
>Priority: Major
>
> beam_PreCommit_Python seems to be failing due to this.
> For example,
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12542/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/testReport/junit/apache_beam.runners.interactive.interactive_runner_test/InteractiveRunnerTest/test_streaming_wordcount_2/]
>  
> > raise ValueError('PCollection not available, please run the pipeline.') E 
> > ValueError: PCollection not available, please run the pipeline.
>  
> Ning, can you please take a look ?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9842) interactive_runner_test test_streaming_wordcount failing due to 'ValueError: PCollection not available'

2020-04-28 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094672#comment-17094672
 ] 

Ning Kang commented on BEAM-9842:
-

There is already a Jira tracking test_streaming_wordcount flakiness: BEAM-9767, 
it's also a duplicate of BEAM-9803. Sam is working on making it deterministic, 
but we might just need to remove such a multi-threading integration test from 
being executed on Jenkins.

> interactive_runner_test test_streaming_wordcount failing due to 'ValueError: 
> PCollection not available'
> ---
>
> Key: BEAM-9842
> URL: https://issues.apache.org/jira/browse/BEAM-9842
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Ning Kang
>Priority: Major
>
> beam_PreCommit_Python seems to be failing due to this.
> For example,
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12542/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/testReport/junit/apache_beam.runners.interactive.interactive_runner_test/InteractiveRunnerTest/test_streaming_wordcount_2/]
>  
> > raise ValueError('PCollection not available, please run the pipeline.') E 
> > ValueError: PCollection not available, please run the pipeline.
>  
> Ning, can you please take a look ?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9832) KeyError: 'No such coder: ' in fn_runner_test

2020-04-28 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094671#comment-17094671
 ] 

Ning Kang commented on BEAM-9832:
-

There is already a Jira tracking test_streaming_wordcount flakiness: BEAM-9767, 
it's also a duplicate of BEAM-9803. Sam is working on making it deterministic, 
but we might just need to remove such a multi-threading integration test from 
being executed on Jenkins.

> KeyError: 'No such coder: ' in fn_runner_test
> -
>
> Key: BEAM-9832
> URL: https://issues.apache.org/jira/browse/BEAM-9832
> Project: Beam
>  Issue Type: Test
>  Components: sdk-py-core, test-failures
>Reporter: Ning Kang
>Assignee: Pablo Estrada
>Priority: Critical
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Failed test results can be found 
> [here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]]
>  
> A stack trace:
> {code:java}
> self = 
>   testMethod=test_read>
> def test_read(self):
>   # Can't use NamedTemporaryFile as a context
>   # due to https://bugs.python.org/issue14243
>   temp_file = tempfile.NamedTemporaryFile(delete=False)
>   try:
> temp_file.write(b'a\nb\nc')
> temp_file.close()
> with self.create_pipeline() as p:
>   assert_that(
> > p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', 
> > 'c']))
> apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> apache_beam/pipeline.py:529: in __exit__
> self.run().wait_until_finish()
> apache_beam/pipeline.py:502: in run
> self._options).run(False)
> apache_beam/pipeline.py:515: in run
> return self.runner.run_pipeline(self, self._options)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in 
> run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in 
> run_via_runner_api
> return self.run_stages(stage_context, stages)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages
> bundle_context_manager,
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage
> bundle_manager)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in 
> process_bundle
> timer_inputs)):
> /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator
> yield future.result()
> /usr/lib/python3.5/concurrent/futures/_base.py:405: in result
> return self.__get_result()
> /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result
> raise self._exception
> apache_beam/utils/thread_pool_executor.py:44: in run
> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute
> dry_run)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in 
> process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push
> response = self.worker.do_instruction(request)
> apache_beam/runners/worker/sdk_worker.py:471: in do_instruction
> getattr(request, request_type), request.instruction_id)
> apache_beam/runners/worker/sdk_worker.py:500: in process_bundle
> instruction_id, request.process_bundle_descriptor_id)
> apache_beam/runners/worker/sdk_worker.py:374: in get
> self.data_channel_factory)
> apache_beam/runners/worker/bundle_processor.py:782: in __init__
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
> apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree
> descriptor.transforms, key=topological_height, reverse=True)
> apache_beam/runners/worker/bundle_processor.py:836: in 
> (transform_id, get_operation(transform_id)) for transform_id in sorted(
> apache_beam/runners/worker/bundle_processor.py:726: in wrapper
> result = cache[args] = func(*args)
> apache_beam/runners/worker/bundle_processor.py:820: in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> apache_beam/runners/worker/bundle_processor.py:820: in 
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> apache_beam/runners/worker/bundle_processor.py:818: in 
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> apache_beam/runners/worker/bundle_processor.py:726: in wrapper
> result = cache[args] = func(*args)
> 

[jira] [Created] (BEAM-9832) KeyError: 'No such coder: ' in fn_runner_test

2020-04-27 Thread Ning Kang (Jira)
Ning Kang created BEAM-9832:
---

 Summary: KeyError: 'No such coder: ' in fn_runner_test
 Key: BEAM-9832
 URL: https://issues.apache.org/jira/browse/BEAM-9832
 Project: Beam
  Issue Type: Test
  Components: sdk-py-core, test-failures
Reporter: Ning Kang


Failed test results can be found 
[here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]]

 

A stack trace:
{code:java}
self = 


def test_read(self):
  # Can't use NamedTemporaryFile as a context
  # due to https://bugs.python.org/issue14243
  temp_file = tempfile.NamedTemporaryFile(delete=False)
  try:
temp_file.write(b'a\nb\nc')
temp_file.close()
with self.create_pipeline() as p:
  assert_that(
> p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', 
> 'c']))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:529: in __exit__
self.run().wait_until_finish()
apache_beam/pipeline.py:502: in run
self._options).run(False)
apache_beam/pipeline.py:515: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in 
run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages
bundle_context_manager,
apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage
bundle_manager)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in 
process_bundle
timer_inputs)):
/usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator
yield future.result()
/usr/lib/python3.5/concurrent/futures/_base.py:405: in result
return self.__get_result()
/usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result
raise self._exception
apache_beam/utils/thread_pool_executor.py:44: in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute
dry_run)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in 
process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:471: in do_instruction
getattr(request, request_type), request.instruction_id)
apache_beam/runners/worker/sdk_worker.py:500: in process_bundle
instruction_id, request.process_bundle_descriptor_id)
apache_beam/runners/worker/sdk_worker.py:374: in get
self.data_channel_factory)
apache_beam/runners/worker/bundle_processor.py:782: in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
apache_beam/runners/worker/bundle_processor.py:836: in 
(transform_id, get_operation(transform_id)) for transform_id in sorted(
apache_beam/runners/worker/bundle_processor.py:726: in wrapper
result = cache[args] = func(*args)
apache_beam/runners/worker/bundle_processor.py:820: in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
apache_beam/runners/worker/bundle_processor.py:820: in 
pcoll_id in descriptor.transforms[transform_id].outputs.items()
apache_beam/runners/worker/bundle_processor.py:818: in 
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
apache_beam/runners/worker/bundle_processor.py:726: in wrapper
result = cache[args] = func(*args)
apache_beam/runners/worker/bundle_processor.py:823: in get_operation
transform_id, transform_consumers)
apache_beam/runners/worker/bundle_processor.py:1108: in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
apache_beam/runners/worker/bundle_processor.py:1341: in 
create_pair_with_restriction
return _create_sdf_operation(PairWithRestriction, *args)
apache_beam/runners/worker/bundle_processor.py:1404: in _create_sdf_operation
parameter)
apache_beam/runners/worker/bundle_processor.py:1501: in _create_pardo_operation
output_coders = factory.get_output_coders(transform_proto)
apache_beam/runners/worker/bundle_processor.py:1154: in get_output_coders
pcoll_id in transform_proto.outputs.items()
apache_beam/runners/worker/bundle_processor.py:1154: in 

[jira] [Commented] (BEAM-9803) test_streaming_wordcount flaky

2020-04-22 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089871#comment-17089871
 ] 

Ning Kang commented on BEAM-9803:
-

We'll probably need to remove such integration test from being executed on 
Jenkins.

The Jenkins machines could be executing many different things at the same time. 
It's possible that the machine does nothing for the test at all in the given 
5-second timeout.

> test_streaming_wordcount flaky
> --
>
> Key: BEAM-9803
> URL: https://issues.apache.org/jira/browse/BEAM-9803
> Project: Beam
>  Issue Type: Test
>  Components: sdk-py-core, test-failures
>Reporter: Ning Kang
>Assignee: Sam Rohde
>Priority: Major
>
> {code:java}
> Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount
>  (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 
> sec.Error MessageAssertionError: DataFrame are different  DataFrame shape 
> mismatch [left]:  (10, 4) [right]: (6, 4)Stacktraceself = 
>   testMethod=test_streaming_wordcount>
> @unittest.skipIf(
> sys.version_info < (3, 5, 3),
> 'The tests require at least Python 3.6 to work.')
> def test_streaming_wordcount(self):
>   class WordExtractingDoFn(beam.DoFn):
> def process(self, element):
>   text_line = element.strip()
>   words = text_line.split()
>   return words
> 
>   # Add the TestStream so that it can be cached.
>   ib.options.capturable_sources.add(TestStream)
>   ib.options.capture_duration = timedelta(seconds=5)
> 
>   p = beam.Pipeline(
>   runner=interactive_runner.InteractiveRunner(),
>   options=StandardOptions(streaming=True))
> 
>   data = (
>   p
>   | TestStream()
>   .advance_watermark_to(0)
>   .advance_processing_time(1)
>   .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
>   .advance_watermark_to(20)
>   .advance_processing_time(1)
>   .add_elements(['that', 'is', 'the', 'question'])
>   | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
> 
>   counts = (
>   data
>   | 'split' >> beam.ParDo(WordExtractingDoFn())
>   | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
>   | 'group' >> beam.GroupByKey()
>   | 'count' >> beam.Map(lambda wordones: (wordones[0], 
> sum(wordones[1]
> 
>   # Watch the local scope for Interactive Beam so that referenced 
> PCollections
>   # will be cached.
>   ib.watch(locals())
> 
>   # This is normally done in the interactive_utils when a transform is
>   # applied but needs an IPython environment. So we manually run this 
> here.
>   ie.current_env().track_user_pipelines()
> 
>   # Create a fake limiter that cancels the BCJ once the main job receives 
> the
>   # expected amount of results.
>   class FakeLimiter:
> def __init__(self, p, pcoll):
>   self.p = p
>   self.pcoll = pcoll
> 
> def is_triggered(self):
>   result = ie.current_env().pipeline_result(self.p)
>   if result:
> try:
>   results = result.get(self.pcoll)
> except ValueError:
>   return False
> return len(results) >= 10
>   return False
> 
>   # This sets the limiters to stop reading when the test receives 10 
> elements
>   # or after 5 seconds have elapsed (to eliminate the possibility of 
> hanging).
>   ie.current_env().options.capture_control.set_limiters_for_test(
>   [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
> 
>   # This tests that the data was correctly cached.
>   pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
>   expected_data_df = pd.DataFrame([
>   ('to', 0, [IntervalWindow(0, 10)], pane_info),
>   ('be', 0, [IntervalWindow(0, 10)], pane_info),
>   ('or', 0, [IntervalWindow(0, 10)], pane_info),
>   ('not', 0, [IntervalWindow(0, 10)], pane_info),
>   ('to', 0, [IntervalWindow(0, 10)], pane_info),
>   ('be', 0, [IntervalWindow(0, 10)], pane_info),
>   ('that', 2000, [IntervalWindow(20, 30)], pane_info),
>   ('is', 2000, [IntervalWindow(20, 30)], pane_info),
>   ('the', 2000, [IntervalWindow(20, 30)], pane_info),
>   ('question', 2000, [IntervalWindow(20, 30)], pane_info)
>   ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
> 
>   data_df = ib.collect(data, include_window_info=True)
> > pd.testing.assert_frame_equal(expected_data_df, data_df)
> E AssertionError: DataFrame are different
> 

[jira] [Assigned] (BEAM-9803) test_streaming_wordcount flaky

2020-04-22 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang reassigned BEAM-9803:
---

Assignee: Sam Rohde

> test_streaming_wordcount flaky
> --
>
> Key: BEAM-9803
> URL: https://issues.apache.org/jira/browse/BEAM-9803
> Project: Beam
>  Issue Type: Test
>  Components: sdk-py-core, test-failures
>Reporter: Ning Kang
>Assignee: Sam Rohde
>Priority: Major
>
> {code:java}
> Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount
>  (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 
> sec.Error MessageAssertionError: DataFrame are different  DataFrame shape 
> mismatch [left]:  (10, 4) [right]: (6, 4)Stacktraceself = 
>   testMethod=test_streaming_wordcount>
> @unittest.skipIf(
> sys.version_info < (3, 5, 3),
> 'The tests require at least Python 3.6 to work.')
> def test_streaming_wordcount(self):
>   class WordExtractingDoFn(beam.DoFn):
> def process(self, element):
>   text_line = element.strip()
>   words = text_line.split()
>   return words
> 
>   # Add the TestStream so that it can be cached.
>   ib.options.capturable_sources.add(TestStream)
>   ib.options.capture_duration = timedelta(seconds=5)
> 
>   p = beam.Pipeline(
>   runner=interactive_runner.InteractiveRunner(),
>   options=StandardOptions(streaming=True))
> 
>   data = (
>   p
>   | TestStream()
>   .advance_watermark_to(0)
>   .advance_processing_time(1)
>   .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
>   .advance_watermark_to(20)
>   .advance_processing_time(1)
>   .add_elements(['that', 'is', 'the', 'question'])
>   | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
> 
>   counts = (
>   data
>   | 'split' >> beam.ParDo(WordExtractingDoFn())
>   | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
>   | 'group' >> beam.GroupByKey()
>   | 'count' >> beam.Map(lambda wordones: (wordones[0], 
> sum(wordones[1]
> 
>   # Watch the local scope for Interactive Beam so that referenced 
> PCollections
>   # will be cached.
>   ib.watch(locals())
> 
>   # This is normally done in the interactive_utils when a transform is
>   # applied but needs an IPython environment. So we manually run this 
> here.
>   ie.current_env().track_user_pipelines()
> 
>   # Create a fake limiter that cancels the BCJ once the main job receives 
> the
>   # expected amount of results.
>   class FakeLimiter:
> def __init__(self, p, pcoll):
>   self.p = p
>   self.pcoll = pcoll
> 
> def is_triggered(self):
>   result = ie.current_env().pipeline_result(self.p)
>   if result:
> try:
>   results = result.get(self.pcoll)
> except ValueError:
>   return False
> return len(results) >= 10
>   return False
> 
>   # This sets the limiters to stop reading when the test receives 10 
> elements
>   # or after 5 seconds have elapsed (to eliminate the possibility of 
> hanging).
>   ie.current_env().options.capture_control.set_limiters_for_test(
>   [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
> 
>   # This tests that the data was correctly cached.
>   pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
>   expected_data_df = pd.DataFrame([
>   ('to', 0, [IntervalWindow(0, 10)], pane_info),
>   ('be', 0, [IntervalWindow(0, 10)], pane_info),
>   ('or', 0, [IntervalWindow(0, 10)], pane_info),
>   ('not', 0, [IntervalWindow(0, 10)], pane_info),
>   ('to', 0, [IntervalWindow(0, 10)], pane_info),
>   ('be', 0, [IntervalWindow(0, 10)], pane_info),
>   ('that', 2000, [IntervalWindow(20, 30)], pane_info),
>   ('is', 2000, [IntervalWindow(20, 30)], pane_info),
>   ('the', 2000, [IntervalWindow(20, 30)], pane_info),
>   ('question', 2000, [IntervalWindow(20, 30)], pane_info)
>   ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
> 
>   data_df = ib.collect(data, include_window_info=True)
> > pd.testing.assert_frame_equal(expected_data_df, data_df)
> E AssertionError: DataFrame are different
> E 
> E DataFrame shape mismatch
> E [left]:  (10, 4)
> E [right]: (6, 4)
> apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9803) test_streaming_wordcount flaky

2020-04-22 Thread Ning Kang (Jira)
Ning Kang created BEAM-9803:
---

 Summary: test_streaming_wordcount flaky
 Key: BEAM-9803
 URL: https://issues.apache.org/jira/browse/BEAM-9803
 Project: Beam
  Issue Type: Test
  Components: sdk-py-core, test-failures
Reporter: Ning Kang


{code:java}
Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount
 (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 
sec.Error MessageAssertionError: DataFrame are different  DataFrame shape 
mismatch [left]:  (10, 4) [right]: (6, 4)Stacktraceself = 


@unittest.skipIf(
sys.version_info < (3, 5, 3),
'The tests require at least Python 3.6 to work.')
def test_streaming_wordcount(self):
  class WordExtractingDoFn(beam.DoFn):
def process(self, element):
  text_line = element.strip()
  words = text_line.split()
  return words

  # Add the TestStream so that it can be cached.
  ib.options.capturable_sources.add(TestStream)
  ib.options.capture_duration = timedelta(seconds=5)

  p = beam.Pipeline(
  runner=interactive_runner.InteractiveRunner(),
  options=StandardOptions(streaming=True))

  data = (
  p
  | TestStream()
  .advance_watermark_to(0)
  .advance_processing_time(1)
  .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
  .advance_watermark_to(20)
  .advance_processing_time(1)
  .add_elements(['that', 'is', 'the', 'question'])
  | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable

  counts = (
  data
  | 'split' >> beam.ParDo(WordExtractingDoFn())
  | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
  | 'group' >> beam.GroupByKey()
  | 'count' >> beam.Map(lambda wordones: (wordones[0], 
sum(wordones[1]

  # Watch the local scope for Interactive Beam so that referenced 
PCollections
  # will be cached.
  ib.watch(locals())

  # This is normally done in the interactive_utils when a transform is
  # applied but needs an IPython environment. So we manually run this here.
  ie.current_env().track_user_pipelines()

  # Create a fake limiter that cancels the BCJ once the main job receives 
the
  # expected amount of results.
  class FakeLimiter:
def __init__(self, p, pcoll):
  self.p = p
  self.pcoll = pcoll

def is_triggered(self):
  result = ie.current_env().pipeline_result(self.p)
  if result:
try:
  results = result.get(self.pcoll)
except ValueError:
  return False
return len(results) >= 10
  return False

  # This sets the limiters to stop reading when the test receives 10 
elements
  # or after 5 seconds have elapsed (to eliminate the possibility of 
hanging).
  ie.current_env().options.capture_control.set_limiters_for_test(
  [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])

  # This tests that the data was correctly cached.
  pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
  expected_data_df = pd.DataFrame([
  ('to', 0, [IntervalWindow(0, 10)], pane_info),
  ('be', 0, [IntervalWindow(0, 10)], pane_info),
  ('or', 0, [IntervalWindow(0, 10)], pane_info),
  ('not', 0, [IntervalWindow(0, 10)], pane_info),
  ('to', 0, [IntervalWindow(0, 10)], pane_info),
  ('be', 0, [IntervalWindow(0, 10)], pane_info),
  ('that', 2000, [IntervalWindow(20, 30)], pane_info),
  ('is', 2000, [IntervalWindow(20, 30)], pane_info),
  ('the', 2000, [IntervalWindow(20, 30)], pane_info),
  ('question', 2000, [IntervalWindow(20, 30)], pane_info)
  ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable

  data_df = ib.collect(data, include_window_info=True)
> pd.testing.assert_frame_equal(expected_data_df, data_df)
E AssertionError: DataFrame are different
E 
E DataFrame shape mismatch
E [left]:  (10, 4)
E [right]: (6, 4)

apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9549) Flaky portableWordCountBatch and portableWordCountStreaming tests

2020-03-18 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-9549:

Description: 
The tests :sdks:python:test-suites:portable:py2:portableWordCountBatch and 
:sdks:python:test-suites:portable:py2:portableWordCountStreaming are flaky, 
sometimes throws grpc errrors.

Stacktrace

!Sr5cNnx8sAW.png|width=2049,height=1001!
In text:
{code:java}
INFO:root:Using Python SDK docker image: apache/beam_python2.7_sdk:2.21.0.dev. 
If the image is not available at local, we will try to pull from 
hub.docker.comINFO:apache_beam.runners.portability.fn_api_runner_transforms:
  
INFO:apache_beam.utils.subprocess_server:Starting service 
with ['docker' 'run' '-v' '/usr/bin/docker:/bin/docker' '-v' 
'/var/run/docker.sock:/var/run/docker.sock' '--network=host' 
'apache/beam_flink1.9_job_server:latest' '--job-host' 'localhost' '--job-port' 
'58753' '--artifact-port' '60175' '--expansion-port' 
'33067']INFO:apache_beam.utils.subprocess_server:[main] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
ArtifactStagingService started on 
localhost:60175INFO:apache_beam.utils.subprocess_server:[main] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java 
ExpansionService started on 
localhost:33067INFO:apache_beam.utils.subprocess_server:[main] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService 
started on localhost:58753ERROR:grpc._common:Exception deserializing 
message!Traceback (most recent call last):  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_common.py",
 line 84, in _transformreturn transformer(message)DecodeError: Error 
parsing messageTraceback (most recent call last):  File 
"/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main"__main__", 
fname, loader, pkg_name)  File "/usr/lib/python2.7/runpy.py", line 72, in 
_run_codeexec code in run_globals  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py",
 line 142, in run()  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py",
 line 121, in runresult = p.run()  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 495, in runself._options).run(False)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 508, in runreturn self.runner.run_pipeline(self, self._options)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 401, in run_pipelinejob_service_handle.submit(proto_pipeline)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 102, in submitprepare_response = self.prepare(proto_pipeline)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 179, in preparetimeout=self.timeout)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py",
 line 826, in __call__return _end_unary_response_blocking(state, call, 
False, None)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py",
 line 729, in _end_unary_response_blockingraise 
_InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of 
RPC that terminated with: status = StatusCode.INTERNALdetails = "Exception 
deserializing response!"   debug_error_string = "None">
{code}
A gradle scan [example|https://scans.gradle.com/s/yv63cefske4cy].

  was:
The tests :sdks:python:test-suites:portable:py2:portableWordCountBatch and 
:sdks:python:test-suites:portable:py2:portableWordCountStreaming are flaky, 
sometimes throws grpc errrors.

Stacktrace
{code:java}
INFO:root:Using Python SDK docker image: apache/beam_python2.7_sdk:2.21.0.dev. 
If the image is not available at local, we will try to pull from 

[jira] [Updated] (BEAM-9549) Flaky portableWordCountBatch and portableWordCountStreaming tests

2020-03-18 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-9549:

Attachment: Sr5cNnx8sAW.png

> Flaky portableWordCountBatch and portableWordCountStreaming tests
> -
>
> Key: BEAM-9549
> URL: https://issues.apache.org/jira/browse/BEAM-9549
> Project: Beam
>  Issue Type: Test
>  Components: test-failures
>Reporter: Ning Kang
>Assignee: Ankur Goenka
>Priority: Major
> Attachments: Sr5cNnx8sAW.png
>
>
> The tests :sdks:python:test-suites:portable:py2:portableWordCountBatch and 
> :sdks:python:test-suites:portable:py2:portableWordCountStreaming are flaky, 
> sometimes throws grpc errrors.
> Stacktrace
> {code:java}
> INFO:root:Using Python SDK docker image: 
> apache/beam_python2.7_sdk:2.21.0.dev. If the image is not available at local, 
> we will try to pull from 
> hub.docker.comINFO:apache_beam.runners.portability.fn_api_runner_transforms:
>   
> INFO:apache_beam.utils.subprocess_server:Starting service 
> with ['docker' 'run' '-v' '/usr/bin/docker:/bin/docker' '-v' 
> '/var/run/docker.sock:/var/run/docker.sock' '--network=host' 
> 'apache/beam_flink1.9_job_server:latest' '--job-host' 'localhost' 
> '--job-port' '58753' '--artifact-port' '60175' '--expansion-port' 
> '33067']INFO:apache_beam.utils.subprocess_server:[main] INFO 
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
> ArtifactStagingService started on 
> localhost:60175INFO:apache_beam.utils.subprocess_server:[main] INFO 
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java 
> ExpansionService started on 
> localhost:33067INFO:apache_beam.utils.subprocess_server:[main] INFO 
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
> JobService started on localhost:58753ERROR:grpc._common:Exception 
> deserializing message!Traceback (most recent call last):  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_common.py",
>  line 84, in _transformreturn transformer(message)DecodeError: Error 
> parsing messageTraceback (most recent call last):  File 
> "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
> "__main__", fname, loader, pkg_name)  File "/usr/lib/python2.7/runpy.py", 
> line 72, in _run_codeexec code in run_globals  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py",
>  line 142, in run()  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py",
>  line 121, in runresult = p.run()  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 495, in runself._options).run(False)  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 508, in runreturn self.runner.run_pipeline(self, self._options)  
> File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 401, in run_pipelinejob_service_handle.submit(proto_pipeline)  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 102, in submitprepare_response = self.prepare(proto_pipeline)  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 179, in preparetimeout=self.timeout)  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py",
>  line 826, in __call__return _end_unary_response_blocking(state, call, 
> False, None)  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py",
>  line 729, in _end_unary_response_blockingraise 
> _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError 
> of RPC that terminated with:   status = StatusCode.INTERNAL

[jira] [Created] (BEAM-9549) Flaky portableWordCountBatch and portableWordCountStreaming tests

2020-03-18 Thread Ning Kang (Jira)
Ning Kang created BEAM-9549:
---

 Summary: Flaky portableWordCountBatch and 
portableWordCountStreaming tests
 Key: BEAM-9549
 URL: https://issues.apache.org/jira/browse/BEAM-9549
 Project: Beam
  Issue Type: Test
  Components: test-failures
Reporter: Ning Kang


The tests :sdks:python:test-suites:portable:py2:portableWordCountBatch and 
:sdks:python:test-suites:portable:py2:portableWordCountStreaming are flaky, 
sometimes throws grpc errrors.

Stacktrace
{code:java}
INFO:root:Using Python SDK docker image: apache/beam_python2.7_sdk:2.21.0.dev. 
If the image is not available at local, we will try to pull from 
hub.docker.comINFO:apache_beam.runners.portability.fn_api_runner_transforms:
  
INFO:apache_beam.utils.subprocess_server:Starting service 
with ['docker' 'run' '-v' '/usr/bin/docker:/bin/docker' '-v' 
'/var/run/docker.sock:/var/run/docker.sock' '--network=host' 
'apache/beam_flink1.9_job_server:latest' '--job-host' 'localhost' '--job-port' 
'58753' '--artifact-port' '60175' '--expansion-port' 
'33067']INFO:apache_beam.utils.subprocess_server:[main] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
ArtifactStagingService started on 
localhost:60175INFO:apache_beam.utils.subprocess_server:[main] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java 
ExpansionService started on 
localhost:33067INFO:apache_beam.utils.subprocess_server:[main] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService 
started on localhost:58753ERROR:grpc._common:Exception deserializing 
message!Traceback (most recent call last):  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_common.py",
 line 84, in _transformreturn transformer(message)DecodeError: Error 
parsing messageTraceback (most recent call last):  File 
"/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main"__main__", 
fname, loader, pkg_name)  File "/usr/lib/python2.7/runpy.py", line 72, in 
_run_codeexec code in run_globals  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py",
 line 142, in run()  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py",
 line 121, in runresult = p.run()  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 495, in runself._options).run(False)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 508, in runreturn self.runner.run_pipeline(self, self._options)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 401, in run_pipelinejob_service_handle.submit(proto_pipeline)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 102, in submitprepare_response = self.prepare(proto_pipeline)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 179, in preparetimeout=self.timeout)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py",
 line 826, in __call__return _end_unary_response_blocking(state, call, 
False, None)  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py",
 line 729, in _end_unary_response_blockingraise 
_InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of 
RPC that terminated with: status = StatusCode.INTERNALdetails = "Exception 
deserializing response!"   debug_error_string = "None">
{code}
A gradle scan [example|https://scans.gradle.com/s/yv63cefske4cy].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow

2020-03-10 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang resolved BEAM-7926.
-
Fix Version/s: Not applicable
   Resolution: Fixed

> Show PCollection with Interactive Beam in a data-centric user flow
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 59h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.
>  
> Detailed 
> [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8991) RuntimeError in log_handler_test

2019-12-19 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000296#comment-17000296
 ] 

Ning Kang commented on BEAM-8991:
-

Got it, thanks!

 

I've also experienced something interesting when running tests.

Say if you intentionally log something at warning level and if you do it in the 
__main__ scope, such warning log will fail tests when running on Jenkins. Some 
gradle tasks will just exit when they see warning level logs.

However, if you move the warning log to some local scope, say some constructor 
of some class, the warning log would not fail those gradle tasks.

> RuntimeError in log_handler_test
> 
>
> Key: BEAM-8991
> URL: https://issues.apache.org/jira/browse/BEAM-8991
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ning Kang
>Priority: Major
> Fix For: Not applicable
>
>
> Now is:
> {code:java}
> apache_beam.runners.worker.log_handler_test.FnApiLogRecordHandlerTest.test_exc_info
>  (from py27-gcp-pytest)Failing for the past 1 build (Since #1290 )Took 78 
> ms.Error MessageIndexError: list index out of rangeStacktraceself = 
>  testMethod=test_exc_info>
> def test_exc_info(self):
>   try:
> raise ValueError('some message')
>   except ValueError:
> _LOGGER.error('some error', exc_info=True)
> 
>   self.fn_log_handler.close()
> 
> > log_entry = 
> > self.test_logging_service.log_records_received[0].log_entries[0]
> E IndexError: list index out of range
> apache_beam/runners/worker/log_handler_test.py:110: IndexErrorStandard 
> ErrorERROR:apache_beam.runners.worker.log_handler_test:some error
> Traceback (most recent call last):
>   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/log_handler_test.py",
>  line 104, in test_exc_info
> raise ValueError('some message')
> ValueError: some message
> {code}
>  Marking it as a duplicate of BEAM-8974.
> Was:
> {code:java}
> 19:28:06 > Task :sdks:python:test-suites:tox:py35:testPy35Cython
> .Exception in thread Thread-1715:
> 19:28:06 Traceback (most recent call last):
> 19:28:06   File "apache_beam/runners/common.py", line 879, in 
> apache_beam.runners.common.DoFnRunner.process
> 19:28:06 return self.do_fn_invoker.invoke_process(windowed_value)
> 19:28:06   File "apache_beam/runners/common.py", line 495, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
> 19:28:06 windowed_value, self.process_method(windowed_value.value))
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/transforms/core.py",
>  line 1434, in 
> 19:28:06 wrapper = lambda x: [fn(x)]
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
>  line 620, in raise_error
> 19:28:06 raise RuntimeError('x')
> 19:28:06 RuntimeError: x
> 19:28:06 
> 19:28:06 During handling of the above exception, another exception occurred:
> 19:28:06 
> 19:28:06 Traceback (most recent call last):
> 19:28:06   File "/usr/lib/python3.5/threading.py", line 914, in 
> _bootstrap_inner
> 19:28:06 self.run()
> 19:28:06   File "/usr/lib/python3.5/threading.py", line 862, in run
> 19:28:06 self._target(*self._args, **self._kwargs)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
>  line 270, in _run_job
> 19:28:06 self._pipeline_proto)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 461, in run_via_runner_api
> 19:28:06 return self.run_stages(stage_context, stages)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 553, in run_stages
> 19:28:06 stage_results.process_bundle.monitoring_infos)
> 19:28:06   File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__
> 19:28:06 self.gen.throw(type, value, traceback)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 500, in maybe_profile
> 19:28:06 yield
> 19:28:06   File 
> 

[jira] [Updated] (BEAM-8991) RuntimeError in log_handler_test

2019-12-18 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-8991:

Summary: RuntimeError in log_handler_test  (was: RuntimeError in 
fn_api_runner_test.py)

> RuntimeError in log_handler_test
> 
>
> Key: BEAM-8991
> URL: https://issues.apache.org/jira/browse/BEAM-8991
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ning Kang
>Priority: Major
>
> Now is:
> {code:java}
> apache_beam.runners.worker.log_handler_test.FnApiLogRecordHandlerTest.test_exc_info
>  (from py27-gcp-pytest)Failing for the past 1 build (Since #1290 )Took 78 
> ms.Error MessageIndexError: list index out of rangeStacktraceself = 
>  testMethod=test_exc_info>
> def test_exc_info(self):
>   try:
> raise ValueError('some message')
>   except ValueError:
> _LOGGER.error('some error', exc_info=True)
> 
>   self.fn_log_handler.close()
> 
> > log_entry = 
> > self.test_logging_service.log_records_received[0].log_entries[0]
> E IndexError: list index out of range
> apache_beam/runners/worker/log_handler_test.py:110: IndexErrorStandard 
> ErrorERROR:apache_beam.runners.worker.log_handler_test:some error
> Traceback (most recent call last):
>   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/log_handler_test.py",
>  line 104, in test_exc_info
> raise ValueError('some message')
> ValueError: some message
> {code}
>  Marking it as a duplicate of BEAM-8974.
> Was:
> {code:java}
> 19:28:06 > Task :sdks:python:test-suites:tox:py35:testPy35Cython
> .Exception in thread Thread-1715:
> 19:28:06 Traceback (most recent call last):
> 19:28:06   File "apache_beam/runners/common.py", line 879, in 
> apache_beam.runners.common.DoFnRunner.process
> 19:28:06 return self.do_fn_invoker.invoke_process(windowed_value)
> 19:28:06   File "apache_beam/runners/common.py", line 495, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
> 19:28:06 windowed_value, self.process_method(windowed_value.value))
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/transforms/core.py",
>  line 1434, in 
> 19:28:06 wrapper = lambda x: [fn(x)]
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
>  line 620, in raise_error
> 19:28:06 raise RuntimeError('x')
> 19:28:06 RuntimeError: x
> 19:28:06 
> 19:28:06 During handling of the above exception, another exception occurred:
> 19:28:06 
> 19:28:06 Traceback (most recent call last):
> 19:28:06   File "/usr/lib/python3.5/threading.py", line 914, in 
> _bootstrap_inner
> 19:28:06 self.run()
> 19:28:06   File "/usr/lib/python3.5/threading.py", line 862, in run
> 19:28:06 self._target(*self._args, **self._kwargs)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
>  line 270, in _run_job
> 19:28:06 self._pipeline_proto)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 461, in run_via_runner_api
> 19:28:06 return self.run_stages(stage_context, stages)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 553, in run_stages
> 19:28:06 stage_results.process_bundle.monitoring_infos)
> 19:28:06   File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__
> 19:28:06 self.gen.throw(type, value, traceback)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 500, in maybe_profile
> 19:28:06 yield
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 550, in run_stages
> 19:28:06 stage_context.safe_coders)
> 19:28:06   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 870, in _run_stage
> 19:28:06 

[jira] [Updated] (BEAM-8991) RuntimeError in fn_api_runner_test.py

2019-12-18 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-8991:

Description: 
Now is:
{code:java}
apache_beam.runners.worker.log_handler_test.FnApiLogRecordHandlerTest.test_exc_info
 (from py27-gcp-pytest)Failing for the past 1 build (Since #1290 )Took 78 
ms.Error MessageIndexError: list index out of rangeStacktraceself = 


def test_exc_info(self):
  try:
raise ValueError('some message')
  except ValueError:
_LOGGER.error('some error', exc_info=True)

  self.fn_log_handler.close()

> log_entry = 
> self.test_logging_service.log_records_received[0].log_entries[0]
E IndexError: list index out of range

apache_beam/runners/worker/log_handler_test.py:110: IndexErrorStandard 
ErrorERROR:apache_beam.runners.worker.log_handler_test:some error
Traceback (most recent call last):
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/log_handler_test.py",
 line 104, in test_exc_info
raise ValueError('some message')
ValueError: some message
{code}
 Marking it as a duplicate of BEAM-8974.

Was:
{code:java}
19:28:06 > Task :sdks:python:test-suites:tox:py35:testPy35Cython
.Exception in thread Thread-1715:
19:28:06 Traceback (most recent call last):
19:28:06   File "apache_beam/runners/common.py", line 879, in 
apache_beam.runners.common.DoFnRunner.process
19:28:06 return self.do_fn_invoker.invoke_process(windowed_value)
19:28:06   File "apache_beam/runners/common.py", line 495, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
19:28:06 windowed_value, self.process_method(windowed_value.value))
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/transforms/core.py",
 line 1434, in 
19:28:06 wrapper = lambda x: [fn(x)]
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
 line 620, in raise_error
19:28:06 raise RuntimeError('x')
19:28:06 RuntimeError: x
19:28:06 
19:28:06 During handling of the above exception, another exception occurred:
19:28:06 
19:28:06 Traceback (most recent call last):
19:28:06   File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
19:28:06 self.run()
19:28:06   File "/usr/lib/python3.5/threading.py", line 862, in run
19:28:06 self._target(*self._args, **self._kwargs)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
 line 270, in _run_job
19:28:06 self._pipeline_proto)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 461, in run_via_runner_api
19:28:06 return self.run_stages(stage_context, stages)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 553, in run_stages
19:28:06 stage_results.process_bundle.monitoring_infos)
19:28:06   File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__
19:28:06 self.gen.throw(type, value, traceback)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 500, in maybe_profile
19:28:06 yield
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 550, in run_stages
19:28:06 stage_context.safe_coders)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 870, in _run_stage
19:28:06 result, splits = bundle_manager.process_bundle(data_input, 
data_output)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 2052, in process_bundle
19:28:06 part, expected_outputs), part_inputs):
19:28:06   File "/usr/lib/python3.5/concurrent/futures/_base.py", line 556, in 
result_iterator
19:28:06 yield future.result()
19:28:06   File "/usr/lib/python3.5/concurrent/futures/_base.py", line 405, in 
result
19:28:06 

[jira] [Created] (BEAM-8991) RuntimeError in fn_api_runner_test.py

2019-12-18 Thread Ning Kang (Jira)
Ning Kang created BEAM-8991:
---

 Summary: RuntimeError in fn_api_runner_test.py
 Key: BEAM-8991
 URL: https://issues.apache.org/jira/browse/BEAM-8991
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Reporter: Ning Kang


{code:java}
19:28:06 > Task :sdks:python:test-suites:tox:py35:testPy35Cython
.Exception in thread Thread-1715:
19:28:06 Traceback (most recent call last):
19:28:06   File "apache_beam/runners/common.py", line 879, in 
apache_beam.runners.common.DoFnRunner.process
19:28:06 return self.do_fn_invoker.invoke_process(windowed_value)
19:28:06   File "apache_beam/runners/common.py", line 495, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
19:28:06 windowed_value, self.process_method(windowed_value.value))
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/transforms/core.py",
 line 1434, in 
19:28:06 wrapper = lambda x: [fn(x)]
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
 line 620, in raise_error
19:28:06 raise RuntimeError('x')
19:28:06 RuntimeError: x
19:28:06 
19:28:06 During handling of the above exception, another exception occurred:
19:28:06 
19:28:06 Traceback (most recent call last):
19:28:06   File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
19:28:06 self.run()
19:28:06   File "/usr/lib/python3.5/threading.py", line 862, in run
19:28:06 self._target(*self._args, **self._kwargs)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
 line 270, in _run_job
19:28:06 self._pipeline_proto)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 461, in run_via_runner_api
19:28:06 return self.run_stages(stage_context, stages)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 553, in run_stages
19:28:06 stage_results.process_bundle.monitoring_infos)
19:28:06   File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__
19:28:06 self.gen.throw(type, value, traceback)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 500, in maybe_profile
19:28:06 yield
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 550, in run_stages
19:28:06 stage_context.safe_coders)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 870, in _run_stage
19:28:06 result, splits = bundle_manager.process_bundle(data_input, 
data_output)
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 2052, in process_bundle
19:28:06 part, expected_outputs), part_inputs):
19:28:06   File "/usr/lib/python3.5/concurrent/futures/_base.py", line 556, in 
result_iterator
19:28:06 yield future.result()
19:28:06   File "/usr/lib/python3.5/concurrent/futures/_base.py", line 405, in 
result
19:28:06 return self.__get_result()
19:28:06   File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in 
__get_result
19:28:06 raise self._exception
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/utils/thread_pool_executor.py",
 line 42, in run
19:28:06 self._future.set_result(self._fn(*self._fn_args, 
**self._fn_kwargs))
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 2052, in 
19:28:06 part, expected_outputs), part_inputs):
19:28:06   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 1977, in process_bundle

[jira] [Commented] (BEAM-8977) apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest.test_dynamic_plotting_update_same_display is flaky

2019-12-17 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998474#comment-16998474
 ] 

Ning Kang commented on BEAM-8977:
-

The flakiness is caused by indeterministic execution of asynchronous tasks to 
be tested.

Affected by the performance of the system executing the tests, it's possible 
that during the 1 second sleep, the asynchronous task is not executed at all.

However, adding a list length check, increasing the sleeping time or setting an 
await with timeout will not resolve the test flakiness issue.

If the asynchronous scheduled task is not executed for at least 2 iterations, 
there is nothing to be tested and a failure/success will not help with the 
tests.

I'll remove the flaky unit test since it's flaky in its nature. Mocking such 
asynchronous mechanism doesn't give us anything, so we will not test the 
asynchronous mechanism.

I'll add a different test for the underlying asynchronous task and test the 
logic synchronously.

 

 

 

> apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest.test_dynamic_plotting_update_same_display
>  is flaky
> 
>
> Key: BEAM-8977
> URL: https://issues.apache.org/jira/browse/BEAM-8977
> Project: Beam
>  Issue Type: Bug
>  Components: test-failures
>Reporter: Valentyn Tymofieiev
>Assignee: Ning Kang
>Priority: Major
>
> Sample failure: 
>  
> [https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1273/testReport/apache_beam.runners.interactive.display.pcoll_visualization_test/PCollectionVisualizationTest/test_dynamic_plotting_update_same_display/]
> Error Message
>  IndexError: list index out of range
> Stacktrace
>  self = 
>   testMethod=test_dynamic_plotting_update_same_display>
>  mocked_display_facets =  id='139889868386376'>
> @patch('apache_beam.runners.interactive.display.pcoll_visualization'
>  '.PCollectionVisualization.display_facets')
>  def test_dynamic_plotting_update_same_display(self,
>  mocked_display_facets):
>  fake_pipeline_result = runner.PipelineResult(runner.PipelineState.RUNNING)
>  ie.current_env().set_pipeline_result(self._p, fake_pipeline_result)
>  # Starts async dynamic plotting that never ends in this test.
>  h = pv.visualize(self._pcoll, dynamic_plotting_interval=0.001)
>  # Blocking so the above async task can execute some iterations.
>  time.sleep(1)
>  # The first iteration doesn't provide updating_pv to display_facets.
>  _, first_kwargs = mocked_display_facets.call_args_list[0]
>  self.assertEqual(first_kwargs, {})
>  # The following iterations use the same updating_pv to display_facets and so
>  # on.
>  > _, second_kwargs = mocked_display_facets.call_args_list[1]
>  E IndexError: list index out of range
> apache_beam/runners/interactive/display/pcoll_visualization_test.py:105: 
> IndexError
> Standard Output
> 
>  Standard Error
>  WARNING:apache_beam.runners.interactive.interactive_environment:You cannot 
> use Interactive Beam features when you are not in an interactive environment 
> such as a Jupyter notebook or ipython terminal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work started] (BEAM-8977) apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest.test_dynamic_plotting_update_same_display is flaky

2019-12-17 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-8977 started by Ning Kang.
---
> apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest.test_dynamic_plotting_update_same_display
>  is flaky
> 
>
> Key: BEAM-8977
> URL: https://issues.apache.org/jira/browse/BEAM-8977
> Project: Beam
>  Issue Type: Bug
>  Components: test-failures
>Reporter: Valentyn Tymofieiev
>Assignee: Ning Kang
>Priority: Major
>
> Sample failure: 
>  
> [https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1273/testReport/apache_beam.runners.interactive.display.pcoll_visualization_test/PCollectionVisualizationTest/test_dynamic_plotting_update_same_display/]
> Error Message
>  IndexError: list index out of range
> Stacktrace
>  self = 
>   testMethod=test_dynamic_plotting_update_same_display>
>  mocked_display_facets =  id='139889868386376'>
> @patch('apache_beam.runners.interactive.display.pcoll_visualization'
>  '.PCollectionVisualization.display_facets')
>  def test_dynamic_plotting_update_same_display(self,
>  mocked_display_facets):
>  fake_pipeline_result = runner.PipelineResult(runner.PipelineState.RUNNING)
>  ie.current_env().set_pipeline_result(self._p, fake_pipeline_result)
>  # Starts async dynamic plotting that never ends in this test.
>  h = pv.visualize(self._pcoll, dynamic_plotting_interval=0.001)
>  # Blocking so the above async task can execute some iterations.
>  time.sleep(1)
>  # The first iteration doesn't provide updating_pv to display_facets.
>  _, first_kwargs = mocked_display_facets.call_args_list[0]
>  self.assertEqual(first_kwargs, {})
>  # The following iterations use the same updating_pv to display_facets and so
>  # on.
>  > _, second_kwargs = mocked_display_facets.call_args_list[1]
>  E IndexError: list index out of range
> apache_beam/runners/interactive/display/pcoll_visualization_test.py:105: 
> IndexError
> Standard Output
> 
>  Standard Error
>  WARNING:apache_beam.runners.interactive.interactive_environment:You cannot 
> use Interactive Beam features when you are not in an interactive environment 
> such as a Jupyter notebook or ipython terminal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-8837) PCollectionVisualizationTest: possible bug

2019-12-16 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang resolved BEAM-8837.
-
Fix Version/s: 2.19.0
   Resolution: Fixed

PR: [https://github.com/apache/beam/pull/10321] merged.

> PCollectionVisualizationTest: possible bug
> --
>
> Key: BEAM-8837
> URL: https://issues.apache.org/jira/browse/BEAM-8837
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Ning Kang
>Priority: Major
> Fix For: 2.19.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> This seems like a bug, even though the test passes:
> {code}
> test_display_plain_text_when_kernel_has_no_frontend 
> (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
>  ... Exception in thread Thread-4405:
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
> self.run()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py",
>  line 19, in run
> self.execute(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 132, in continuous_update_display
> updated_pv.display_facets(updating_pv=pv)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 209, in display_facets
> data = self._to_dataframe()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 278, in _to_dataframe
> for el in self._to_element_list():
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 266, in _to_element_list
> if ie.current_env().cache_manager().exists('full', self._cache_key):
> AttributeError: 'NoneType' object has no attribute 'exists'
> ok
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8837) PCollectionVisualizationTest: possible bug

2019-12-13 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995948#comment-16995948
 ] 

Ning Kang commented on BEAM-8837:
-

We'll just run the real logic to prepare data as inputs for visualization 
instead of patching test data for PCollections to be visualized.

 

 

> PCollectionVisualizationTest: possible bug
> --
>
> Key: BEAM-8837
> URL: https://issues.apache.org/jira/browse/BEAM-8837
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> This seems like a bug, even though the test passes:
> {code}
> test_display_plain_text_when_kernel_has_no_frontend 
> (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
>  ... Exception in thread Thread-4405:
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
> self.run()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py",
>  line 19, in run
> self.execute(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 132, in continuous_update_display
> updated_pv.display_facets(updating_pv=pv)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 209, in display_facets
> data = self._to_dataframe()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 278, in _to_dataframe
> for el in self._to_element_list():
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 266, in _to_element_list
> if ie.current_env().cache_manager().exists('full', self._cache_key):
> AttributeError: 'NoneType' object has no attribute 'exists'
> ok
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-8837) PCollectionVisualizationTest: possible bug

2019-12-06 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986465#comment-16986465
 ] 

Ning Kang edited comment on BEAM-8837 at 12/7/19 12:10 AM:
---

The *@patch* in unit tests didn't work as expected and the code executes the 
real logic to read cached PCollection when there is *None* cache manager in the 
test.
 I can reproduce the error trace by running the gradle task
{code:java}
14:20:16 test_display_plain_text_when_kernel_has_no_frontend 
(apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
 ... Exception in thread Thread-3982:
14:20:16 Traceback (most recent call last):
14:20:16   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
14:20:16 self.run()
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/target/.tox-py37-gcp/py37-gcp/lib/python3.7/site-packages/timeloop/job.py",
 line 19, in run
14:20:16 self.execute(*self.args, **self.kwargs)
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 132, in continuous_update_display
14:20:16 updated_pv.display_facets(updating_pv=pv)
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 209, in display_facets
14:20:16 data = self._to_dataframe()
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 278, in _to_dataframe
14:20:16 for el in self._to_element_list():
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 266, in _to_element_list
14:20:16 if ie.current_env().cache_manager().exists('full', 
self._cache_key):
14:20:16 AttributeError: 'NoneType' object has no attribute 'exists'
14:20:16 
14:20:16 ok{code}
I've tried locally run with *unittest* and *nosetests*, both passed without 
running into the issue. I'm pretty sure it's a gotcha in [where to 
patch|https://docs.python.org/3/library/unittest.mock.html#where-to-patch].

When the test is ran by tox + nosetests, the patch doesn't work.

 

 


was (Author: ningk):
The *@patch* in unit tests didn't work as expected and the code executes the 
real logic to read cached PCollection when there is *None* cache manager in the 
test.
 [~udim], do you have a link to the failed Jenkins test? I'll need to figure 
out what kind of test invocation path failed this *@patch*.
 I've tried locally run with *unittest* and *nosetests*, both passed without 
running into the issue. I'm pretty sure it's a gotcha in [where to 
patch|https://docs.python.org/3/library/unittest.mock.html#where-to-patch].

> PCollectionVisualizationTest: possible bug
> --
>
> Key: BEAM-8837
> URL: https://issues.apache.org/jira/browse/BEAM-8837
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Ning Kang
>Priority: Major
>
> This seems like a bug, even though the test passes:
> {code}
> test_display_plain_text_when_kernel_has_no_frontend 
> (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
>  ... Exception in thread Thread-4405:
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
> self.run()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py",
>  line 19, in run
> self.execute(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 132, in continuous_update_display
> updated_pv.display_facets(updating_pv=pv)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 209, in display_facets
> data = self._to_dataframe()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 278, in _to_dataframe
> for el in 

[jira] [Comment Edited] (BEAM-8837) PCollectionVisualizationTest: possible bug

2019-12-06 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986465#comment-16986465
 ] 

Ning Kang edited comment on BEAM-8837 at 12/7/19 12:10 AM:
---

The *@patch* in unit tests didn't work as expected and the code executes the 
real logic to read cached PCollection when there is *None* cache manager in the 
test.
 I can reproduce the error trace by running the gradle task
{code:java}
14:20:16 test_display_plain_text_when_kernel_has_no_frontend 
(apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
 ... Exception in thread Thread-3982:
14:20:16 Traceback (most recent call last):
14:20:16   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
14:20:16 self.run()
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/target/.tox-py37-gcp/py37-gcp/lib/python3.7/site-packages/timeloop/job.py",
 line 19, in run
14:20:16 self.execute(*self.args, **self.kwargs)
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 132, in continuous_update_display
14:20:16 updated_pv.display_facets(updating_pv=pv)
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 209, in display_facets
14:20:16 data = self._to_dataframe()
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 278, in _to_dataframe
14:20:16 for el in self._to_element_list():
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 266, in _to_element_list
14:20:16 if ie.current_env().cache_manager().exists('full', 
self._cache_key):
14:20:16 AttributeError: 'NoneType' object has no attribute 'exists'
14:20:16 
14:20:16 ok{code}
I've tried locally run with *unittest* and *nosetests*, both passed without 
running into the issue. I'm pretty sure it's a gotcha in [where to 
patch|https://docs.python.org/3/library/unittest.mock.html#where-to-patch].

When the test is run by tox + nosetests, the patch doesn't work.

 

 


was (Author: ningk):
The *@patch* in unit tests didn't work as expected and the code executes the 
real logic to read cached PCollection when there is *None* cache manager in the 
test.
 I can reproduce the error trace by running the gradle task
{code:java}
14:20:16 test_display_plain_text_when_kernel_has_no_frontend 
(apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
 ... Exception in thread Thread-3982:
14:20:16 Traceback (most recent call last):
14:20:16   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
14:20:16 self.run()
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/target/.tox-py37-gcp/py37-gcp/lib/python3.7/site-packages/timeloop/job.py",
 line 19, in run
14:20:16 self.execute(*self.args, **self.kwargs)
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 132, in continuous_update_display
14:20:16 updated_pv.display_facets(updating_pv=pv)
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 209, in display_facets
14:20:16 data = self._to_dataframe()
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 278, in _to_dataframe
14:20:16 for el in self._to_element_list():
14:20:16   File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
 line 266, in _to_element_list
14:20:16 if ie.current_env().cache_manager().exists('full', 
self._cache_key):
14:20:16 AttributeError: 'NoneType' object has no attribute 'exists'
14:20:16 
14:20:16 ok{code}
I've tried locally run with *unittest* and *nosetests*, both passed 

[jira] [Commented] (BEAM-8837) PCollectionVisualizationTest: possible bug

2019-12-02 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986465#comment-16986465
 ] 

Ning Kang commented on BEAM-8837:
-

The *@patch* in unit tests didn't work as expected and the code executes the 
real logic to read cached PCollection when there is *None* cache manager in the 
test.
 [~udim], do you have a link to the failed Jenkins test? I'll need to figure 
out what kind of test invocation path failed this *@patch*.
 I've tried locally run with *unittest* and *nosetests*, both passed without 
running into the issue. I'm pretty sure it's a gotcha in [where to 
patch|https://docs.python.org/3/library/unittest.mock.html#where-to-patch].

> PCollectionVisualizationTest: possible bug
> --
>
> Key: BEAM-8837
> URL: https://issues.apache.org/jira/browse/BEAM-8837
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Ning Kang
>Priority: Major
>
> This seems like a bug, even though the test passes:
> {code}
> test_display_plain_text_when_kernel_has_no_frontend 
> (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
>  ... Exception in thread Thread-4405:
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
> self.run()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py",
>  line 19, in run
> self.execute(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 132, in continuous_update_display
> updated_pv.display_facets(updating_pv=pv)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 209, in display_facets
> data = self._to_dataframe()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 278, in _to_dataframe
> for el in self._to_element_list():
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 266, in _to_element_list
> if ie.current_env().cache_manager().exists('full', self._cache_key):
> AttributeError: 'NoneType' object has no attribute 'exists'
> ok
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8837) PCollectionVisualizationTest: possible bug

2019-12-02 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986426#comment-16986426
 ] 

Ning Kang commented on BEAM-8837:
-

Thanks, [~udim], I'll take a look at it.

> PCollectionVisualizationTest: possible bug
> --
>
> Key: BEAM-8837
> URL: https://issues.apache.org/jira/browse/BEAM-8837
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Ning Kang
>Priority: Major
>
> This seems like a bug, even though the test passes:
> {code}
> test_display_plain_text_when_kernel_has_no_frontend 
> (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
>  ... Exception in thread Thread-4405:
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
> self.run()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py",
>  line 19, in run
> self.execute(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 132, in continuous_update_display
> updated_pv.display_facets(updating_pv=pv)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 209, in display_facets
> data = self._to_dataframe()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 278, in _to_dataframe
> for el in self._to_element_list():
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 266, in _to_element_list
> if ie.current_env().cache_manager().exists('full', self._cache_key):
> AttributeError: 'NoneType' object has no attribute 'exists'
> ok
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work started] (BEAM-8837) PCollectionVisualizationTest: possible bug

2019-12-02 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-8837 started by Ning Kang.
---
> PCollectionVisualizationTest: possible bug
> --
>
> Key: BEAM-8837
> URL: https://issues.apache.org/jira/browse/BEAM-8837
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Ning Kang
>Priority: Major
>
> This seems like a bug, even though the test passes:
> {code}
> test_display_plain_text_when_kernel_has_no_frontend 
> (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest)
>  ... Exception in thread Thread-4405:
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
> self.run()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py",
>  line 19, in run
> self.execute(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 132, in continuous_update_display
> updated_pv.display_facets(updating_pv=pv)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 209, in display_facets
> data = self._to_dataframe()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 278, in _to_dataframe
> for el in self._to_element_list():
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py",
>  line 266, in _to_element_list
> if ie.current_env().cache_manager().exists('full', self._cache_key):
> AttributeError: 'NoneType' object has no attribute 'exists'
> ok
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-8016) Render Beam Pipeline as DOT with Interactive Beam

2019-12-02 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang resolved BEAM-8016.
-
Fix Version/s: 2.18.0
   Resolution: Fixed

PR has been merged

> Render Beam Pipeline as DOT with Interactive Beam  
> ---
>
> Key: BEAM-8016
> URL: https://issues.apache.org/jira/browse/BEAM-8016
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
> Fix For: 2.18.0
>
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> With work in https://issues.apache.org/jira/browse/BEAM-7760, Beam pipeline 
> converted to DOT then rendered should mark user defined variables on edges.
> With work in https://issues.apache.org/jira/browse/BEAM-7926, it might be 
> redundant or confusing to render arbitrary random sample PCollection data on 
> edges.
> We'll also make sure edges in the graph corresponds to output -> input 
> relationship in the user defined pipeline. Each edge is one output. If 
> multiple down stream inputs take the same output, it should be rendered as 
> one edge diverging into two instead of two edges.
> For advanced interactivity highlight where each execution highlights the part 
> of the pipeline really executed from the original pipeline, we'll also 
> provide the support in beta.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow

2019-11-15 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Description: 
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

 
{code:java}
p = beam.Pipeline(InteractiveRunner())
pcoll = p | 'Transform' >> transform()
pcoll2 = ...
pcoll3 = ...{code}
The use can call a single function and get auto-magical charting of the data.

e.g.,
{code:java}
show(pcoll, pcoll2)
{code}
Throughout the process, a pipeline fragment is built to include only transforms 
necessary to produce the desired pcolls (pcoll and pcoll2) and execute that 
fragment.

This makes the Interactive Beam user flow data-centric.

 

Detailed 
[design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz].

  was:
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

 
{code:java}
p = beam.Pipeline(InteractiveRunner())
pcoll = p | 'Transform' >> transform()
pcoll2 = ...
pcoll3 = ...{code}
The use can call a single function and get auto-magical charting of the data.

e.g.,
{code:java}
show(pcoll, pcoll2)
{code}
Throughout the process, a pipeline fragment is built to include only transforms 
necessary to produce the desired pcolls (pcoll and pcoll2) and execute that 
fragment.

This makes the Interactive Beam user flow data-centric.

 

Detailed 
[design|[https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]].


> Show PCollection with Interactive Beam in a data-centric user flow
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.
>  
> Detailed 
> [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow

2019-11-15 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Description: 
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

 
{code:java}
p = beam.Pipeline(InteractiveRunner())
pcoll = p | 'Transform' >> transform()
pcoll2 = ...
pcoll3 = ...{code}
The use can call a single function and get auto-magical charting of the data.

e.g.,
{code:java}
show(pcoll, pcoll2)
{code}
Throughout the process, a pipeline fragment is built to include only transforms 
necessary to produce the desired pcolls (pcoll and pcoll2) and execute that 
fragment.

This makes the Interactive Beam user flow data-centric.

 

Detailed 
[design|[https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]].

  was:
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

 
{code:java}
p = beam.Pipeline(InteractiveRunner())
pcoll = p | 'Transform' >> transform()
pcoll2 = ...
pcoll3 = ...{code}
The use can call a single function and get auto-magical charting of the data.

e.g.,
{code:java}
show(pcoll, pcoll2)
{code}
Throughout the process, a pipeline fragment is built to include only transforms 
necessary to produce the desired pcolls (pcoll and pcoll2) and execute that 
fragment.

This makes the Interactive Beam user flow data-centric.


> Show PCollection with Interactive Beam in a data-centric user flow
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.
>  
> Detailed 
> [design|[https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow

2019-11-15 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Description: 
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

 
{code:java}
p = beam.Pipeline(InteractiveRunner())
pcoll = p | 'Transform' >> transform()
pcoll2 = ...
pcoll3 = ...{code}
The use can call a single function and get auto-magical charting of the data.

e.g.,
{code:java}
show(pcoll, pcoll2)
{code}
Throughout the process, a pipeline fragment is built to include only transforms 
necessary to produce the desired pcolls (pcoll and pcoll2) and execute that 
fragment.

This makes the Interactive Beam user flow data-centric.

  was:
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

 
{code:java}
p = beam.Pipeline(InteractiveRunner())
pcoll = p | 'Transform' >> transform()
pcoll2 = ...
pcoll3 = ...{code}
The use can call a single function and get auto-magical charting of the data.

e.g., show(pcoll, pcoll2)

Throughout the process, a pipeline fragment is built to include only transforms 
necessary to produce the desired pcolls (pcoll and pcoll2) and execute that 
fragment.


> Show PCollection with Interactive Beam in a data-centric user flow
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow

2019-11-15 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Summary: Show PCollection with Interactive Beam in a data-centric user flow 
 (was: Show PCollection with Interactive Beam)

> Show PCollection with Interactive Beam in a data-centric user flow
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g., show(pcoll, pcoll2)
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work started] (BEAM-8379) Cache Eviction for Interactive Beam

2019-11-15 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-8379 started by Ning Kang.
---
> Cache Eviction for Interactive Beam
> ---
>
> Key: BEAM-8379
> URL: https://issues.apache.org/jira/browse/BEAM-8379
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Evicts cache created by Interactive Beam when an IPython kernel is restarted 
> or terminated to release the resource usage that is no longer needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-8379) Cache Eviction for Interactive Beam

2019-11-15 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang resolved BEAM-8379.
-
Fix Version/s: 2.18.0
   Resolution: Fixed

> Cache Eviction for Interactive Beam
> ---
>
> Key: BEAM-8379
> URL: https://issues.apache.org/jira/browse/BEAM-8379
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
> Fix For: 2.18.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Evicts cache created by Interactive Beam when an IPython kernel is restarted 
> or terminated to release the resource usage that is no longer needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution

2019-11-11 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang closed BEAM-8425.
---
Fix Version/s: Not applicable
   Resolution: Abandoned

We decide not to notify users but proactively steer the user away from the bad 
situation.

By implementing show() that executes an implicitly built fragment, we mitigate 
the re-execution/deleted cell problems.

> Notifying Interactive Beam user about Beam related cell deletion or 
> re-execution
> 
>
> Key: BEAM-8425
> URL: https://issues.apache.org/jira/browse/BEAM-8425
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
> Fix For: Not applicable
>
>
> There is a general problem about Interactive Notebooks that when an end user 
> deletes a cell that has been executed or re-executes a cell, those previous 
> executions are hidden from the end user.
> However, hidden states will still have side effects in the notebook.
> This kind of problem bothers Beam even more because Beam's pipeline 
> construction statements are note idempotent and pipeline execution is 
> decoupled and deferred from pipeline construction.
> Re-executing a cell with Beam statements that build a pipeline would cause 
> unexpected pipeline state and the user wouldn't notice it due to the problem 
> of notebooks.
> We'll intercept each transform application invocation from the 
> InteractiveRunner and record the ipython/notebook prompt number. Then each 
> time a user executes a cell that applies PTransform, we'll compare the 
> recorded list of prompt numbers with current notebook file's content and 
> figure out if there is any missing number. If so, we know for sure that a 
> re-execution happens and use display manager to notify the end user of 
> potential side effects caused by hidden states of the notebook/ipython.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook

2019-11-11 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang resolved BEAM-7760.
-
Fix Version/s: 2.18.0
   Resolution: Fixed

> Interactive Beam Caching PCollections bound to user defined vars in notebook
> 
>
> Key: BEAM-7760
> URL: https://issues.apache.org/jira/browse/BEAM-7760
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
> Fix For: 2.18.0
>
>  Time Spent: 18h
>  Remaining Estimate: 0h
>
> Cache only PCollections bound to user defined variables in a pipeline when 
> running pipeline with interactive runner in jupyter notebooks.
> [Interactive 
> Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
>  has been caching and using caches of "leaf" PCollections for interactive 
> execution in jupyter notebooks.
> The interactive execution is currently supported so that when appending new 
> transforms to existing pipeline for a new run, executed part of the pipeline 
> doesn't need to be re-executed. 
> A PCollection is "leaf" when it is never used as input in any PTransform in 
> the pipeline.
> The problem with building caches and pipeline to execute around "leaf" is 
> that when a PCollection is consumed by a sink with no output, the pipeline to 
> execute built will miss the subgraph generating and consuming that 
> PCollection.
> An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty 
> pipeline.
> Caching around PCollections bound to user defined variables and replacing 
> transforms with source and sink of caches could resolve the pipeline to 
> execute properly under the interactive execution scenario. Also, cached 
> PCollection now can trace back to user code and can be used for user data 
> visualization if user wants to do it.
> E.g.,
> {code:java}
> // ...
> p = beam.Pipeline(interactive_runner.InteractiveRunner(),
>   options=pipeline_options)
> messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
> messages | "Write" >> beam.io.WriteToPubSub(topic_path)
> result = p.run()
> // ...
> visualize(messages){code}
>  The interactive runner automatically figures out that PCollection
> {code:java}
> messages{code}
> created by
> {code:java}
> p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
> should be cached and reused if the notebook user appends more transforms.
>  And once the pipeline gets executed, the user could use any 
> visualize(PCollection) module to visualize the data statically (batch) or 
> dynamically (stream)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam

2019-11-11 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Description: 
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

 
{code:java}
p = beam.Pipeline(InteractiveRunner())
pcoll = p | 'Transform' >> transform()
pcoll2 = ...
pcoll3 = ...{code}
The use can call a single function and get auto-magical charting of the data.

e.g., show(pcoll, pcoll2)

Throughout the process, a pipeline fragment is built to include only transforms 
necessary to produce the desired pcolls (pcoll and pcoll2) and execute that 
fragment.

  was:
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

p = create_pipeline()

pcoll = p | 'Transform' >> transform()

The use can call a single function and get auto-magical charting of the data as 
materialized pcoll.

e.g., show(pcoll)


> Show PCollection with Interactive Beam
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g., show(pcoll, pcoll2)
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-7926) Show PCollection with Interactive Beam

2019-11-11 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971824#comment-16971824
 ] 

Ning Kang commented on BEAM-7926:
-

Implementation has been added. Mark it as resolved.

> Show PCollection with Interactive Beam
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
> p = create_pipeline()
> pcoll = p | 'Transform' >> transform()
> The use can call a single function and get auto-magical charting of the data 
> as materialized pcoll.
> e.g., show(pcoll)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam

2019-11-11 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Summary: Show PCollection with Interactive Beam  (was: Visualize 
PCollection with Interactive Beam)

> Show PCollection with Interactive Beam
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
> p = create_pipeline()
> pcoll = p | 'Transform' >> transform()
> The use can call a single function and get auto-magical charting of the data 
> as materialized pcoll.
> e.g., visualize(pcoll)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam

2019-11-11 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Description: 
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

p = create_pipeline()

pcoll = p | 'Transform' >> transform()

The use can call a single function and get auto-magical charting of the data as 
materialized pcoll.

e.g., show(pcoll)

  was:
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

p = create_pipeline()

pcoll = p | 'Transform' >> transform()

The use can call a single function and get auto-magical charting of the data as 
materialized pcoll.

e.g., visualize(pcoll)


> Show PCollection with Interactive Beam
> --
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
> p = create_pipeline()
> pcoll = p | 'Transform' >> transform()
> The use can call a single function and get auto-magical charting of the data 
> as materialized pcoll.
> e.g., show(pcoll)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-8457) Instrument Dataflow jobs that are launched from Notebooks

2019-11-11 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-8457:

Fix Version/s: (was: 2.17.0)
   2.18.0
  Description: 
Dataflow needs the capability to tell how many Dataflow jobs are launched from 
the Notebook environment.

We are doing it by checking if the current execution path is with ipython and 
if the ipython kernel is connected to a notebook frontend.

  was:
Dataflow needs the capability to tell how many Dataflow jobs are launched from 
the Notebook environment, i.e., the Interactive Runner.
 # Change the pipeline.run() API to allow supply a runner and an option 
parameter so that a pipeline initially bundled w/ an interactive runner can be 
directly run by other runners from notebook.
 # Implicitly add the necessary source information through user labels when the 
user does p.run(runner=DataflowRunner()).


> Instrument Dataflow jobs that are launched from Notebooks
> -
>
> Key: BEAM-8457
> URL: https://issues.apache.org/jira/browse/BEAM-8457
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
> Fix For: 2.18.0
>
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> Dataflow needs the capability to tell how many Dataflow jobs are launched 
> from the Notebook environment.
> We are doing it by checking if the current execution path is with ipython and 
> if the ipython kernel is connected to a notebook frontend.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-8457) Instrument Dataflow jobs that are launched from Notebooks

2019-10-24 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang resolved BEAM-8457.
-
Resolution: Fixed

PR is merged. Resolve this in case it blocks the release.

> Instrument Dataflow jobs that are launched from Notebooks
> -
>
> Key: BEAM-8457
> URL: https://issues.apache.org/jira/browse/BEAM-8457
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
> Fix For: 2.17.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Dataflow needs the capability to tell how many Dataflow jobs are launched 
> from the Notebook environment, i.e., the Interactive Runner.
>  # Change the pipeline.run() API to allow supply a runner and an option 
> parameter so that a pipeline initially bundled w/ an interactive runner can 
> be directly run by other runners from notebook.
>  # Implicitly add the necessary source information through user labels when 
> the user does p.run(runner=DataflowRunner()).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work started] (BEAM-8457) Instrument Dataflow jobs that are launched from Notebooks

2019-10-24 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-8457 started by Ning Kang.
---
> Instrument Dataflow jobs that are launched from Notebooks
> -
>
> Key: BEAM-8457
> URL: https://issues.apache.org/jira/browse/BEAM-8457
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
> Fix For: 2.17.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Dataflow needs the capability to tell how many Dataflow jobs are launched 
> from the Notebook environment, i.e., the Interactive Runner.
>  # Change the pipeline.run() API to allow supply a runner and an option 
> parameter so that a pipeline initially bundled w/ an interactive runner can 
> be directly run by other runners from notebook.
>  # Implicitly add the necessary source information through user labels when 
> the user does p.run(runner=DataflowRunner()).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-8457) Instrument Dataflow jobs that are launched from Notebooks

2019-10-22 Thread Ning Kang (Jira)
Ning Kang created BEAM-8457:
---

 Summary: Instrument Dataflow jobs that are launched from Notebooks
 Key: BEAM-8457
 URL: https://issues.apache.org/jira/browse/BEAM-8457
 Project: Beam
  Issue Type: Improvement
  Components: runner-py-interactive
Reporter: Ning Kang
Assignee: Ning Kang


Dataflow needs the capability to tell how many Dataflow jobs are launched from 
the Notebook environment, i.e., the Interactive Runner.
 # Change the pipeline.run() API to allow supply a runner and an option 
parameter so that a pipeline initially bundled w/ an interactive runner can be 
directly run by other runners from notebook.
 # Implicitly add the necessary source information through user labels when the 
user does p.run(runner=DataflowRunner()).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution

2019-10-17 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954083#comment-16954083
 ] 

Ning Kang commented on BEAM-8425:
-

1. We record all the cells that change the pipeline graph in a list.
2. Every time we change the pipeline graph, we go through the cells in that 
list and see if any of them is missing in the notebook file. If so, it means 
the user has either deleted the cell or has re-executed the cell, we print a 
warning and remove the cell from the list.
3. Continue the operation.

The way to do (1), is to have the interactive runner record the current cell 
number upon each .apply call. The current cell number is accessible through 
len(In)-1.

Note that we are not changing the behavior of the Beam programming model, 
except for printing a warning when the user has deleted or re-executed a cell 
that had side effect on the job graph, and that's when we think the confusion 
likely arises.

> Notifying Interactive Beam user about Beam related cell deletion or 
> re-execution
> 
>
> Key: BEAM-8425
> URL: https://issues.apache.org/jira/browse/BEAM-8425
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> There is a general problem about Interactive Notebooks that when an end user 
> deletes a cell that has been executed or re-executes a cell, those previous 
> executions are hidden from the end user.
> However, hidden states will still have side effects in the notebook.
> This kind of problem bothers Beam even more because Beam's pipeline 
> construction statements are note idempotent and pipeline execution is 
> decoupled and deferred from pipeline construction.
> Re-executing a cell with Beam statements that build a pipeline would cause 
> unexpected pipeline state and the user wouldn't notice it due to the problem 
> of notebooks.
> We'll intercept each transform application invocation from the 
> InteractiveRunner and record the ipython/notebook prompt number. Then each 
> time a user executes a cell that applies PTransform, we'll compare the 
> recorded list of prompt numbers with current notebook file's content and 
> figure out if there is any missing number. If so, we know for sure that a 
> re-execution happens and use display manager to notify the end user of 
> potential side effects caused by hidden states of the notebook/ipython.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution

2019-10-17 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954007#comment-16954007
 ] 

Ning Kang edited comment on BEAM-8425 at 10/17/19 6:35 PM:
---

To get ipython kernel (without try-except for non-interactive scenario):
{code:java}
from IPython import get_ipython
 _ipython = get_ipython()
{code}
 

 

To get prompt number of current PTransform applied:

 
{code:java}
len(vars(_ipython)['user_ns']['In'])-1{code}
 

Bonus, to get all code executed by the user in this kernel:

 
{code:java}
_ipython.magic('%history')
{code}
 

 


was (Author: ningk):
To get ipython kernel (without try-except for non-interactive scenario):

```

from IPython import get_ipython
_ipython = get_ipython()

```

To get prompt number of current PTransform applied:

```

len(vars(_ipython)['user_ns']['In'])-1

```

Bonus, to get all code executed by the user in this kernel:

_ipython.magic('%history')

> Notifying Interactive Beam user about Beam related cell deletion or 
> re-execution
> 
>
> Key: BEAM-8425
> URL: https://issues.apache.org/jira/browse/BEAM-8425
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> There is a general problem about Interactive Notebooks that when an end user 
> deletes a cell that has been executed or re-executes a cell, those previous 
> executions are hidden from the end user.
> However, hidden states will still have side effects in the notebook.
> This kind of problem bothers Beam even more because Beam's pipeline 
> construction statements are note idempotent and pipeline execution is 
> decoupled and deferred from pipeline construction.
> Re-executing a cell with Beam statements that build a pipeline would cause 
> unexpected pipeline state and the user wouldn't notice it due to the problem 
> of notebooks.
> We'll intercept each transform application invocation from the 
> InteractiveRunner and record the ipython/notebook prompt number. Then each 
> time a user executes a cell that applies PTransform, we'll compare the 
> recorded list of prompt numbers with current notebook file's content and 
> figure out if there is any missing number. If so, we know for sure that a 
> re-execution happens and use display manager to notify the end user of 
> potential side effects caused by hidden states of the notebook/ipython.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution

2019-10-17 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954007#comment-16954007
 ] 

Ning Kang commented on BEAM-8425:
-

To get ipython kernel (without try-except for non-interactive scenario):

```

from IPython import get_ipython
_ipython = get_ipython()

```

To get prompt number of current PTransform applied:

```

len(vars(_ipython)['user_ns']['In'])-1

```

Bonus, to get all code executed by the user in this kernel:

_ipython.magic('%history')

> Notifying Interactive Beam user about Beam related cell deletion or 
> re-execution
> 
>
> Key: BEAM-8425
> URL: https://issues.apache.org/jira/browse/BEAM-8425
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> There is a general problem about Interactive Notebooks that when an end user 
> deletes a cell that has been executed or re-executes a cell, those previous 
> executions are hidden from the end user.
> However, hidden states will still have side effects in the notebook.
> This kind of problem bothers Beam even more because Beam's pipeline 
> construction statements are note idempotent and pipeline execution is 
> decoupled and deferred from pipeline construction.
> Re-executing a cell with Beam statements that build a pipeline would cause 
> unexpected pipeline state and the user wouldn't notice it due to the problem 
> of notebooks.
> We'll intercept each transform application invocation from the 
> InteractiveRunner and record the ipython/notebook prompt number. Then each 
> time a user executes a cell that applies PTransform, we'll compare the 
> recorded list of prompt numbers with current notebook file's content and 
> figure out if there is any missing number. If so, we know for sure that a 
> re-execution happens and use display manager to notify the end user of 
> potential side effects caused by hidden states of the notebook/ipython.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution

2019-10-17 Thread Ning Kang (Jira)
Ning Kang created BEAM-8425:
---

 Summary: Notifying Interactive Beam user about Beam related cell 
deletion or re-execution
 Key: BEAM-8425
 URL: https://issues.apache.org/jira/browse/BEAM-8425
 Project: Beam
  Issue Type: New Feature
  Components: runner-py-interactive
Reporter: Ning Kang
Assignee: Ning Kang


There is a general problem about Interactive Notebooks that when an end user 
deletes a cell that has been executed or re-executes a cell, those previous 
executions are hidden from the end user.

However, hidden states will still have side effects in the notebook.

This kind of problem bothers Beam even more because Beam's pipeline 
construction statements are note idempotent and pipeline execution is decoupled 
and deferred from pipeline construction.

Re-executing a cell with Beam statements that build a pipeline would cause 
unexpected pipeline state and the user wouldn't notice it due to the problem of 
notebooks.

We'll intercept each transform application invocation from the 
InteractiveRunner and record the ipython/notebook prompt number. Then each time 
a user executes a cell that applies PTransform, we'll compare the recorded list 
of prompt numbers with current notebook file's content and figure out if there 
is any missing number. If so, we know for sure that a re-execution happens and 
use display manager to notify the end user of potential side effects caused by 
hidden states of the notebook/ipython.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-646) Get runners out of the apply()

2019-10-16 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953250#comment-16953250
 ] 

Ning Kang edited comment on BEAM-646 at 10/16/19 10:51 PM:
---

Hi,

This is Ning. I'm working on a project that supports InteractiveRunner 
(Python3+) when users use Beam in notebook environment.

Would apply() interception in runner still be useful as a hook for non-Beam 
related but interactivity related features such as collecting IPython/notebook 
cell information when a PTransfrom is applied?

Of course, we can also have all pipelines support interactivity by making the 
interception inside pipelines themselves. But it's unlikely that all runners 
would/could take a pipeline with interactivity logic at this moment. And those 
ipython/notebook dependencies probably shouldn't be introduced into pipeline 
itself.

Would there be any APIs that support invoking arbitrary external logic at 
different stages of building a pipeline when pipeline is completely decoupled 
from runner?

Thanks!


was (Author: ningk):
Hi,

This is Ning. I'm working a project that supports InteractiveRunner (Python3+) 
when users use Beam in notebook environment.

Would apply() interception in runner still be useful as a hook for non-Beam 
related but interactivity related features such as collecting IPython/notebook 
cell information when a PTransfrom is applied?

Of course, we can also have all pipelines support interactivity by making the 
interception inside pipelines themselves. But it's unlikely that all runners 
would/could take a pipeline with interactivity logic at this moment. And those 
ipython/notebook dependencies probably shouldn't be introduced into pipeline 
itself.

Would there be any APIs that support invoking arbitrary external logic at 
different stages of building a pipeline when pipeline is completely decoupled 
from runner?

Thanks!

> Get runners out of the apply()
> --
>
> Key: BEAM-646
> URL: https://issues.apache.org/jira/browse/BEAM-646
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Thomas Groh
>Priority: Major
>  Labels: backwards-incompatible
> Fix For: 0.6.0
>
>
> Right now, the runner intercepts calls to apply() and replaces transforms as 
> we go. This means that there is no "original" user graph. For portability and 
> misc architectural benefits, we would like to build the original graph first, 
> and have the runner override later.
> Some runners already work in this manner, but we could integrate it more 
> smoothly, with more validation, via some handy APIs on e.g. the Pipeline 
> object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-646) Get runners out of the apply()

2019-10-16 Thread Ning Kang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953250#comment-16953250
 ] 

Ning Kang commented on BEAM-646:


Hi,

This is Ning. I'm working a project that supports InteractiveRunner (Python3+) 
when users use Beam in notebook environment.

Would apply() interception in runner still be useful as a hook for non-Beam 
related but interactivity related features such as collecting IPython/notebook 
cell information when a PTransfrom is applied?

Of course, we can also have all pipelines support interactivity by making the 
interception inside pipelines themselves. But it's unlikely that all runners 
would/could take a pipeline with interactivity logic at this moment. And those 
ipython/notebook dependencies probably shouldn't be introduced into pipeline 
itself.

Would there be any APIs that support invoking arbitrary external logic at 
different stages of building a pipeline when pipeline is completely decoupled 
from runner?

Thanks!

> Get runners out of the apply()
> --
>
> Key: BEAM-646
> URL: https://issues.apache.org/jira/browse/BEAM-646
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Thomas Groh
>Priority: Major
>  Labels: backwards-incompatible
> Fix For: 0.6.0
>
>
> Right now, the runner intercepts calls to apply() and replaces transforms as 
> we go. This means that there is no "original" user graph. For portability and 
> misc architectural benefits, we would like to build the original graph first, 
> and have the runner override later.
> Some runners already work in this manner, but we could integrate it more 
> smoothly, with more validation, via some handy APIs on e.g. the Pipeline 
> object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-8379) Cache Eviction for Interactive Beam

2019-10-10 Thread Ning Kang (Jira)
Ning Kang created BEAM-8379:
---

 Summary: Cache Eviction for Interactive Beam
 Key: BEAM-8379
 URL: https://issues.apache.org/jira/browse/BEAM-8379
 Project: Beam
  Issue Type: New Feature
  Components: runner-py-interactive
Reporter: Ning Kang
Assignee: Ning Kang


Evicts cache created by Interactive Beam when an IPython kernel is restarted or 
terminated to release the resource usage that is no longer needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-8288) Cleanup Interactive Beam Python 2 support

2019-10-02 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-8288:

Component/s: (was: examples-python)
 runner-py-interactive

> Cleanup Interactive Beam Python 2 support
> -
>
> Key: BEAM-8288
> URL: https://issues.apache.org/jira/browse/BEAM-8288
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Minor
>
> As Beam is retiring Python 2, some special handle in Interactive Beam code 
> and tests will need to be cleaned up.
> This Jira ticket tracks those changes to be cleaned up.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-8016) Render Beam Pipeline as DOT with Interactive Beam

2019-10-02 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-8016:

Component/s: (was: examples-python)
 runner-py-interactive

> Render Beam Pipeline as DOT with Interactive Beam  
> ---
>
> Key: BEAM-8016
> URL: https://issues.apache.org/jira/browse/BEAM-8016
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> With work in https://issues.apache.org/jira/browse/BEAM-7760, Beam pipeline 
> converted to DOT then rendered should mark user defined variables on edges.
> With work in https://issues.apache.org/jira/browse/BEAM-7926, it might be 
> redundant or confusing to render arbitrary random sample PCollection data on 
> edges.
> We'll also make sure edges in the graph corresponds to output -> input 
> relationship in the user defined pipeline. Each edge is one output. If 
> multiple down stream inputs take the same output, it should be rendered as 
> one edge diverging into two instead of two edges.
> For advanced interactivity highlight where each execution highlights the part 
> of the pipeline really executed from the original pipeline, we'll also 
> provide the support in beta.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Visualize PCollection with Interactive Beam

2019-10-02 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Component/s: (was: examples-python)
 runner-py-interactive

> Visualize PCollection with Interactive Beam
> ---
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
> p = create_pipeline()
> pcoll = p | 'Transform' >> transform()
> The use can call a single function and get auto-magical charting of the data 
> as materialized pcoll.
> e.g., visualize(pcoll)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7923) Interactive Beam

2019-10-02 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7923:

Component/s: (was: examples-python)
 runner-py-interactive

> Interactive Beam
> 
>
> Key: BEAM-7923
> URL: https://issues.apache.org/jira/browse/BEAM-7923
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> This is the top level ticket for all efforts leveraging [interactive 
> Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
> As the development goes, blocking tickets will be added to this one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook

2019-10-02 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7760:

Component/s: (was: examples-python)
 runner-py-interactive

> Interactive Beam Caching PCollections bound to user defined vars in notebook
> 
>
> Key: BEAM-7760
> URL: https://issues.apache.org/jira/browse/BEAM-7760
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-py-interactive
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 17h 10m
>  Remaining Estimate: 0h
>
> Cache only PCollections bound to user defined variables in a pipeline when 
> running pipeline with interactive runner in jupyter notebooks.
> [Interactive 
> Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
>  has been caching and using caches of "leaf" PCollections for interactive 
> execution in jupyter notebooks.
> The interactive execution is currently supported so that when appending new 
> transforms to existing pipeline for a new run, executed part of the pipeline 
> doesn't need to be re-executed. 
> A PCollection is "leaf" when it is never used as input in any PTransform in 
> the pipeline.
> The problem with building caches and pipeline to execute around "leaf" is 
> that when a PCollection is consumed by a sink with no output, the pipeline to 
> execute built will miss the subgraph generating and consuming that 
> PCollection.
> An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty 
> pipeline.
> Caching around PCollections bound to user defined variables and replacing 
> transforms with source and sink of caches could resolve the pipeline to 
> execute properly under the interactive execution scenario. Also, cached 
> PCollection now can trace back to user code and can be used for user data 
> visualization if user wants to do it.
> E.g.,
> {code:java}
> // ...
> p = beam.Pipeline(interactive_runner.InteractiveRunner(),
>   options=pipeline_options)
> messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
> messages | "Write" >> beam.io.WriteToPubSub(topic_path)
> result = p.run()
> // ...
> visualize(messages){code}
>  The interactive runner automatically figures out that PCollection
> {code:java}
> messages{code}
> created by
> {code:java}
> p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
> should be cached and reused if the notebook user appends more transforms.
>  And once the pipeline gets executed, the user could use any 
> visualize(PCollection) module to visualize the data statically (batch) or 
> dynamically (stream)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-8288) Cleanup Interactive Beam Python 2 support

2019-09-19 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang reassigned BEAM-8288:
---

Assignee: Ning Kang

> Cleanup Interactive Beam Python 2 support
> -
>
> Key: BEAM-8288
> URL: https://issues.apache.org/jira/browse/BEAM-8288
> Project: Beam
>  Issue Type: Improvement
>  Components: examples-python
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Minor
>
> As Beam is retiring Python 2, some special handle in Interactive Beam code 
> and tests will need to be cleaned up.
> This Jira ticket tracks those changes to be cleaned up.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-8288) Cleanup Interactive Beam Python 2 support

2019-09-19 Thread Ning Kang (Jira)
Ning Kang created BEAM-8288:
---

 Summary: Cleanup Interactive Beam Python 2 support
 Key: BEAM-8288
 URL: https://issues.apache.org/jira/browse/BEAM-8288
 Project: Beam
  Issue Type: Improvement
  Components: examples-python
Reporter: Ning Kang


As Beam is retiring Python 2, some special handle in Interactive Beam code and 
tests will need to be cleaned up.

This Jira ticket tracks those changes to be cleaned up.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7926) Visualize PCollection with Interactive Beam

2019-08-23 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Description: 
Support auto plotting / charting of materialized data of a given PCollection 
with Interactive Beam.

Say an Interactive Beam pipeline defined as

p = create_pipeline()

pcoll = p | 'Transform' >> transform()

The use can call a single function and get auto-magical charting of the data as 
materialized pcoll.

e.g., visualize(pcoll)

  was:
Support auto plotting / charting of materialized data of a given PCollection 
with interactive Beam.

Say an iBeam pipeline defined as

p = ibeam.create_pipeline()

pcoll = p | 'Transform' >> transform()

The use can call a single function and get auto-magical charting of the data as 
materialized pcoll.

e.g., ibeam.visualize(pcoll)


> Visualize PCollection with Interactive Beam
> ---
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
> p = create_pipeline()
> pcoll = p | 'Transform' >> transform()
> The use can call a single function and get auto-magical charting of the data 
> as materialized pcoll.
> e.g., visualize(pcoll)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BEAM-8016) Render Beam Pipeline as DOT with Interactive Beam

2019-08-20 Thread Ning Kang (Jira)
Ning Kang created BEAM-8016:
---

 Summary: Render Beam Pipeline as DOT with Interactive Beam  
 Key: BEAM-8016
 URL: https://issues.apache.org/jira/browse/BEAM-8016
 Project: Beam
  Issue Type: Improvement
  Components: examples-python
Reporter: Ning Kang
Assignee: Ning Kang


With work in https://issues.apache.org/jira/browse/BEAM-7760, Beam pipeline 
converted to DOT then rendered should mark user defined variables on edges.

With work in https://issues.apache.org/jira/browse/BEAM-7926, it might be 
redundant or confusing to render arbitrary random sample PCollection data on 
edges.

We'll also make sure edges in the graph corresponds to output -> input 
relationship in the user defined pipeline. Each edge is one output. If multiple 
down stream inputs take the same output, it should be rendered as one edge 
diverging into two instead of two edges.

For advanced interactivity highlight where each execution highlights the part 
of the pipeline really executed from the original pipeline, we'll also provide 
the support in beta.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (BEAM-7926) Visualize PCollection with Interactive Beam

2019-08-20 Thread Ning Kang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Summary: Visualize PCollection with Interactive Beam  (was: Visualize 
PCollection with iBeam)

> Visualize PCollection with Interactive Beam
> ---
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with interactive Beam.
> Say an iBeam pipeline defined as
> p = ibeam.create_pipeline()
> pcoll = p | 'Transform' >> transform()
> The use can call a single function and get auto-magical charting of the data 
> as materialized pcoll.
> e.g., ibeam.visualize(pcoll)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BEAM-7926) Visualize PCollection with iBeam

2019-08-07 Thread Ning Kang (JIRA)
Ning Kang created BEAM-7926:
---

 Summary: Visualize PCollection with iBeam
 Key: BEAM-7926
 URL: https://issues.apache.org/jira/browse/BEAM-7926
 Project: Beam
  Issue Type: New Feature
  Components: examples-python
Reporter: Ning Kang
Assignee: Ning Kang


Support auto plotting / charting of materialized data of a given PCollection 
with interactive Beam.

Say an iBeam pipeline defined as

p = ibeam.create_pipeline()

pcoll = p | 'Transform' >> transform()

The use can call a single function and get auto-magical charting of the data as 
materialized pcoll.

e.g., visualize(pcoll)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7926) Visualize PCollection with iBeam

2019-08-07 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7926:

Description: 
Support auto plotting / charting of materialized data of a given PCollection 
with interactive Beam.

Say an iBeam pipeline defined as

p = ibeam.create_pipeline()

pcoll = p | 'Transform' >> transform()

The use can call a single function and get auto-magical charting of the data as 
materialized pcoll.

e.g., ibeam.visualize(pcoll)

  was:
Support auto plotting / charting of materialized data of a given PCollection 
with interactive Beam.

Say an iBeam pipeline defined as

p = ibeam.create_pipeline()

pcoll = p | 'Transform' >> transform()

The use can call a single function and get auto-magical charting of the data as 
materialized pcoll.

e.g., visualize(pcoll)


> Visualize PCollection with iBeam
> 
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with interactive Beam.
> Say an iBeam pipeline defined as
> p = ibeam.create_pipeline()
> pcoll = p | 'Transform' >> transform()
> The use can call a single function and get auto-magical charting of the data 
> as materialized pcoll.
> e.g., ibeam.visualize(pcoll)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook

2019-08-07 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang reassigned BEAM-7760:
---

Assignee: Ning Kang

> Interactive Beam Caching PCollections bound to user defined vars in notebook
> 
>
> Key: BEAM-7760
> URL: https://issues.apache.org/jira/browse/BEAM-7760
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Cache only PCollections bound to user defined variables in a pipeline when 
> running pipeline with interactive runner in jupyter notebooks.
> [Interactive 
> Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
>  has been caching and using caches of "leaf" PCollections for interactive 
> execution in jupyter notebooks.
> The interactive execution is currently supported so that when appending new 
> transforms to existing pipeline for a new run, executed part of the pipeline 
> doesn't need to be re-executed. 
> A PCollection is "leaf" when it is never used as input in any PTransform in 
> the pipeline.
> The problem with building caches and pipeline to execute around "leaf" is 
> that when a PCollection is consumed by a sink with no output, the pipeline to 
> execute built will miss the subgraph generating and consuming that 
> PCollection.
> An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty 
> pipeline.
> Caching around PCollections bound to user defined variables and replacing 
> transforms with source and sink of caches could resolve the pipeline to 
> execute properly under the interactive execution scenario. Also, cached 
> PCollection now can trace back to user code and can be used for user data 
> visualization if user wants to do it.
> E.g.,
> {code:java}
> // ...
> p = beam.Pipeline(interactive_runner.InteractiveRunner(),
>   options=pipeline_options)
> messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
> messages | "Write" >> beam.io.WriteToPubSub(topic_path)
> result = p.run()
> // ...
> visualize(messages){code}
>  The interactive runner automatically figures out that PCollection
> {code:java}
> messages{code}
> created by
> {code:java}
> p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
> should be cached and reused if the notebook user appends more transforms.
>  And once the pipeline gets executed, the user could use any 
> visualize(PCollection) module to visualize the data statically (batch) or 
> dynamically (stream)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BEAM-7923) Interactive Beam

2019-08-07 Thread Ning Kang (JIRA)
Ning Kang created BEAM-7923:
---

 Summary: Interactive Beam
 Key: BEAM-7923
 URL: https://issues.apache.org/jira/browse/BEAM-7923
 Project: Beam
  Issue Type: New Feature
  Components: examples-python
Reporter: Ning Kang
Assignee: Ning Kang


This is the top level ticket for all efforts leveraging [interactive 
Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]

As the development goes, blocking tickets will be added to this one.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors

2019-08-05 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang closed BEAM-7812.
---

> Work around Stackdriver error reporting double counting worker errors
> -
>
> Key: BEAM-7812
> URL: https://issues.apache.org/jira/browse/BEAM-7812
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> h1. *Objective*
> Work around Stackdriver Error Reporting to count worker errors only once when 
> double logging.
> {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
> h1. *Background*
> Stackdriver error reporting will double count worker errors logged to 
> Stackdriver, because:
>  # workers log errors to Stackdriver;
>  # workers report the same errors to dfe and dfe will log them again to 
> Stackdriver.
> The double counting is blocking us sending job message logs from dfe to 
> Stackdriver because we don't want to change the behavior of any existing log 
> and feature.
> There happens to be an inconsistency in Java batch 
> [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]]
>  and streaming 
> ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
>  error reporting to dfe that results in reported error from streaming Java 
> worker will eventually be ignored by Stackdriver Error Reporting.
> h1. *Details*
> Inspired by the inconsistency, we decide to apply the streaming Java worker 
> error reporting logic to batch to both fix the inconsistency and work around 
> double counting issue on Stackdriver Error Reporting.
> The change will be when workers reporting errors to dfe,
>  * For Java, construct stack trace from StackTrace object instead of using 
> printStackTrace;
>  * For Python, report the complete error message details exactly the same to 
> worker logging instead of only reporting traceback through traceback module.
> Users will not experience change since job message logging to Stackdriver 
> hasn’t been launched yet.
> h1. *Test Plan*
> We'll add unit test for public methods changed in the process.
> Google has internal integration tests where we can push worker harness images 
> and set worker harness container image to test in sandbox.
> When releasing, we also have integration tests in different releasing stages.
> The workaround needs to be released completely before we can enable job 
> message logging.
> We can verify the format of stacktraces in sandbox and release stages by 
> executing example pipelines in our projects and directly browse prod 
> Stackdriver logging and error reporting consoles. This should be done before 
> and after enabling job message logging.
> Run any other existing and required tests before sending PR.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors

2019-08-05 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang resolved BEAM-7812.
-
   Resolution: Fixed
Fix Version/s: Not applicable

The change is made to Dataflow service internally in Google instead of Dataflow 
runner since we want the change compatible with older versions of SDKs.

> Work around Stackdriver error reporting double counting worker errors
> -
>
> Key: BEAM-7812
> URL: https://issues.apache.org/jira/browse/BEAM-7812
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> h1. *Objective*
> Work around Stackdriver Error Reporting to count worker errors only once when 
> double logging.
> {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
> h1. *Background*
> Stackdriver error reporting will double count worker errors logged to 
> Stackdriver, because:
>  # workers log errors to Stackdriver;
>  # workers report the same errors to dfe and dfe will log them again to 
> Stackdriver.
> The double counting is blocking us sending job message logs from dfe to 
> Stackdriver because we don't want to change the behavior of any existing log 
> and feature.
> There happens to be an inconsistency in Java batch 
> [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]]
>  and streaming 
> ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
>  error reporting to dfe that results in reported error from streaming Java 
> worker will eventually be ignored by Stackdriver Error Reporting.
> h1. *Details*
> Inspired by the inconsistency, we decide to apply the streaming Java worker 
> error reporting logic to batch to both fix the inconsistency and work around 
> double counting issue on Stackdriver Error Reporting.
> The change will be when workers reporting errors to dfe,
>  * For Java, construct stack trace from StackTrace object instead of using 
> printStackTrace;
>  * For Python, report the complete error message details exactly the same to 
> worker logging instead of only reporting traceback through traceback module.
> Users will not experience change since job message logging to Stackdriver 
> hasn’t been launched yet.
> h1. *Test Plan*
> We'll add unit test for public methods changed in the process.
> Google has internal integration tests where we can push worker harness images 
> and set worker harness container image to test in sandbox.
> When releasing, we also have integration tests in different releasing stages.
> The workaround needs to be released completely before we can enable job 
> message logging.
> We can verify the format of stacktraces in sandbox and release stages by 
> executing example pipelines in our projects and directly browse prod 
> Stackdriver logging and error reporting consoles. This should be done before 
> and after enabling job message logging.
> Run any other existing and required tests before sending PR.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors

2019-08-05 Thread Ning Kang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16900263#comment-16900263
 ] 

Ning Kang commented on BEAM-7812:
-

We decide to make the change internally in Dataflow service to support all 
versions of SDKs instead of making changes in the runner then patch to all SDKs.

We may mark the bug closed now.

> Work around Stackdriver error reporting double counting worker errors
> -
>
> Key: BEAM-7812
> URL: https://issues.apache.org/jira/browse/BEAM-7812
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> h1. *Objective*
> Work around Stackdriver Error Reporting to count worker errors only once when 
> double logging.
> {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
> h1. *Background*
> Stackdriver error reporting will double count worker errors logged to 
> Stackdriver, because:
>  # workers log errors to Stackdriver;
>  # workers report the same errors to dfe and dfe will log them again to 
> Stackdriver.
> The double counting is blocking us sending job message logs from dfe to 
> Stackdriver because we don't want to change the behavior of any existing log 
> and feature.
> There happens to be an inconsistency in Java batch 
> [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]]
>  and streaming 
> ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
>  error reporting to dfe that results in reported error from streaming Java 
> worker will eventually be ignored by Stackdriver Error Reporting.
> h1. *Details*
> Inspired by the inconsistency, we decide to apply the streaming Java worker 
> error reporting logic to batch to both fix the inconsistency and work around 
> double counting issue on Stackdriver Error Reporting.
> The change will be when workers reporting errors to dfe,
>  * For Java, construct stack trace from StackTrace object instead of using 
> printStackTrace;
>  * For Python, report the complete error message details exactly the same to 
> worker logging instead of only reporting traceback through traceback module.
> Users will not experience change since job message logging to Stackdriver 
> hasn’t been launched yet.
> h1. *Test Plan*
> We'll add unit test for public methods changed in the process.
> Google has internal integration tests where we can push worker harness images 
> and set worker harness container image to test in sandbox.
> When releasing, we also have integration tests in different releasing stages.
> The workaround needs to be released completely before we can enable job 
> message logging.
> We can verify the format of stacktraces in sandbox and release stages by 
> executing example pipelines in our projects and directly browse prod 
> Stackdriver logging and error reporting consoles. This should be done before 
> and after enabling job message logging.
> Run any other existing and required tests before sending PR.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work started] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors

2019-07-29 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-7812 started by Ning Kang.
---
> Work around Stackdriver error reporting double counting worker errors
> -
>
> Key: BEAM-7812
> URL: https://issues.apache.org/jira/browse/BEAM-7812
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Minor
>
> h1. *Objective*
> Work around Stackdriver Error Reporting to count worker errors only once when 
> double logging.
> {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
> h1. *Background*
> Stackdriver error reporting will double count worker errors logged to 
> Stackdriver, because:
>  # workers log errors to Stackdriver;
>  # workers report the same errors to dfe and dfe will log them again to 
> Stackdriver.
> The double counting is blocking us sending job message logs from dfe to 
> Stackdriver because we don't want to change the behavior of any existing log 
> and feature.
> There happens to be an inconsistency in Java batch 
> [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]]
>  and streaming 
> ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
>  error reporting to dfe that results in reported error from streaming Java 
> worker will eventually be ignored by Stackdriver Error Reporting.
> h1. *Details*
> Inspired by the inconsistency, we decide to apply the streaming Java worker 
> error reporting logic to batch to both fix the inconsistency and work around 
> double counting issue on Stackdriver Error Reporting.
> The change will be when workers reporting errors to dfe,
>  * For Java, construct stack trace from StackTrace object instead of using 
> printStackTrace;
>  * For Python, report the complete error message details exactly the same to 
> worker logging instead of only reporting traceback through traceback module.
> Users will not experience change since job message logging to Stackdriver 
> hasn’t been launched yet.
> h1. *Test Plan*
> We'll add unit test for public methods changed in the process.
> Google has internal integration tests where we can push worker harness images 
> and set worker harness container image to test in sandbox.
> When releasing, we also have integration tests in different releasing stages.
> The workaround needs to be released completely before we can enable job 
> message logging.
> We can verify the format of stacktraces in sandbox and release stages by 
> executing example pipelines in our projects and directly browse prod 
> Stackdriver logging and error reporting consoles. This should be done before 
> and after enabling job message logging.
> Run any other existing and required tests before sending PR.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors

2019-07-29 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7812:

Description: 
h1. *Objective*

Work around Stackdriver Error Reporting to count worker errors only once when 
double logging.

{color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
h1. *Background*

Stackdriver error reporting will double count worker errors logged to 
Stackdriver, because:
 # workers log errors to Stackdriver;
 # workers report the same errors to dfe and dfe will log them again to 
Stackdriver.

The double counting is blocking us sending job message logs from dfe to 
Stackdriver because we don't want to change the behavior of any existing log 
and feature.

There happens to be an inconsistency in Java batch 
[DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]]
 and streaming 
([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
 error reporting to dfe that results in reported error from streaming Java 
worker will eventually be ignored by Stackdriver Error Reporting.
h1. *Details*

Inspired by the inconsistency, we decide to apply the streaming Java worker 
error reporting logic to batch to both fix the inconsistency and work around 
double counting issue on Stackdriver Error Reporting.

The change will be when workers reporting errors to dfe,
 * For Java, construct stack trace from StackTrace object instead of using 
printStackTrace;
 * For Python, report the complete error message details exactly the same to 
worker logging instead of only reporting traceback through traceback module.

Users will not experience change since job message logging to Stackdriver 
hasn’t been launched yet.
h1. *Test Plan*

We'll add unit test for public methods changed in the process.

Google has internal integration tests where we can push worker harness images 
and set worker harness container image to test in sandbox.

When releasing, we also have integration tests in different releasing stages.

The workaround needs to be released completely before we can enable job message 
logging.

We can verify the format of stacktraces in sandbox and release stages by 
executing example pipelines in our projects and directly browse prod 
Stackdriver logging and error reporting consoles. This should be done before 
and after enabling job message logging.

Run any other existing and required tests before sending PR.

  was:
h1. *Objective*

Work around Stackdriver Error Reporting to count worker errors only once when 
double logging.

{color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
h1. *Background*

Stackdriver error reporting will double count worker errors logged to 
Stackdriver, because:
 # workers log errors to Stackdriver;
 # workers report the same errors to dfe and dfe will log them again to 
Stackdriver.

The double counting is blocking us sending job message logs from dfe to 
Stackdriver because we don't want to change the behavior of any existing log 
and feature.

There happens to be an inconsistency in Java batch 
[DataflowWorkerLoggingHandler]([https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82])
 and streaming 
([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
 error reporting to dfe that results in reported error from streaming Java 
worker will eventually be ignored by Stackdriver Error Reporting.
h1. *Details*

Inspired by the inconsistency, we decide to apply the streaming Java worker 
error reporting logic to batch to both fix the inconsistency and work around 
double counting issue on Stackdriver Error Reporting.

The change will be when workers reporting errors to dfe,
 * For Java, construct stack trace from StackTrace object instead of using 
printStackTrace;
 * For Python, report the complete error message details exactly the same to 
worker logging instead of only reporting traceback through traceback module.

Users will not experience change since job message logging to Stackdriver 
hasn’t been launched yet.
h1. *Test Plan*

We'll add unit test for public methods changed in the process.

Google has internal integration tests where we can push worker harness images 
and set worker harness container image to test in sandbox.

When releasing, we also have integration tests in different releasing stages.

The workaround needs to be released completely before we can 

[jira] [Updated] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors

2019-07-29 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7812:

Description: 
h1. *Objective*

Work around Stackdriver Error Reporting to count worker errors only once when 
double logging.

{color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
h1. *Background*

Stackdriver error reporting will double count worker errors logged to 
Stackdriver, because:
 # workers log errors to Stackdriver;
 # workers report the same errors to dfe and dfe will log them again to 
Stackdriver.

The double counting is blocking us sending job message logs from dfe to 
Stackdriver because we don't want to change the behavior of any existing log 
and feature.

There happens to be an inconsistency in Java batch 
[DataflowWorkerLoggingHandler]([https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82])
 and streaming 
([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
 error reporting to dfe that results in reported error from streaming Java 
worker will eventually be ignored by Stackdriver Error Reporting.
h1. *Details*

Inspired by the inconsistency, we decide to apply the streaming Java worker 
error reporting logic to batch to both fix the inconsistency and work around 
double counting issue on Stackdriver Error Reporting.

The change will be when workers reporting errors to dfe,
 * For Java, construct stack trace from StackTrace object instead of using 
printStackTrace;
 * For Python, report the complete error message details exactly the same to 
worker logging instead of only reporting traceback through traceback module.

Users will not experience change since job message logging to Stackdriver 
hasn’t been launched yet.
h1. *Test Plan*

We'll add unit test for public methods changed in the process.

Google has internal integration tests where we can push worker harness images 
and set worker harness container image to test in sandbox.

When releasing, we also have integration tests in different releasing stages.

The workaround needs to be released completely before we can enable job message 
logging.

We can verify the format of stacktraces in sandbox and release stages by 
executing example pipelines in our projects and directly browse prod 
Stackdriver logging and error reporting consoles. This should be done before 
and after enabling job message logging.

Run any other existing and required tests before sending PR.

  was:
h1. *Objective*

Work around Stackdriver Error Reporting to count worker errors only once when 
double logging.

{color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
h1. *Background*

Stackdriver error reporting will double count worker errors logged to 
Stackdriver, because:
 # workers log errors to Stackdriver;
 # workers report the same errors to dfe and dfe will log them again to 
Stackdriver.

The double counting is blocking us sending job message logs from dfe to 
Stackdriver because we don't want to change the behavior of any existing log 
and feature.

There happens to be an inconsistency in Java batch 
([DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]])
 and streaming 
([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
 error reporting to dfe that results in reported error from streaming Java 
worker will eventually be ignored by Stackdriver Error Reporting.
h1. *Details*

Inspired by the inconsistency, we decide to apply the streaming Java worker 
error reporting logic to batch to both fix the inconsistency and work around 
double counting issue on Stackdriver Error Reporting.

The change will be when workers reporting errors to dfe,
 * For Java, construct stack trace from StackTrace object instead of using 
printStackTrace;
 * For Python, report the complete error message details exactly the same to 
worker logging instead of only reporting traceback through traceback module.

Users will not experience change since job message logging to Stackdriver 
hasn’t been launched yet.
h1. *Test Plan*

We'll add unit test for public methods changed in the process.

Google has internal integration tests where we can push worker harness images 
and set worker harness container image to test in sandbox.

When releasing, we also have integration tests in different releasing stages.

The workaround needs to be released completely before we can 

[jira] [Assigned] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors

2019-07-25 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang reassigned BEAM-7812:
---

Assignee: Ning Kang

> Work around Stackdriver error reporting double counting worker errors
> -
>
> Key: BEAM-7812
> URL: https://issues.apache.org/jira/browse/BEAM-7812
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ning Kang
>Assignee: Ning Kang
>Priority: Minor
>
> h1. *Objective*
> Work around Stackdriver Error Reporting to count worker errors only once when 
> double logging.
> {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
> h1. *Background*
> Stackdriver error reporting will double count worker errors logged to 
> Stackdriver, because:
>  # workers log errors to Stackdriver;
>  # workers report the same errors to dfe and dfe will log them again to 
> Stackdriver.
> The double counting is blocking us sending job message logs from dfe to 
> Stackdriver because we don't want to change the behavior of any existing log 
> and feature.
> There happens to be an inconsistency in Java batch 
> ([DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]])
>  and streaming 
> ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
>  error reporting to dfe that results in reported error from streaming Java 
> worker will eventually be ignored by Stackdriver Error Reporting.
> h1. *Details*
> Inspired by the inconsistency, we decide to apply the streaming Java worker 
> error reporting logic to batch to both fix the inconsistency and work around 
> double counting issue on Stackdriver Error Reporting.
> The change will be when workers reporting errors to dfe,
>  * For Java, construct stack trace from StackTrace object instead of using 
> printStackTrace;
>  * For Python, report the complete error message details exactly the same to 
> worker logging instead of only reporting traceback through traceback module.
> Users will not experience change since job message logging to Stackdriver 
> hasn’t been launched yet.
> h1. *Test Plan*
> We'll add unit test for public methods changed in the process.
> Google has internal integration tests where we can push worker harness images 
> and set worker harness container image to test in sandbox.
> When releasing, we also have integration tests in different releasing stages.
> The workaround needs to be released completely before we can enable job 
> message logging.
> We can verify the format of stacktraces in sandbox and release stages by 
> executing example pipelines in our projects and directly browse prod 
> Stackdriver logging and error reporting consoles. This should be done before 
> and after enabling job message logging.
> Run any other existing and required tests before sending PR.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors

2019-07-24 Thread Ning Kang (JIRA)
Ning Kang created BEAM-7812:
---

 Summary: Work around Stackdriver error reporting double counting 
worker errors
 Key: BEAM-7812
 URL: https://issues.apache.org/jira/browse/BEAM-7812
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Ning Kang


h1. *Objective*

Work around Stackdriver Error Reporting to count worker errors only once when 
double logging.

{color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}.
h1. *Background*

Stackdriver error reporting will double count worker errors logged to 
Stackdriver, because:
 # workers log errors to Stackdriver;
 # workers report the same errors to dfe and dfe will log them again to 
Stackdriver.

The double counting is blocking us sending job message logs from dfe to 
Stackdriver because we don't want to change the behavior of any existing log 
and feature.

There happens to be an inconsistency in Java batch 
([DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]])
 and streaming 
([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]])
 error reporting to dfe that results in reported error from streaming Java 
worker will eventually be ignored by Stackdriver Error Reporting.
h1. *Details*

Inspired by the inconsistency, we decide to apply the streaming Java worker 
error reporting logic to batch to both fix the inconsistency and work around 
double counting issue on Stackdriver Error Reporting.

The change will be when workers reporting errors to dfe,
 * For Java, construct stack trace from StackTrace object instead of using 
printStackTrace;
 * For Python, report the complete error message details exactly the same to 
worker logging instead of only reporting traceback through traceback module.

Users will not experience change since job message logging to Stackdriver 
hasn’t been launched yet.
h1. *Test Plan*

We'll add unit test for public methods changed in the process.

Google has internal integration tests where we can push worker harness images 
and set worker harness container image to test in sandbox.

When releasing, we also have integration tests in different releasing stages.

The workaround needs to be released completely before we can enable job message 
logging.

We can verify the format of stacktraces in sandbox and release stages by 
executing example pipelines in our projects and directly browse prod 
Stackdriver logging and error reporting consoles. This should be done before 
and after enabling job message logging.

Run any other existing and required tests before sending PR.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook

2019-07-16 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7760:

Description: 
Cache only PCollections bound to user defined variables in a pipeline when 
running pipeline with interactive runner in jupyter notebooks.

[Interactive 
Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
 has been caching and using caches of "leaf" PCollections for interactive 
execution in jupyter notebooks.

The interactive execution is currently supported so that when appending new 
transforms to existing pipeline for a new run, executed part of the pipeline 
doesn't need to be re-executed. 

A PCollection is "leaf" when it is never used as input in any PTransform in the 
pipeline.

The problem with building caches and pipeline to execute around "leaf" is that 
when a PCollection is consumed by a sink with no output, the pipeline to 
execute built will miss the subgraph generating and consuming that PCollection.

An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline.

Caching around PCollections bound to user defined variables and replacing 
transforms with source and sink of caches could resolve the pipeline to execute 
properly under the interactive execution scenario. Also, cached PCollection now 
can trace back to user code and can be used for user data visualization if user 
wants to do it.

E.g.,
{code:java}
// ...
p = beam.Pipeline(interactive_runner.InteractiveRunner(),
  options=pipeline_options)
messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
messages | "Write" >> beam.io.WriteToPubSub(topic_path)
result = p.run()
// ...
visualize(messages){code}
 The interactive runner automatically figures out that PCollection
{code:java}
messages{code}
created by
{code:java}
p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
should be cached and reused if the notebook user appends more transforms.
 And once the pipeline gets executed, the user could use any 
visualize(PCollection) module to visualize the data statically (batch) or 
dynamically (stream)

  was:
Cache only PCollections bound to user defined variables in a pipeline when 
running pipeline with interactive runner in jupyter notebooks.

[Interactive 
Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
 has been caching and using caches of "leaf" PCollections for interactive 
execution in jupyter notebooks.

The interactive execution is currently supported so that when appending new 
transforms to existing pipeline for a new run, executed part of the pipeline 
doesn't need to be re-executed. 

A PCollection is "leaf" when it is never used as input in any PTransform in the 
pipeline.

The problem with building caches and pipeline to execute around "leaf" is that 
when a PCollection is consumed by a sink with no output, the pipeline to 
execute built will miss the subgraph generating and consuming that PCollection.

An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline.

Caching around PCollections bound to user defined variables and replacing 
transforms with source and sink of caches could resolve the pipeline to execute 
properly under the interactive execution scenario. Also, cached PCollection now 
can trace back to user code and can be used for user data visualization if user 
wants to do it.

E.g.,
{code:java}
// ...
p = beam.Pipeline(interactive_runner.InteractiveRunner(),
  options=pipeline_options)
messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
messages | "Write" >> beam.io.WriteToPubSub(topic_path)
result = p.run()
// ...
visualize(messages){code}
 The interactive runner automatically figures out that PCollection created by
{code:java}
p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
should be cached and reused if the notebook user appends more transforms.
And once the pipeline gets executed, the user could use any 
visualize(PCollection) module to visualize the data statically (batch) or 
dynamically (stream)


> Interactive Beam Caching PCollections bound to user defined vars in notebook
> 
>
> Key: BEAM-7760
> URL: https://issues.apache.org/jira/browse/BEAM-7760
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python
>Reporter: Ning Kang
>Priority: Major
>
> Cache only PCollections bound to user defined variables in a pipeline when 
> running pipeline with interactive runner in jupyter notebooks.
> [Interactive 
> Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
>  has been caching and using caches of "leaf" PCollections for interactive 
> execution in jupyter notebooks.
> The 

[jira] [Updated] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook

2019-07-16 Thread Ning Kang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Kang updated BEAM-7760:

Description: 
Cache only PCollections bound to user defined variables in a pipeline when 
running pipeline with interactive runner in jupyter notebooks.

[Interactive 
Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
 has been caching and using caches of "leaf" PCollections for interactive 
execution in jupyter notebooks.

The interactive execution is currently supported so that when appending new 
transforms to existing pipeline for a new run, executed part of the pipeline 
doesn't need to be re-executed. 

A PCollection is "leaf" when it is never used as input in any PTransform in the 
pipeline.

The problem with building caches and pipeline to execute around "leaf" is that 
when a PCollection is consumed by a sink with no output, the pipeline to 
execute built will miss the subgraph generating and consuming that PCollection.

An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline.

Caching around PCollections bound to user defined variables and replacing 
transforms with source and sink of caches could resolve the pipeline to execute 
properly under the interactive execution scenario. Also, cached PCollection now 
can trace back to user code and can be used for user data visualization if user 
wants to do it.

E.g.,
{code:java}
// ...
p = beam.Pipeline(interactive_runner.InteractiveRunner(),
  options=pipeline_options)
messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
messages | "Write" >> beam.io.WriteToPubSub(topic_path)
result = p.run()
// ...
visualize(messages){code}
 The interactive runner automatically figures out that PCollection created by
{code:java}
p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
should be cached and reused if the notebook user appends more transforms.
And once the pipeline gets executed, the user could use any 
visualize(PCollection) module to visualize the data statically (batch) or 
dynamically (stream)

  was:
Cache only PCollections bound to user defined variables in a pipeline when 
running pipeline with interactive runner in jupyter notebooks.

[Interactive 
Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
 has been caching and using caches of "leaf" PCollections for interactive 
execution in jupyter notebooks.

The interactive execution is currently supported so that when appending new 
transforms to existing pipeline for a new run, executed part of the pipeline 
doesn't need to be re-executed. 

A PCollection is "leaf" when it is never used as input in any PTransform in the 
pipeline.

The problem with building caches and pipeline to execute around "leaf" is that 
when a PCollection is consumed by a sink with no output, the pipeline to 
execute built will miss the subgraph generating and consuming that PCollection.

An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline.

Caching around PCollections bound to user defined variables and replacing 
transforms with source and sink of caches could resolve the pipeline to execute 
properly under the interactive execution scenario. Also, cached PCollection now 
can trace back to user code and can be used for user data visualization if user 
wants to do it.

E.g.,
{code:java}
// ...
p = beam.Pipeline(interactive_runner.InteractiveRunner(),
  options=pipeline_options)
messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
messages | "Write" >> beam.io.WriteToPubSub(topic_path)
result = p.run()
// ...
visualize(messages){code}
 

The interactive runner automatically figures out that PCollection created by
{code:java}
p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}


> Interactive Beam Caching PCollections bound to user defined vars in notebook
> 
>
> Key: BEAM-7760
> URL: https://issues.apache.org/jira/browse/BEAM-7760
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python
>Reporter: Ning Kang
>Priority: Major
>
> Cache only PCollections bound to user defined variables in a pipeline when 
> running pipeline with interactive runner in jupyter notebooks.
> [Interactive 
> Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
>  has been caching and using caches of "leaf" PCollections for interactive 
> execution in jupyter notebooks.
> The interactive execution is currently supported so that when appending new 
> transforms to existing pipeline for a new run, executed part of the pipeline 
> doesn't need to be re-executed. 
> A PCollection is "leaf" when it is never used as input in any 

[jira] [Created] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook

2019-07-16 Thread Ning Kang (JIRA)
Ning Kang created BEAM-7760:
---

 Summary: Interactive Beam Caching PCollections bound to user 
defined vars in notebook
 Key: BEAM-7760
 URL: https://issues.apache.org/jira/browse/BEAM-7760
 Project: Beam
  Issue Type: New Feature
  Components: examples-python
Reporter: Ning Kang


Cache only PCollections bound to user defined variables in a pipeline when 
running pipeline with interactive runner in jupyter notebooks.

[Interactive 
Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
 has been caching and using caches of "leaf" PCollections for interactive 
execution in jupyter notebooks.

The interactive execution is currently supported so that when appending new 
transforms to existing pipeline for a new run, executed part of the pipeline 
doesn't need to be re-executed. 

A PCollection is "leaf" when it is never used as input in any PTransform in the 
pipeline.

The problem with building caches and pipeline to execute around "leaf" is that 
when a PCollection is consumed by a sink with no output, the pipeline to 
execute built will miss the subgraph generating and consuming that PCollection.

An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline.

Caching around PCollections bound to user defined variables and replacing 
transforms with source and sink of caches could resolve the pipeline to execute 
properly under the interactive execution scenario. Also, cached PCollection now 
can trace back to user code and can be used for user data visualization if user 
wants to do it.

E.g.,
{code:java}
// ...
p = beam.Pipeline(interactive_runner.InteractiveRunner(),
  options=pipeline_options)
messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
messages | "Write" >> beam.io.WriteToPubSub(topic_path)
result = p.run()
// ...
visualize(messages){code}
 

The interactive runner automatically figures out that PCollection created by
{code:java}
p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)