[jira] [Created] (BEAM-10182) Support custom gcs location for bigquery read
Ning Kang created BEAM-10182: Summary: Support custom gcs location for bigquery read Key: BEAM-10182 URL: https://issues.apache.org/jira/browse/BEAM-10182 Project: Beam Issue Type: Improvement Components: sdk-java-core Reporter: Ning Kang Assignee: Pablo Estrada Java and Python handles the usage of temp location for bigquery io differently. Java's Read only uses and validates tempLocation but Write takes customGcsLocation and falls back to tempLocation. Python's Read takes gcs_location and falls back to temp_location while Write takes custom_gcs_temp_location and falls back to temp_location. We need an equivalence of https://github.com/apache/beam/blob/0a0399f71cf14ecabe7e73b6cd596325bb7ff2ea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2314 for the Read class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10178) Error messages for unspecified options should display the command line flag that needs to be specified
Ning Kang created BEAM-10178: Summary: Error messages for unspecified options should display the command line flag that needs to be specified Key: BEAM-10178 URL: https://issues.apache.org/jira/browse/BEAM-10178 Project: Beam Issue Type: Improvement Components: sdk-java-core, sdk-py-core Reporter: Ning Kang Assignee: Ning Kang An example error trace: {code:java} java.lang.IllegalArgumentException: BigQueryIO.Read needs a GCS temp location to store temp files. at org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate(BigQueryIO.java:762) at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:641) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:577) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:312) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at org.apache.beam.examples.cookbook.BigQueryTornadoes.runBigQueryTornadoes(BigQueryTornadoes.java:199) at org.apache.beam.examples.cookbook.BigQueryTornadoesIT.runE2EBigQueryTornadoesTest(BigQueryTornadoesIT.java:70) at org.apache.beam.examples.cookbook.BigQueryTornadoesIT.testE2EBigQueryTornadoesWithExport(BigQueryTornadoesIT.java:82) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:349) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:314) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:312) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.run(ParentRunner.java:396) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at
[jira] [Updated] (BEAM-9927) [Beam Internship] Example Notebook w/ diary study
[ https://issues.apache.org/jira/browse/BEAM-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-9927: Description: * Pick/add a topic from [https://kevingg.github.io/diary/] * Setup local jupyter lab environment based on [instructions|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive] * Write an example interactive beam notebook and take notes on improvement AIs throughout the process. * The example notebook should be created under to [https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/examples] and sent out for review. was: * Pick a topic from [https://kevingg.github.io/diary/] * Setup local jupyter lab environment based on [instructions|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive] * Write an example interactive beam notebook and take notes on improvement AIs throughout the process. > [Beam Internship] Example Notebook w/ diary study > - > > Key: BEAM-9927 > URL: https://issues.apache.org/jira/browse/BEAM-9927 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Ning Kang >Priority: Minor > > * Pick/add a topic from [https://kevingg.github.io/diary/] > * Setup local jupyter lab environment based on > [instructions|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive] > * Write an example interactive beam notebook and take notes on improvement > AIs throughout the process. > * The example notebook should be created under to > [https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/examples] > and sent out for review. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9927) [Beam Internship] Example Notebook w/ diary study
Ning Kang created BEAM-9927: --- Summary: [Beam Internship] Example Notebook w/ diary study Key: BEAM-9927 URL: https://issues.apache.org/jira/browse/BEAM-9927 Project: Beam Issue Type: Improvement Components: sdk-py-core Reporter: Ning Kang * Pick a topic from [https://kevingg.github.io/diary/] * Setup local jupyter lab environment based on [instructions|https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive] * Write an example interactive beam notebook and take notes on improvement AIs throughout the process. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9907) apache_beam.transforms.external_test.ExternalTransformTest.test_nested flaky
Ning Kang created BEAM-9907: --- Summary: apache_beam.transforms.external_test.ExternalTransformTest.test_nested flaky Key: BEAM-9907 URL: https://issues.apache.org/jira/browse/BEAM-9907 Project: Beam Issue Type: Test Components: sdk-py-core Reporter: Ning Kang Example test failures: https://builds.apache.org/job/beam_PreCommit_Python_Commit/12682/ https://builds.apache.org/job/beam_PreCommit_Python_Commit/12684/ A stacktrace {code:bash} apache_beam.transforms.external_test.ExternalTransformTest.test_nested (from py37-cloud) Failing for the past 1 build (Since Failed#12682 ) Took 54 ms. Error Message google.protobuf.json_format.ParseError: Unexpected type for Value message. Stacktrace self = def test_nested(self): with beam.Pipeline() as p: > assert_that(p | FibTransform(6), equal_to([8])) apache_beam/transforms/external_test.py:250: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/transforms/ptransform.py:562: in __ror__ result = p.apply(self, pvalueish, label) apache_beam/pipeline.py:651: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) apache_beam/runners/runner.py:198: in apply return m(transform, input, options) apache_beam/runners/runner.py:228: in apply_PTransform return transform.expand(input) apache_beam/runners/portability/expansion_service_test.py:257: in expand expansion_service.ExpansionServiceServicer()) apache_beam/pvalue.py:140: in __or__ return self.pipeline.apply(ptransform, self) apache_beam/pipeline.py:598: in apply transform.transform, pvalueish, label or transform.label) apache_beam/pipeline.py:608: in apply return self.apply(transform, pvalueish) apache_beam/pipeline.py:651: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) apache_beam/runners/runner.py:198: in apply return m(transform, input, options) apache_beam/runners/runner.py:228: in apply_PTransform return transform.expand(input) apache_beam/transforms/external.py:322: in expand pipeline_options=job_utils.pipeline_options_dict_to_struct(options)) apache_beam/runners/job/utils.py:38: in pipeline_options_dict_to_struct v in options.items() if v is not None apache_beam/runners/job/utils.py:44: in dict_to_struct return json_format.ParseDict(dict_obj, struct_pb2.Struct()) target/.tox-py37-cloud/py37-cloud/lib/python3.7/site-packages/google/protobuf/json_format.py:450: in ParseDict parser.ConvertMessage(js_dict, message) target/.tox-py37-cloud/py37-cloud/lib/python3.7/site-packages/google/protobuf/json_format.py:479: in ConvertMessage methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self) target/.tox-py37-cloud/py37-cloud/lib/python3.7/site-packages/google/protobuf/json_format.py:667: in _ConvertStructMessage self._ConvertValueMessage(value[key], message.fields[key]) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = value = message = def _ConvertValueMessage(self, value, message): """Convert a JSON representation into Value message.""" if isinstance(value, dict): self._ConvertStructMessage(value, message.struct_value) elif isinstance(value, list): self. _ConvertListValueMessage(value, message.list_value) elif value is None: message.null_value = 0 elif isinstance(value, bool): message.bool_value = value elif isinstance(value, six.string_types): message.string_value = value elif isinstance(value, _INT_OR_FLOAT): message.number_value = value else: > raise ParseError('Unexpected type for Value message.') E google.protobuf.json_format.ParseError: Unexpected type for Value message. target/.tox-py37-cloud/py37-cloud/lib/python3.7/site-packages/google/protobuf/json_format.py:647: ParseError {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9832) KeyError: 'No such coder: ' in fn_runner_test
[ https://issues.apache.org/jira/browse/BEAM-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094678#comment-17094678 ] Ning Kang commented on BEAM-9832: - That test expects some work to be completed within a timeout. If Jenkins is busy during that timeout, the work will not be completed or even started at all, thus causing all kinds of error scenarios such as mismatch result due to incompletion of the work, no PCollection due to no work done and etc. > KeyError: 'No such coder: ' in fn_runner_test > - > > Key: BEAM-9832 > URL: https://issues.apache.org/jira/browse/BEAM-9832 > Project: Beam > Issue Type: Test > Components: sdk-py-core, test-failures >Reporter: Ning Kang >Assignee: Pablo Estrada >Priority: Critical > Time Spent: 50m > Remaining Estimate: 0h > > Failed test results can be found > [here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]] > > A stack trace: > {code:java} > self = > testMethod=test_read> > def test_read(self): > # Can't use NamedTemporaryFile as a context > # due to https://bugs.python.org/issue14243 > temp_file = tempfile.NamedTemporaryFile(delete=False) > try: > temp_file.write(b'a\nb\nc') > temp_file.close() > with self.create_pipeline() as p: > assert_that( > > p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', > > 'c'])) > apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/pipeline.py:529: in __exit__ > self.run().wait_until_finish() > apache_beam/pipeline.py:502: in run > self._options).run(False) > apache_beam/pipeline.py:515: in run > return self.runner.run_pipeline(self, self._options) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in > run_pipeline > pipeline.to_runner_api(default_environment=self._default_environment)) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in > run_via_runner_api > return self.run_stages(stage_context, stages) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages > bundle_context_manager, > apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage > bundle_manager) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle > data_input, data_output, input_timers, expected_timer_output) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in > process_bundle > timer_inputs)): > /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator > yield future.result() > /usr/lib/python3.5/concurrent/futures/_base.py:405: in result > return self.__get_result() > /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result > raise self._exception > apache_beam/utils/thread_pool_executor.py:44: in run > self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute > dry_run) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in > process_bundle > result_future = self._worker_handler.control_conn.push(process_bundle_req) > apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push > response = self.worker.do_instruction(request) > apache_beam/runners/worker/sdk_worker.py:471: in do_instruction > getattr(request, request_type), request.instruction_id) > apache_beam/runners/worker/sdk_worker.py:500: in process_bundle > instruction_id, request.process_bundle_descriptor_id) > apache_beam/runners/worker/sdk_worker.py:374: in get > self.data_channel_factory) > apache_beam/runners/worker/bundle_processor.py:782: in __init__ > self.ops = self.create_execution_tree(self.process_bundle_descriptor) > apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree > descriptor.transforms, key=topological_height, reverse=True) > apache_beam/runners/worker/bundle_processor.py:836: in > (transform_id, get_operation(transform_id)) for transform_id in sorted( > apache_beam/runners/worker/bundle_processor.py:726: in wrapper > result = cache[args] = func(*args) > apache_beam/runners/worker/bundle_processor.py:820: in get_operation > pcoll_id in descriptor.transforms[transform_id].outputs.items() > apache_beam/runners/worker/bundle_processor.py:820: in > pcoll_id in descriptor.transforms[transform_id].outputs.items() > apache_beam/runners/worker/bundle_processor.py:818: in > tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] > apache_beam/runners/worker/bundle_processor.py:726: in wrapper > result = cache[args] = func(*args) >
[jira] [Closed] (BEAM-9842) interactive_runner_test test_streaming_wordcount failing due to 'ValueError: PCollection not available'
[ https://issues.apache.org/jira/browse/BEAM-9842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang closed BEAM-9842. --- Fix Version/s: Not applicable Resolution: Duplicate > interactive_runner_test test_streaming_wordcount failing due to 'ValueError: > PCollection not available' > --- > > Key: BEAM-9842 > URL: https://issues.apache.org/jira/browse/BEAM-9842 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Sam Rohde >Priority: Major > Fix For: Not applicable > > > beam_PreCommit_Python seems to be failing due to this. > For example, > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12542/] > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/] > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/testReport/junit/apache_beam.runners.interactive.interactive_runner_test/InteractiveRunnerTest/test_streaming_wordcount_2/] > > > raise ValueError('PCollection not available, please run the pipeline.') E > > ValueError: PCollection not available, please run the pipeline. > > Ning, can you please take a look ? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-9842) interactive_runner_test test_streaming_wordcount failing due to 'ValueError: PCollection not available'
[ https://issues.apache.org/jira/browse/BEAM-9842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang reassigned BEAM-9842: --- Assignee: Sam Rohde (was: Ning Kang) > interactive_runner_test test_streaming_wordcount failing due to 'ValueError: > PCollection not available' > --- > > Key: BEAM-9842 > URL: https://issues.apache.org/jira/browse/BEAM-9842 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Sam Rohde >Priority: Major > > beam_PreCommit_Python seems to be failing due to this. > For example, > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12542/] > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/] > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/testReport/junit/apache_beam.runners.interactive.interactive_runner_test/InteractiveRunnerTest/test_streaming_wordcount_2/] > > > raise ValueError('PCollection not available, please run the pipeline.') E > > ValueError: PCollection not available, please run the pipeline. > > Ning, can you please take a look ? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9842) interactive_runner_test test_streaming_wordcount failing due to 'ValueError: PCollection not available'
[ https://issues.apache.org/jira/browse/BEAM-9842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094672#comment-17094672 ] Ning Kang commented on BEAM-9842: - There is already a Jira tracking test_streaming_wordcount flakiness: BEAM-9767, it's also a duplicate of BEAM-9803. Sam is working on making it deterministic, but we might just need to remove such a multi-threading integration test from being executed on Jenkins. > interactive_runner_test test_streaming_wordcount failing due to 'ValueError: > PCollection not available' > --- > > Key: BEAM-9842 > URL: https://issues.apache.org/jira/browse/BEAM-9842 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Ning Kang >Priority: Major > > beam_PreCommit_Python seems to be failing due to this. > For example, > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12542/] > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/] > [https://builds.apache.org/job/beam_PreCommit_Python_Commit/12543/testReport/junit/apache_beam.runners.interactive.interactive_runner_test/InteractiveRunnerTest/test_streaming_wordcount_2/] > > > raise ValueError('PCollection not available, please run the pipeline.') E > > ValueError: PCollection not available, please run the pipeline. > > Ning, can you please take a look ? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9832) KeyError: 'No such coder: ' in fn_runner_test
[ https://issues.apache.org/jira/browse/BEAM-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094671#comment-17094671 ] Ning Kang commented on BEAM-9832: - There is already a Jira tracking test_streaming_wordcount flakiness: BEAM-9767, it's also a duplicate of BEAM-9803. Sam is working on making it deterministic, but we might just need to remove such a multi-threading integration test from being executed on Jenkins. > KeyError: 'No such coder: ' in fn_runner_test > - > > Key: BEAM-9832 > URL: https://issues.apache.org/jira/browse/BEAM-9832 > Project: Beam > Issue Type: Test > Components: sdk-py-core, test-failures >Reporter: Ning Kang >Assignee: Pablo Estrada >Priority: Critical > Time Spent: 50m > Remaining Estimate: 0h > > Failed test results can be found > [here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]] > > A stack trace: > {code:java} > self = > testMethod=test_read> > def test_read(self): > # Can't use NamedTemporaryFile as a context > # due to https://bugs.python.org/issue14243 > temp_file = tempfile.NamedTemporaryFile(delete=False) > try: > temp_file.write(b'a\nb\nc') > temp_file.close() > with self.create_pipeline() as p: > assert_that( > > p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', > > 'c'])) > apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/pipeline.py:529: in __exit__ > self.run().wait_until_finish() > apache_beam/pipeline.py:502: in run > self._options).run(False) > apache_beam/pipeline.py:515: in run > return self.runner.run_pipeline(self, self._options) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in > run_pipeline > pipeline.to_runner_api(default_environment=self._default_environment)) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in > run_via_runner_api > return self.run_stages(stage_context, stages) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages > bundle_context_manager, > apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage > bundle_manager) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle > data_input, data_output, input_timers, expected_timer_output) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in > process_bundle > timer_inputs)): > /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator > yield future.result() > /usr/lib/python3.5/concurrent/futures/_base.py:405: in result > return self.__get_result() > /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result > raise self._exception > apache_beam/utils/thread_pool_executor.py:44: in run > self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute > dry_run) > apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in > process_bundle > result_future = self._worker_handler.control_conn.push(process_bundle_req) > apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push > response = self.worker.do_instruction(request) > apache_beam/runners/worker/sdk_worker.py:471: in do_instruction > getattr(request, request_type), request.instruction_id) > apache_beam/runners/worker/sdk_worker.py:500: in process_bundle > instruction_id, request.process_bundle_descriptor_id) > apache_beam/runners/worker/sdk_worker.py:374: in get > self.data_channel_factory) > apache_beam/runners/worker/bundle_processor.py:782: in __init__ > self.ops = self.create_execution_tree(self.process_bundle_descriptor) > apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree > descriptor.transforms, key=topological_height, reverse=True) > apache_beam/runners/worker/bundle_processor.py:836: in > (transform_id, get_operation(transform_id)) for transform_id in sorted( > apache_beam/runners/worker/bundle_processor.py:726: in wrapper > result = cache[args] = func(*args) > apache_beam/runners/worker/bundle_processor.py:820: in get_operation > pcoll_id in descriptor.transforms[transform_id].outputs.items() > apache_beam/runners/worker/bundle_processor.py:820: in > pcoll_id in descriptor.transforms[transform_id].outputs.items() > apache_beam/runners/worker/bundle_processor.py:818: in > tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] > apache_beam/runners/worker/bundle_processor.py:726: in wrapper > result = cache[args] = func(*args) >
[jira] [Created] (BEAM-9832) KeyError: 'No such coder: ' in fn_runner_test
Ning Kang created BEAM-9832: --- Summary: KeyError: 'No such coder: ' in fn_runner_test Key: BEAM-9832 URL: https://issues.apache.org/jira/browse/BEAM-9832 Project: Beam Issue Type: Test Components: sdk-py-core, test-failures Reporter: Ning Kang Failed test results can be found [here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]] A stack trace: {code:java} self = def test_read(self): # Can't use NamedTemporaryFile as a context # due to https://bugs.python.org/issue14243 temp_file = tempfile.NamedTemporaryFile(delete=False) try: temp_file.write(b'a\nb\nc') temp_file.close() with self.create_pipeline() as p: assert_that( > p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', > 'c'])) apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/pipeline.py:529: in __exit__ self.run().wait_until_finish() apache_beam/pipeline.py:502: in run self._options).run(False) apache_beam/pipeline.py:515: in run return self.runner.run_pipeline(self, self._options) apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in run_pipeline pipeline.to_runner_api(default_environment=self._default_environment)) apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in run_via_runner_api return self.run_stages(stage_context, stages) apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages bundle_context_manager, apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage bundle_manager) apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle data_input, data_output, input_timers, expected_timer_output) apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in process_bundle timer_inputs)): /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator yield future.result() /usr/lib/python3.5/concurrent/futures/_base.py:405: in result return self.__get_result() /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result raise self._exception apache_beam/utils/thread_pool_executor.py:44: in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute dry_run) apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push response = self.worker.do_instruction(request) apache_beam/runners/worker/sdk_worker.py:471: in do_instruction getattr(request, request_type), request.instruction_id) apache_beam/runners/worker/sdk_worker.py:500: in process_bundle instruction_id, request.process_bundle_descriptor_id) apache_beam/runners/worker/sdk_worker.py:374: in get self.data_channel_factory) apache_beam/runners/worker/bundle_processor.py:782: in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) apache_beam/runners/worker/bundle_processor.py:836: in (transform_id, get_operation(transform_id)) for transform_id in sorted( apache_beam/runners/worker/bundle_processor.py:726: in wrapper result = cache[args] = func(*args) apache_beam/runners/worker/bundle_processor.py:820: in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() apache_beam/runners/worker/bundle_processor.py:820: in pcoll_id in descriptor.transforms[transform_id].outputs.items() apache_beam/runners/worker/bundle_processor.py:818: in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] apache_beam/runners/worker/bundle_processor.py:726: in wrapper result = cache[args] = func(*args) apache_beam/runners/worker/bundle_processor.py:823: in get_operation transform_id, transform_consumers) apache_beam/runners/worker/bundle_processor.py:1108: in create_operation return creator(self, transform_id, transform_proto, payload, consumers) apache_beam/runners/worker/bundle_processor.py:1341: in create_pair_with_restriction return _create_sdf_operation(PairWithRestriction, *args) apache_beam/runners/worker/bundle_processor.py:1404: in _create_sdf_operation parameter) apache_beam/runners/worker/bundle_processor.py:1501: in _create_pardo_operation output_coders = factory.get_output_coders(transform_proto) apache_beam/runners/worker/bundle_processor.py:1154: in get_output_coders pcoll_id in transform_proto.outputs.items() apache_beam/runners/worker/bundle_processor.py:1154: in
[jira] [Commented] (BEAM-9803) test_streaming_wordcount flaky
[ https://issues.apache.org/jira/browse/BEAM-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089871#comment-17089871 ] Ning Kang commented on BEAM-9803: - We'll probably need to remove such integration test from being executed on Jenkins. The Jenkins machines could be executing many different things at the same time. It's possible that the machine does nothing for the test at all in the given 5-second timeout. > test_streaming_wordcount flaky > -- > > Key: BEAM-9803 > URL: https://issues.apache.org/jira/browse/BEAM-9803 > Project: Beam > Issue Type: Test > Components: sdk-py-core, test-failures >Reporter: Ning Kang >Assignee: Sam Rohde >Priority: Major > > {code:java} > Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount > (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 > sec.Error MessageAssertionError: DataFrame are different DataFrame shape > mismatch [left]: (10, 4) [right]: (6, 4)Stacktraceself = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > data_df = ib.collect(data, include_window_info=True) > > pd.testing.assert_frame_equal(expected_data_df, data_df) > E AssertionError: DataFrame are different >
[jira] [Assigned] (BEAM-9803) test_streaming_wordcount flaky
[ https://issues.apache.org/jira/browse/BEAM-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang reassigned BEAM-9803: --- Assignee: Sam Rohde > test_streaming_wordcount flaky > -- > > Key: BEAM-9803 > URL: https://issues.apache.org/jira/browse/BEAM-9803 > Project: Beam > Issue Type: Test > Components: sdk-py-core, test-failures >Reporter: Ning Kang >Assignee: Sam Rohde >Priority: Major > > {code:java} > Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount > (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 > sec.Error MessageAssertionError: DataFrame are different DataFrame shape > mismatch [left]: (10, 4) [right]: (6, 4)Stacktraceself = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > data_df = ib.collect(data, include_window_info=True) > > pd.testing.assert_frame_equal(expected_data_df, data_df) > E AssertionError: DataFrame are different > E > E DataFrame shape mismatch > E [left]: (10, 4) > E [right]: (6, 4) > apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9803) test_streaming_wordcount flaky
Ning Kang created BEAM-9803: --- Summary: test_streaming_wordcount flaky Key: BEAM-9803 URL: https://issues.apache.org/jira/browse/BEAM-9803 Project: Beam Issue Type: Test Components: sdk-py-core, test-failures Reporter: Ning Kang {code:java} Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 sec.Error MessageAssertionError: DataFrame are different DataFrame shape mismatch [left]: (10, 4) [right]: (6, 4)Stacktraceself = @unittest.skipIf( sys.version_info < (3, 5, 3), 'The tests require at least Python 3.6 to work.') def test_streaming_wordcount(self): class WordExtractingDoFn(beam.DoFn): def process(self, element): text_line = element.strip() words = text_line.split() return words # Add the TestStream so that it can be cached. ib.options.capturable_sources.add(TestStream) ib.options.capture_duration = timedelta(seconds=5) p = beam.Pipeline( runner=interactive_runner.InteractiveRunner(), options=StandardOptions(streaming=True)) data = ( p | TestStream() .advance_watermark_to(0) .advance_processing_time(1) .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) .advance_watermark_to(20) .advance_processing_time(1) .add_elements(['that', 'is', 'the', 'question']) | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable counts = ( data | 'split' >> beam.ParDo(WordExtractingDoFn()) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(lambda wordones: (wordones[0], sum(wordones[1] # Watch the local scope for Interactive Beam so that referenced PCollections # will be cached. ib.watch(locals()) # This is normally done in the interactive_utils when a transform is # applied but needs an IPython environment. So we manually run this here. ie.current_env().track_user_pipelines() # Create a fake limiter that cancels the BCJ once the main job receives the # expected amount of results. class FakeLimiter: def __init__(self, p, pcoll): self.p = p self.pcoll = pcoll def is_triggered(self): result = ie.current_env().pipeline_result(self.p) if result: try: results = result.get(self.pcoll) except ValueError: return False return len(results) >= 10 return False # This sets the limiters to stop reading when the test receives 10 elements # or after 5 seconds have elapsed (to eliminate the possibility of hanging). ie.current_env().options.capture_control.set_limiters_for_test( [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) # This tests that the data was correctly cached. pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) expected_data_df = pd.DataFrame([ ('to', 0, [IntervalWindow(0, 10)], pane_info), ('be', 0, [IntervalWindow(0, 10)], pane_info), ('or', 0, [IntervalWindow(0, 10)], pane_info), ('not', 0, [IntervalWindow(0, 10)], pane_info), ('to', 0, [IntervalWindow(0, 10)], pane_info), ('be', 0, [IntervalWindow(0, 10)], pane_info), ('that', 2000, [IntervalWindow(20, 30)], pane_info), ('is', 2000, [IntervalWindow(20, 30)], pane_info), ('the', 2000, [IntervalWindow(20, 30)], pane_info), ('question', 2000, [IntervalWindow(20, 30)], pane_info) ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable data_df = ib.collect(data, include_window_info=True) > pd.testing.assert_frame_equal(expected_data_df, data_df) E AssertionError: DataFrame are different E E DataFrame shape mismatch E [left]: (10, 4) E [right]: (6, 4) apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9549) Flaky portableWordCountBatch and portableWordCountStreaming tests
[ https://issues.apache.org/jira/browse/BEAM-9549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-9549: Description: The tests :sdks:python:test-suites:portable:py2:portableWordCountBatch and :sdks:python:test-suites:portable:py2:portableWordCountStreaming are flaky, sometimes throws grpc errrors. Stacktrace !Sr5cNnx8sAW.png|width=2049,height=1001! In text: {code:java} INFO:root:Using Python SDK docker image: apache/beam_python2.7_sdk:2.21.0.dev. If the image is not available at local, we will try to pull from hub.docker.comINFO:apache_beam.runners.portability.fn_api_runner_transforms: INFO:apache_beam.utils.subprocess_server:Starting service with ['docker' 'run' '-v' '/usr/bin/docker:/bin/docker' '-v' '/var/run/docker.sock:/var/run/docker.sock' '--network=host' 'apache/beam_flink1.9_job_server:latest' '--job-host' 'localhost' '--job-port' '58753' '--artifact-port' '60175' '--expansion-port' '33067']INFO:apache_beam.utils.subprocess_server:[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - ArtifactStagingService started on localhost:60175INFO:apache_beam.utils.subprocess_server:[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:33067INFO:apache_beam.utils.subprocess_server:[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:58753ERROR:grpc._common:Exception deserializing message!Traceback (most recent call last): File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_common.py", line 84, in _transformreturn transformer(message)DecodeError: Error parsing messageTraceback (most recent call last): File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main"__main__", fname, loader, pkg_name) File "/usr/lib/python2.7/runpy.py", line 72, in _run_codeexec code in run_globals File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py", line 142, in run() File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py", line 121, in runresult = p.run() File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 495, in runself._options).run(False) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 508, in runreturn self.runner.run_pipeline(self, self._options) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 401, in run_pipelinejob_service_handle.submit(proto_pipeline) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 102, in submitprepare_response = self.prepare(proto_pipeline) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 179, in preparetimeout=self.timeout) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py", line 826, in __call__return _end_unary_response_blocking(state, call, False, None) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blockingraise _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.INTERNALdetails = "Exception deserializing response!" debug_error_string = "None"> {code} A gradle scan [example|https://scans.gradle.com/s/yv63cefske4cy]. was: The tests :sdks:python:test-suites:portable:py2:portableWordCountBatch and :sdks:python:test-suites:portable:py2:portableWordCountStreaming are flaky, sometimes throws grpc errrors. Stacktrace {code:java} INFO:root:Using Python SDK docker image: apache/beam_python2.7_sdk:2.21.0.dev. If the image is not available at local, we will try to pull from
[jira] [Updated] (BEAM-9549) Flaky portableWordCountBatch and portableWordCountStreaming tests
[ https://issues.apache.org/jira/browse/BEAM-9549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-9549: Attachment: Sr5cNnx8sAW.png > Flaky portableWordCountBatch and portableWordCountStreaming tests > - > > Key: BEAM-9549 > URL: https://issues.apache.org/jira/browse/BEAM-9549 > Project: Beam > Issue Type: Test > Components: test-failures >Reporter: Ning Kang >Assignee: Ankur Goenka >Priority: Major > Attachments: Sr5cNnx8sAW.png > > > The tests :sdks:python:test-suites:portable:py2:portableWordCountBatch and > :sdks:python:test-suites:portable:py2:portableWordCountStreaming are flaky, > sometimes throws grpc errrors. > Stacktrace > {code:java} > INFO:root:Using Python SDK docker image: > apache/beam_python2.7_sdk:2.21.0.dev. If the image is not available at local, > we will try to pull from > hub.docker.comINFO:apache_beam.runners.portability.fn_api_runner_transforms: > > INFO:apache_beam.utils.subprocess_server:Starting service > with ['docker' 'run' '-v' '/usr/bin/docker:/bin/docker' '-v' > '/var/run/docker.sock:/var/run/docker.sock' '--network=host' > 'apache/beam_flink1.9_job_server:latest' '--job-host' 'localhost' > '--job-port' '58753' '--artifact-port' '60175' '--expansion-port' > '33067']INFO:apache_beam.utils.subprocess_server:[main] INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - > ArtifactStagingService started on > localhost:60175INFO:apache_beam.utils.subprocess_server:[main] INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java > ExpansionService started on > localhost:33067INFO:apache_beam.utils.subprocess_server:[main] INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - > JobService started on localhost:58753ERROR:grpc._common:Exception > deserializing message!Traceback (most recent call last): File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_common.py", > line 84, in _transformreturn transformer(message)DecodeError: Error > parsing messageTraceback (most recent call last): File > "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main > "__main__", fname, loader, pkg_name) File "/usr/lib/python2.7/runpy.py", > line 72, in _run_codeexec code in run_globals File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py", > line 142, in run() File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py", > line 121, in runresult = p.run() File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py", > line 495, in runself._options).run(False) File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py", > line 508, in runreturn self.runner.run_pipeline(self, self._options) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 401, in run_pipelinejob_service_handle.submit(proto_pipeline) File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 102, in submitprepare_response = self.prepare(proto_pipeline) File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 179, in preparetimeout=self.timeout) File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py", > line 826, in __call__return _end_unary_response_blocking(state, call, > False, None) File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py", > line 729, in _end_unary_response_blockingraise > _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError > of RPC that terminated with: status = StatusCode.INTERNAL
[jira] [Created] (BEAM-9549) Flaky portableWordCountBatch and portableWordCountStreaming tests
Ning Kang created BEAM-9549: --- Summary: Flaky portableWordCountBatch and portableWordCountStreaming tests Key: BEAM-9549 URL: https://issues.apache.org/jira/browse/BEAM-9549 Project: Beam Issue Type: Test Components: test-failures Reporter: Ning Kang The tests :sdks:python:test-suites:portable:py2:portableWordCountBatch and :sdks:python:test-suites:portable:py2:portableWordCountStreaming are flaky, sometimes throws grpc errrors. Stacktrace {code:java} INFO:root:Using Python SDK docker image: apache/beam_python2.7_sdk:2.21.0.dev. If the image is not available at local, we will try to pull from hub.docker.comINFO:apache_beam.runners.portability.fn_api_runner_transforms: INFO:apache_beam.utils.subprocess_server:Starting service with ['docker' 'run' '-v' '/usr/bin/docker:/bin/docker' '-v' '/var/run/docker.sock:/var/run/docker.sock' '--network=host' 'apache/beam_flink1.9_job_server:latest' '--job-host' 'localhost' '--job-port' '58753' '--artifact-port' '60175' '--expansion-port' '33067']INFO:apache_beam.utils.subprocess_server:[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - ArtifactStagingService started on localhost:60175INFO:apache_beam.utils.subprocess_server:[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:33067INFO:apache_beam.utils.subprocess_server:[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:58753ERROR:grpc._common:Exception deserializing message!Traceback (most recent call last): File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_common.py", line 84, in _transformreturn transformer(message)DecodeError: Error parsing messageTraceback (most recent call last): File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main"__main__", fname, loader, pkg_name) File "/usr/lib/python2.7/runpy.py", line 72, in _run_codeexec code in run_globals File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py", line 142, in run() File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/lib/python2.7/site-packages/apache_beam/examples/wordcount.py", line 121, in runresult = p.run() File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 495, in runself._options).run(False) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 508, in runreturn self.runner.run_pipeline(self, self._options) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 401, in run_pipelinejob_service_handle.submit(proto_pipeline) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 102, in submitprepare_response = self.prepare(proto_pipeline) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 179, in preparetimeout=self.timeout) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py", line 826, in __call__return _end_unary_response_blocking(state, call, False, None) File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Portable_Python_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blockingraise _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.INTERNALdetails = "Exception deserializing response!" debug_error_string = "None"> {code} A gradle scan [example|https://scans.gradle.com/s/yv63cefske4cy]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang resolved BEAM-7926. - Fix Version/s: Not applicable Resolution: Fixed > Show PCollection with Interactive Beam in a data-centric user flow > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Fix For: Not applicable > > Time Spent: 59h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > > {code:java} > p = beam.Pipeline(InteractiveRunner()) > pcoll = p | 'Transform' >> transform() > pcoll2 = ... > pcoll3 = ...{code} > The use can call a single function and get auto-magical charting of the data. > e.g., > {code:java} > show(pcoll, pcoll2) > {code} > Throughout the process, a pipeline fragment is built to include only > transforms necessary to produce the desired pcolls (pcoll and pcoll2) and > execute that fragment. > This makes the Interactive Beam user flow data-centric. > > Detailed > [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8991) RuntimeError in log_handler_test
[ https://issues.apache.org/jira/browse/BEAM-8991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000296#comment-17000296 ] Ning Kang commented on BEAM-8991: - Got it, thanks! I've also experienced something interesting when running tests. Say if you intentionally log something at warning level and if you do it in the __main__ scope, such warning log will fail tests when running on Jenkins. Some gradle tasks will just exit when they see warning level logs. However, if you move the warning log to some local scope, say some constructor of some class, the warning log would not fail those gradle tasks. > RuntimeError in log_handler_test > > > Key: BEAM-8991 > URL: https://issues.apache.org/jira/browse/BEAM-8991 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Ning Kang >Priority: Major > Fix For: Not applicable > > > Now is: > {code:java} > apache_beam.runners.worker.log_handler_test.FnApiLogRecordHandlerTest.test_exc_info > (from py27-gcp-pytest)Failing for the past 1 build (Since #1290 )Took 78 > ms.Error MessageIndexError: list index out of rangeStacktraceself = > testMethod=test_exc_info> > def test_exc_info(self): > try: > raise ValueError('some message') > except ValueError: > _LOGGER.error('some error', exc_info=True) > > self.fn_log_handler.close() > > > log_entry = > > self.test_logging_service.log_records_received[0].log_entries[0] > E IndexError: list index out of range > apache_beam/runners/worker/log_handler_test.py:110: IndexErrorStandard > ErrorERROR:apache_beam.runners.worker.log_handler_test:some error > Traceback (most recent call last): > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/log_handler_test.py", > line 104, in test_exc_info > raise ValueError('some message') > ValueError: some message > {code} > Marking it as a duplicate of BEAM-8974. > Was: > {code:java} > 19:28:06 > Task :sdks:python:test-suites:tox:py35:testPy35Cython > .Exception in thread Thread-1715: > 19:28:06 Traceback (most recent call last): > 19:28:06 File "apache_beam/runners/common.py", line 879, in > apache_beam.runners.common.DoFnRunner.process > 19:28:06 return self.do_fn_invoker.invoke_process(windowed_value) > 19:28:06 File "apache_beam/runners/common.py", line 495, in > apache_beam.runners.common.SimpleInvoker.invoke_process > 19:28:06 windowed_value, self.process_method(windowed_value.value)) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/transforms/core.py", > line 1434, in > 19:28:06 wrapper = lambda x: [fn(x)] > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py", > line 620, in raise_error > 19:28:06 raise RuntimeError('x') > 19:28:06 RuntimeError: x > 19:28:06 > 19:28:06 During handling of the above exception, another exception occurred: > 19:28:06 > 19:28:06 Traceback (most recent call last): > 19:28:06 File "/usr/lib/python3.5/threading.py", line 914, in > _bootstrap_inner > 19:28:06 self.run() > 19:28:06 File "/usr/lib/python3.5/threading.py", line 862, in run > 19:28:06 self._target(*self._args, **self._kwargs) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py", > line 270, in _run_job > 19:28:06 self._pipeline_proto) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 461, in run_via_runner_api > 19:28:06 return self.run_stages(stage_context, stages) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 553, in run_stages > 19:28:06 stage_results.process_bundle.monitoring_infos) > 19:28:06 File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__ > 19:28:06 self.gen.throw(type, value, traceback) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 500, in maybe_profile > 19:28:06 yield > 19:28:06 File >
[jira] [Updated] (BEAM-8991) RuntimeError in log_handler_test
[ https://issues.apache.org/jira/browse/BEAM-8991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-8991: Summary: RuntimeError in log_handler_test (was: RuntimeError in fn_api_runner_test.py) > RuntimeError in log_handler_test > > > Key: BEAM-8991 > URL: https://issues.apache.org/jira/browse/BEAM-8991 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Ning Kang >Priority: Major > > Now is: > {code:java} > apache_beam.runners.worker.log_handler_test.FnApiLogRecordHandlerTest.test_exc_info > (from py27-gcp-pytest)Failing for the past 1 build (Since #1290 )Took 78 > ms.Error MessageIndexError: list index out of rangeStacktraceself = > testMethod=test_exc_info> > def test_exc_info(self): > try: > raise ValueError('some message') > except ValueError: > _LOGGER.error('some error', exc_info=True) > > self.fn_log_handler.close() > > > log_entry = > > self.test_logging_service.log_records_received[0].log_entries[0] > E IndexError: list index out of range > apache_beam/runners/worker/log_handler_test.py:110: IndexErrorStandard > ErrorERROR:apache_beam.runners.worker.log_handler_test:some error > Traceback (most recent call last): > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/log_handler_test.py", > line 104, in test_exc_info > raise ValueError('some message') > ValueError: some message > {code} > Marking it as a duplicate of BEAM-8974. > Was: > {code:java} > 19:28:06 > Task :sdks:python:test-suites:tox:py35:testPy35Cython > .Exception in thread Thread-1715: > 19:28:06 Traceback (most recent call last): > 19:28:06 File "apache_beam/runners/common.py", line 879, in > apache_beam.runners.common.DoFnRunner.process > 19:28:06 return self.do_fn_invoker.invoke_process(windowed_value) > 19:28:06 File "apache_beam/runners/common.py", line 495, in > apache_beam.runners.common.SimpleInvoker.invoke_process > 19:28:06 windowed_value, self.process_method(windowed_value.value)) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/transforms/core.py", > line 1434, in > 19:28:06 wrapper = lambda x: [fn(x)] > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py", > line 620, in raise_error > 19:28:06 raise RuntimeError('x') > 19:28:06 RuntimeError: x > 19:28:06 > 19:28:06 During handling of the above exception, another exception occurred: > 19:28:06 > 19:28:06 Traceback (most recent call last): > 19:28:06 File "/usr/lib/python3.5/threading.py", line 914, in > _bootstrap_inner > 19:28:06 self.run() > 19:28:06 File "/usr/lib/python3.5/threading.py", line 862, in run > 19:28:06 self._target(*self._args, **self._kwargs) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py", > line 270, in _run_job > 19:28:06 self._pipeline_proto) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 461, in run_via_runner_api > 19:28:06 return self.run_stages(stage_context, stages) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 553, in run_stages > 19:28:06 stage_results.process_bundle.monitoring_infos) > 19:28:06 File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__ > 19:28:06 self.gen.throw(type, value, traceback) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 500, in maybe_profile > 19:28:06 yield > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 550, in run_stages > 19:28:06 stage_context.safe_coders) > 19:28:06 File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 870, in _run_stage > 19:28:06
[jira] [Updated] (BEAM-8991) RuntimeError in fn_api_runner_test.py
[ https://issues.apache.org/jira/browse/BEAM-8991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-8991: Description: Now is: {code:java} apache_beam.runners.worker.log_handler_test.FnApiLogRecordHandlerTest.test_exc_info (from py27-gcp-pytest)Failing for the past 1 build (Since #1290 )Took 78 ms.Error MessageIndexError: list index out of rangeStacktraceself = def test_exc_info(self): try: raise ValueError('some message') except ValueError: _LOGGER.error('some error', exc_info=True) self.fn_log_handler.close() > log_entry = > self.test_logging_service.log_records_received[0].log_entries[0] E IndexError: list index out of range apache_beam/runners/worker/log_handler_test.py:110: IndexErrorStandard ErrorERROR:apache_beam.runners.worker.log_handler_test:some error Traceback (most recent call last): File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/log_handler_test.py", line 104, in test_exc_info raise ValueError('some message') ValueError: some message {code} Marking it as a duplicate of BEAM-8974. Was: {code:java} 19:28:06 > Task :sdks:python:test-suites:tox:py35:testPy35Cython .Exception in thread Thread-1715: 19:28:06 Traceback (most recent call last): 19:28:06 File "apache_beam/runners/common.py", line 879, in apache_beam.runners.common.DoFnRunner.process 19:28:06 return self.do_fn_invoker.invoke_process(windowed_value) 19:28:06 File "apache_beam/runners/common.py", line 495, in apache_beam.runners.common.SimpleInvoker.invoke_process 19:28:06 windowed_value, self.process_method(windowed_value.value)) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/transforms/core.py", line 1434, in 19:28:06 wrapper = lambda x: [fn(x)] 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py", line 620, in raise_error 19:28:06 raise RuntimeError('x') 19:28:06 RuntimeError: x 19:28:06 19:28:06 During handling of the above exception, another exception occurred: 19:28:06 19:28:06 Traceback (most recent call last): 19:28:06 File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner 19:28:06 self.run() 19:28:06 File "/usr/lib/python3.5/threading.py", line 862, in run 19:28:06 self._target(*self._args, **self._kwargs) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py", line 270, in _run_job 19:28:06 self._pipeline_proto) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 461, in run_via_runner_api 19:28:06 return self.run_stages(stage_context, stages) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 553, in run_stages 19:28:06 stage_results.process_bundle.monitoring_infos) 19:28:06 File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__ 19:28:06 self.gen.throw(type, value, traceback) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 500, in maybe_profile 19:28:06 yield 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 550, in run_stages 19:28:06 stage_context.safe_coders) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 870, in _run_stage 19:28:06 result, splits = bundle_manager.process_bundle(data_input, data_output) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 2052, in process_bundle 19:28:06 part, expected_outputs), part_inputs): 19:28:06 File "/usr/lib/python3.5/concurrent/futures/_base.py", line 556, in result_iterator 19:28:06 yield future.result() 19:28:06 File "/usr/lib/python3.5/concurrent/futures/_base.py", line 405, in result 19:28:06
[jira] [Created] (BEAM-8991) RuntimeError in fn_api_runner_test.py
Ning Kang created BEAM-8991: --- Summary: RuntimeError in fn_api_runner_test.py Key: BEAM-8991 URL: https://issues.apache.org/jira/browse/BEAM-8991 Project: Beam Issue Type: Bug Components: sdk-py-core Reporter: Ning Kang {code:java} 19:28:06 > Task :sdks:python:test-suites:tox:py35:testPy35Cython .Exception in thread Thread-1715: 19:28:06 Traceback (most recent call last): 19:28:06 File "apache_beam/runners/common.py", line 879, in apache_beam.runners.common.DoFnRunner.process 19:28:06 return self.do_fn_invoker.invoke_process(windowed_value) 19:28:06 File "apache_beam/runners/common.py", line 495, in apache_beam.runners.common.SimpleInvoker.invoke_process 19:28:06 windowed_value, self.process_method(windowed_value.value)) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/transforms/core.py", line 1434, in 19:28:06 wrapper = lambda x: [fn(x)] 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py", line 620, in raise_error 19:28:06 raise RuntimeError('x') 19:28:06 RuntimeError: x 19:28:06 19:28:06 During handling of the above exception, another exception occurred: 19:28:06 19:28:06 Traceback (most recent call last): 19:28:06 File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner 19:28:06 self.run() 19:28:06 File "/usr/lib/python3.5/threading.py", line 862, in run 19:28:06 self._target(*self._args, **self._kwargs) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py", line 270, in _run_job 19:28:06 self._pipeline_proto) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 461, in run_via_runner_api 19:28:06 return self.run_stages(stage_context, stages) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 553, in run_stages 19:28:06 stage_results.process_bundle.monitoring_infos) 19:28:06 File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__ 19:28:06 self.gen.throw(type, value, traceback) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 500, in maybe_profile 19:28:06 yield 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 550, in run_stages 19:28:06 stage_context.safe_coders) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 870, in _run_stage 19:28:06 result, splits = bundle_manager.process_bundle(data_input, data_output) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 2052, in process_bundle 19:28:06 part, expected_outputs), part_inputs): 19:28:06 File "/usr/lib/python3.5/concurrent/futures/_base.py", line 556, in result_iterator 19:28:06 yield future.result() 19:28:06 File "/usr/lib/python3.5/concurrent/futures/_base.py", line 405, in result 19:28:06 return self.__get_result() 19:28:06 File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result 19:28:06 raise self._exception 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/utils/thread_pool_executor.py", line 42, in run 19:28:06 self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 2052, in 19:28:06 part, expected_outputs), part_inputs): 19:28:06 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit@2/src/sdks/python/test-suites/tox/py35/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1977, in process_bundle
[jira] [Commented] (BEAM-8977) apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest.test_dynamic_plotting_update_same_display is flaky
[ https://issues.apache.org/jira/browse/BEAM-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998474#comment-16998474 ] Ning Kang commented on BEAM-8977: - The flakiness is caused by indeterministic execution of asynchronous tasks to be tested. Affected by the performance of the system executing the tests, it's possible that during the 1 second sleep, the asynchronous task is not executed at all. However, adding a list length check, increasing the sleeping time or setting an await with timeout will not resolve the test flakiness issue. If the asynchronous scheduled task is not executed for at least 2 iterations, there is nothing to be tested and a failure/success will not help with the tests. I'll remove the flaky unit test since it's flaky in its nature. Mocking such asynchronous mechanism doesn't give us anything, so we will not test the asynchronous mechanism. I'll add a different test for the underlying asynchronous task and test the logic synchronously. > apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest.test_dynamic_plotting_update_same_display > is flaky > > > Key: BEAM-8977 > URL: https://issues.apache.org/jira/browse/BEAM-8977 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Valentyn Tymofieiev >Assignee: Ning Kang >Priority: Major > > Sample failure: > > [https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1273/testReport/apache_beam.runners.interactive.display.pcoll_visualization_test/PCollectionVisualizationTest/test_dynamic_plotting_update_same_display/] > Error Message > IndexError: list index out of range > Stacktrace > self = > testMethod=test_dynamic_plotting_update_same_display> > mocked_display_facets = id='139889868386376'> > @patch('apache_beam.runners.interactive.display.pcoll_visualization' > '.PCollectionVisualization.display_facets') > def test_dynamic_plotting_update_same_display(self, > mocked_display_facets): > fake_pipeline_result = runner.PipelineResult(runner.PipelineState.RUNNING) > ie.current_env().set_pipeline_result(self._p, fake_pipeline_result) > # Starts async dynamic plotting that never ends in this test. > h = pv.visualize(self._pcoll, dynamic_plotting_interval=0.001) > # Blocking so the above async task can execute some iterations. > time.sleep(1) > # The first iteration doesn't provide updating_pv to display_facets. > _, first_kwargs = mocked_display_facets.call_args_list[0] > self.assertEqual(first_kwargs, {}) > # The following iterations use the same updating_pv to display_facets and so > # on. > > _, second_kwargs = mocked_display_facets.call_args_list[1] > E IndexError: list index out of range > apache_beam/runners/interactive/display/pcoll_visualization_test.py:105: > IndexError > Standard Output > > Standard Error > WARNING:apache_beam.runners.interactive.interactive_environment:You cannot > use Interactive Beam features when you are not in an interactive environment > such as a Jupyter notebook or ipython terminal. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-8977) apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest.test_dynamic_plotting_update_same_display is flaky
[ https://issues.apache.org/jira/browse/BEAM-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-8977 started by Ning Kang. --- > apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest.test_dynamic_plotting_update_same_display > is flaky > > > Key: BEAM-8977 > URL: https://issues.apache.org/jira/browse/BEAM-8977 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Valentyn Tymofieiev >Assignee: Ning Kang >Priority: Major > > Sample failure: > > [https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1273/testReport/apache_beam.runners.interactive.display.pcoll_visualization_test/PCollectionVisualizationTest/test_dynamic_plotting_update_same_display/] > Error Message > IndexError: list index out of range > Stacktrace > self = > testMethod=test_dynamic_plotting_update_same_display> > mocked_display_facets = id='139889868386376'> > @patch('apache_beam.runners.interactive.display.pcoll_visualization' > '.PCollectionVisualization.display_facets') > def test_dynamic_plotting_update_same_display(self, > mocked_display_facets): > fake_pipeline_result = runner.PipelineResult(runner.PipelineState.RUNNING) > ie.current_env().set_pipeline_result(self._p, fake_pipeline_result) > # Starts async dynamic plotting that never ends in this test. > h = pv.visualize(self._pcoll, dynamic_plotting_interval=0.001) > # Blocking so the above async task can execute some iterations. > time.sleep(1) > # The first iteration doesn't provide updating_pv to display_facets. > _, first_kwargs = mocked_display_facets.call_args_list[0] > self.assertEqual(first_kwargs, {}) > # The following iterations use the same updating_pv to display_facets and so > # on. > > _, second_kwargs = mocked_display_facets.call_args_list[1] > E IndexError: list index out of range > apache_beam/runners/interactive/display/pcoll_visualization_test.py:105: > IndexError > Standard Output > > Standard Error > WARNING:apache_beam.runners.interactive.interactive_environment:You cannot > use Interactive Beam features when you are not in an interactive environment > such as a Jupyter notebook or ipython terminal. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8837) PCollectionVisualizationTest: possible bug
[ https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang resolved BEAM-8837. - Fix Version/s: 2.19.0 Resolution: Fixed PR: [https://github.com/apache/beam/pull/10321] merged. > PCollectionVisualizationTest: possible bug > -- > > Key: BEAM-8837 > URL: https://issues.apache.org/jira/browse/BEAM-8837 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Ning Kang >Priority: Major > Fix For: 2.19.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > This seems like a bug, even though the test passes: > {code} > test_display_plain_text_when_kernel_has_no_frontend > (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) > ... Exception in thread Thread-4405: > Traceback (most recent call last): > File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner > self.run() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py", > line 19, in run > self.execute(*self.args, **self.kwargs) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 132, in continuous_update_display > updated_pv.display_facets(updating_pv=pv) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 209, in display_facets > data = self._to_dataframe() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 278, in _to_dataframe > for el in self._to_element_list(): > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 266, in _to_element_list > if ie.current_env().cache_manager().exists('full', self._cache_key): > AttributeError: 'NoneType' object has no attribute 'exists' > ok > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8837) PCollectionVisualizationTest: possible bug
[ https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995948#comment-16995948 ] Ning Kang commented on BEAM-8837: - We'll just run the real logic to prepare data as inputs for visualization instead of patching test data for PCollections to be visualized. > PCollectionVisualizationTest: possible bug > -- > > Key: BEAM-8837 > URL: https://issues.apache.org/jira/browse/BEAM-8837 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Ning Kang >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > This seems like a bug, even though the test passes: > {code} > test_display_plain_text_when_kernel_has_no_frontend > (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) > ... Exception in thread Thread-4405: > Traceback (most recent call last): > File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner > self.run() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py", > line 19, in run > self.execute(*self.args, **self.kwargs) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 132, in continuous_update_display > updated_pv.display_facets(updating_pv=pv) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 209, in display_facets > data = self._to_dataframe() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 278, in _to_dataframe > for el in self._to_element_list(): > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 266, in _to_element_list > if ie.current_env().cache_manager().exists('full', self._cache_key): > AttributeError: 'NoneType' object has no attribute 'exists' > ok > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-8837) PCollectionVisualizationTest: possible bug
[ https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986465#comment-16986465 ] Ning Kang edited comment on BEAM-8837 at 12/7/19 12:10 AM: --- The *@patch* in unit tests didn't work as expected and the code executes the real logic to read cached PCollection when there is *None* cache manager in the test. I can reproduce the error trace by running the gradle task {code:java} 14:20:16 test_display_plain_text_when_kernel_has_no_frontend (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) ... Exception in thread Thread-3982: 14:20:16 Traceback (most recent call last): 14:20:16 File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner 14:20:16 self.run() 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/target/.tox-py37-gcp/py37-gcp/lib/python3.7/site-packages/timeloop/job.py", line 19, in run 14:20:16 self.execute(*self.args, **self.kwargs) 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 132, in continuous_update_display 14:20:16 updated_pv.display_facets(updating_pv=pv) 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 209, in display_facets 14:20:16 data = self._to_dataframe() 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 278, in _to_dataframe 14:20:16 for el in self._to_element_list(): 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 266, in _to_element_list 14:20:16 if ie.current_env().cache_manager().exists('full', self._cache_key): 14:20:16 AttributeError: 'NoneType' object has no attribute 'exists' 14:20:16 14:20:16 ok{code} I've tried locally run with *unittest* and *nosetests*, both passed without running into the issue. I'm pretty sure it's a gotcha in [where to patch|https://docs.python.org/3/library/unittest.mock.html#where-to-patch]. When the test is ran by tox + nosetests, the patch doesn't work. was (Author: ningk): The *@patch* in unit tests didn't work as expected and the code executes the real logic to read cached PCollection when there is *None* cache manager in the test. [~udim], do you have a link to the failed Jenkins test? I'll need to figure out what kind of test invocation path failed this *@patch*. I've tried locally run with *unittest* and *nosetests*, both passed without running into the issue. I'm pretty sure it's a gotcha in [where to patch|https://docs.python.org/3/library/unittest.mock.html#where-to-patch]. > PCollectionVisualizationTest: possible bug > -- > > Key: BEAM-8837 > URL: https://issues.apache.org/jira/browse/BEAM-8837 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Ning Kang >Priority: Major > > This seems like a bug, even though the test passes: > {code} > test_display_plain_text_when_kernel_has_no_frontend > (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) > ... Exception in thread Thread-4405: > Traceback (most recent call last): > File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner > self.run() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py", > line 19, in run > self.execute(*self.args, **self.kwargs) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 132, in continuous_update_display > updated_pv.display_facets(updating_pv=pv) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 209, in display_facets > data = self._to_dataframe() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 278, in _to_dataframe > for el in
[jira] [Comment Edited] (BEAM-8837) PCollectionVisualizationTest: possible bug
[ https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986465#comment-16986465 ] Ning Kang edited comment on BEAM-8837 at 12/7/19 12:10 AM: --- The *@patch* in unit tests didn't work as expected and the code executes the real logic to read cached PCollection when there is *None* cache manager in the test. I can reproduce the error trace by running the gradle task {code:java} 14:20:16 test_display_plain_text_when_kernel_has_no_frontend (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) ... Exception in thread Thread-3982: 14:20:16 Traceback (most recent call last): 14:20:16 File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner 14:20:16 self.run() 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/target/.tox-py37-gcp/py37-gcp/lib/python3.7/site-packages/timeloop/job.py", line 19, in run 14:20:16 self.execute(*self.args, **self.kwargs) 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 132, in continuous_update_display 14:20:16 updated_pv.display_facets(updating_pv=pv) 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 209, in display_facets 14:20:16 data = self._to_dataframe() 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 278, in _to_dataframe 14:20:16 for el in self._to_element_list(): 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 266, in _to_element_list 14:20:16 if ie.current_env().cache_manager().exists('full', self._cache_key): 14:20:16 AttributeError: 'NoneType' object has no attribute 'exists' 14:20:16 14:20:16 ok{code} I've tried locally run with *unittest* and *nosetests*, both passed without running into the issue. I'm pretty sure it's a gotcha in [where to patch|https://docs.python.org/3/library/unittest.mock.html#where-to-patch]. When the test is run by tox + nosetests, the patch doesn't work. was (Author: ningk): The *@patch* in unit tests didn't work as expected and the code executes the real logic to read cached PCollection when there is *None* cache manager in the test. I can reproduce the error trace by running the gradle task {code:java} 14:20:16 test_display_plain_text_when_kernel_has_no_frontend (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) ... Exception in thread Thread-3982: 14:20:16 Traceback (most recent call last): 14:20:16 File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner 14:20:16 self.run() 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/target/.tox-py37-gcp/py37-gcp/lib/python3.7/site-packages/timeloop/job.py", line 19, in run 14:20:16 self.execute(*self.args, **self.kwargs) 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 132, in continuous_update_display 14:20:16 updated_pv.display_facets(updating_pv=pv) 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 209, in display_facets 14:20:16 data = self._to_dataframe() 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 278, in _to_dataframe 14:20:16 for el in self._to_element_list(): 14:20:16 File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", line 266, in _to_element_list 14:20:16 if ie.current_env().cache_manager().exists('full', self._cache_key): 14:20:16 AttributeError: 'NoneType' object has no attribute 'exists' 14:20:16 14:20:16 ok{code} I've tried locally run with *unittest* and *nosetests*, both passed
[jira] [Commented] (BEAM-8837) PCollectionVisualizationTest: possible bug
[ https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986465#comment-16986465 ] Ning Kang commented on BEAM-8837: - The *@patch* in unit tests didn't work as expected and the code executes the real logic to read cached PCollection when there is *None* cache manager in the test. [~udim], do you have a link to the failed Jenkins test? I'll need to figure out what kind of test invocation path failed this *@patch*. I've tried locally run with *unittest* and *nosetests*, both passed without running into the issue. I'm pretty sure it's a gotcha in [where to patch|https://docs.python.org/3/library/unittest.mock.html#where-to-patch]. > PCollectionVisualizationTest: possible bug > -- > > Key: BEAM-8837 > URL: https://issues.apache.org/jira/browse/BEAM-8837 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Ning Kang >Priority: Major > > This seems like a bug, even though the test passes: > {code} > test_display_plain_text_when_kernel_has_no_frontend > (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) > ... Exception in thread Thread-4405: > Traceback (most recent call last): > File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner > self.run() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py", > line 19, in run > self.execute(*self.args, **self.kwargs) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 132, in continuous_update_display > updated_pv.display_facets(updating_pv=pv) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 209, in display_facets > data = self._to_dataframe() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 278, in _to_dataframe > for el in self._to_element_list(): > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 266, in _to_element_list > if ie.current_env().cache_manager().exists('full', self._cache_key): > AttributeError: 'NoneType' object has no attribute 'exists' > ok > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8837) PCollectionVisualizationTest: possible bug
[ https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986426#comment-16986426 ] Ning Kang commented on BEAM-8837: - Thanks, [~udim], I'll take a look at it. > PCollectionVisualizationTest: possible bug > -- > > Key: BEAM-8837 > URL: https://issues.apache.org/jira/browse/BEAM-8837 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Ning Kang >Priority: Major > > This seems like a bug, even though the test passes: > {code} > test_display_plain_text_when_kernel_has_no_frontend > (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) > ... Exception in thread Thread-4405: > Traceback (most recent call last): > File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner > self.run() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py", > line 19, in run > self.execute(*self.args, **self.kwargs) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 132, in continuous_update_display > updated_pv.display_facets(updating_pv=pv) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 209, in display_facets > data = self._to_dataframe() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 278, in _to_dataframe > for el in self._to_element_list(): > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 266, in _to_element_list > if ie.current_env().cache_manager().exists('full', self._cache_key): > AttributeError: 'NoneType' object has no attribute 'exists' > ok > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-8837) PCollectionVisualizationTest: possible bug
[ https://issues.apache.org/jira/browse/BEAM-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-8837 started by Ning Kang. --- > PCollectionVisualizationTest: possible bug > -- > > Key: BEAM-8837 > URL: https://issues.apache.org/jira/browse/BEAM-8837 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Ning Kang >Priority: Major > > This seems like a bug, even though the test passes: > {code} > test_display_plain_text_when_kernel_has_no_frontend > (apache_beam.runners.interactive.display.pcoll_visualization_test.PCollectionVisualizationTest) > ... Exception in thread Thread-4405: > Traceback (most recent call last): > File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner > self.run() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/.eggs/timeloop-1.0.2-py3.7.egg/timeloop/job.py", > line 19, in run > self.execute(*self.args, **self.kwargs) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 132, in continuous_update_display > updated_pv.display_facets(updating_pv=pv) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 209, in display_facets > data = self._to_dataframe() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 278, in _to_dataframe > for el in self._to_element_list(): > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py", > line 266, in _to_element_list > if ie.current_env().cache_manager().exists('full', self._cache_key): > AttributeError: 'NoneType' object has no attribute 'exists' > ok > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8016) Render Beam Pipeline as DOT with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang resolved BEAM-8016. - Fix Version/s: 2.18.0 Resolution: Fixed PR has been merged > Render Beam Pipeline as DOT with Interactive Beam > --- > > Key: BEAM-8016 > URL: https://issues.apache.org/jira/browse/BEAM-8016 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Fix For: 2.18.0 > > Time Spent: 7h 10m > Remaining Estimate: 0h > > With work in https://issues.apache.org/jira/browse/BEAM-7760, Beam pipeline > converted to DOT then rendered should mark user defined variables on edges. > With work in https://issues.apache.org/jira/browse/BEAM-7926, it might be > redundant or confusing to render arbitrary random sample PCollection data on > edges. > We'll also make sure edges in the graph corresponds to output -> input > relationship in the user defined pipeline. Each edge is one output. If > multiple down stream inputs take the same output, it should be rendered as > one edge diverging into two instead of two edges. > For advanced interactivity highlight where each execution highlights the part > of the pipeline really executed from the original pipeline, we'll also > provide the support in beta. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Description: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as {code:java} p = beam.Pipeline(InteractiveRunner()) pcoll = p | 'Transform' >> transform() pcoll2 = ... pcoll3 = ...{code} The use can call a single function and get auto-magical charting of the data. e.g., {code:java} show(pcoll, pcoll2) {code} Throughout the process, a pipeline fragment is built to include only transforms necessary to produce the desired pcolls (pcoll and pcoll2) and execute that fragment. This makes the Interactive Beam user flow data-centric. Detailed [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]. was: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as {code:java} p = beam.Pipeline(InteractiveRunner()) pcoll = p | 'Transform' >> transform() pcoll2 = ... pcoll3 = ...{code} The use can call a single function and get auto-magical charting of the data. e.g., {code:java} show(pcoll, pcoll2) {code} Throughout the process, a pipeline fragment is built to include only transforms necessary to produce the desired pcolls (pcoll and pcoll2) and execute that fragment. This makes the Interactive Beam user flow data-centric. Detailed [design|[https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]]. > Show PCollection with Interactive Beam in a data-centric user flow > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > > {code:java} > p = beam.Pipeline(InteractiveRunner()) > pcoll = p | 'Transform' >> transform() > pcoll2 = ... > pcoll3 = ...{code} > The use can call a single function and get auto-magical charting of the data. > e.g., > {code:java} > show(pcoll, pcoll2) > {code} > Throughout the process, a pipeline fragment is built to include only > transforms necessary to produce the desired pcolls (pcoll and pcoll2) and > execute that fragment. > This makes the Interactive Beam user flow data-centric. > > Detailed > [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Description: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as {code:java} p = beam.Pipeline(InteractiveRunner()) pcoll = p | 'Transform' >> transform() pcoll2 = ... pcoll3 = ...{code} The use can call a single function and get auto-magical charting of the data. e.g., {code:java} show(pcoll, pcoll2) {code} Throughout the process, a pipeline fragment is built to include only transforms necessary to produce the desired pcolls (pcoll and pcoll2) and execute that fragment. This makes the Interactive Beam user flow data-centric. Detailed [design|[https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]]. was: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as {code:java} p = beam.Pipeline(InteractiveRunner()) pcoll = p | 'Transform' >> transform() pcoll2 = ... pcoll3 = ...{code} The use can call a single function and get auto-magical charting of the data. e.g., {code:java} show(pcoll, pcoll2) {code} Throughout the process, a pipeline fragment is built to include only transforms necessary to produce the desired pcolls (pcoll and pcoll2) and execute that fragment. This makes the Interactive Beam user flow data-centric. > Show PCollection with Interactive Beam in a data-centric user flow > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > > {code:java} > p = beam.Pipeline(InteractiveRunner()) > pcoll = p | 'Transform' >> transform() > pcoll2 = ... > pcoll3 = ...{code} > The use can call a single function and get auto-magical charting of the data. > e.g., > {code:java} > show(pcoll, pcoll2) > {code} > Throughout the process, a pipeline fragment is built to include only > transforms necessary to produce the desired pcolls (pcoll and pcoll2) and > execute that fragment. > This makes the Interactive Beam user flow data-centric. > > Detailed > [design|[https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz]]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Description: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as {code:java} p = beam.Pipeline(InteractiveRunner()) pcoll = p | 'Transform' >> transform() pcoll2 = ... pcoll3 = ...{code} The use can call a single function and get auto-magical charting of the data. e.g., {code:java} show(pcoll, pcoll2) {code} Throughout the process, a pipeline fragment is built to include only transforms necessary to produce the desired pcolls (pcoll and pcoll2) and execute that fragment. This makes the Interactive Beam user flow data-centric. was: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as {code:java} p = beam.Pipeline(InteractiveRunner()) pcoll = p | 'Transform' >> transform() pcoll2 = ... pcoll3 = ...{code} The use can call a single function and get auto-magical charting of the data. e.g., show(pcoll, pcoll2) Throughout the process, a pipeline fragment is built to include only transforms necessary to produce the desired pcolls (pcoll and pcoll2) and execute that fragment. > Show PCollection with Interactive Beam in a data-centric user flow > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > > {code:java} > p = beam.Pipeline(InteractiveRunner()) > pcoll = p | 'Transform' >> transform() > pcoll2 = ... > pcoll3 = ...{code} > The use can call a single function and get auto-magical charting of the data. > e.g., > {code:java} > show(pcoll, pcoll2) > {code} > Throughout the process, a pipeline fragment is built to include only > transforms necessary to produce the desired pcolls (pcoll and pcoll2) and > execute that fragment. > This makes the Interactive Beam user flow data-centric. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam in a data-centric user flow
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Summary: Show PCollection with Interactive Beam in a data-centric user flow (was: Show PCollection with Interactive Beam) > Show PCollection with Interactive Beam in a data-centric user flow > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > > {code:java} > p = beam.Pipeline(InteractiveRunner()) > pcoll = p | 'Transform' >> transform() > pcoll2 = ... > pcoll3 = ...{code} > The use can call a single function and get auto-magical charting of the data. > e.g., show(pcoll, pcoll2) > Throughout the process, a pipeline fragment is built to include only > transforms necessary to produce the desired pcolls (pcoll and pcoll2) and > execute that fragment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-8379) Cache Eviction for Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-8379 started by Ning Kang. --- > Cache Eviction for Interactive Beam > --- > > Key: BEAM-8379 > URL: https://issues.apache.org/jira/browse/BEAM-8379 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Evicts cache created by Interactive Beam when an IPython kernel is restarted > or terminated to release the resource usage that is no longer needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8379) Cache Eviction for Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang resolved BEAM-8379. - Fix Version/s: 2.18.0 Resolution: Fixed > Cache Eviction for Interactive Beam > --- > > Key: BEAM-8379 > URL: https://issues.apache.org/jira/browse/BEAM-8379 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Fix For: 2.18.0 > > Time Spent: 2h > Remaining Estimate: 0h > > Evicts cache created by Interactive Beam when an IPython kernel is restarted > or terminated to release the resource usage that is no longer needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution
[ https://issues.apache.org/jira/browse/BEAM-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang closed BEAM-8425. --- Fix Version/s: Not applicable Resolution: Abandoned We decide not to notify users but proactively steer the user away from the bad situation. By implementing show() that executes an implicitly built fragment, we mitigate the re-execution/deleted cell problems. > Notifying Interactive Beam user about Beam related cell deletion or > re-execution > > > Key: BEAM-8425 > URL: https://issues.apache.org/jira/browse/BEAM-8425 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Fix For: Not applicable > > > There is a general problem about Interactive Notebooks that when an end user > deletes a cell that has been executed or re-executes a cell, those previous > executions are hidden from the end user. > However, hidden states will still have side effects in the notebook. > This kind of problem bothers Beam even more because Beam's pipeline > construction statements are note idempotent and pipeline execution is > decoupled and deferred from pipeline construction. > Re-executing a cell with Beam statements that build a pipeline would cause > unexpected pipeline state and the user wouldn't notice it due to the problem > of notebooks. > We'll intercept each transform application invocation from the > InteractiveRunner and record the ipython/notebook prompt number. Then each > time a user executes a cell that applies PTransform, we'll compare the > recorded list of prompt numbers with current notebook file's content and > figure out if there is any missing number. If so, we know for sure that a > re-execution happens and use display manager to notify the end user of > potential side effects caused by hidden states of the notebook/ipython. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang resolved BEAM-7760. - Fix Version/s: 2.18.0 Resolution: Fixed > Interactive Beam Caching PCollections bound to user defined vars in notebook > > > Key: BEAM-7760 > URL: https://issues.apache.org/jira/browse/BEAM-7760 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Fix For: 2.18.0 > > Time Spent: 18h > Remaining Estimate: 0h > > Cache only PCollections bound to user defined variables in a pipeline when > running pipeline with interactive runner in jupyter notebooks. > [Interactive > Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] > has been caching and using caches of "leaf" PCollections for interactive > execution in jupyter notebooks. > The interactive execution is currently supported so that when appending new > transforms to existing pipeline for a new run, executed part of the pipeline > doesn't need to be re-executed. > A PCollection is "leaf" when it is never used as input in any PTransform in > the pipeline. > The problem with building caches and pipeline to execute around "leaf" is > that when a PCollection is consumed by a sink with no output, the pipeline to > execute built will miss the subgraph generating and consuming that > PCollection. > An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty > pipeline. > Caching around PCollections bound to user defined variables and replacing > transforms with source and sink of caches could resolve the pipeline to > execute properly under the interactive execution scenario. Also, cached > PCollection now can trace back to user code and can be used for user data > visualization if user wants to do it. > E.g., > {code:java} > // ... > p = beam.Pipeline(interactive_runner.InteractiveRunner(), > options=pipeline_options) > messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') > messages | "Write" >> beam.io.WriteToPubSub(topic_path) > result = p.run() > // ... > visualize(messages){code} > The interactive runner automatically figures out that PCollection > {code:java} > messages{code} > created by > {code:java} > p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} > should be cached and reused if the notebook user appends more transforms. > And once the pipeline gets executed, the user could use any > visualize(PCollection) module to visualize the data statically (batch) or > dynamically (stream) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Description: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as {code:java} p = beam.Pipeline(InteractiveRunner()) pcoll = p | 'Transform' >> transform() pcoll2 = ... pcoll3 = ...{code} The use can call a single function and get auto-magical charting of the data. e.g., show(pcoll, pcoll2) Throughout the process, a pipeline fragment is built to include only transforms necessary to produce the desired pcolls (pcoll and pcoll2) and execute that fragment. was: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as p = create_pipeline() pcoll = p | 'Transform' >> transform() The use can call a single function and get auto-magical charting of the data as materialized pcoll. e.g., show(pcoll) > Show PCollection with Interactive Beam > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > > {code:java} > p = beam.Pipeline(InteractiveRunner()) > pcoll = p | 'Transform' >> transform() > pcoll2 = ... > pcoll3 = ...{code} > The use can call a single function and get auto-magical charting of the data. > e.g., show(pcoll, pcoll2) > Throughout the process, a pipeline fragment is built to include only > transforms necessary to produce the desired pcolls (pcoll and pcoll2) and > execute that fragment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7926) Show PCollection with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971824#comment-16971824 ] Ning Kang commented on BEAM-7926: - Implementation has been added. Mark it as resolved. > Show PCollection with Interactive Beam > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > p = create_pipeline() > pcoll = p | 'Transform' >> transform() > The use can call a single function and get auto-magical charting of the data > as materialized pcoll. > e.g., show(pcoll) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Summary: Show PCollection with Interactive Beam (was: Visualize PCollection with Interactive Beam) > Show PCollection with Interactive Beam > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > p = create_pipeline() > pcoll = p | 'Transform' >> transform() > The use can call a single function and get auto-magical charting of the data > as materialized pcoll. > e.g., visualize(pcoll) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Show PCollection with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Description: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as p = create_pipeline() pcoll = p | 'Transform' >> transform() The use can call a single function and get auto-magical charting of the data as materialized pcoll. e.g., show(pcoll) was: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as p = create_pipeline() pcoll = p | 'Transform' >> transform() The use can call a single function and get auto-magical charting of the data as materialized pcoll. e.g., visualize(pcoll) > Show PCollection with Interactive Beam > -- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > p = create_pipeline() > pcoll = p | 'Transform' >> transform() > The use can call a single function and get auto-magical charting of the data > as materialized pcoll. > e.g., show(pcoll) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8457) Instrument Dataflow jobs that are launched from Notebooks
[ https://issues.apache.org/jira/browse/BEAM-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-8457: Fix Version/s: (was: 2.17.0) 2.18.0 Description: Dataflow needs the capability to tell how many Dataflow jobs are launched from the Notebook environment. We are doing it by checking if the current execution path is with ipython and if the ipython kernel is connected to a notebook frontend. was: Dataflow needs the capability to tell how many Dataflow jobs are launched from the Notebook environment, i.e., the Interactive Runner. # Change the pipeline.run() API to allow supply a runner and an option parameter so that a pipeline initially bundled w/ an interactive runner can be directly run by other runners from notebook. # Implicitly add the necessary source information through user labels when the user does p.run(runner=DataflowRunner()). > Instrument Dataflow jobs that are launched from Notebooks > - > > Key: BEAM-8457 > URL: https://issues.apache.org/jira/browse/BEAM-8457 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Fix For: 2.18.0 > > Time Spent: 9h 40m > Remaining Estimate: 0h > > Dataflow needs the capability to tell how many Dataflow jobs are launched > from the Notebook environment. > We are doing it by checking if the current execution path is with ipython and > if the ipython kernel is connected to a notebook frontend. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8457) Instrument Dataflow jobs that are launched from Notebooks
[ https://issues.apache.org/jira/browse/BEAM-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang resolved BEAM-8457. - Resolution: Fixed PR is merged. Resolve this in case it blocks the release. > Instrument Dataflow jobs that are launched from Notebooks > - > > Key: BEAM-8457 > URL: https://issues.apache.org/jira/browse/BEAM-8457 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Fix For: 2.17.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > Dataflow needs the capability to tell how many Dataflow jobs are launched > from the Notebook environment, i.e., the Interactive Runner. > # Change the pipeline.run() API to allow supply a runner and an option > parameter so that a pipeline initially bundled w/ an interactive runner can > be directly run by other runners from notebook. > # Implicitly add the necessary source information through user labels when > the user does p.run(runner=DataflowRunner()). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-8457) Instrument Dataflow jobs that are launched from Notebooks
[ https://issues.apache.org/jira/browse/BEAM-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-8457 started by Ning Kang. --- > Instrument Dataflow jobs that are launched from Notebooks > - > > Key: BEAM-8457 > URL: https://issues.apache.org/jira/browse/BEAM-8457 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Fix For: 2.17.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > Dataflow needs the capability to tell how many Dataflow jobs are launched > from the Notebook environment, i.e., the Interactive Runner. > # Change the pipeline.run() API to allow supply a runner and an option > parameter so that a pipeline initially bundled w/ an interactive runner can > be directly run by other runners from notebook. > # Implicitly add the necessary source information through user labels when > the user does p.run(runner=DataflowRunner()). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8457) Instrument Dataflow jobs that are launched from Notebooks
Ning Kang created BEAM-8457: --- Summary: Instrument Dataflow jobs that are launched from Notebooks Key: BEAM-8457 URL: https://issues.apache.org/jira/browse/BEAM-8457 Project: Beam Issue Type: Improvement Components: runner-py-interactive Reporter: Ning Kang Assignee: Ning Kang Dataflow needs the capability to tell how many Dataflow jobs are launched from the Notebook environment, i.e., the Interactive Runner. # Change the pipeline.run() API to allow supply a runner and an option parameter so that a pipeline initially bundled w/ an interactive runner can be directly run by other runners from notebook. # Implicitly add the necessary source information through user labels when the user does p.run(runner=DataflowRunner()). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution
[ https://issues.apache.org/jira/browse/BEAM-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954083#comment-16954083 ] Ning Kang commented on BEAM-8425: - 1. We record all the cells that change the pipeline graph in a list. 2. Every time we change the pipeline graph, we go through the cells in that list and see if any of them is missing in the notebook file. If so, it means the user has either deleted the cell or has re-executed the cell, we print a warning and remove the cell from the list. 3. Continue the operation. The way to do (1), is to have the interactive runner record the current cell number upon each .apply call. The current cell number is accessible through len(In)-1. Note that we are not changing the behavior of the Beam programming model, except for printing a warning when the user has deleted or re-executed a cell that had side effect on the job graph, and that's when we think the confusion likely arises. > Notifying Interactive Beam user about Beam related cell deletion or > re-execution > > > Key: BEAM-8425 > URL: https://issues.apache.org/jira/browse/BEAM-8425 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > There is a general problem about Interactive Notebooks that when an end user > deletes a cell that has been executed or re-executes a cell, those previous > executions are hidden from the end user. > However, hidden states will still have side effects in the notebook. > This kind of problem bothers Beam even more because Beam's pipeline > construction statements are note idempotent and pipeline execution is > decoupled and deferred from pipeline construction. > Re-executing a cell with Beam statements that build a pipeline would cause > unexpected pipeline state and the user wouldn't notice it due to the problem > of notebooks. > We'll intercept each transform application invocation from the > InteractiveRunner and record the ipython/notebook prompt number. Then each > time a user executes a cell that applies PTransform, we'll compare the > recorded list of prompt numbers with current notebook file's content and > figure out if there is any missing number. If so, we know for sure that a > re-execution happens and use display manager to notify the end user of > potential side effects caused by hidden states of the notebook/ipython. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution
[ https://issues.apache.org/jira/browse/BEAM-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954007#comment-16954007 ] Ning Kang edited comment on BEAM-8425 at 10/17/19 6:35 PM: --- To get ipython kernel (without try-except for non-interactive scenario): {code:java} from IPython import get_ipython _ipython = get_ipython() {code} To get prompt number of current PTransform applied: {code:java} len(vars(_ipython)['user_ns']['In'])-1{code} Bonus, to get all code executed by the user in this kernel: {code:java} _ipython.magic('%history') {code} was (Author: ningk): To get ipython kernel (without try-except for non-interactive scenario): ``` from IPython import get_ipython _ipython = get_ipython() ``` To get prompt number of current PTransform applied: ``` len(vars(_ipython)['user_ns']['In'])-1 ``` Bonus, to get all code executed by the user in this kernel: _ipython.magic('%history') > Notifying Interactive Beam user about Beam related cell deletion or > re-execution > > > Key: BEAM-8425 > URL: https://issues.apache.org/jira/browse/BEAM-8425 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > There is a general problem about Interactive Notebooks that when an end user > deletes a cell that has been executed or re-executes a cell, those previous > executions are hidden from the end user. > However, hidden states will still have side effects in the notebook. > This kind of problem bothers Beam even more because Beam's pipeline > construction statements are note idempotent and pipeline execution is > decoupled and deferred from pipeline construction. > Re-executing a cell with Beam statements that build a pipeline would cause > unexpected pipeline state and the user wouldn't notice it due to the problem > of notebooks. > We'll intercept each transform application invocation from the > InteractiveRunner and record the ipython/notebook prompt number. Then each > time a user executes a cell that applies PTransform, we'll compare the > recorded list of prompt numbers with current notebook file's content and > figure out if there is any missing number. If so, we know for sure that a > re-execution happens and use display manager to notify the end user of > potential side effects caused by hidden states of the notebook/ipython. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution
[ https://issues.apache.org/jira/browse/BEAM-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954007#comment-16954007 ] Ning Kang commented on BEAM-8425: - To get ipython kernel (without try-except for non-interactive scenario): ``` from IPython import get_ipython _ipython = get_ipython() ``` To get prompt number of current PTransform applied: ``` len(vars(_ipython)['user_ns']['In'])-1 ``` Bonus, to get all code executed by the user in this kernel: _ipython.magic('%history') > Notifying Interactive Beam user about Beam related cell deletion or > re-execution > > > Key: BEAM-8425 > URL: https://issues.apache.org/jira/browse/BEAM-8425 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > There is a general problem about Interactive Notebooks that when an end user > deletes a cell that has been executed or re-executes a cell, those previous > executions are hidden from the end user. > However, hidden states will still have side effects in the notebook. > This kind of problem bothers Beam even more because Beam's pipeline > construction statements are note idempotent and pipeline execution is > decoupled and deferred from pipeline construction. > Re-executing a cell with Beam statements that build a pipeline would cause > unexpected pipeline state and the user wouldn't notice it due to the problem > of notebooks. > We'll intercept each transform application invocation from the > InteractiveRunner and record the ipython/notebook prompt number. Then each > time a user executes a cell that applies PTransform, we'll compare the > recorded list of prompt numbers with current notebook file's content and > figure out if there is any missing number. If so, we know for sure that a > re-execution happens and use display manager to notify the end user of > potential side effects caused by hidden states of the notebook/ipython. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8425) Notifying Interactive Beam user about Beam related cell deletion or re-execution
Ning Kang created BEAM-8425: --- Summary: Notifying Interactive Beam user about Beam related cell deletion or re-execution Key: BEAM-8425 URL: https://issues.apache.org/jira/browse/BEAM-8425 Project: Beam Issue Type: New Feature Components: runner-py-interactive Reporter: Ning Kang Assignee: Ning Kang There is a general problem about Interactive Notebooks that when an end user deletes a cell that has been executed or re-executes a cell, those previous executions are hidden from the end user. However, hidden states will still have side effects in the notebook. This kind of problem bothers Beam even more because Beam's pipeline construction statements are note idempotent and pipeline execution is decoupled and deferred from pipeline construction. Re-executing a cell with Beam statements that build a pipeline would cause unexpected pipeline state and the user wouldn't notice it due to the problem of notebooks. We'll intercept each transform application invocation from the InteractiveRunner and record the ipython/notebook prompt number. Then each time a user executes a cell that applies PTransform, we'll compare the recorded list of prompt numbers with current notebook file's content and figure out if there is any missing number. If so, we know for sure that a re-execution happens and use display manager to notify the end user of potential side effects caused by hidden states of the notebook/ipython. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-646) Get runners out of the apply()
[ https://issues.apache.org/jira/browse/BEAM-646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953250#comment-16953250 ] Ning Kang edited comment on BEAM-646 at 10/16/19 10:51 PM: --- Hi, This is Ning. I'm working on a project that supports InteractiveRunner (Python3+) when users use Beam in notebook environment. Would apply() interception in runner still be useful as a hook for non-Beam related but interactivity related features such as collecting IPython/notebook cell information when a PTransfrom is applied? Of course, we can also have all pipelines support interactivity by making the interception inside pipelines themselves. But it's unlikely that all runners would/could take a pipeline with interactivity logic at this moment. And those ipython/notebook dependencies probably shouldn't be introduced into pipeline itself. Would there be any APIs that support invoking arbitrary external logic at different stages of building a pipeline when pipeline is completely decoupled from runner? Thanks! was (Author: ningk): Hi, This is Ning. I'm working a project that supports InteractiveRunner (Python3+) when users use Beam in notebook environment. Would apply() interception in runner still be useful as a hook for non-Beam related but interactivity related features such as collecting IPython/notebook cell information when a PTransfrom is applied? Of course, we can also have all pipelines support interactivity by making the interception inside pipelines themselves. But it's unlikely that all runners would/could take a pipeline with interactivity logic at this moment. And those ipython/notebook dependencies probably shouldn't be introduced into pipeline itself. Would there be any APIs that support invoking arbitrary external logic at different stages of building a pipeline when pipeline is completely decoupled from runner? Thanks! > Get runners out of the apply() > -- > > Key: BEAM-646 > URL: https://issues.apache.org/jira/browse/BEAM-646 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Thomas Groh >Priority: Major > Labels: backwards-incompatible > Fix For: 0.6.0 > > > Right now, the runner intercepts calls to apply() and replaces transforms as > we go. This means that there is no "original" user graph. For portability and > misc architectural benefits, we would like to build the original graph first, > and have the runner override later. > Some runners already work in this manner, but we could integrate it more > smoothly, with more validation, via some handy APIs on e.g. the Pipeline > object. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-646) Get runners out of the apply()
[ https://issues.apache.org/jira/browse/BEAM-646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953250#comment-16953250 ] Ning Kang commented on BEAM-646: Hi, This is Ning. I'm working a project that supports InteractiveRunner (Python3+) when users use Beam in notebook environment. Would apply() interception in runner still be useful as a hook for non-Beam related but interactivity related features such as collecting IPython/notebook cell information when a PTransfrom is applied? Of course, we can also have all pipelines support interactivity by making the interception inside pipelines themselves. But it's unlikely that all runners would/could take a pipeline with interactivity logic at this moment. And those ipython/notebook dependencies probably shouldn't be introduced into pipeline itself. Would there be any APIs that support invoking arbitrary external logic at different stages of building a pipeline when pipeline is completely decoupled from runner? Thanks! > Get runners out of the apply() > -- > > Key: BEAM-646 > URL: https://issues.apache.org/jira/browse/BEAM-646 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Thomas Groh >Priority: Major > Labels: backwards-incompatible > Fix For: 0.6.0 > > > Right now, the runner intercepts calls to apply() and replaces transforms as > we go. This means that there is no "original" user graph. For portability and > misc architectural benefits, we would like to build the original graph first, > and have the runner override later. > Some runners already work in this manner, but we could integrate it more > smoothly, with more validation, via some handy APIs on e.g. the Pipeline > object. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8379) Cache Eviction for Interactive Beam
Ning Kang created BEAM-8379: --- Summary: Cache Eviction for Interactive Beam Key: BEAM-8379 URL: https://issues.apache.org/jira/browse/BEAM-8379 Project: Beam Issue Type: New Feature Components: runner-py-interactive Reporter: Ning Kang Assignee: Ning Kang Evicts cache created by Interactive Beam when an IPython kernel is restarted or terminated to release the resource usage that is no longer needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8288) Cleanup Interactive Beam Python 2 support
[ https://issues.apache.org/jira/browse/BEAM-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-8288: Component/s: (was: examples-python) runner-py-interactive > Cleanup Interactive Beam Python 2 support > - > > Key: BEAM-8288 > URL: https://issues.apache.org/jira/browse/BEAM-8288 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Minor > > As Beam is retiring Python 2, some special handle in Interactive Beam code > and tests will need to be cleaned up. > This Jira ticket tracks those changes to be cleaned up. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8016) Render Beam Pipeline as DOT with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-8016: Component/s: (was: examples-python) runner-py-interactive > Render Beam Pipeline as DOT with Interactive Beam > --- > > Key: BEAM-8016 > URL: https://issues.apache.org/jira/browse/BEAM-8016 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > With work in https://issues.apache.org/jira/browse/BEAM-7760, Beam pipeline > converted to DOT then rendered should mark user defined variables on edges. > With work in https://issues.apache.org/jira/browse/BEAM-7926, it might be > redundant or confusing to render arbitrary random sample PCollection data on > edges. > We'll also make sure edges in the graph corresponds to output -> input > relationship in the user defined pipeline. Each edge is one output. If > multiple down stream inputs take the same output, it should be rendered as > one edge diverging into two instead of two edges. > For advanced interactivity highlight where each execution highlights the part > of the pipeline really executed from the original pipeline, we'll also > provide the support in beta. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Visualize PCollection with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Component/s: (was: examples-python) runner-py-interactive > Visualize PCollection with Interactive Beam > --- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > p = create_pipeline() > pcoll = p | 'Transform' >> transform() > The use can call a single function and get auto-magical charting of the data > as materialized pcoll. > e.g., visualize(pcoll) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7923) Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7923: Component/s: (was: examples-python) runner-py-interactive > Interactive Beam > > > Key: BEAM-7923 > URL: https://issues.apache.org/jira/browse/BEAM-7923 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > This is the top level ticket for all efforts leveraging [interactive > Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] > As the development goes, blocking tickets will be added to this one. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7760: Component/s: (was: examples-python) runner-py-interactive > Interactive Beam Caching PCollections bound to user defined vars in notebook > > > Key: BEAM-7760 > URL: https://issues.apache.org/jira/browse/BEAM-7760 > Project: Beam > Issue Type: New Feature > Components: runner-py-interactive >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 17h 10m > Remaining Estimate: 0h > > Cache only PCollections bound to user defined variables in a pipeline when > running pipeline with interactive runner in jupyter notebooks. > [Interactive > Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] > has been caching and using caches of "leaf" PCollections for interactive > execution in jupyter notebooks. > The interactive execution is currently supported so that when appending new > transforms to existing pipeline for a new run, executed part of the pipeline > doesn't need to be re-executed. > A PCollection is "leaf" when it is never used as input in any PTransform in > the pipeline. > The problem with building caches and pipeline to execute around "leaf" is > that when a PCollection is consumed by a sink with no output, the pipeline to > execute built will miss the subgraph generating and consuming that > PCollection. > An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty > pipeline. > Caching around PCollections bound to user defined variables and replacing > transforms with source and sink of caches could resolve the pipeline to > execute properly under the interactive execution scenario. Also, cached > PCollection now can trace back to user code and can be used for user data > visualization if user wants to do it. > E.g., > {code:java} > // ... > p = beam.Pipeline(interactive_runner.InteractiveRunner(), > options=pipeline_options) > messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') > messages | "Write" >> beam.io.WriteToPubSub(topic_path) > result = p.run() > // ... > visualize(messages){code} > The interactive runner automatically figures out that PCollection > {code:java} > messages{code} > created by > {code:java} > p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} > should be cached and reused if the notebook user appends more transforms. > And once the pipeline gets executed, the user could use any > visualize(PCollection) module to visualize the data statically (batch) or > dynamically (stream) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-8288) Cleanup Interactive Beam Python 2 support
[ https://issues.apache.org/jira/browse/BEAM-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang reassigned BEAM-8288: --- Assignee: Ning Kang > Cleanup Interactive Beam Python 2 support > - > > Key: BEAM-8288 > URL: https://issues.apache.org/jira/browse/BEAM-8288 > Project: Beam > Issue Type: Improvement > Components: examples-python >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Minor > > As Beam is retiring Python 2, some special handle in Interactive Beam code > and tests will need to be cleaned up. > This Jira ticket tracks those changes to be cleaned up. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8288) Cleanup Interactive Beam Python 2 support
Ning Kang created BEAM-8288: --- Summary: Cleanup Interactive Beam Python 2 support Key: BEAM-8288 URL: https://issues.apache.org/jira/browse/BEAM-8288 Project: Beam Issue Type: Improvement Components: examples-python Reporter: Ning Kang As Beam is retiring Python 2, some special handle in Interactive Beam code and tests will need to be cleaned up. This Jira ticket tracks those changes to be cleaned up. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7926) Visualize PCollection with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Description: Support auto plotting / charting of materialized data of a given PCollection with Interactive Beam. Say an Interactive Beam pipeline defined as p = create_pipeline() pcoll = p | 'Transform' >> transform() The use can call a single function and get auto-magical charting of the data as materialized pcoll. e.g., visualize(pcoll) was: Support auto plotting / charting of materialized data of a given PCollection with interactive Beam. Say an iBeam pipeline defined as p = ibeam.create_pipeline() pcoll = p | 'Transform' >> transform() The use can call a single function and get auto-magical charting of the data as materialized pcoll. e.g., ibeam.visualize(pcoll) > Visualize PCollection with Interactive Beam > --- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: examples-python >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > Support auto plotting / charting of materialized data of a given PCollection > with Interactive Beam. > Say an Interactive Beam pipeline defined as > p = create_pipeline() > pcoll = p | 'Transform' >> transform() > The use can call a single function and get auto-magical charting of the data > as materialized pcoll. > e.g., visualize(pcoll) -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (BEAM-8016) Render Beam Pipeline as DOT with Interactive Beam
Ning Kang created BEAM-8016: --- Summary: Render Beam Pipeline as DOT with Interactive Beam Key: BEAM-8016 URL: https://issues.apache.org/jira/browse/BEAM-8016 Project: Beam Issue Type: Improvement Components: examples-python Reporter: Ning Kang Assignee: Ning Kang With work in https://issues.apache.org/jira/browse/BEAM-7760, Beam pipeline converted to DOT then rendered should mark user defined variables on edges. With work in https://issues.apache.org/jira/browse/BEAM-7926, it might be redundant or confusing to render arbitrary random sample PCollection data on edges. We'll also make sure edges in the graph corresponds to output -> input relationship in the user defined pipeline. Each edge is one output. If multiple down stream inputs take the same output, it should be rendered as one edge diverging into two instead of two edges. For advanced interactivity highlight where each execution highlights the part of the pipeline really executed from the original pipeline, we'll also provide the support in beta. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (BEAM-7926) Visualize PCollection with Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Summary: Visualize PCollection with Interactive Beam (was: Visualize PCollection with iBeam) > Visualize PCollection with Interactive Beam > --- > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: examples-python >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > Support auto plotting / charting of materialized data of a given PCollection > with interactive Beam. > Say an iBeam pipeline defined as > p = ibeam.create_pipeline() > pcoll = p | 'Transform' >> transform() > The use can call a single function and get auto-magical charting of the data > as materialized pcoll. > e.g., ibeam.visualize(pcoll) -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (BEAM-7926) Visualize PCollection with iBeam
Ning Kang created BEAM-7926: --- Summary: Visualize PCollection with iBeam Key: BEAM-7926 URL: https://issues.apache.org/jira/browse/BEAM-7926 Project: Beam Issue Type: New Feature Components: examples-python Reporter: Ning Kang Assignee: Ning Kang Support auto plotting / charting of materialized data of a given PCollection with interactive Beam. Say an iBeam pipeline defined as p = ibeam.create_pipeline() pcoll = p | 'Transform' >> transform() The use can call a single function and get auto-magical charting of the data as materialized pcoll. e.g., visualize(pcoll) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (BEAM-7926) Visualize PCollection with iBeam
[ https://issues.apache.org/jira/browse/BEAM-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7926: Description: Support auto plotting / charting of materialized data of a given PCollection with interactive Beam. Say an iBeam pipeline defined as p = ibeam.create_pipeline() pcoll = p | 'Transform' >> transform() The use can call a single function and get auto-magical charting of the data as materialized pcoll. e.g., ibeam.visualize(pcoll) was: Support auto plotting / charting of materialized data of a given PCollection with interactive Beam. Say an iBeam pipeline defined as p = ibeam.create_pipeline() pcoll = p | 'Transform' >> transform() The use can call a single function and get auto-magical charting of the data as materialized pcoll. e.g., visualize(pcoll) > Visualize PCollection with iBeam > > > Key: BEAM-7926 > URL: https://issues.apache.org/jira/browse/BEAM-7926 > Project: Beam > Issue Type: New Feature > Components: examples-python >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > > Support auto plotting / charting of materialized data of a given PCollection > with interactive Beam. > Say an iBeam pipeline defined as > p = ibeam.create_pipeline() > pcoll = p | 'Transform' >> transform() > The use can call a single function and get auto-magical charting of the data > as materialized pcoll. > e.g., ibeam.visualize(pcoll) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang reassigned BEAM-7760: --- Assignee: Ning Kang > Interactive Beam Caching PCollections bound to user defined vars in notebook > > > Key: BEAM-7760 > URL: https://issues.apache.org/jira/browse/BEAM-7760 > Project: Beam > Issue Type: New Feature > Components: examples-python >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Cache only PCollections bound to user defined variables in a pipeline when > running pipeline with interactive runner in jupyter notebooks. > [Interactive > Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] > has been caching and using caches of "leaf" PCollections for interactive > execution in jupyter notebooks. > The interactive execution is currently supported so that when appending new > transforms to existing pipeline for a new run, executed part of the pipeline > doesn't need to be re-executed. > A PCollection is "leaf" when it is never used as input in any PTransform in > the pipeline. > The problem with building caches and pipeline to execute around "leaf" is > that when a PCollection is consumed by a sink with no output, the pipeline to > execute built will miss the subgraph generating and consuming that > PCollection. > An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty > pipeline. > Caching around PCollections bound to user defined variables and replacing > transforms with source and sink of caches could resolve the pipeline to > execute properly under the interactive execution scenario. Also, cached > PCollection now can trace back to user code and can be used for user data > visualization if user wants to do it. > E.g., > {code:java} > // ... > p = beam.Pipeline(interactive_runner.InteractiveRunner(), > options=pipeline_options) > messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') > messages | "Write" >> beam.io.WriteToPubSub(topic_path) > result = p.run() > // ... > visualize(messages){code} > The interactive runner automatically figures out that PCollection > {code:java} > messages{code} > created by > {code:java} > p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} > should be cached and reused if the notebook user appends more transforms. > And once the pipeline gets executed, the user could use any > visualize(PCollection) module to visualize the data statically (batch) or > dynamically (stream) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (BEAM-7923) Interactive Beam
Ning Kang created BEAM-7923: --- Summary: Interactive Beam Key: BEAM-7923 URL: https://issues.apache.org/jira/browse/BEAM-7923 Project: Beam Issue Type: New Feature Components: examples-python Reporter: Ning Kang Assignee: Ning Kang This is the top level ticket for all efforts leveraging [interactive Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] As the development goes, blocking tickets will be added to this one. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Closed] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors
[ https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang closed BEAM-7812. --- > Work around Stackdriver error reporting double counting worker errors > - > > Key: BEAM-7812 > URL: https://issues.apache.org/jira/browse/BEAM-7812 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Minor > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > h1. *Objective* > Work around Stackdriver Error Reporting to count worker errors only once when > double logging. > {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. > h1. *Background* > Stackdriver error reporting will double count worker errors logged to > Stackdriver, because: > # workers log errors to Stackdriver; > # workers report the same errors to dfe and dfe will log them again to > Stackdriver. > The double counting is blocking us sending job message logs from dfe to > Stackdriver because we don't want to change the behavior of any existing log > and feature. > There happens to be an inconsistency in Java batch > [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]] > and streaming > ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) > error reporting to dfe that results in reported error from streaming Java > worker will eventually be ignored by Stackdriver Error Reporting. > h1. *Details* > Inspired by the inconsistency, we decide to apply the streaming Java worker > error reporting logic to batch to both fix the inconsistency and work around > double counting issue on Stackdriver Error Reporting. > The change will be when workers reporting errors to dfe, > * For Java, construct stack trace from StackTrace object instead of using > printStackTrace; > * For Python, report the complete error message details exactly the same to > worker logging instead of only reporting traceback through traceback module. > Users will not experience change since job message logging to Stackdriver > hasn’t been launched yet. > h1. *Test Plan* > We'll add unit test for public methods changed in the process. > Google has internal integration tests where we can push worker harness images > and set worker harness container image to test in sandbox. > When releasing, we also have integration tests in different releasing stages. > The workaround needs to be released completely before we can enable job > message logging. > We can verify the format of stacktraces in sandbox and release stages by > executing example pipelines in our projects and directly browse prod > Stackdriver logging and error reporting consoles. This should be done before > and after enabling job message logging. > Run any other existing and required tests before sending PR. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors
[ https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang resolved BEAM-7812. - Resolution: Fixed Fix Version/s: Not applicable The change is made to Dataflow service internally in Google instead of Dataflow runner since we want the change compatible with older versions of SDKs. > Work around Stackdriver error reporting double counting worker errors > - > > Key: BEAM-7812 > URL: https://issues.apache.org/jira/browse/BEAM-7812 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Minor > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > h1. *Objective* > Work around Stackdriver Error Reporting to count worker errors only once when > double logging. > {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. > h1. *Background* > Stackdriver error reporting will double count worker errors logged to > Stackdriver, because: > # workers log errors to Stackdriver; > # workers report the same errors to dfe and dfe will log them again to > Stackdriver. > The double counting is blocking us sending job message logs from dfe to > Stackdriver because we don't want to change the behavior of any existing log > and feature. > There happens to be an inconsistency in Java batch > [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]] > and streaming > ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) > error reporting to dfe that results in reported error from streaming Java > worker will eventually be ignored by Stackdriver Error Reporting. > h1. *Details* > Inspired by the inconsistency, we decide to apply the streaming Java worker > error reporting logic to batch to both fix the inconsistency and work around > double counting issue on Stackdriver Error Reporting. > The change will be when workers reporting errors to dfe, > * For Java, construct stack trace from StackTrace object instead of using > printStackTrace; > * For Python, report the complete error message details exactly the same to > worker logging instead of only reporting traceback through traceback module. > Users will not experience change since job message logging to Stackdriver > hasn’t been launched yet. > h1. *Test Plan* > We'll add unit test for public methods changed in the process. > Google has internal integration tests where we can push worker harness images > and set worker harness container image to test in sandbox. > When releasing, we also have integration tests in different releasing stages. > The workaround needs to be released completely before we can enable job > message logging. > We can verify the format of stacktraces in sandbox and release stages by > executing example pipelines in our projects and directly browse prod > Stackdriver logging and error reporting consoles. This should be done before > and after enabling job message logging. > Run any other existing and required tests before sending PR. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors
[ https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16900263#comment-16900263 ] Ning Kang commented on BEAM-7812: - We decide to make the change internally in Dataflow service to support all versions of SDKs instead of making changes in the runner then patch to all SDKs. We may mark the bug closed now. > Work around Stackdriver error reporting double counting worker errors > - > > Key: BEAM-7812 > URL: https://issues.apache.org/jira/browse/BEAM-7812 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > h1. *Objective* > Work around Stackdriver Error Reporting to count worker errors only once when > double logging. > {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. > h1. *Background* > Stackdriver error reporting will double count worker errors logged to > Stackdriver, because: > # workers log errors to Stackdriver; > # workers report the same errors to dfe and dfe will log them again to > Stackdriver. > The double counting is blocking us sending job message logs from dfe to > Stackdriver because we don't want to change the behavior of any existing log > and feature. > There happens to be an inconsistency in Java batch > [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]] > and streaming > ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) > error reporting to dfe that results in reported error from streaming Java > worker will eventually be ignored by Stackdriver Error Reporting. > h1. *Details* > Inspired by the inconsistency, we decide to apply the streaming Java worker > error reporting logic to batch to both fix the inconsistency and work around > double counting issue on Stackdriver Error Reporting. > The change will be when workers reporting errors to dfe, > * For Java, construct stack trace from StackTrace object instead of using > printStackTrace; > * For Python, report the complete error message details exactly the same to > worker logging instead of only reporting traceback through traceback module. > Users will not experience change since job message logging to Stackdriver > hasn’t been launched yet. > h1. *Test Plan* > We'll add unit test for public methods changed in the process. > Google has internal integration tests where we can push worker harness images > and set worker harness container image to test in sandbox. > When releasing, we also have integration tests in different releasing stages. > The workaround needs to be released completely before we can enable job > message logging. > We can verify the format of stacktraces in sandbox and release stages by > executing example pipelines in our projects and directly browse prod > Stackdriver logging and error reporting consoles. This should be done before > and after enabling job message logging. > Run any other existing and required tests before sending PR. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work started] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors
[ https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-7812 started by Ning Kang. --- > Work around Stackdriver error reporting double counting worker errors > - > > Key: BEAM-7812 > URL: https://issues.apache.org/jira/browse/BEAM-7812 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Minor > > h1. *Objective* > Work around Stackdriver Error Reporting to count worker errors only once when > double logging. > {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. > h1. *Background* > Stackdriver error reporting will double count worker errors logged to > Stackdriver, because: > # workers log errors to Stackdriver; > # workers report the same errors to dfe and dfe will log them again to > Stackdriver. > The double counting is blocking us sending job message logs from dfe to > Stackdriver because we don't want to change the behavior of any existing log > and feature. > There happens to be an inconsistency in Java batch > [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]] > and streaming > ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) > error reporting to dfe that results in reported error from streaming Java > worker will eventually be ignored by Stackdriver Error Reporting. > h1. *Details* > Inspired by the inconsistency, we decide to apply the streaming Java worker > error reporting logic to batch to both fix the inconsistency and work around > double counting issue on Stackdriver Error Reporting. > The change will be when workers reporting errors to dfe, > * For Java, construct stack trace from StackTrace object instead of using > printStackTrace; > * For Python, report the complete error message details exactly the same to > worker logging instead of only reporting traceback through traceback module. > Users will not experience change since job message logging to Stackdriver > hasn’t been launched yet. > h1. *Test Plan* > We'll add unit test for public methods changed in the process. > Google has internal integration tests where we can push worker harness images > and set worker harness container image to test in sandbox. > When releasing, we also have integration tests in different releasing stages. > The workaround needs to be released completely before we can enable job > message logging. > We can verify the format of stacktraces in sandbox and release stages by > executing example pipelines in our projects and directly browse prod > Stackdriver logging and error reporting consoles. This should be done before > and after enabling job message logging. > Run any other existing and required tests before sending PR. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors
[ https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7812: Description: h1. *Objective* Work around Stackdriver Error Reporting to count worker errors only once when double logging. {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. h1. *Background* Stackdriver error reporting will double count worker errors logged to Stackdriver, because: # workers log errors to Stackdriver; # workers report the same errors to dfe and dfe will log them again to Stackdriver. The double counting is blocking us sending job message logs from dfe to Stackdriver because we don't want to change the behavior of any existing log and feature. There happens to be an inconsistency in Java batch [DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]] and streaming ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) error reporting to dfe that results in reported error from streaming Java worker will eventually be ignored by Stackdriver Error Reporting. h1. *Details* Inspired by the inconsistency, we decide to apply the streaming Java worker error reporting logic to batch to both fix the inconsistency and work around double counting issue on Stackdriver Error Reporting. The change will be when workers reporting errors to dfe, * For Java, construct stack trace from StackTrace object instead of using printStackTrace; * For Python, report the complete error message details exactly the same to worker logging instead of only reporting traceback through traceback module. Users will not experience change since job message logging to Stackdriver hasn’t been launched yet. h1. *Test Plan* We'll add unit test for public methods changed in the process. Google has internal integration tests where we can push worker harness images and set worker harness container image to test in sandbox. When releasing, we also have integration tests in different releasing stages. The workaround needs to be released completely before we can enable job message logging. We can verify the format of stacktraces in sandbox and release stages by executing example pipelines in our projects and directly browse prod Stackdriver logging and error reporting consoles. This should be done before and after enabling job message logging. Run any other existing and required tests before sending PR. was: h1. *Objective* Work around Stackdriver Error Reporting to count worker errors only once when double logging. {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. h1. *Background* Stackdriver error reporting will double count worker errors logged to Stackdriver, because: # workers log errors to Stackdriver; # workers report the same errors to dfe and dfe will log them again to Stackdriver. The double counting is blocking us sending job message logs from dfe to Stackdriver because we don't want to change the behavior of any existing log and feature. There happens to be an inconsistency in Java batch [DataflowWorkerLoggingHandler]([https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]) and streaming ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) error reporting to dfe that results in reported error from streaming Java worker will eventually be ignored by Stackdriver Error Reporting. h1. *Details* Inspired by the inconsistency, we decide to apply the streaming Java worker error reporting logic to batch to both fix the inconsistency and work around double counting issue on Stackdriver Error Reporting. The change will be when workers reporting errors to dfe, * For Java, construct stack trace from StackTrace object instead of using printStackTrace; * For Python, report the complete error message details exactly the same to worker logging instead of only reporting traceback through traceback module. Users will not experience change since job message logging to Stackdriver hasn’t been launched yet. h1. *Test Plan* We'll add unit test for public methods changed in the process. Google has internal integration tests where we can push worker harness images and set worker harness container image to test in sandbox. When releasing, we also have integration tests in different releasing stages. The workaround needs to be released completely before we can
[jira] [Updated] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors
[ https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7812: Description: h1. *Objective* Work around Stackdriver Error Reporting to count worker errors only once when double logging. {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. h1. *Background* Stackdriver error reporting will double count worker errors logged to Stackdriver, because: # workers log errors to Stackdriver; # workers report the same errors to dfe and dfe will log them again to Stackdriver. The double counting is blocking us sending job message logs from dfe to Stackdriver because we don't want to change the behavior of any existing log and feature. There happens to be an inconsistency in Java batch [DataflowWorkerLoggingHandler]([https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]) and streaming ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) error reporting to dfe that results in reported error from streaming Java worker will eventually be ignored by Stackdriver Error Reporting. h1. *Details* Inspired by the inconsistency, we decide to apply the streaming Java worker error reporting logic to batch to both fix the inconsistency and work around double counting issue on Stackdriver Error Reporting. The change will be when workers reporting errors to dfe, * For Java, construct stack trace from StackTrace object instead of using printStackTrace; * For Python, report the complete error message details exactly the same to worker logging instead of only reporting traceback through traceback module. Users will not experience change since job message logging to Stackdriver hasn’t been launched yet. h1. *Test Plan* We'll add unit test for public methods changed in the process. Google has internal integration tests where we can push worker harness images and set worker harness container image to test in sandbox. When releasing, we also have integration tests in different releasing stages. The workaround needs to be released completely before we can enable job message logging. We can verify the format of stacktraces in sandbox and release stages by executing example pipelines in our projects and directly browse prod Stackdriver logging and error reporting consoles. This should be done before and after enabling job message logging. Run any other existing and required tests before sending PR. was: h1. *Objective* Work around Stackdriver Error Reporting to count worker errors only once when double logging. {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. h1. *Background* Stackdriver error reporting will double count worker errors logged to Stackdriver, because: # workers log errors to Stackdriver; # workers report the same errors to dfe and dfe will log them again to Stackdriver. The double counting is blocking us sending job message logs from dfe to Stackdriver because we don't want to change the behavior of any existing log and feature. There happens to be an inconsistency in Java batch ([DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]]) and streaming ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) error reporting to dfe that results in reported error from streaming Java worker will eventually be ignored by Stackdriver Error Reporting. h1. *Details* Inspired by the inconsistency, we decide to apply the streaming Java worker error reporting logic to batch to both fix the inconsistency and work around double counting issue on Stackdriver Error Reporting. The change will be when workers reporting errors to dfe, * For Java, construct stack trace from StackTrace object instead of using printStackTrace; * For Python, report the complete error message details exactly the same to worker logging instead of only reporting traceback through traceback module. Users will not experience change since job message logging to Stackdriver hasn’t been launched yet. h1. *Test Plan* We'll add unit test for public methods changed in the process. Google has internal integration tests where we can push worker harness images and set worker harness container image to test in sandbox. When releasing, we also have integration tests in different releasing stages. The workaround needs to be released completely before we can
[jira] [Assigned] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors
[ https://issues.apache.org/jira/browse/BEAM-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang reassigned BEAM-7812: --- Assignee: Ning Kang > Work around Stackdriver error reporting double counting worker errors > - > > Key: BEAM-7812 > URL: https://issues.apache.org/jira/browse/BEAM-7812 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ning Kang >Assignee: Ning Kang >Priority: Minor > > h1. *Objective* > Work around Stackdriver Error Reporting to count worker errors only once when > double logging. > {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. > h1. *Background* > Stackdriver error reporting will double count worker errors logged to > Stackdriver, because: > # workers log errors to Stackdriver; > # workers report the same errors to dfe and dfe will log them again to > Stackdriver. > The double counting is blocking us sending job message logs from dfe to > Stackdriver because we don't want to change the behavior of any existing log > and feature. > There happens to be an inconsistency in Java batch > ([DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]]) > and streaming > ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) > error reporting to dfe that results in reported error from streaming Java > worker will eventually be ignored by Stackdriver Error Reporting. > h1. *Details* > Inspired by the inconsistency, we decide to apply the streaming Java worker > error reporting logic to batch to both fix the inconsistency and work around > double counting issue on Stackdriver Error Reporting. > The change will be when workers reporting errors to dfe, > * For Java, construct stack trace from StackTrace object instead of using > printStackTrace; > * For Python, report the complete error message details exactly the same to > worker logging instead of only reporting traceback through traceback module. > Users will not experience change since job message logging to Stackdriver > hasn’t been launched yet. > h1. *Test Plan* > We'll add unit test for public methods changed in the process. > Google has internal integration tests where we can push worker harness images > and set worker harness container image to test in sandbox. > When releasing, we also have integration tests in different releasing stages. > The workaround needs to be released completely before we can enable job > message logging. > We can verify the format of stacktraces in sandbox and release stages by > executing example pipelines in our projects and directly browse prod > Stackdriver logging and error reporting consoles. This should be done before > and after enabling job message logging. > Run any other existing and required tests before sending PR. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (BEAM-7812) Work around Stackdriver error reporting double counting worker errors
Ning Kang created BEAM-7812: --- Summary: Work around Stackdriver error reporting double counting worker errors Key: BEAM-7812 URL: https://issues.apache.org/jira/browse/BEAM-7812 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Ning Kang h1. *Objective* Work around Stackdriver Error Reporting to count worker errors only once when double logging. {color:#d04437}*Only applicable to dataflow runner workers in SDK*{color}. h1. *Background* Stackdriver error reporting will double count worker errors logged to Stackdriver, because: # workers log errors to Stackdriver; # workers report the same errors to dfe and dfe will log them again to Stackdriver. The double counting is blocking us sending job message logs from dfe to Stackdriver because we don't want to change the behavior of any existing log and feature. There happens to be an inconsistency in Java batch ([DataflowWorkerLoggingHandler|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java#L82]]) and streaming ([StreamingDataflowWorker|[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1747]]) error reporting to dfe that results in reported error from streaming Java worker will eventually be ignored by Stackdriver Error Reporting. h1. *Details* Inspired by the inconsistency, we decide to apply the streaming Java worker error reporting logic to batch to both fix the inconsistency and work around double counting issue on Stackdriver Error Reporting. The change will be when workers reporting errors to dfe, * For Java, construct stack trace from StackTrace object instead of using printStackTrace; * For Python, report the complete error message details exactly the same to worker logging instead of only reporting traceback through traceback module. Users will not experience change since job message logging to Stackdriver hasn’t been launched yet. h1. *Test Plan* We'll add unit test for public methods changed in the process. Google has internal integration tests where we can push worker harness images and set worker harness container image to test in sandbox. When releasing, we also have integration tests in different releasing stages. The workaround needs to be released completely before we can enable job message logging. We can verify the format of stacktraces in sandbox and release stages by executing example pipelines in our projects and directly browse prod Stackdriver logging and error reporting consoles. This should be done before and after enabling job message logging. Run any other existing and required tests before sending PR. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7760: Description: Cache only PCollections bound to user defined variables in a pipeline when running pipeline with interactive runner in jupyter notebooks. [Interactive Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] has been caching and using caches of "leaf" PCollections for interactive execution in jupyter notebooks. The interactive execution is currently supported so that when appending new transforms to existing pipeline for a new run, executed part of the pipeline doesn't need to be re-executed. A PCollection is "leaf" when it is never used as input in any PTransform in the pipeline. The problem with building caches and pipeline to execute around "leaf" is that when a PCollection is consumed by a sink with no output, the pipeline to execute built will miss the subgraph generating and consuming that PCollection. An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline. Caching around PCollections bound to user defined variables and replacing transforms with source and sink of caches could resolve the pipeline to execute properly under the interactive execution scenario. Also, cached PCollection now can trace back to user code and can be used for user data visualization if user wants to do it. E.g., {code:java} // ... p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=pipeline_options) messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') messages | "Write" >> beam.io.WriteToPubSub(topic_path) result = p.run() // ... visualize(messages){code} The interactive runner automatically figures out that PCollection {code:java} messages{code} created by {code:java} p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} should be cached and reused if the notebook user appends more transforms. And once the pipeline gets executed, the user could use any visualize(PCollection) module to visualize the data statically (batch) or dynamically (stream) was: Cache only PCollections bound to user defined variables in a pipeline when running pipeline with interactive runner in jupyter notebooks. [Interactive Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] has been caching and using caches of "leaf" PCollections for interactive execution in jupyter notebooks. The interactive execution is currently supported so that when appending new transforms to existing pipeline for a new run, executed part of the pipeline doesn't need to be re-executed. A PCollection is "leaf" when it is never used as input in any PTransform in the pipeline. The problem with building caches and pipeline to execute around "leaf" is that when a PCollection is consumed by a sink with no output, the pipeline to execute built will miss the subgraph generating and consuming that PCollection. An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline. Caching around PCollections bound to user defined variables and replacing transforms with source and sink of caches could resolve the pipeline to execute properly under the interactive execution scenario. Also, cached PCollection now can trace back to user code and can be used for user data visualization if user wants to do it. E.g., {code:java} // ... p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=pipeline_options) messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') messages | "Write" >> beam.io.WriteToPubSub(topic_path) result = p.run() // ... visualize(messages){code} The interactive runner automatically figures out that PCollection created by {code:java} p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} should be cached and reused if the notebook user appends more transforms. And once the pipeline gets executed, the user could use any visualize(PCollection) module to visualize the data statically (batch) or dynamically (stream) > Interactive Beam Caching PCollections bound to user defined vars in notebook > > > Key: BEAM-7760 > URL: https://issues.apache.org/jira/browse/BEAM-7760 > Project: Beam > Issue Type: New Feature > Components: examples-python >Reporter: Ning Kang >Priority: Major > > Cache only PCollections bound to user defined variables in a pipeline when > running pipeline with interactive runner in jupyter notebooks. > [Interactive > Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] > has been caching and using caches of "leaf" PCollections for interactive > execution in jupyter notebooks. > The
[jira] [Updated] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
[ https://issues.apache.org/jira/browse/BEAM-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Kang updated BEAM-7760: Description: Cache only PCollections bound to user defined variables in a pipeline when running pipeline with interactive runner in jupyter notebooks. [Interactive Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] has been caching and using caches of "leaf" PCollections for interactive execution in jupyter notebooks. The interactive execution is currently supported so that when appending new transforms to existing pipeline for a new run, executed part of the pipeline doesn't need to be re-executed. A PCollection is "leaf" when it is never used as input in any PTransform in the pipeline. The problem with building caches and pipeline to execute around "leaf" is that when a PCollection is consumed by a sink with no output, the pipeline to execute built will miss the subgraph generating and consuming that PCollection. An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline. Caching around PCollections bound to user defined variables and replacing transforms with source and sink of caches could resolve the pipeline to execute properly under the interactive execution scenario. Also, cached PCollection now can trace back to user code and can be used for user data visualization if user wants to do it. E.g., {code:java} // ... p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=pipeline_options) messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') messages | "Write" >> beam.io.WriteToPubSub(topic_path) result = p.run() // ... visualize(messages){code} The interactive runner automatically figures out that PCollection created by {code:java} p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} should be cached and reused if the notebook user appends more transforms. And once the pipeline gets executed, the user could use any visualize(PCollection) module to visualize the data statically (batch) or dynamically (stream) was: Cache only PCollections bound to user defined variables in a pipeline when running pipeline with interactive runner in jupyter notebooks. [Interactive Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] has been caching and using caches of "leaf" PCollections for interactive execution in jupyter notebooks. The interactive execution is currently supported so that when appending new transforms to existing pipeline for a new run, executed part of the pipeline doesn't need to be re-executed. A PCollection is "leaf" when it is never used as input in any PTransform in the pipeline. The problem with building caches and pipeline to execute around "leaf" is that when a PCollection is consumed by a sink with no output, the pipeline to execute built will miss the subgraph generating and consuming that PCollection. An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline. Caching around PCollections bound to user defined variables and replacing transforms with source and sink of caches could resolve the pipeline to execute properly under the interactive execution scenario. Also, cached PCollection now can trace back to user code and can be used for user data visualization if user wants to do it. E.g., {code:java} // ... p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=pipeline_options) messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') messages | "Write" >> beam.io.WriteToPubSub(topic_path) result = p.run() // ... visualize(messages){code} The interactive runner automatically figures out that PCollection created by {code:java} p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} > Interactive Beam Caching PCollections bound to user defined vars in notebook > > > Key: BEAM-7760 > URL: https://issues.apache.org/jira/browse/BEAM-7760 > Project: Beam > Issue Type: New Feature > Components: examples-python >Reporter: Ning Kang >Priority: Major > > Cache only PCollections bound to user defined variables in a pipeline when > running pipeline with interactive runner in jupyter notebooks. > [Interactive > Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] > has been caching and using caches of "leaf" PCollections for interactive > execution in jupyter notebooks. > The interactive execution is currently supported so that when appending new > transforms to existing pipeline for a new run, executed part of the pipeline > doesn't need to be re-executed. > A PCollection is "leaf" when it is never used as input in any
[jira] [Created] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook
Ning Kang created BEAM-7760: --- Summary: Interactive Beam Caching PCollections bound to user defined vars in notebook Key: BEAM-7760 URL: https://issues.apache.org/jira/browse/BEAM-7760 Project: Beam Issue Type: New Feature Components: examples-python Reporter: Ning Kang Cache only PCollections bound to user defined variables in a pipeline when running pipeline with interactive runner in jupyter notebooks. [Interactive Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] has been caching and using caches of "leaf" PCollections for interactive execution in jupyter notebooks. The interactive execution is currently supported so that when appending new transforms to existing pipeline for a new run, executed part of the pipeline doesn't need to be re-executed. A PCollection is "leaf" when it is never used as input in any PTransform in the pipeline. The problem with building caches and pipeline to execute around "leaf" is that when a PCollection is consumed by a sink with no output, the pipeline to execute built will miss the subgraph generating and consuming that PCollection. An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline. Caching around PCollections bound to user defined variables and replacing transforms with source and sink of caches could resolve the pipeline to execute properly under the interactive execution scenario. Also, cached PCollection now can trace back to user code and can be used for user data visualization if user wants to do it. E.g., {code:java} // ... p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=pipeline_options) messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...') messages | "Write" >> beam.io.WriteToPubSub(topic_path) result = p.run() // ... visualize(messages){code} The interactive runner automatically figures out that PCollection created by {code:java} p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)