[jira] [Assigned] (BEAM-13624) pipeline_fragment incorrectly prunes producer transform
[ https://issues.apache.org/jira/browse/BEAM-13624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-13624: Assignee: Sam Rohde > pipeline_fragment incorrectly prunes producer transform > --- > > Key: BEAM-13624 > URL: https://issues.apache.org/jira/browse/BEAM-13624 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P2 > > Unfortunately I haven't been able to diagnose the exact issue here or come up > with a minimal repro. I just have some code to reproduce in > https://github.com/apache/beam/pull/16445. > That PR adds support for value_count(bins) in the DataFrame API, which for > some reason is interacting poorly with pipeline pruning in interactive Beam > (rehydrating the pipeline raises an error about a PCollection's producer > missing). The PR also adds a test to transform_test.py that replicate the > issue, as well as a temporary mitigation in pipeline_fragment.py. I think the > mitigation is effectively disabling pipeline pruning, so it likely shouldn't > be merged. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-12984) InteractiveRunner cannot collect PCollections from composites
[ https://issues.apache.org/jira/browse/BEAM-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-12984: - Resolution: Fixed Status: Resolved (was: Open) > InteractiveRunner cannot collect PCollections from composites > -- > > Key: BEAM-12984 > URL: https://issues.apache.org/jira/browse/BEAM-12984 > Project: Beam > Issue Type: Bug > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Fix For: 2.34.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > The following code will complain and throw an exception that there is no > producer for some PCollection. > ``` > import apache_beam as beam > import apache_beam.runners.interactive.interactive_beam as ib > import apache_beam.runners.interactive.interactive_runner as ir > > @beam.ptransform_fn > def Foo(pcoll): > p1 = pcoll | 'ident' >> beam.Map(lambda n: n) > p2 = pcoll | 'to str' >> beam.Map(str) > return {'pc1': p1, 'pc2': p2} > > p = beam.Pipeline(ir.InteractiveRunner()) > res = p | 'my create' >> beam.Create([1]) | 'my foo' >> Foo() > ib.collect(res['pc1']) > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12984) InteractiveRunner cannot collect PCollections from composites
Sam Rohde created BEAM-12984: Summary: InteractiveRunner cannot collect PCollections from composites Key: BEAM-12984 URL: https://issues.apache.org/jira/browse/BEAM-12984 Project: Beam Issue Type: Bug Components: runner-py-interactive Reporter: Sam Rohde Assignee: Sam Rohde Fix For: 2.34.0 The following code will complain and throw an exception that there is no producer for some PCollection. ``` import apache_beam as beam import apache_beam.runners.interactive.interactive_beam as ib import apache_beam.runners.interactive.interactive_runner as ir @beam.ptransform_fn def Foo(pcoll): p1 = pcoll | 'ident' >> beam.Map(lambda n: n) p2 = pcoll | 'to str' >> beam.Map(str) return {'pc1': p1, 'pc2': p2} p = beam.Pipeline(ir.InteractiveRunner()) res = p | 'my create' >> beam.Create([1]) | 'my foo' >> Foo() ib.collect(res['pc1']) ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12843) (Broken Pipe induced) Bricked Dataflow Pipeline
[ https://issues.apache.org/jira/browse/BEAM-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17419460#comment-17419460 ] Sam Rohde commented on BEAM-12843: -- Nothing concrete. It could be that GCS updated their API to return an arbitrary byte range to retry or it could be that more users are handling larger files. It's also possible it could be something else entirely. > (Broken Pipe induced) Bricked Dataflow Pipeline > > > Key: BEAM-12843 > URL: https://issues.apache.org/jira/browse/BEAM-12843 > Project: Beam > Issue Type: Bug > Components: io-py-gcp, runner-dataflow >Affects Versions: 2.31.0 >Reporter: Ryan Tam >Assignee: Chamikara Madhusanka Jayalath >Priority: P1 > > We are seeing Dataflow pipelines being stuck indefinitely, the common theme > of this behaviour is a bundle failing with the Broken Pipe error and > subsequently the next bundle being stuck at the `StartBundle` stage (reported > by Dataflow). > Specifically, we see an exception like the following for a bundle (truncated > re-raise exception log as it’s long):- > > {code:java} > "Error processing instruction process_bundle-7079259598045896145-12555. > Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 1223, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 752, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 875, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 1359, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", > line 192, in process > writer.write(element) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", > line 1454, in write > return self._file_handle.write(self._coder.encode(row) + b'\n') > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line > 200, in write > self._uploader.put(b) > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", > line 661, in put > self._conn.send_bytes(data.tobytes()) > File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in > send_bytes > self._send_bytes(m[offset:offset + size]) > File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in > _send_bytes > self._send(buf) > File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in > _send > n = write(self._handle, buf) > BrokenPipeError: [Errno 32] Broken pipe > {code} > And as previously mentioned, the next bundle is stuck at the `StartBundle` > stage (reported by Dataflow), the progress report thread logs message like > these:- > > {code:java} > "Operation ongoing for over 10087.60 seconds in state start-msecs in step > Assign to Location for POI joins-ptransform-49654 without returning. Current > Traceback: > File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap > self._bootstrap_inner() > File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner > self.run() > File > "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", > line 53, in run > self._work_item.run() > File > "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", > line 37, in run > self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 357, in task > lambda: self.create_worker().do_instruction(request), request) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 284, in _execute > response = task() > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 357, in > lambda: self.create_worker().do_instruction(request), request) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 602, in do_instruction > getattr(request, request_type), request.instruction_id) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 639, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 983, in process_bundle > expected_inputs): > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", > line 459, in input_elements >
[jira] [Commented] (BEAM-12843) (Broken Pipe induced) Bricked Dataflow Pipeline
[ https://issues.apache.org/jira/browse/BEAM-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17419433#comment-17419433 ] Sam Rohde commented on BEAM-12843: -- Looking at this a little bit more, the IO uses a [GCS resumable_upload|[https://cloud.google.com/storage/docs/performing-resumable-uploads#resume-uploadhttp://example.com|https://cloud.google.com/storage/docs/performing-resumable-uploads#resume-upload]] when writing files. By the looks of their API, an interruption can occur at any byte and querying the status of the insert gets you the last byte that succeeded. As Udi says, our API only can only rewind to the end of the last successful block, otherwise it raises an exception. The retry mismatch between the GCS API (arbitrary byte) and our logic (previous block) causes the raised exception. > (Broken Pipe induced) Bricked Dataflow Pipeline > > > Key: BEAM-12843 > URL: https://issues.apache.org/jira/browse/BEAM-12843 > Project: Beam > Issue Type: Bug > Components: io-py-gcp, runner-dataflow >Affects Versions: 2.31.0 >Reporter: Ryan Tam >Assignee: Chamikara Madhusanka Jayalath >Priority: P1 > > We are seeing Dataflow pipelines being stuck indefinitely, the common theme > of this behaviour is a bundle failing with the Broken Pipe error and > subsequently the next bundle being stuck at the `StartBundle` stage (reported > by Dataflow). > Specifically, we see an exception like the following for a bundle (truncated > re-raise exception log as it’s long):- > > {code:java} > "Error processing instruction process_bundle-7079259598045896145-12555. > Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 1223, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 752, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 875, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 1359, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", > line 192, in process > writer.write(element) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", > line 1454, in write > return self._file_handle.write(self._coder.encode(row) + b'\n') > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line > 200, in write > self._uploader.put(b) > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", > line 661, in put > self._conn.send_bytes(data.tobytes()) > File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in > send_bytes > self._send_bytes(m[offset:offset + size]) > File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in > _send_bytes > self._send(buf) > File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in > _send > n = write(self._handle, buf) > BrokenPipeError: [Errno 32] Broken pipe > {code} > And as previously mentioned, the next bundle is stuck at the `StartBundle` > stage (reported by Dataflow), the progress report thread logs message like > these:- > > {code:java} > "Operation ongoing for over 10087.60 seconds in state start-msecs in step > Assign to Location for POI joins-ptransform-49654 without returning. Current > Traceback: > File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap > self._bootstrap_inner() > File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner > self.run() > File > "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", > line 53, in run > self._work_item.run() > File > "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", > line 37, in run > self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 357, in task > lambda: self.create_worker().do_instruction(request), request) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 284, in _execute > response = task() > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 357, in > lambda: self.create_worker().do_instruction(request), request) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 602, in do_instruction > getattr(request, request_type), request.instruction_id) > File > "/usr/local/lib/
[jira] [Commented] (BEAM-12842) StreamingDataflowWorkerTest.testHotKeyLogging test flake
[ https://issues.apache.org/jira/browse/BEAM-12842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17413267#comment-17413267 ] Sam Rohde commented on BEAM-12842: -- Made https://github.com/apache/beam/pull/15492 > StreamingDataflowWorkerTest.testHotKeyLogging test flake > > > Key: BEAM-12842 > URL: https://issues.apache.org/jira/browse/BEAM-12842 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow, test-failures >Reporter: Valentyn Tymofieiev >Assignee: Sam Rohde >Priority: P1 > Labels: flake, test-failures > Time Spent: 10m > Remaining Estimate: 0h > > Hi Sam, would you be able to please help take a look at these test failures: > https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4455/testReport/junit/org.apache.beam.runners.dataflow.worker/StreamingDataflowWorkerTest/testHotKeyLogging_1___streamingEngine_true___2/ > {noformat} > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testHotKeyLogging[1: > [streamingEngine=true]] > Failing for the past 1 build (Since #4455 ) > Took 15 sec. > Error Message > java.lang.AssertionError: Should have received 2000 more commits beyond 0 > commits already seen, but after 10s have only seen {0=key: "key" > work_token: 0 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data0" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > sharding_key: 12345 > cache_token: 3 > , 1=key: "key" > work_token: 1 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data1" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > sharding_key: 12345 > cache_token: 3 > , 2=key: "key" > work_token: 2 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data2" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > ... > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12842) StreamingDataflowWorkerTest.testHotKeyLogging test flake
[ https://issues.apache.org/jira/browse/BEAM-12842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17413246#comment-17413246 ] Sam Rohde commented on BEAM-12842: -- Looking at the code, the test puts a 0 as the timestamp for each work. Maybe that makes it susceptible to flaking? I'll add in the timestamp and check it in. > StreamingDataflowWorkerTest.testHotKeyLogging test flake > > > Key: BEAM-12842 > URL: https://issues.apache.org/jira/browse/BEAM-12842 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow, test-failures >Reporter: Valentyn Tymofieiev >Assignee: Sam Rohde >Priority: P1 > Labels: flake, test-failures > > Hi Sam, would you be able to please help take a look at these test failures: > https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4455/testReport/junit/org.apache.beam.runners.dataflow.worker/StreamingDataflowWorkerTest/testHotKeyLogging_1___streamingEngine_true___2/ > {noformat} > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testHotKeyLogging[1: > [streamingEngine=true]] > Failing for the past 1 build (Since #4455 ) > Took 15 sec. > Error Message > java.lang.AssertionError: Should have received 2000 more commits beyond 0 > commits already seen, but after 10s have only seen {0=key: "key" > work_token: 0 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data0" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > sharding_key: 12345 > cache_token: 3 > , 1=key: "key" > work_token: 1 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data1" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > sharding_key: 12345 > cache_token: 3 > , 2=key: "key" > work_token: 2 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data2" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > ... > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12842) StreamingDataflowWorkerTest.testHotKeyLogging test flake
[ https://issues.apache.org/jira/browse/BEAM-12842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-12842: - Summary: StreamingDataflowWorkerTest.testHotKeyLogging test flake (was: StreamingDataflowWorkerTest.testHotKeyLogging tests are flaky) > StreamingDataflowWorkerTest.testHotKeyLogging test flake > > > Key: BEAM-12842 > URL: https://issues.apache.org/jira/browse/BEAM-12842 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow, test-failures >Reporter: Valentyn Tymofieiev >Assignee: Sam Rohde >Priority: P1 > Labels: flake, test-failures > > Hi Sam, would you be able to please help take a look at these test failures: > https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4455/testReport/junit/org.apache.beam.runners.dataflow.worker/StreamingDataflowWorkerTest/testHotKeyLogging_1___streamingEngine_true___2/ > {noformat} > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testHotKeyLogging[1: > [streamingEngine=true]] > Failing for the past 1 build (Since #4455 ) > Took 15 sec. > Error Message > java.lang.AssertionError: Should have received 2000 more commits beyond 0 > commits already seen, but after 10s have only seen {0=key: "key" > work_token: 0 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data0" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > sharding_key: 12345 > cache_token: 3 > , 1=key: "key" > work_token: 1 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data1" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > sharding_key: 12345 > cache_token: 3 > , 2=key: "key" > work_token: 2 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data2" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > ... > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12842) StreamingDataflowWorkerTest.testHotKeyLogging tests are flaky
[ https://issues.apache.org/jira/browse/BEAM-12842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411379#comment-17411379 ] Sam Rohde commented on BEAM-12842: -- The test is essentially a copy of testBasic except with an extra log existence assertion at the end. This would indicate a flake with the test infrastructure. Looks like this is the first flake for the test, so maybe it's related a recent change? The three most recent changes in the Dataflow worker are the following, with the first two being related to streaming: * [https://github.com/apache/beam/commit/2135e5e6d1ffc778a4efc83b78dc8af86c6db3be#diff-d47e0a84a3abe960382f7eda4159284c6af01b17e6214268ba54e978f1681414] * [https://github.com/apache/beam/commit/de7e7d3e82a461f7250917c1011ad0e88d00e0bf#diff-d47e0a84a3abe960382f7eda4159284c6af01b17e6214268ba54e978f1681414] * [https://github.com/apache/beam/commit/8bd1e208685518148fa574797bd7d7c08d430bf5#diff-d47e0a84a3abe960382f7eda4159284c6af01b17e6214268ba54e978f1681414] > StreamingDataflowWorkerTest.testHotKeyLogging tests are flaky > - > > Key: BEAM-12842 > URL: https://issues.apache.org/jira/browse/BEAM-12842 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow >Reporter: Valentyn Tymofieiev >Assignee: Sam Rohde >Priority: P1 > Labels: test-failures > > Hi Sam, would you be able to please help take a look at these test failures: > https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4455/testReport/junit/org.apache.beam.runners.dataflow.worker/StreamingDataflowWorkerTest/testHotKeyLogging_1___streamingEngine_true___2/ > {noformat} > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testHotKeyLogging[1: > [streamingEngine=true]] > Failing for the past 1 build (Since #4455 ) > Took 15 sec. > Error Message > java.lang.AssertionError: Should have received 2000 more commits beyond 0 > commits already seen, but after 10s have only seen {0=key: "key" > work_token: 0 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data0" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > sharding_key: 12345 > cache_token: 3 > , 1=key: "key" > work_token: 1 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data1" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > sharding_key: 12345 > cache_token: 3 > , 2=key: "key" > work_token: 2 > output_messages { > bundles { > key: "key" > messages { > timestamp: 0 > data: "data2" > metadata: "\017\000\000\000\001\200\000\000\000\000\000\b\272\350\a" > } > messages_ids: "" > } > destination_stream_id: "out" > } > ... > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12764) Can't get attribute 'new_block' on
[ https://issues.apache.org/jira/browse/BEAM-12764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404084#comment-17404084 ] Sam Rohde commented on BEAM-12764: -- Yes, that's fine to go ahead and rollback. > Can't get attribute 'new_block' on - > > Key: BEAM-12764 > URL: https://issues.apache.org/jira/browse/BEAM-12764 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Affects Versions: 2.33.0 >Reporter: Luke Cwik >Assignee: Sam Rohde >Priority: P0 > Labels: currently-failing > Fix For: 2.33.0 > > > I believe https://github.com/apache/beam/pull/15165 broke the PostCommit > https://ci-beam.apache.org/job/beam_PostCommit_Python38/1564/testReport/junit/apache_beam.examples.dataframe.flight_delays_it_test/FlightDelaysTest/test_flight_delays/ > https://ci-beam.apache.org/job/beam_PostCommit_Python38/1564/testReport/junit/apache_beam.examples.dataframe.taxiride_it_test/TaxirideIT/test_enrich/ > https://ci-beam.apache.org/job/beam_PostCommit_Python38/1564/testReport/junit/apache_beam.examples.dataframe.taxiride_it_test/TaxirideIT/test_aggregation/ > Failure from tests looks like: > {noformat} > apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: > Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call > last): File > "/usr/local/lib/python3.8/site-packages/apache_beam/internal/pickler.py", > line 285, in loads return dill.loads(s) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads > return load(file, ignore, **kwds) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load > return Unpickler(file, ignore=ignore, **kwds).load() File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load > obj = StockUnpickler.load(self) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 462, in > find_class return StockUnpickler.find_class(self, module, name) > AttributeError: Can't get attribute 'new_block' on 'pandas.core.internals.blocks' from > '/usr/local/lib/python3.8/site-packages/pandas/core/internals/blocks.py'> > During handling of the above exception, another exception occurred: > Traceback (most recent call last): File > "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line > 651, in do_work work_executor.execute() File > "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line > 179, in execute op.start() File > "apache_beam/runners/worker/operations.py", line 704, in > apache_beam.runners.worker.operations.DoOperation.start File > "apache_beam/runners/worker/operations.py", line 706, in > apache_beam.runners.worker.operations.DoOperation.start File > "apache_beam/runners/worker/operations.py", line 707, in > apache_beam.runners.worker.operations.DoOperation.start File > "apache_beam/runners/worker/operations.py", line 305, in > apache_beam.runners.worker.operations.Operation.start File > "apache_beam/runners/worker/operations.py", line 311, in > apache_beam.runners.worker.operations.Operation.start File > "apache_beam/runners/worker/operations.py", line 653, in > apache_beam.runners.worker.operations.DoOperation.setup File > "apache_beam/runners/worker/operations.py", line 654, in > apache_beam.runners.worker.operations.DoOperation.setup File > "apache_beam/runners/worker/operations.py", line 286, in > apache_beam.runners.worker.operations.Operation.setup File > "apache_beam/runners/worker/operations.py", line 300, in > apache_beam.runners.worker.operations.Operation.setup File > "apache_beam/runners/worker/operations.py", line 793, in > apache_beam.runners.worker.operations.DoOperation._get_runtime_performance_hints >File > "/usr/local/lib/python3.8/site-packages/apache_beam/internal/pickler.py", > line 289, in loads return dill.loads(s) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads > return load(file, ignore, **kwds) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load > return Unpickler(file, ignore=ignore, **kwds).load() File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load > obj = StockUnpickler.load(self) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 462, in > find_class return StockUnpickler.find_class(self, module, name) > AttributeError: Can't get attribute 'new_block' on 'pandas.core.internals.blocks' from > '/usr/local/lib/python3.8/site-packages/pandas/core/internals/blocks.py'> > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12764) Can't get attribute 'new_block' on
[ https://issues.apache.org/jira/browse/BEAM-12764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403397#comment-17403397 ] Sam Rohde commented on BEAM-12764: -- Yeah, that's my guess too. I'll take a look today > Can't get attribute 'new_block' on - > > Key: BEAM-12764 > URL: https://issues.apache.org/jira/browse/BEAM-12764 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Affects Versions: 2.33.0 >Reporter: Luke Cwik >Assignee: Sam Rohde >Priority: P0 > Labels: currently-failing > Fix For: 2.33.0 > > > I believe https://github.com/apache/beam/pull/15165 broke the PostCommit > https://ci-beam.apache.org/job/beam_PostCommit_Python38/1564/testReport/junit/apache_beam.examples.dataframe.flight_delays_it_test/FlightDelaysTest/test_flight_delays/ > https://ci-beam.apache.org/job/beam_PostCommit_Python38/1564/testReport/junit/apache_beam.examples.dataframe.taxiride_it_test/TaxirideIT/test_enrich/ > https://ci-beam.apache.org/job/beam_PostCommit_Python38/1564/testReport/junit/apache_beam.examples.dataframe.taxiride_it_test/TaxirideIT/test_aggregation/ > Failure from tests looks like: > {noformat} > apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: > Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call > last): File > "/usr/local/lib/python3.8/site-packages/apache_beam/internal/pickler.py", > line 285, in loads return dill.loads(s) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads > return load(file, ignore, **kwds) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load > return Unpickler(file, ignore=ignore, **kwds).load() File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load > obj = StockUnpickler.load(self) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 462, in > find_class return StockUnpickler.find_class(self, module, name) > AttributeError: Can't get attribute 'new_block' on 'pandas.core.internals.blocks' from > '/usr/local/lib/python3.8/site-packages/pandas/core/internals/blocks.py'> > During handling of the above exception, another exception occurred: > Traceback (most recent call last): File > "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line > 651, in do_work work_executor.execute() File > "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line > 179, in execute op.start() File > "apache_beam/runners/worker/operations.py", line 704, in > apache_beam.runners.worker.operations.DoOperation.start File > "apache_beam/runners/worker/operations.py", line 706, in > apache_beam.runners.worker.operations.DoOperation.start File > "apache_beam/runners/worker/operations.py", line 707, in > apache_beam.runners.worker.operations.DoOperation.start File > "apache_beam/runners/worker/operations.py", line 305, in > apache_beam.runners.worker.operations.Operation.start File > "apache_beam/runners/worker/operations.py", line 311, in > apache_beam.runners.worker.operations.Operation.start File > "apache_beam/runners/worker/operations.py", line 653, in > apache_beam.runners.worker.operations.DoOperation.setup File > "apache_beam/runners/worker/operations.py", line 654, in > apache_beam.runners.worker.operations.DoOperation.setup File > "apache_beam/runners/worker/operations.py", line 286, in > apache_beam.runners.worker.operations.Operation.setup File > "apache_beam/runners/worker/operations.py", line 300, in > apache_beam.runners.worker.operations.Operation.setup File > "apache_beam/runners/worker/operations.py", line 793, in > apache_beam.runners.worker.operations.DoOperation._get_runtime_performance_hints >File > "/usr/local/lib/python3.8/site-packages/apache_beam/internal/pickler.py", > line 289, in loads return dill.loads(s) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads > return load(file, ignore, **kwds) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load > return Unpickler(file, ignore=ignore, **kwds).load() File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load > obj = StockUnpickler.load(self) File > "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 462, in > find_class return StockUnpickler.find_class(self, module, name) > AttributeError: Can't get attribute 'new_block' on 'pandas.core.internals.blocks' from > '/usr/local/lib/python3.8/site-packages/pandas/core/internals/blocks.py'> > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12698) Some binary operations on DeferredSeries don't work
[ https://issues.apache.org/jira/browse/BEAM-12698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401845#comment-17401845 ] Sam Rohde commented on BEAM-12698: -- I am unable to reproduce on either 2.31.0 or 2.32.0. Can you please post the code that is generating the error? > Some binary operations on DeferredSeries don't work > --- > > Key: BEAM-12698 > URL: https://issues.apache.org/jira/browse/BEAM-12698 > Project: Beam > Issue Type: Bug > Components: dsl-dataframe >Affects Versions: 2.31.0 >Reporter: Yunfeng Zhang >Priority: P2 > > I get an error when trying to multiply two columns of a deferred dataframe: > ib.collect(df.A.mul(df.B)) > {color:#FF}TypeError: can't multiply sequence by non-int of type 'str' > [while running '[35]: > ComputedExpression[mul_Series_140480471782160]/[ComputedExpression[mul_Series_140480471782160]]:140480472023312/FlatMap(evaluate)/FlatMap(evaluate)']{color} > > Similarly, adding a scalar also doesn't work: > ib.collect(df.A.add(1)) > {color:#FF}TypeError: can only concatenate str (not "int") to str [while > running '[39]: > ComputedExpression[add_Series_140480422551952]/[ComputedExpression[get_column_Series_140480422552144], > > ComputedExpression[add_Series_140480422551952]]:140480472568336/FlatMap(evaluate)/FlatMap(evaluate)']{color} > {color:#172b4d}But adding two columns work.{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12531) ib.show does not handle deferred dataframe instances
[ https://issues.apache.org/jira/browse/BEAM-12531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-12531: - Resolution: Fixed Status: Resolved (was: Open) > ib.show does not handle deferred dataframe instances > > > Key: BEAM-12531 > URL: https://issues.apache.org/jira/browse/BEAM-12531 > Project: Beam > Issue Type: Bug > Components: dsl-dataframe >Affects Versions: 2.31.0 >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P2 > Fix For: 2.32.0 > > Time Spent: 6h > Remaining Estimate: 0h > > When passed a deferred dataframe instance (e.g. {{ib.show(counts.nlargest(20, > keep='all'))}}), ib.show calls len() and ends up raising a WontImplementError: > {code} > --- > WontImplementErrorTraceback (most recent call last) > in > > 1 ib.show(counts.nlargest(20, keep='all')) > 2 frames > /usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/utils.py > in run_within_progress_indicator(*args, **kwargs) > 245 def run_within_progress_indicator(*args, **kwargs): > 246 with ProgressIndicator('Processing...', 'Done.'): > --> 247 return func(*args, **kwargs) > 248 > 249 return run_within_progress_indicator > /usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/interactive_beam.py > in show(include_window_info, visualize_data, n, duration, *pcolls) > 441 else: > 442 try: > --> 443 flatten_pcolls.extend(iter(pcoll_container)) > 444 except TypeError: > 445 raise ValueError( > /usr/local/lib/python3.7/dist-packages/apache_beam/dataframe/frames.py in > __len__(self) > 695 "len(df) is not currently supported because it produces a > non-deferred " > 696 "result. Consider using df.length() instead.", > --> 697 reason="non-deferred-result") > 698 > 699 @property # type: ignore > WontImplementError: len(df) is not currently supported because it produces a > non-deferred result. Consider using df.length() instead. > For more information see https://s.apache.org/dataframe-non-deferred-result. > {code} > We should support this case, or at least fail gracefully. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-12388) Improve caching experience on InteractiveRunner with dataframes
[ https://issues.apache.org/jira/browse/BEAM-12388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-12388: Assignee: Sam Rohde > Improve caching experience on InteractiveRunner with dataframes > --- > > Key: BEAM-12388 > URL: https://issues.apache.org/jira/browse/BEAM-12388 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Time Spent: 10m > Remaining Estimate: 0h > > Reusing the default label for to_pcollection when using the interactive > runner results in caching errors when used with multiple pipelines: > > > {{Traceback (most recent call last): > File > "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py", > line 389, in test_dataframes_with_multi_index_get_result > pd.testing.assert_series_equal(df_expected, ib.collect(deferred_df, n=10)) > File > "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/utils.py", > line 247, in run_within_progress_indicator > return func(*args, **kwargs) > File > "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py", > line 579, in collect > recording = recording_manager.record([pcoll], max_n=n, > max_duration=duration) > File > "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py", > line 433, in record > self._watch(pcolls) > File > "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py", > line 306, in _watch > for pcoll in to_pcollection(*watched_dataframes, > always_return_tuple=True): > File > "/home/srohde/Workdir/beam/sdks/python/apache_beam/dataframe/convert.py", > line 196, in to_pcollection > new_results = {p: extract_input(p) > File > "/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py", > line 1086, in __ror__ > return self.transform.__ror__(pvalueish, self.label) > File > "/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py", > line 587, in __ror__ > raise ValueError( > ValueError: Mixing value from different pipelines not allowed.}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12388) Improve caching experience on InteractiveRunner with dataframes
Sam Rohde created BEAM-12388: Summary: Improve caching experience on InteractiveRunner with dataframes Key: BEAM-12388 URL: https://issues.apache.org/jira/browse/BEAM-12388 Project: Beam Issue Type: Improvement Components: runner-py-interactive Reporter: Sam Rohde Assignee: Sam Rohde Reusing the default label for to_pcollection when using the interactive runner results in caching errors when used with multiple pipelines: {{Traceback (most recent call last): File "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py", line 389, in test_dataframes_with_multi_index_get_result pd.testing.assert_series_equal(df_expected, ib.collect(deferred_df, n=10)) File "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/utils.py", line 247, in run_within_progress_indicator return func(*args, **kwargs) File "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py", line 579, in collect recording = recording_manager.record([pcoll], max_n=n, max_duration=duration) File "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py", line 433, in record self._watch(pcolls) File "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py", line 306, in _watch for pcoll in to_pcollection(*watched_dataframes, always_return_tuple=True): File "/home/srohde/Workdir/beam/sdks/python/apache_beam/dataframe/convert.py", line 196, in to_pcollection new_results = {p: extract_input(p) File "/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py", line 1086, in __ror__ return self.transform.__ror__(pvalueish, self.label) File "/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py", line 587, in __ror__ raise ValueError( ValueError: Mixing value from different pipelines not allowed.}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-12246) ib.collect doesn't preserve the index from DeferredDataFrame instances
[ https://issues.apache.org/jira/browse/BEAM-12246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-12246 started by Sam Rohde. > ib.collect doesn't preserve the index from DeferredDataFrame instances > -- > > Key: BEAM-12246 > URL: https://issues.apache.org/jira/browse/BEAM-12246 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.29.0 >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P2 > Labels: dataframe-api > > This happens because it use {{to_pcollection(yield='schemas', > include_indexes=False)}} (the default values for those arguments). To fix > this we should avoid converting to beam schemas and collect the raw > dataframes with {{to_pcollectiion(yield='pandas')}}. > See https://github.com/apache/beam/pull/14356#discussion_r620647659 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10994) Add Hot Key Logging in Dataflow Runner
[ https://issues.apache.org/jira/browse/BEAM-10994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10994: - Status: Resolved (was: Resolved) > Add Hot Key Logging in Dataflow Runner > -- > > Key: BEAM-10994 > URL: https://issues.apache.org/jira/browse/BEAM-10994 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Fix For: 2.29.0 > > Time Spent: 14h 50m > Remaining Estimate: 0h > > This adds the ability for users to enable the logging of hot key content. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10970) apache_beam.runners.interactive.recording_manager_test.ElementStreamTest is flaky on Windows
[ https://issues.apache.org/jira/browse/BEAM-10970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10970: - Resolution: Fixed Status: Resolved (was: Resolved) > apache_beam.runners.interactive.recording_manager_test.ElementStreamTest is > flaky on Windows > > > Key: BEAM-10970 > URL: https://issues.apache.org/jira/browse/BEAM-10970 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Robin Qiu >Assignee: Sam Rohde >Priority: P1 > Labels: currently-failing, flake > > There are two test suites that are failing now: > > Python Unit Tests (macos-latest, 3.7, py37): > [https://github.com/apache/beam/pull/12935/checks?check_run_id=1163443154] > > Python Unit Tests (windows-latest, 3.7, py37): > [https://github.com/apache/beam/pull/12935/checks?check_run_id=1163443198] > > I am not very familiar with the Python tests suites in GitHub check, but I > think this failure should be a release blocker? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10921) Interactive Runner Python unit tests are flaky on Windows
[ https://issues.apache.org/jira/browse/BEAM-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10921: - Resolution: Fixed Status: Resolved (was: Resolved) > Interactive Runner Python unit tests are flaky on Windows > - > > Key: BEAM-10921 > URL: https://issues.apache.org/jira/browse/BEAM-10921 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P1 > Labels: currently-failing, flake > Fix For: Not applicable > > Time Spent: 8h > Remaining Estimate: 0h > > Over the past few days python unit tests have been failing frequently. The > errors always seem to occur when cleaning up the interactive environment: > {code} > ... > [100%] > == FAILURES > === > _ > PipelineInstrumentTest.test_able_to_cache_intermediate_unbounded_source_pcollection > _ > [gw2] win32 -- Python 3.5.4 > d:\a\beam\beam\sdks\python\target\.tox\py35-win\scripts\python.exe > self = > testMethod=test_able_to_cache_intermediate_unbounded_source_pcollection> > def setUp(self): > > ie.new_env() > apache_beam\runners\interactive\pipeline_instrument_test.py:46: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam\runners\interactive\interactive_environment.py:117: in new_env > _interactive_beam_env.cleanup() > apache_beam\runners\interactive\interactive_environment.py:273: in cleanup > cache_manager.cleanup() > apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup > shutil.rmtree(self._cache_dir) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:494: in rmtree > return _rmtree_unsafe(path, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:384: in > _rmtree_unsafe > _rmtree_unsafe(fullname, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:389: in > _rmtree_unsafe > onerror(os.unlink, fullname, sys.exc_info()) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > path = > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full' > onerror = .onerror at 0x01D6C3E5C7B8> > def _rmtree_unsafe(path, onerror): > try: > if os.path.islink(path): > # symlinks to directories are forbidden, see bug #1669 > raise OSError("Cannot call rmtree on a symbolic link") > except OSError: > onerror(os.path.islink, path, sys.exc_info()) > # can't continue even if onerror hook returns > return > names = [] > try: > names = os.listdir(path) > except OSError: > onerror(os.listdir, path, sys.exc_info()) > for name in names: > fullname = os.path.join(path, name) > try: > mode = os.lstat(fullname).st_mode > except OSError: > mode = 0 > if stat.S_ISDIR(mode): > _rmtree_unsafe(fullname, onerror) > else: > try: > > os.unlink(fullname) > E PermissionError: [WinError 32] The process cannot access > the file because it is being used by another process: > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full\\ac8879590f-2021876280456-2021876278608-2021914046928' > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:387: PermissionError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9687) Names of temporary files created by interactive runner include characters invalid on some platforms.
[ https://issues.apache.org/jira/browse/BEAM-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-9687: Resolution: Fixed Status: Resolved (was: Resolved) > Names of temporary files created by interactive runner include characters > invalid on some platforms. > > > Key: BEAM-9687 > URL: https://issues.apache.org/jira/browse/BEAM-9687 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Assignee: Sam Rohde >Priority: P3 > Fix For: 2.21.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Temporary files created by interactive runner in streaming scenarios include > pipe '|' characters, which are not allowed in filenames on Windows platform. > This causes test failures on a Windows platform: > > python setup.py nosetests --tests > apache_beam/runners/interactive/pipeline_instrument_test.py:PipelineInstrumentTest.test_instrument_example_unbounded_pipeline_to_multiple_read_cache > == > ERROR: Tests that the instrumenter works for multiple unbounded sources. > -- > Traceback (most recent call last): > File > "C:\projects\apache_beam\runners\interactive\pipeline_instrument_test.py", > line 698, in test_instrument_example_unbounded_pipeline_to_multiple_read_cache > self._mock_write_cache([b''], cache_key) > File > "C:\projects\apache_beam\runners\interactive\pipeline_instrument_test.py", > line 227, in _mock_write_cache > ie.current_env().cache_manager().write(values, *labels) > File > "C:\projects\apache_beam\runners\interactive\caching\streaming_cache.py", > line 323, in write > with open(filepath, 'ab') as f: > IOError: [Errno 22] invalid mode ('ab') or filename: > 'c:\\users\\deft-t~1\\appdata\\local\\temp\\2\\interactive-temp-xwg5qi\\full\\pcoll_1|149781752|149781920|1 > 49231600' > > [1] > https://github.com/apache/beam/blob/e6b37c44d542969b6104fc97ee6f25b6f7d2ddba/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py#L323 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10994) Add Hot Key Logging in Dataflow Runner
[ https://issues.apache.org/jira/browse/BEAM-10994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10994: - Fix Version/s: 2.29.0 Resolution: Fixed Status: Resolved (was: Open) > Add Hot Key Logging in Dataflow Runner > -- > > Key: BEAM-10994 > URL: https://issues.apache.org/jira/browse/BEAM-10994 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Fix For: 2.29.0 > > Time Spent: 14h 50m > Remaining Estimate: 0h > > This adds the ability for users to enable the logging of hot key content. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10956) PipelineInstrumentTest is flaky
[ https://issues.apache.org/jira/browse/BEAM-10956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10956: - Resolution: Fixed Status: Resolved (was: Resolved) > PipelineInstrumentTest is flaky > --- > > Key: BEAM-10956 > URL: https://issues.apache.org/jira/browse/BEAM-10956 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Time Spent: 2h 20m > Remaining Estimate: 0h > > h3. Error Message > AttributeError: 'Pipeline' object has no attribute 'pipeline' > h3. Stacktrace > self = > testMethod=test_pipeline_pruned_when_input_pcoll_is_cached> def > test_pipeline_pruned_when_input_pcoll_is_cached(self): > user_pipeline, > init_pcoll, _ = self._example_pipeline() > apache_beam/runners/interactive/pipeline_instrument_test.py:765: _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > apache_beam/runners/interactive/pipeline_instrument_test.py:223: in > _example_pipeline ie.current_env().set_cache_manager(InMemoryCache(), p) > apache_beam/runners/interactive/interactive_environment.py:324: in > set_cache_manager self.cleanup(pipeline) > apache_beam/runners/interactive/interactive_environment.py:277: in cleanup > self.evict_computed_pcollections(pipeline) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = > object at 0x7f8698c89cf8> pipeline = at 0x7f86cc1539e8> def evict_computed_pcollections(self, pipeline=None): > """Evicts all computed PCollections for the given pipeline. If no pipeline is > specified, evicts for all pipelines. """ if pipeline: discarded = set() for > pcoll in self._computed_pcolls: > if pcoll.pipeline is pipeline: E > AttributeError: 'Pipeline' object has no attribute 'pipeline' > apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10954) PipelineInstrumentTest.test_not_has_unbounded_source flaky on ubuntu
[ https://issues.apache.org/jira/browse/BEAM-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10954: - Resolution: Fixed Status: Resolved (was: Resolved) > PipelineInstrumentTest.test_not_has_unbounded_source flaky on ubuntu > > > Key: BEAM-10954 > URL: https://issues.apache.org/jira/browse/BEAM-10954 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Kyle Weaver >Assignee: Sam Rohde >Priority: P1 > Labels: flake, flaky-test > > https://github.com/apache/beam/pull/12910/checks?check_run_id=1152559818 > 2020-09-23T01:21:12.2148103Z _ > PipelineInstrumentTest.test_not_has_unbounded_source _ > 2020-09-23T01:21:12.2149918Z [gw4] linux -- Python 3.6.12 > /home/runner/work/beam/beam/sdks/python/target/.tox/py36/bin/python > 2020-09-23T01:21:12.2150650Z > 2020-09-23T01:21:12.2151861Z self = > testMethod=test_not_has_unbounded_source> > 2020-09-23T01:21:12.2152973Z > 2020-09-23T01:21:12.2153509Z def test_not_has_unbounded_source(self): > 2020-09-23T01:21:12.2154356Z p = > beam.Pipeline(interactive_runner.InteractiveRunner()) > 2020-09-23T01:21:12.2155524Z > > ie.current_env().set_cache_manager(InMemoryCache(), p) > 2020-09-23T01:21:12.2156066Z > 2020-09-23T01:21:12.2156694Z > apache_beam/runners/interactive/pipeline_instrument_test.py:173: > 2020-09-23T01:21:12.2157514Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-09-23T01:21:12.2158325Z > apache_beam/runners/interactive/interactive_environment.py:324: in > set_cache_manager > 2020-09-23T01:21:12.2159184Z self.cleanup(pipeline) > 2020-09-23T01:21:12.2159949Z > apache_beam/runners/interactive/interactive_environment.py:277: in cleanup > 2020-09-23T01:21:12.2160810Z self.evict_computed_pcollections(pipeline) > 2020-09-23T01:21:12.2161404Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-09-23T01:21:12.2161789Z > 2020-09-23T01:21:12.2163024Z self = > object at 0x7fb3459b02e8> > 2020-09-23T01:21:12.2164547Z pipeline = at 0x7fb344f774a8> > 2020-09-23T01:21:12.2165174Z > 2020-09-23T01:21:12.2165764Z def evict_computed_pcollections(self, > pipeline=None): > 2020-09-23T01:21:12.2166603Z """Evicts all computed PCollections for > the given pipeline. If no pipeline > 2020-09-23T01:21:12.2167382Z is specified, evicts for all pipelines. > 2020-09-23T01:21:12.2167916Z """ > 2020-09-23T01:21:12.2168492Z if pipeline: > 2020-09-23T01:21:12.2169634Z discarded = set() > 2020-09-23T01:21:12.2170161Z for pcoll in self._computed_pcolls: > 2020-09-23T01:21:12.2170725Z > if pcoll.pipeline is pipeline: > 2020-09-23T01:21:12.2171757Z E AttributeError: 'Pipeline' object has > no attribute 'pipeline' > 2020-09-23T01:21:12.2172289Z > 2020-09-23T01:21:12.2172939Z > apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11151) Add ToString support
[ https://issues.apache.org/jira/browse/BEAM-11151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-11151: - Resolution: Fixed Status: Resolved (was: Open) > Add ToString support > > > Key: BEAM-11151 > URL: https://issues.apache.org/jira/browse/BEAM-11151 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Sam Rohde >Priority: P2 > Time Spent: 4.5h > Remaining Estimate: 0h > > This tracks work on adding the ToString to Beam. This new transform takes an > element and returns a human-readable string back. > design-doc: > https://docs.google.com/document/d/1v7iWj0LIum04mYwRM_Cvze915tATwmEzLrqj_uVBkCE/edit# -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-11151) Add ToString support
[ https://issues.apache.org/jira/browse/BEAM-11151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17313996#comment-17313996 ] Sam Rohde commented on BEAM-11151: -- Still requires Go support > Add ToString support > > > Key: BEAM-11151 > URL: https://issues.apache.org/jira/browse/BEAM-11151 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Labels: stale-assigned > Time Spent: 4.5h > Remaining Estimate: 0h > > This tracks work on adding the ToString to Beam. This new transform takes an > element and returns a human-readable string back. > design-doc: > https://docs.google.com/document/d/1v7iWj0LIum04mYwRM_Cvze915tATwmEzLrqj_uVBkCE/edit# -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-11855) ib.collect should accept DeferredDataFrame instances
[ https://issues.apache.org/jira/browse/BEAM-11855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-11855: Assignee: Sam Rohde > ib.collect should accept DeferredDataFrame instances > > > Key: BEAM-11855 > URL: https://issues.apache.org/jira/browse/BEAM-11855 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P2 > > When given a DeferredFrame, ib.collect should convert it to a PCollection and > collect component DataFrames into a single DataFrame in-memory in the > notebook environment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10603) Large Source Recording for Interarctive Runner
[ https://issues.apache.org/jira/browse/BEAM-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10603: - Resolution: Fixed Status: Resolved (was: Open) > Large Source Recording for Interarctive Runner > -- > > Key: BEAM-10603 > URL: https://issues.apache.org/jira/browse/BEAM-10603 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Priority: P2 > Time Spent: 44h 20m > Remaining Estimate: 0h > > This changes the Interactive Runner to create a long-running background > caching job that is decoupled from the user pipeline. When a user invokes a > collect() or show(), it will read from the cache to compute the requested > PCollections. Previously, the user would have to wait for the cache to be > fully written to. This allows for the user to start experimenting immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10603) Large Source Recording for Interarctive Runner
[ https://issues.apache.org/jira/browse/BEAM-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306589#comment-17306589 ] Sam Rohde commented on BEAM-10603: -- Yep, thanks for the ping > Large Source Recording for Interarctive Runner > -- > > Key: BEAM-10603 > URL: https://issues.apache.org/jira/browse/BEAM-10603 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Priority: P2 > Time Spent: 44h 20m > Remaining Estimate: 0h > > This changes the Interactive Runner to create a long-running background > caching job that is decoupled from the user pipeline. When a user invokes a > collect() or show(), it will read from the cache to compute the requested > PCollections. Previously, the user would have to wait for the cache to be > fully written to. This allows for the user to start experimenting immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-11151) Add ToString support
[ https://issues.apache.org/jira/browse/BEAM-11151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-11151: Assignee: Sam Rohde > Add ToString support > > > Key: BEAM-11151 > URL: https://issues.apache.org/jira/browse/BEAM-11151 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Time Spent: 2h 50m > Remaining Estimate: 0h > > This tracks work on adding the ToString to Beam. This new transform takes an > element and returns a human-readable string back. > design-doc: > https://docs.google.com/document/d/1v7iWj0LIum04mYwRM_Cvze915tATwmEzLrqj_uVBkCE/edit# -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-11666) apache_beam.runners.interactive.recording_manager_test.RecordingManagerTest.test_basic_execution is flaky
[ https://issues.apache.org/jira/browse/BEAM-11666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-11666: Assignee: Sam Rohde > apache_beam.runners.interactive.recording_manager_test.RecordingManagerTest.test_basic_execution > is flaky > - > > Key: BEAM-11666 > URL: https://issues.apache.org/jira/browse/BEAM-11666 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Valentyn Tymofieiev >Assignee: Sam Rohde >Priority: P1 > > Happened in: https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/16819 > {noformat} > self = > testMethod=test_basic_execution> > @unittest.skipIf( > sys.version_info < (3, 6, 0), > 'This test requires at least Python 3.6 to work.') > def test_basic_execution(self): > """A basic pipeline to be used as a smoke test.""" > > # Create the pipeline that will emit 0, 1, 2. > p = beam.Pipeline(InteractiveRunner()) > numbers = p | 'numbers' >> beam.Create([0, 1, 2]) > letters = p | 'letters' >> beam.Create(['a', 'b', 'c']) > > # Watch the pipeline and PCollections. This is normally done in a > notebook > # environment automatically, but we have to do it manually here. > ib.watch(locals()) > ie.current_env().track_user_pipelines() > > # Create the recording objects. By calling `record` a new > PipelineFragment > # is started to compute the given PCollections and cache to disk. > rm = RecordingManager(p) > > numbers_recording = rm.record([numbers], max_n=3, max_duration=500) > apache_beam/runners/interactive/recording_manager_test.py:331: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/runners/interactive/recording_manager.py:435: in record > self._clear(pipeline_instrument) > apache_beam/runners/interactive/recording_manager.py:319: in _clear > self._clear_pcolls(cache_manager, set(to_clear)) > apache_beam/runners/interactive/recording_manager.py:323: in _clear_pcolls > cache_manager.clear('full', pc) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > self = > object at 0x7fa3903ac208> > labels = ('full', > 'ee5c35ce3d-140340882711664-140340882712560-140340476166608') > def clear(self, *labels): > # type (*str) -> Boolean > > """Clears the cache entry of the given labels and returns True on > success. > > Args: > value: An encodable (with corresponding PCoder) value > *labels: List of labels for PCollection instance > """ > > raise NotImplementedError > E NotImplementedError > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-10994) Add Hot Key Logging in Dataflow Runner
[ https://issues.apache.org/jira/browse/BEAM-10994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-10994: Assignee: Sam Rohde > Add Hot Key Logging in Dataflow Runner > -- > > Key: BEAM-10994 > URL: https://issues.apache.org/jira/browse/BEAM-10994 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Labels: stale-P2 > Time Spent: 6h 10m > Remaining Estimate: 0h > > This adds the ability for users to enable the logging of hot key content. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10994) Add Hot Key Logging in Dataflow Runner
[ https://issues.apache.org/jira/browse/BEAM-10994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10994: - Labels: (was: stale-P2) > Add Hot Key Logging in Dataflow Runner > -- > > Key: BEAM-10994 > URL: https://issues.apache.org/jira/browse/BEAM-10994 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Time Spent: 6h 10m > Remaining Estimate: 0h > > This adds the ability for users to enable the logging of hot key content. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-10921) Python unit tests on windows flaky
[ https://issues.apache.org/jira/browse/BEAM-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-10921: Assignee: Ning Kang (was: Sam Rohde) > Python unit tests on windows flaky > -- > > Key: BEAM-10921 > URL: https://issues.apache.org/jira/browse/BEAM-10921 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Assignee: Ning Kang >Priority: P1 > Labels: currently-failing, flake > Fix For: Not applicable > > Time Spent: 1h 50m > Remaining Estimate: 0h > > Over the past few days python unit tests have been failing frequently. The > errors always seem to occur when cleaning up the interactive environment: > {code} > ... > [100%] > == FAILURES > === > _ > PipelineInstrumentTest.test_able_to_cache_intermediate_unbounded_source_pcollection > _ > [gw2] win32 -- Python 3.5.4 > d:\a\beam\beam\sdks\python\target\.tox\py35-win\scripts\python.exe > self = > testMethod=test_able_to_cache_intermediate_unbounded_source_pcollection> > def setUp(self): > > ie.new_env() > apache_beam\runners\interactive\pipeline_instrument_test.py:46: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam\runners\interactive\interactive_environment.py:117: in new_env > _interactive_beam_env.cleanup() > apache_beam\runners\interactive\interactive_environment.py:273: in cleanup > cache_manager.cleanup() > apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup > shutil.rmtree(self._cache_dir) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:494: in rmtree > return _rmtree_unsafe(path, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:384: in > _rmtree_unsafe > _rmtree_unsafe(fullname, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:389: in > _rmtree_unsafe > onerror(os.unlink, fullname, sys.exc_info()) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > path = > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full' > onerror = .onerror at 0x01D6C3E5C7B8> > def _rmtree_unsafe(path, onerror): > try: > if os.path.islink(path): > # symlinks to directories are forbidden, see bug #1669 > raise OSError("Cannot call rmtree on a symbolic link") > except OSError: > onerror(os.path.islink, path, sys.exc_info()) > # can't continue even if onerror hook returns > return > names = [] > try: > names = os.listdir(path) > except OSError: > onerror(os.listdir, path, sys.exc_info()) > for name in names: > fullname = os.path.join(path, name) > try: > mode = os.lstat(fullname).st_mode > except OSError: > mode = 0 > if stat.S_ISDIR(mode): > _rmtree_unsafe(fullname, onerror) > else: > try: > > os.unlink(fullname) > E PermissionError: [WinError 32] The process cannot access > the file because it is being used by another process: > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full\\ac8879590f-2021876280456-2021876278608-2021914046928' > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:387: PermissionError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-11151) Add ToString support
Sam Rohde created BEAM-11151: Summary: Add ToString support Key: BEAM-11151 URL: https://issues.apache.org/jira/browse/BEAM-11151 Project: Beam Issue Type: New Feature Components: beam-model Reporter: Sam Rohde Assignee: Sam Rohde This tracks work on adding the ToString to Beam. This new transform takes an element and returns a human-readable string back. design-doc: https://docs.google.com/document/d/1v7iWj0LIum04mYwRM_Cvze915tATwmEzLrqj_uVBkCE/edit# -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10970) apache_beam.runners.interactive.recording_manager_test.ElementStreamTest is flaky on Windows
[ https://issues.apache.org/jira/browse/BEAM-10970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17219235#comment-17219235 ] Sam Rohde commented on BEAM-10970: -- This is a dupe of https://issues.apache.org/jira/browse/BEAM-10921 > apache_beam.runners.interactive.recording_manager_test.ElementStreamTest is > flaky on Windows > > > Key: BEAM-10970 > URL: https://issues.apache.org/jira/browse/BEAM-10970 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Robin Qiu >Assignee: Sam Rohde >Priority: P1 > Labels: currently-failing > > There are two test suites that are failing now: > > Python Unit Tests (macos-latest, 3.7, py37): > [https://github.com/apache/beam/pull/12935/checks?check_run_id=1163443154] > > Python Unit Tests (windows-latest, 3.7, py37): > [https://github.com/apache/beam/pull/12935/checks?check_run_id=1163443198] > > I am not very familiar with the Python tests suites in GitHub check, but I > think this failure should be a release blocker? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10970) apache_beam.runners.interactive.recording_manager_test.ElementStreamTest is flaky on Windows
[ https://issues.apache.org/jira/browse/BEAM-10970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10970: - Status: Resolved (was: Resolved) > apache_beam.runners.interactive.recording_manager_test.ElementStreamTest is > flaky on Windows > > > Key: BEAM-10970 > URL: https://issues.apache.org/jira/browse/BEAM-10970 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Robin Qiu >Assignee: Sam Rohde >Priority: P1 > Labels: currently-failing > > There are two test suites that are failing now: > > Python Unit Tests (macos-latest, 3.7, py37): > [https://github.com/apache/beam/pull/12935/checks?check_run_id=1163443154] > > Python Unit Tests (windows-latest, 3.7, py37): > [https://github.com/apache/beam/pull/12935/checks?check_run_id=1163443198] > > I am not very familiar with the Python tests suites in GitHub check, but I > think this failure should be a release blocker? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10970) apache_beam.runners.interactive.recording_manager_test.ElementStreamTest is flaky on Windows
[ https://issues.apache.org/jira/browse/BEAM-10970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10970: - Status: Resolved (was: Triage Needed) > apache_beam.runners.interactive.recording_manager_test.ElementStreamTest is > flaky on Windows > > > Key: BEAM-10970 > URL: https://issues.apache.org/jira/browse/BEAM-10970 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Robin Qiu >Assignee: Sam Rohde >Priority: P1 > Labels: currently-failing > > There are two test suites that are failing now: > > Python Unit Tests (macos-latest, 3.7, py37): > [https://github.com/apache/beam/pull/12935/checks?check_run_id=1163443154] > > Python Unit Tests (windows-latest, 3.7, py37): > [https://github.com/apache/beam/pull/12935/checks?check_run_id=1163443198] > > I am not very familiar with the Python tests suites in GitHub check, but I > think this failure should be a release blocker? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10921) Python unit tests on windows flaky
[ https://issues.apache.org/jira/browse/BEAM-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217151#comment-17217151 ] Sam Rohde commented on BEAM-10921: -- Ack, taking a look > Python unit tests on windows flaky > -- > > Key: BEAM-10921 > URL: https://issues.apache.org/jira/browse/BEAM-10921 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P1 > Labels: currently-failing, flake > Fix For: Not applicable > > Time Spent: 1h 50m > Remaining Estimate: 0h > > Over the past few days python unit tests have been failing frequently. The > errors always seem to occur when cleaning up the interactive environment: > {code} > ... > [100%] > == FAILURES > === > _ > PipelineInstrumentTest.test_able_to_cache_intermediate_unbounded_source_pcollection > _ > [gw2] win32 -- Python 3.5.4 > d:\a\beam\beam\sdks\python\target\.tox\py35-win\scripts\python.exe > self = > testMethod=test_able_to_cache_intermediate_unbounded_source_pcollection> > def setUp(self): > > ie.new_env() > apache_beam\runners\interactive\pipeline_instrument_test.py:46: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam\runners\interactive\interactive_environment.py:117: in new_env > _interactive_beam_env.cleanup() > apache_beam\runners\interactive\interactive_environment.py:273: in cleanup > cache_manager.cleanup() > apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup > shutil.rmtree(self._cache_dir) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:494: in rmtree > return _rmtree_unsafe(path, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:384: in > _rmtree_unsafe > _rmtree_unsafe(fullname, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:389: in > _rmtree_unsafe > onerror(os.unlink, fullname, sys.exc_info()) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > path = > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full' > onerror = .onerror at 0x01D6C3E5C7B8> > def _rmtree_unsafe(path, onerror): > try: > if os.path.islink(path): > # symlinks to directories are forbidden, see bug #1669 > raise OSError("Cannot call rmtree on a symbolic link") > except OSError: > onerror(os.path.islink, path, sys.exc_info()) > # can't continue even if onerror hook returns > return > names = [] > try: > names = os.listdir(path) > except OSError: > onerror(os.listdir, path, sys.exc_info()) > for name in names: > fullname = os.path.join(path, name) > try: > mode = os.lstat(fullname).st_mode > except OSError: > mode = 0 > if stat.S_ISDIR(mode): > _rmtree_unsafe(fullname, onerror) > else: > try: > > os.unlink(fullname) > E PermissionError: [WinError 32] The process cannot access > the file because it is being used by another process: > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full\\ac8879590f-2021876280456-2021876278608-2021914046928' > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:387: PermissionError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10994) Add Hot Key Logging in Dataflow Runner
Sam Rohde created BEAM-10994: Summary: Add Hot Key Logging in Dataflow Runner Key: BEAM-10994 URL: https://issues.apache.org/jira/browse/BEAM-10994 Project: Beam Issue Type: New Feature Components: runner-dataflow Reporter: Sam Rohde Assignee: Sam Rohde This adds the ability for users to enable the logging of hot key content. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10956) PipelineInstrumentTest is flaky
[ https://issues.apache.org/jira/browse/BEAM-10956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10956: - Status: Resolved (was: Open) > PipelineInstrumentTest is flaky > --- > > Key: BEAM-10956 > URL: https://issues.apache.org/jira/browse/BEAM-10956 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Time Spent: 1h 10m > Remaining Estimate: 0h > > h3. Error Message > AttributeError: 'Pipeline' object has no attribute 'pipeline' > h3. Stacktrace > self = > testMethod=test_pipeline_pruned_when_input_pcoll_is_cached> def > test_pipeline_pruned_when_input_pcoll_is_cached(self): > user_pipeline, > init_pcoll, _ = self._example_pipeline() > apache_beam/runners/interactive/pipeline_instrument_test.py:765: _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > apache_beam/runners/interactive/pipeline_instrument_test.py:223: in > _example_pipeline ie.current_env().set_cache_manager(InMemoryCache(), p) > apache_beam/runners/interactive/interactive_environment.py:324: in > set_cache_manager self.cleanup(pipeline) > apache_beam/runners/interactive/interactive_environment.py:277: in cleanup > self.evict_computed_pcollections(pipeline) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = > object at 0x7f8698c89cf8> pipeline = at 0x7f86cc1539e8> def evict_computed_pcollections(self, pipeline=None): > """Evicts all computed PCollections for the given pipeline. If no pipeline is > specified, evicts for all pipelines. """ if pipeline: discarded = set() for > pcoll in self._computed_pcolls: > if pcoll.pipeline is pipeline: E > AttributeError: 'Pipeline' object has no attribute 'pipeline' > apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10956) PipelineInstrumentTest is flaky
[ https://issues.apache.org/jira/browse/BEAM-10956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10956: - Status: Resolved (was: Resolved) > PipelineInstrumentTest is flaky > --- > > Key: BEAM-10956 > URL: https://issues.apache.org/jira/browse/BEAM-10956 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Time Spent: 1h 10m > Remaining Estimate: 0h > > h3. Error Message > AttributeError: 'Pipeline' object has no attribute 'pipeline' > h3. Stacktrace > self = > testMethod=test_pipeline_pruned_when_input_pcoll_is_cached> def > test_pipeline_pruned_when_input_pcoll_is_cached(self): > user_pipeline, > init_pcoll, _ = self._example_pipeline() > apache_beam/runners/interactive/pipeline_instrument_test.py:765: _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > apache_beam/runners/interactive/pipeline_instrument_test.py:223: in > _example_pipeline ie.current_env().set_cache_manager(InMemoryCache(), p) > apache_beam/runners/interactive/interactive_environment.py:324: in > set_cache_manager self.cleanup(pipeline) > apache_beam/runners/interactive/interactive_environment.py:277: in cleanup > self.evict_computed_pcollections(pipeline) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = > object at 0x7f8698c89cf8> pipeline = at 0x7f86cc1539e8> def evict_computed_pcollections(self, pipeline=None): > """Evicts all computed PCollections for the given pipeline. If no pipeline is > specified, evicts for all pipelines. """ if pipeline: discarded = set() for > pcoll in self._computed_pcolls: > if pcoll.pipeline is pipeline: E > AttributeError: 'Pipeline' object has no attribute 'pipeline' > apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10954) PipelineInstrumentTest.test_not_has_unbounded_source flaky on ubuntu
[ https://issues.apache.org/jira/browse/BEAM-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201170#comment-17201170 ] Sam Rohde commented on BEAM-10954: -- Dupe of https://issues.apache.org/jira/browse/BEAM-10956 which has a PR attached > PipelineInstrumentTest.test_not_has_unbounded_source flaky on ubuntu > > > Key: BEAM-10954 > URL: https://issues.apache.org/jira/browse/BEAM-10954 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Kyle Weaver >Assignee: Sam Rohde >Priority: P1 > Labels: flaky-test > > https://github.com/apache/beam/pull/12910/checks?check_run_id=1152559818 > 2020-09-23T01:21:12.2148103Z _ > PipelineInstrumentTest.test_not_has_unbounded_source _ > 2020-09-23T01:21:12.2149918Z [gw4] linux -- Python 3.6.12 > /home/runner/work/beam/beam/sdks/python/target/.tox/py36/bin/python > 2020-09-23T01:21:12.2150650Z > 2020-09-23T01:21:12.2151861Z self = > testMethod=test_not_has_unbounded_source> > 2020-09-23T01:21:12.2152973Z > 2020-09-23T01:21:12.2153509Z def test_not_has_unbounded_source(self): > 2020-09-23T01:21:12.2154356Z p = > beam.Pipeline(interactive_runner.InteractiveRunner()) > 2020-09-23T01:21:12.2155524Z > > ie.current_env().set_cache_manager(InMemoryCache(), p) > 2020-09-23T01:21:12.2156066Z > 2020-09-23T01:21:12.2156694Z > apache_beam/runners/interactive/pipeline_instrument_test.py:173: > 2020-09-23T01:21:12.2157514Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-09-23T01:21:12.2158325Z > apache_beam/runners/interactive/interactive_environment.py:324: in > set_cache_manager > 2020-09-23T01:21:12.2159184Z self.cleanup(pipeline) > 2020-09-23T01:21:12.2159949Z > apache_beam/runners/interactive/interactive_environment.py:277: in cleanup > 2020-09-23T01:21:12.2160810Z self.evict_computed_pcollections(pipeline) > 2020-09-23T01:21:12.2161404Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-09-23T01:21:12.2161789Z > 2020-09-23T01:21:12.2163024Z self = > object at 0x7fb3459b02e8> > 2020-09-23T01:21:12.2164547Z pipeline = at 0x7fb344f774a8> > 2020-09-23T01:21:12.2165174Z > 2020-09-23T01:21:12.2165764Z def evict_computed_pcollections(self, > pipeline=None): > 2020-09-23T01:21:12.2166603Z """Evicts all computed PCollections for > the given pipeline. If no pipeline > 2020-09-23T01:21:12.2167382Z is specified, evicts for all pipelines. > 2020-09-23T01:21:12.2167916Z """ > 2020-09-23T01:21:12.2168492Z if pipeline: > 2020-09-23T01:21:12.2169634Z discarded = set() > 2020-09-23T01:21:12.2170161Z for pcoll in self._computed_pcolls: > 2020-09-23T01:21:12.2170725Z > if pcoll.pipeline is pipeline: > 2020-09-23T01:21:12.2171757Z E AttributeError: 'Pipeline' object has > no attribute 'pipeline' > 2020-09-23T01:21:12.2172289Z > 2020-09-23T01:21:12.2172939Z > apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10954) PipelineInstrumentTest.test_not_has_unbounded_source flaky on ubuntu
[ https://issues.apache.org/jira/browse/BEAM-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10954: - Status: Resolved (was: Resolved) > PipelineInstrumentTest.test_not_has_unbounded_source flaky on ubuntu > > > Key: BEAM-10954 > URL: https://issues.apache.org/jira/browse/BEAM-10954 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Kyle Weaver >Assignee: Sam Rohde >Priority: P1 > Labels: flaky-test > > https://github.com/apache/beam/pull/12910/checks?check_run_id=1152559818 > 2020-09-23T01:21:12.2148103Z _ > PipelineInstrumentTest.test_not_has_unbounded_source _ > 2020-09-23T01:21:12.2149918Z [gw4] linux -- Python 3.6.12 > /home/runner/work/beam/beam/sdks/python/target/.tox/py36/bin/python > 2020-09-23T01:21:12.2150650Z > 2020-09-23T01:21:12.2151861Z self = > testMethod=test_not_has_unbounded_source> > 2020-09-23T01:21:12.2152973Z > 2020-09-23T01:21:12.2153509Z def test_not_has_unbounded_source(self): > 2020-09-23T01:21:12.2154356Z p = > beam.Pipeline(interactive_runner.InteractiveRunner()) > 2020-09-23T01:21:12.2155524Z > > ie.current_env().set_cache_manager(InMemoryCache(), p) > 2020-09-23T01:21:12.2156066Z > 2020-09-23T01:21:12.2156694Z > apache_beam/runners/interactive/pipeline_instrument_test.py:173: > 2020-09-23T01:21:12.2157514Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-09-23T01:21:12.2158325Z > apache_beam/runners/interactive/interactive_environment.py:324: in > set_cache_manager > 2020-09-23T01:21:12.2159184Z self.cleanup(pipeline) > 2020-09-23T01:21:12.2159949Z > apache_beam/runners/interactive/interactive_environment.py:277: in cleanup > 2020-09-23T01:21:12.2160810Z self.evict_computed_pcollections(pipeline) > 2020-09-23T01:21:12.2161404Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-09-23T01:21:12.2161789Z > 2020-09-23T01:21:12.2163024Z self = > object at 0x7fb3459b02e8> > 2020-09-23T01:21:12.2164547Z pipeline = at 0x7fb344f774a8> > 2020-09-23T01:21:12.2165174Z > 2020-09-23T01:21:12.2165764Z def evict_computed_pcollections(self, > pipeline=None): > 2020-09-23T01:21:12.2166603Z """Evicts all computed PCollections for > the given pipeline. If no pipeline > 2020-09-23T01:21:12.2167382Z is specified, evicts for all pipelines. > 2020-09-23T01:21:12.2167916Z """ > 2020-09-23T01:21:12.2168492Z if pipeline: > 2020-09-23T01:21:12.2169634Z discarded = set() > 2020-09-23T01:21:12.2170161Z for pcoll in self._computed_pcolls: > 2020-09-23T01:21:12.2170725Z > if pcoll.pipeline is pipeline: > 2020-09-23T01:21:12.2171757Z E AttributeError: 'Pipeline' object has > no attribute 'pipeline' > 2020-09-23T01:21:12.2172289Z > 2020-09-23T01:21:12.2172939Z > apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10954) PipelineInstrumentTest.test_not_has_unbounded_source flaky on ubuntu
[ https://issues.apache.org/jira/browse/BEAM-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10954: - Status: Resolved (was: Open) > PipelineInstrumentTest.test_not_has_unbounded_source flaky on ubuntu > > > Key: BEAM-10954 > URL: https://issues.apache.org/jira/browse/BEAM-10954 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Kyle Weaver >Assignee: Sam Rohde >Priority: P1 > Labels: flaky-test > > https://github.com/apache/beam/pull/12910/checks?check_run_id=1152559818 > 2020-09-23T01:21:12.2148103Z _ > PipelineInstrumentTest.test_not_has_unbounded_source _ > 2020-09-23T01:21:12.2149918Z [gw4] linux -- Python 3.6.12 > /home/runner/work/beam/beam/sdks/python/target/.tox/py36/bin/python > 2020-09-23T01:21:12.2150650Z > 2020-09-23T01:21:12.2151861Z self = > testMethod=test_not_has_unbounded_source> > 2020-09-23T01:21:12.2152973Z > 2020-09-23T01:21:12.2153509Z def test_not_has_unbounded_source(self): > 2020-09-23T01:21:12.2154356Z p = > beam.Pipeline(interactive_runner.InteractiveRunner()) > 2020-09-23T01:21:12.2155524Z > > ie.current_env().set_cache_manager(InMemoryCache(), p) > 2020-09-23T01:21:12.2156066Z > 2020-09-23T01:21:12.2156694Z > apache_beam/runners/interactive/pipeline_instrument_test.py:173: > 2020-09-23T01:21:12.2157514Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-09-23T01:21:12.2158325Z > apache_beam/runners/interactive/interactive_environment.py:324: in > set_cache_manager > 2020-09-23T01:21:12.2159184Z self.cleanup(pipeline) > 2020-09-23T01:21:12.2159949Z > apache_beam/runners/interactive/interactive_environment.py:277: in cleanup > 2020-09-23T01:21:12.2160810Z self.evict_computed_pcollections(pipeline) > 2020-09-23T01:21:12.2161404Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-09-23T01:21:12.2161789Z > 2020-09-23T01:21:12.2163024Z self = > object at 0x7fb3459b02e8> > 2020-09-23T01:21:12.2164547Z pipeline = at 0x7fb344f774a8> > 2020-09-23T01:21:12.2165174Z > 2020-09-23T01:21:12.2165764Z def evict_computed_pcollections(self, > pipeline=None): > 2020-09-23T01:21:12.2166603Z """Evicts all computed PCollections for > the given pipeline. If no pipeline > 2020-09-23T01:21:12.2167382Z is specified, evicts for all pipelines. > 2020-09-23T01:21:12.2167916Z """ > 2020-09-23T01:21:12.2168492Z if pipeline: > 2020-09-23T01:21:12.2169634Z discarded = set() > 2020-09-23T01:21:12.2170161Z for pcoll in self._computed_pcolls: > 2020-09-23T01:21:12.2170725Z > if pcoll.pipeline is pipeline: > 2020-09-23T01:21:12.2171757Z E AttributeError: 'Pipeline' object has > no attribute 'pipeline' > 2020-09-23T01:21:12.2172289Z > 2020-09-23T01:21:12.2172939Z > apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10956) PipelineInstrumentTest is flaky
[ https://issues.apache.org/jira/browse/BEAM-10956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201048#comment-17201048 ] Sam Rohde commented on BEAM-10956: -- Able to repro with `for i in \{1..100}; do python pipeline_instrument_test.py & done` > PipelineInstrumentTest is flaky > --- > > Key: BEAM-10956 > URL: https://issues.apache.org/jira/browse/BEAM-10956 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > > h3. Error Message > AttributeError: 'Pipeline' object has no attribute 'pipeline' > h3. Stacktrace > self = > testMethod=test_pipeline_pruned_when_input_pcoll_is_cached> def > test_pipeline_pruned_when_input_pcoll_is_cached(self): > user_pipeline, > init_pcoll, _ = self._example_pipeline() > apache_beam/runners/interactive/pipeline_instrument_test.py:765: _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > apache_beam/runners/interactive/pipeline_instrument_test.py:223: in > _example_pipeline ie.current_env().set_cache_manager(InMemoryCache(), p) > apache_beam/runners/interactive/interactive_environment.py:324: in > set_cache_manager self.cleanup(pipeline) > apache_beam/runners/interactive/interactive_environment.py:277: in cleanup > self.evict_computed_pcollections(pipeline) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = > object at 0x7f8698c89cf8> pipeline = at 0x7f86cc1539e8> def evict_computed_pcollections(self, pipeline=None): > """Evicts all computed PCollections for the given pipeline. If no pipeline is > specified, evicts for all pipelines. """ if pipeline: discarded = set() for > pcoll in self._computed_pcolls: > if pcoll.pipeline is pipeline: E > AttributeError: 'Pipeline' object has no attribute 'pipeline' > apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10956) PipelineInstrumentTest is flaky
Sam Rohde created BEAM-10956: Summary: PipelineInstrumentTest is flaky Key: BEAM-10956 URL: https://issues.apache.org/jira/browse/BEAM-10956 Project: Beam Issue Type: Improvement Components: runner-py-interactive Reporter: Sam Rohde Assignee: Sam Rohde h3. Error Message AttributeError: 'Pipeline' object has no attribute 'pipeline' h3. Stacktrace self = def test_pipeline_pruned_when_input_pcoll_is_cached(self): > user_pipeline, init_pcoll, _ = self._example_pipeline() apache_beam/runners/interactive/pipeline_instrument_test.py:765: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/runners/interactive/pipeline_instrument_test.py:223: in _example_pipeline ie.current_env().set_cache_manager(InMemoryCache(), p) apache_beam/runners/interactive/interactive_environment.py:324: in set_cache_manager self.cleanup(pipeline) apache_beam/runners/interactive/interactive_environment.py:277: in cleanup self.evict_computed_pcollections(pipeline) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = pipeline = def evict_computed_pcollections(self, pipeline=None): """Evicts all computed PCollections for the given pipeline. If no pipeline is specified, evicts for all pipelines. """ if pipeline: discarded = set() for pcoll in self._computed_pcolls: > if pcoll.pipeline is pipeline: E AttributeError: 'Pipeline' object has no attribute 'pipeline' apache_beam/runners/interactive/interactive_environment.py:506: AttributeError -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10921) Python unit tests on windows flaky
[ https://issues.apache.org/jira/browse/BEAM-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10921: - Status: Resolved (was: Resolved) > Python unit tests on windows flaky > -- > > Key: BEAM-10921 > URL: https://issues.apache.org/jira/browse/BEAM-10921 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P1 > Labels: flake > Fix For: Not applicable > > Time Spent: 1h 50m > Remaining Estimate: 0h > > Over the past few days python unit tests have been failing frequently. The > errors always seem to occur when cleaning up the interactive environment: > {code} > ... > [100%] > == FAILURES > === > _ > PipelineInstrumentTest.test_able_to_cache_intermediate_unbounded_source_pcollection > _ > [gw2] win32 -- Python 3.5.4 > d:\a\beam\beam\sdks\python\target\.tox\py35-win\scripts\python.exe > self = > testMethod=test_able_to_cache_intermediate_unbounded_source_pcollection> > def setUp(self): > > ie.new_env() > apache_beam\runners\interactive\pipeline_instrument_test.py:46: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam\runners\interactive\interactive_environment.py:117: in new_env > _interactive_beam_env.cleanup() > apache_beam\runners\interactive\interactive_environment.py:273: in cleanup > cache_manager.cleanup() > apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup > shutil.rmtree(self._cache_dir) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:494: in rmtree > return _rmtree_unsafe(path, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:384: in > _rmtree_unsafe > _rmtree_unsafe(fullname, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:389: in > _rmtree_unsafe > onerror(os.unlink, fullname, sys.exc_info()) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > path = > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full' > onerror = .onerror at 0x01D6C3E5C7B8> > def _rmtree_unsafe(path, onerror): > try: > if os.path.islink(path): > # symlinks to directories are forbidden, see bug #1669 > raise OSError("Cannot call rmtree on a symbolic link") > except OSError: > onerror(os.path.islink, path, sys.exc_info()) > # can't continue even if onerror hook returns > return > names = [] > try: > names = os.listdir(path) > except OSError: > onerror(os.listdir, path, sys.exc_info()) > for name in names: > fullname = os.path.join(path, name) > try: > mode = os.lstat(fullname).st_mode > except OSError: > mode = 0 > if stat.S_ISDIR(mode): > _rmtree_unsafe(fullname, onerror) > else: > try: > > os.unlink(fullname) > E PermissionError: [WinError 32] The process cannot access > the file because it is being used by another process: > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full\\ac8879590f-2021876280456-2021876278608-2021914046928' > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:387: PermissionError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10695) Cannot evict PCollections from InteractiveEnvironment
[ https://issues.apache.org/jira/browse/BEAM-10695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10695: - Status: Resolved (was: Resolved) > Cannot evict PCollections from InteractiveEnvironment > - > > Key: BEAM-10695 > URL: https://issues.apache.org/jira/browse/BEAM-10695 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Priority: P2 > Time Spent: 1h > Remaining Estimate: 0h > > The code in the InteractiveEnvironment manipulates the _computed_pcollections > set while iterating. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10695) Cannot evict PCollections from InteractiveEnvironment
[ https://issues.apache.org/jira/browse/BEAM-10695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10695: - Status: Resolved (was: Open) > Cannot evict PCollections from InteractiveEnvironment > - > > Key: BEAM-10695 > URL: https://issues.apache.org/jira/browse/BEAM-10695 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Priority: P2 > Time Spent: 1h > Remaining Estimate: 0h > > The code in the InteractiveEnvironment manipulates the _computed_pcollections > set while iterating. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10921) Python unit tests on windows flaky
[ https://issues.apache.org/jira/browse/BEAM-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197874#comment-17197874 ] Sam Rohde commented on BEAM-10921: -- Made [https://github.com/apache/beam/pull/12866] for fix > Python unit tests on windows flaky > -- > > Key: BEAM-10921 > URL: https://issues.apache.org/jira/browse/BEAM-10921 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P1 > Labels: flake > Time Spent: 0.5h > Remaining Estimate: 0h > > Over the past few days python unit tests have been failing frequently. The > errors always seem to occur when cleaning up the interactive environment: > {code} > ... > [100%] > == FAILURES > === > _ > PipelineInstrumentTest.test_able_to_cache_intermediate_unbounded_source_pcollection > _ > [gw2] win32 -- Python 3.5.4 > d:\a\beam\beam\sdks\python\target\.tox\py35-win\scripts\python.exe > self = > testMethod=test_able_to_cache_intermediate_unbounded_source_pcollection> > def setUp(self): > > ie.new_env() > apache_beam\runners\interactive\pipeline_instrument_test.py:46: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam\runners\interactive\interactive_environment.py:117: in new_env > _interactive_beam_env.cleanup() > apache_beam\runners\interactive\interactive_environment.py:273: in cleanup > cache_manager.cleanup() > apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup > shutil.rmtree(self._cache_dir) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:494: in rmtree > return _rmtree_unsafe(path, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:384: in > _rmtree_unsafe > _rmtree_unsafe(fullname, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:389: in > _rmtree_unsafe > onerror(os.unlink, fullname, sys.exc_info()) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > path = > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full' > onerror = .onerror at 0x01D6C3E5C7B8> > def _rmtree_unsafe(path, onerror): > try: > if os.path.islink(path): > # symlinks to directories are forbidden, see bug #1669 > raise OSError("Cannot call rmtree on a symbolic link") > except OSError: > onerror(os.path.islink, path, sys.exc_info()) > # can't continue even if onerror hook returns > return > names = [] > try: > names = os.listdir(path) > except OSError: > onerror(os.listdir, path, sys.exc_info()) > for name in names: > fullname = os.path.join(path, name) > try: > mode = os.lstat(fullname).st_mode > except OSError: > mode = 0 > if stat.S_ISDIR(mode): > _rmtree_unsafe(fullname, onerror) > else: > try: > > os.unlink(fullname) > E PermissionError: [WinError 32] The process cannot access > the file because it is being used by another process: > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full\\ac8879590f-2021876280456-2021876278608-2021914046928' > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:387: PermissionError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10921) Python unit tests on windows flaky
[ https://issues.apache.org/jira/browse/BEAM-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197861#comment-17197861 ] Sam Rohde commented on BEAM-10921: -- I noticed the flakiness yesterday and integrated a fix into an ongoing PR: [https://github.com/apache/beam/pull/12799.] I'll move the changes to its own PR > Python unit tests on windows flaky > -- > > Key: BEAM-10921 > URL: https://issues.apache.org/jira/browse/BEAM-10921 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Assignee: Sam Rohde >Priority: P1 > Labels: flake > Time Spent: 20m > Remaining Estimate: 0h > > Over the past few days python unit tests have been failing frequently. The > errors always seem to occur when cleaning up the interactive environment: > {code} > ... > [100%] > == FAILURES > === > _ > PipelineInstrumentTest.test_able_to_cache_intermediate_unbounded_source_pcollection > _ > [gw2] win32 -- Python 3.5.4 > d:\a\beam\beam\sdks\python\target\.tox\py35-win\scripts\python.exe > self = > testMethod=test_able_to_cache_intermediate_unbounded_source_pcollection> > def setUp(self): > > ie.new_env() > apache_beam\runners\interactive\pipeline_instrument_test.py:46: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam\runners\interactive\interactive_environment.py:117: in new_env > _interactive_beam_env.cleanup() > apache_beam\runners\interactive\interactive_environment.py:273: in cleanup > cache_manager.cleanup() > apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup > shutil.rmtree(self._cache_dir) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:494: in rmtree > return _rmtree_unsafe(path, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:384: in > _rmtree_unsafe > _rmtree_unsafe(fullname, onerror) > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:389: in > _rmtree_unsafe > onerror(os.unlink, fullname, sys.exc_info()) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > path = > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full' > onerror = .onerror at 0x01D6C3E5C7B8> > def _rmtree_unsafe(path, onerror): > try: > if os.path.islink(path): > # symlinks to directories are forbidden, see bug #1669 > raise OSError("Cannot call rmtree on a symbolic link") > except OSError: > onerror(os.path.islink, path, sys.exc_info()) > # can't continue even if onerror hook returns > return > names = [] > try: > names = os.listdir(path) > except OSError: > onerror(os.listdir, path, sys.exc_info()) > for name in names: > fullname = os.path.join(path, name) > try: > mode = os.lstat(fullname).st_mode > except OSError: > mode = 0 > if stat.S_ISDIR(mode): > _rmtree_unsafe(fullname, onerror) > else: > try: > > os.unlink(fullname) > E PermissionError: [WinError 32] The process cannot access > the file because it is being used by another process: > 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py35-win\\tmp\\it-8vh2z7pi2021914046928\\full\\ac8879590f-2021876280456-2021876278608-2021914046928' > c:\hostedtoolcache\windows\python\3.5.4\x64\lib\shutil.py:387: PermissionError > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10695) Cannot evict PCollections from InteractiveEnvironment
Sam Rohde created BEAM-10695: Summary: Cannot evict PCollections from InteractiveEnvironment Key: BEAM-10695 URL: https://issues.apache.org/jira/browse/BEAM-10695 Project: Beam Issue Type: Improvement Components: runner-py-interactive Reporter: Sam Rohde Assignee: Sam Rohde The code in the InteractiveEnvironment manipulates the _computed_pcollections set while iterating. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10635) Incompatible google-api-core and google-cloud-bigquery
[ https://issues.apache.org/jira/browse/BEAM-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171039#comment-17171039 ] Sam Rohde commented on BEAM-10635: -- Bumped to a P0 because it breaks the build. > Incompatible google-api-core and google-cloud-bigquery > -- > > Key: BEAM-10635 > URL: https://issues.apache.org/jira/browse/BEAM-10635 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Ning Kang >Priority: P0 > > This bigquery version bump: > https://github.com/apache/beam/commit/a315672dbe2b82e013e8120ac5398c623c7b13a7 > seems to have broken the pip check: > google-cloud-bigquery 1.26.1 has requirement google-api-core<2.0dev,>=1.21.0, > but you have google-api-core 1.20.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10635) Incompatible google-api-core and google-cloud-bigquery
[ https://issues.apache.org/jira/browse/BEAM-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10635: - Priority: P0 (was: P1) > Incompatible google-api-core and google-cloud-bigquery > -- > > Key: BEAM-10635 > URL: https://issues.apache.org/jira/browse/BEAM-10635 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Ning Kang >Priority: P0 > > This bigquery version bump: > https://github.com/apache/beam/commit/a315672dbe2b82e013e8120ac5398c623c7b13a7 > seems to have broken the pip check: > google-cloud-bigquery 1.26.1 has requirement google-api-core<2.0dev,>=1.21.0, > but you have google-api-core 1.20.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10635) Incompatible google-api-core and google-cloud-bigquery
[ https://issues.apache.org/jira/browse/BEAM-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-10635: - Priority: P1 (was: P2) > Incompatible google-api-core and google-cloud-bigquery > -- > > Key: BEAM-10635 > URL: https://issues.apache.org/jira/browse/BEAM-10635 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Ning Kang >Priority: P1 > > This bigquery version bump: > https://github.com/apache/beam/commit/a315672dbe2b82e013e8120ac5398c623c7b13a7 > seems to have broken the pip check: > google-cloud-bigquery 1.26.1 has requirement google-api-core<2.0dev,>=1.21.0, > but you have google-api-core 1.20.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10603) Large Source Recording for Interarctive Runner
Sam Rohde created BEAM-10603: Summary: Large Source Recording for Interarctive Runner Key: BEAM-10603 URL: https://issues.apache.org/jira/browse/BEAM-10603 Project: Beam Issue Type: Improvement Components: runner-py-interactive Reporter: Sam Rohde Assignee: Sam Rohde This changes the Interactive Runner to create a long-running background caching job that is decoupled from the user pipeline. When a user invokes a collect() or show(), it will read from the cache to compute the requested PCollections. Previously, the user would have to wait for the cache to be fully written to. This allows for the user to start experimenting immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9692) Clean Python DataflowRunner to use portable pipelines
[ https://issues.apache.org/jira/browse/BEAM-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-9692. - Fix Version/s: 2.23.0 Resolution: Fixed > Clean Python DataflowRunner to use portable pipelines > - > > Key: BEAM-9692 > URL: https://issues.apache.org/jira/browse/BEAM-9692 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: P2 > Fix For: 2.23.0 > > Time Spent: 6h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9767) test_streaming_wordcount flaky timeouts
[ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106529#comment-17106529 ] Sam Rohde commented on BEAM-9767: - Yep, it should be the fix. > test_streaming_wordcount flaky timeouts > --- > > Key: BEAM-9767 > URL: https://issues.apache.org/jira/browse/BEAM-9767 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Udi Meiri >Assignee: Sam Rohde >Priority: Critical > Fix For: 2.22.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > Timed out after 600s, typically completes in 2.8s on my workstation. > https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/ > {code} > self = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > > data_df = ib.collect(data, include_window_info=True) > apache_beam/runners/interactive/interactive_runner_test.py:237: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/runners/interactive/interactive_beam.py:451: in collect > return head(pcoll, n=-1, include_window_info=include_window_info) > apache_beam/runners/interactive/utils.py:204: in run_within_progress_indicator > return func(*args, **kwargs)
[jira] [Closed] (BEAM-9767) test_streaming_wordcount flaky timeouts
[ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde closed BEAM-9767. --- > test_streaming_wordcount flaky timeouts > --- > > Key: BEAM-9767 > URL: https://issues.apache.org/jira/browse/BEAM-9767 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Udi Meiri >Assignee: Sam Rohde >Priority: Critical > Fix For: 2.22.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > Timed out after 600s, typically completes in 2.8s on my workstation. > https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/ > {code} > self = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > > data_df = ib.collect(data, include_window_info=True) > apache_beam/runners/interactive/interactive_runner_test.py:237: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/runners/interactive/interactive_beam.py:451: in collect > return head(pcoll, n=-1, include_window_info=include_window_info) > apache_beam/runners/interactive/utils.py:204: in run_within_progress_indicator > return func(*args, **kwargs) > apache_beam/runners/interactive/interactive_beam.py:515: in head > result.wait_un
[jira] [Resolved] (BEAM-9767) test_streaming_wordcount flaky timeouts
[ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-9767. - Fix Version/s: 2.22.0 Resolution: Fixed > test_streaming_wordcount flaky timeouts > --- > > Key: BEAM-9767 > URL: https://issues.apache.org/jira/browse/BEAM-9767 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Udi Meiri >Assignee: Sam Rohde >Priority: Critical > Fix For: 2.22.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > Timed out after 600s, typically completes in 2.8s on my workstation. > https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/ > {code} > self = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > > data_df = ib.collect(data, include_window_info=True) > apache_beam/runners/interactive/interactive_runner_test.py:237: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/runners/interactive/interactive_beam.py:451: in collect > return head(pcoll, n=-1, include_window_info=include_window_info) > apache_beam/runners/interactive/utils.py:204: in run_within_progress_indicator > return func(*args, **kwargs) > apache_beam/runners/interactiv
[jira] [Comment Edited] (BEAM-9767) test_streaming_wordcount flaky timeouts
[ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104733#comment-17104733 ] Sam Rohde edited comment on BEAM-9767 at 5/11/20, 6:19 PM: --- Thanks for the update Brian, I have root caused it and have a PR out now. Seems to be an edge case around the StreamingCache. EDIT: PR is [https://github.com/apache/beam/pull/11663] was (Author: rohdesam): Thanks for the update Brian, I have root caused it and have a PR out now. Seems to be an edge case around the StreamingCache. > test_streaming_wordcount flaky timeouts > --- > > Key: BEAM-9767 > URL: https://issues.apache.org/jira/browse/BEAM-9767 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Udi Meiri >Assignee: Sam Rohde >Priority: Critical > Time Spent: 2h 40m > Remaining Estimate: 0h > > Timed out after 600s, typically completes in 2.8s on my workstation. > https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/ > {code} > self = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > > data_df = ib.collect(data, include_window_info=True) > apache_beam/runners/interactive/interactive_runner_test.py:237: > _ _ _ _ _
[jira] [Commented] (BEAM-9767) test_streaming_wordcount flaky timeouts
[ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104733#comment-17104733 ] Sam Rohde commented on BEAM-9767: - Thanks for the update Brian, I have root caused it and have a PR out now. Seems to be an edge case around the StreamingCache. > test_streaming_wordcount flaky timeouts > --- > > Key: BEAM-9767 > URL: https://issues.apache.org/jira/browse/BEAM-9767 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Udi Meiri >Assignee: Sam Rohde >Priority: Critical > Time Spent: 2h 40m > Remaining Estimate: 0h > > Timed out after 600s, typically completes in 2.8s on my workstation. > https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/ > {code} > self = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > > data_df = ib.collect(data, include_window_info=True) > apache_beam/runners/interactive/interactive_runner_test.py:237: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/runners/interactive/interactive_beam.py:451: in collect > return head(pcoll, n=-1, include_window_info=include_window_info) > apache_beam/runners/interactive/utils.py:204:
[jira] [Commented] (BEAM-9767) test_streaming_wordcount flaky timeouts
[ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089912#comment-17089912 ] Sam Rohde commented on BEAM-9767: - Copying comment from Ning in duped JIRA: > We'll probably need to remove such integration test from being executed on >Jenkins. > The Jenkins machines could be executing many different things at the same > time. It's possible that the machine does nothing for the test at all in the > given 5-second timeout > test_streaming_wordcount flaky timeouts > --- > > Key: BEAM-9767 > URL: https://issues.apache.org/jira/browse/BEAM-9767 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Udi Meiri >Assignee: Sam Rohde >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Timed out after 600s, typically completes in 2.8s on my workstation. > https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/ > {code} > self = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > > data_df = ib.collect(data, include_window_info=True) > apache_beam/runners/interactive/interactive_runner_test.py:237: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apac
[jira] [Commented] (BEAM-9767) test_streaming_wordcount flaky timeouts
[ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089908#comment-17089908 ] Sam Rohde commented on BEAM-9767: - PR 11440 didn't fix the flakiness, it only added a GRPC timeout to mitigate the failure. The test will now take at most 30s if there is no activity on the GRPC channel between the test_stream_service and test_stream. > test_streaming_wordcount flaky timeouts > --- > > Key: BEAM-9767 > URL: https://issues.apache.org/jira/browse/BEAM-9767 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Udi Meiri >Assignee: Sam Rohde >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Timed out after 600s, typically completes in 2.8s on my workstation. > https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/ > {code} > self = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > > data_df = ib.collect(data, include_window_info=True) > apache_beam/runners/interactive/interactive_runner_test.py:237: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/runners/interactive/interactive_beam.py:451: in collect > return head(pcoll, n=-1,
[jira] [Closed] (BEAM-9803) test_streaming_wordcount flaky
[ https://issues.apache.org/jira/browse/BEAM-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde closed BEAM-9803. --- Fix Version/s: Not applicable Resolution: Duplicate > test_streaming_wordcount flaky > -- > > Key: BEAM-9803 > URL: https://issues.apache.org/jira/browse/BEAM-9803 > Project: Beam > Issue Type: Test > Components: sdk-py-core, test-failures >Reporter: Ning Kang >Assignee: Sam Rohde >Priority: Major > Fix For: Not applicable > > > {code:java} > Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount > (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 > sec.Error MessageAssertionError: DataFrame are different DataFrame shape > mismatch [left]: (10, 4) [right]: (6, 4)Stacktraceself = > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1] > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 2000, [IntervalWindow(20, 30)], pane_info), > ('is', 2000, [IntervalWindow(20, 30)], pane_info), > ('the', 2000, [IntervalWindow(20, 30)], pane_info), > ('question', 2000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > data_df = ib.collect(data, include_window_info=True) > > pd.testing.assert_frame_equal(expected_data_df, data_df) > E AssertionError: DataFrame are different > E > E DataFrame shape mismatch > E [left]: (10, 4) > E [right]: (6, 4) > apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError > {code} -- This message was se
[jira] [Commented] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078662#comment-17078662 ] Sam Rohde commented on BEAM-9322: - We can consider the issue resolved, but let's keep the ticket open. It can be used to track removing the feature flag and doing cleanup. > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Critical > Fix For: 2.21.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9692) Clean Python DataflowRunner to use portable pipelines
Sam Rohde created BEAM-9692: --- Summary: Clean Python DataflowRunner to use portable pipelines Key: BEAM-9692 URL: https://issues.apache.org/jira/browse/BEAM-9692 Project: Beam Issue Type: Improvement Components: runner-dataflow Reporter: Sam Rohde Assignee: Sam Rohde -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9601) Interactive test_streaming_wordcount failing
[ https://issues.apache.org/jira/browse/BEAM-9601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-9601. - Fix Version/s: 2.20.0 Resolution: Fixed > Interactive test_streaming_wordcount failing > > > Key: BEAM-9601 > URL: https://issues.apache.org/jira/browse/BEAM-9601 > Project: Beam > Issue Type: Bug > Components: runner-py-interactive, test-failures >Reporter: Pablo Estrada >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9548) Bad error handling with errors from TestStreamService when using Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-9548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-9548. - Fix Version/s: 2.20.0 Resolution: Fixed > Bad error handling with errors from TestStreamService when using Interactive > Beam > - > > Key: BEAM-9548 > URL: https://issues.apache.org/jira/browse/BEAM-9548 > Project: Beam > Issue Type: Bug > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h > Remaining Estimate: 0h > > The error handling when an error is generated on the GRPC server side is very > verbose and hides the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-8335. - Fix Version/s: 2.20.0 Resolution: Fixed > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 115.5h > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9013) Multi-output TestStream breaks the DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-9013. - Fix Version/s: 2.18.0 Resolution: Fixed > Multi-output TestStream breaks the DataflowRunner > - > > Key: BEAM-9013 > URL: https://issues.apache.org/jira/browse/BEAM-9013 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.17.0 >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.18.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-9524) ib.show() spins forever when cells are re-executed
[ https://issues.apache.org/jira/browse/BEAM-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde closed BEAM-9524. --- Fix Version/s: 2.21.0 Resolution: Fixed > ib.show() spins forever when cells are re-executed > -- > > Key: BEAM-9524 > URL: https://issues.apache.org/jira/browse/BEAM-9524 > Project: Beam > Issue Type: Bug > Components: runner-py-interactive >Reporter: Sam Rohde >Priority: Major > Fix For: 2.21.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9548) Bad error handling with errors from TestStreamService when using Interactive Beam
Sam Rohde created BEAM-9548: --- Summary: Bad error handling with errors from TestStreamService when using Interactive Beam Key: BEAM-9548 URL: https://issues.apache.org/jira/browse/BEAM-9548 Project: Beam Issue Type: Bug Components: runner-py-interactive Reporter: Sam Rohde Assignee: Sam Rohde The error handling when an error is generated on the GRPC server side is very verbose and hides the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9524) ib.show() spins forever when cells are re-executed
Sam Rohde created BEAM-9524: --- Summary: ib.show() spins forever when cells are re-executed Key: BEAM-9524 URL: https://issues.apache.org/jira/browse/BEAM-9524 Project: Beam Issue Type: Bug Components: runner-py-interactive Reporter: Sam Rohde -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-9447) Generalize the InteractiveRunner StreamingCache
[ https://issues.apache.org/jira/browse/BEAM-9447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-9447: --- Assignee: Sam Rohde > Generalize the InteractiveRunner StreamingCache > --- > > Key: BEAM-9447 > URL: https://issues.apache.org/jira/browse/BEAM-9447 > Project: Beam > Issue Type: Bug > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > > The InteractiveRunner's StreamingCache is only file based for now. This > should be generalized to work across more different source and sink types and > ported to other runners. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9447) Generalize the InteractiveRunner StreamingCache
Sam Rohde created BEAM-9447: --- Summary: Generalize the InteractiveRunner StreamingCache Key: BEAM-9447 URL: https://issues.apache.org/jira/browse/BEAM-9447 Project: Beam Issue Type: Bug Components: runner-py-interactive Reporter: Sam Rohde The InteractiveRunner's StreamingCache is only file based for now. This should be generalized to work across more different source and sink types and ported to other runners. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17043991#comment-17043991 ] Sam Rohde commented on BEAM-9322: - No this Jira won't be able to be solved by by the 2.20.0 version. But I will have a small fix in [https://github.com/apache/beam/pull/10934]. > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9322) Python SDK ignores manually set PCollection tags
Sam Rohde created BEAM-9322: --- Summary: Python SDK ignores manually set PCollection tags Key: BEAM-9322 URL: https://issues.apache.org/jira/browse/BEAM-9322 Project: Beam Issue Type: Bug Components: sdk-py-core Reporter: Sam Rohde Assignee: Sam Rohde The Python SDK currently ignores any tags set on PCollections manually when applying PTransforms when adding the PCollection to the PTransform [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. In the [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] method, the tag is set to None for all PValues, meaning the output tags are set to an enumeration index over the PCollection outputs. The tags are not propagated to correctly which can be a problem on relying on the output PCollection tags to match the user set values. The fix is to correct BEAM-1833, and always pass in the tags. However, that doesn't fix the problem for nested PCollections. If you have a dict of lists of PCollections, what should their tags be correctly set to? In order to fix this, first propagate the correct tag then talk with the community about the best auto-generated tags. Some users may rely on the old implementation, so a flag will be created: "force_generated_pcollection_output_ids" and be default set to False. If True, this will go to the old implementation and generate tags for PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-1833) Restructure Python pipeline construction to better follow the Runner API
[ https://issues.apache.org/jira/browse/BEAM-1833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-1833: --- Assignee: Sam Rohde > Restructure Python pipeline construction to better follow the Runner API > > > Key: BEAM-1833 > URL: https://issues.apache.org/jira/browse/BEAM-1833 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Robert Bradshaw >Assignee: Sam Rohde >Priority: Major > > The most important part is removing the runner.apply overrides, but there are > also various other improvements (e.g. all inputs and outputs should be named). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9073) PipelineVisitor is topologically-order dependent
Sam Rohde created BEAM-9073: --- Summary: PipelineVisitor is topologically-order dependent Key: BEAM-9073 URL: https://issues.apache.org/jira/browse/BEAM-9073 Project: Beam Issue Type: Bug Components: sdk-py-core Reporter: Sam Rohde Assignee: Sam Rohde The Python PipelineVisitor is topologically-order dependent and can visit the same transform multiple times. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9013) Multi-output TestStream breaks the DataflowRunner
Sam Rohde created BEAM-9013: --- Summary: Multi-output TestStream breaks the DataflowRunner Key: BEAM-9013 URL: https://issues.apache.org/jira/browse/BEAM-9013 Project: Beam Issue Type: Bug Components: runner-dataflow Affects Versions: 2.17.0 Reporter: Sam Rohde Assignee: Sam Rohde Fix For: 2.17.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-8582) Python SDK emits duplicate records for Default and AfterWatermark triggers
[ https://issues.apache.org/jira/browse/BEAM-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde closed BEAM-8582. --- > Python SDK emits duplicate records for Default and AfterWatermark triggers > -- > > Key: BEAM-8582 > URL: https://issues.apache.org/jira/browse/BEAM-8582 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.19.0 > > > This was found after fixing https://issues.apache.org/jira/browse/BEAM-8581. > The fix for 8581 was to pass in the input watermark. Previously, it was using > MIN_TIMESTAMP for all of its EOW calculations. By giving it a proper input > watermark, this bug started to manifest. > The DefaultTrigger and AfterWatermark do not clear their timers after the > watermark passed the end of the endow, leading to duplicate records being > emitted. > Fix: Clear the watermark timer when the watermark reaches the end of the > window. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8581) Python SDK labels ontime empty panes as late
[ https://issues.apache.org/jira/browse/BEAM-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-8581. - Fix Version/s: 2.19.0 Resolution: Fixed > Python SDK labels ontime empty panes as late > > > Key: BEAM-8581 > URL: https://issues.apache.org/jira/browse/BEAM-8581 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.19.0 > > Time Spent: 9h 40m > Remaining Estimate: 0h > > The GeneralTriggerDriver does not put watermark holds on timers, leading to > the ontime empty pane being considered late data. > Fix: Add a new notion of whether a trigger has an ontime pane. If it does, > then set a watermark hold to end of window - 1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8582) Python SDK emits duplicate records for Default and AfterWatermark triggers
[ https://issues.apache.org/jira/browse/BEAM-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-8582. - Fix Version/s: 2.19.0 Resolution: Fixed > Python SDK emits duplicate records for Default and AfterWatermark triggers > -- > > Key: BEAM-8582 > URL: https://issues.apache.org/jira/browse/BEAM-8582 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.19.0 > > > This was found after fixing https://issues.apache.org/jira/browse/BEAM-8581. > The fix for 8581 was to pass in the input watermark. Previously, it was using > MIN_TIMESTAMP for all of its EOW calculations. By giving it a proper input > watermark, this bug started to manifest. > The DefaultTrigger and AfterWatermark do not clear their timers after the > watermark passed the end of the endow, leading to duplicate records being > emitted. > Fix: Clear the watermark timer when the watermark reaches the end of the > window. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-8581) Python SDK labels ontime empty panes as late
[ https://issues.apache.org/jira/browse/BEAM-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde closed BEAM-8581. --- > Python SDK labels ontime empty panes as late > > > Key: BEAM-8581 > URL: https://issues.apache.org/jira/browse/BEAM-8581 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.19.0 > > Time Spent: 9h 40m > Remaining Estimate: 0h > > The GeneralTriggerDriver does not put watermark holds on timers, leading to > the ontime empty pane being considered late data. > Fix: Add a new notion of whether a trigger has an ontime pane. If it does, > then set a watermark hold to end of window - 1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8738) Revisit timestamp and duration representation
[ https://issues.apache.org/jira/browse/BEAM-8738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-8738: Description: The current proto representation of timesetamp and durations in Beam is either raw int64s or the well-known Google protobuf types "google.protobuf.timestamp" and "google.protobuf.duration". Apache Beam uses int64 MAX and MIN as sentinel values for an +inf watermark and -inf watermark. However, the google.protobuf.timestamp is compliant with RFC3339, meaning it can only represent date-times between 0001-01-01 and -12-31. This is not the same as the int64 MAX and MIN representation. The questions remain: * What should the timestamp and duration representations be? * What units should the timestamps and duration be? Nanos? Micros? * What algebra is allowed when dealing with timestamps and durations? What is needed? See: * [https://lists.apache.org/thread.html/c8e7d8dc7d94487fae23fa2b74ee61aec93c94abbcbef3329ffbf3bd@%3Cdev.beam.apache.org%3E] * [https://lists.apache.org/thread.html/27fe9aa5b33dbee97db1bc924ee410048137e4fe97d9c79d131c010c@%3Cdev.beam.apache.org%3E] was: The current proto representation of timesetamp and durations in Beam is either raw int64s or the well-known Google protobuf types "google.protobuf.timestamp" and "google.protobuf.duration". Apache Beam uses int64 MAX and MIN as sentinel values for an +inf watermark and -inf watermark. However, the google.protobuf.timestamp is compliant with RFC3339, meaning it can only represent date-times between 0001-01-01 and -12-31. This is not the same as the int64 MAX and MIN representation. The questions remain: * What should the timestamp and duration representations be? * What units should the timestamps and duration be? Nanos? Micros? * What algebra is allowed when dealing with timestamps and durations? What is needed? > Revisit timestamp and duration representation > - > > Key: BEAM-8738 > URL: https://issues.apache.org/jira/browse/BEAM-8738 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Sam Rohde >Priority: Minor > > The current proto representation of timesetamp and durations in Beam is > either raw int64s or the well-known Google protobuf types > "google.protobuf.timestamp" and "google.protobuf.duration". Apache Beam uses > int64 MAX and MIN as sentinel values for an +inf watermark and -inf > watermark. However, the google.protobuf.timestamp is compliant with RFC3339, > meaning it can only represent date-times between 0001-01-01 and -12-31. > This is not the same as the int64 MAX and MIN representation. The questions > remain: > * What should the timestamp and duration representations be? > * What units should the timestamps and duration be? Nanos? Micros? > * What algebra is allowed when dealing with timestamps and durations? What > is needed? > See: > * > [https://lists.apache.org/thread.html/c8e7d8dc7d94487fae23fa2b74ee61aec93c94abbcbef3329ffbf3bd@%3Cdev.beam.apache.org%3E] > * > [https://lists.apache.org/thread.html/27fe9aa5b33dbee97db1bc924ee410048137e4fe97d9c79d131c010c@%3Cdev.beam.apache.org%3E] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8738) Revisit timestamp and duration representation
[ https://issues.apache.org/jira/browse/BEAM-8738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde updated BEAM-8738: Description: The current proto representation of timesetamp and durations in Beam is either raw int64s or the well-known Google protobuf types "google.protobuf.timestamp" and "google.protobuf.duration". Apache Beam uses int64 MAX and MIN as sentinel values for an +inf watermark and -inf watermark. However, the google.protobuf.timestamp is compliant with RFC3339, meaning it can only represent date-times between 0001-01-01 and -12-31. This is not the same as the int64 MAX and MIN representation. The questions remain: * What should the timestamp and duration representations be? * What units should the timestamps and duration be? Nanos? Micros? * What algebra is allowed when dealing with timestamps and durations? What is needed? See: * [https://lists.apache.org/thread.html/c8e7d8dc7d94487fae23fa2b74ee61aec93c94abbcbef3329ffbf3bd@%3Cdev.beam.apache.org%3E] * [https://lists.apache.org/thread.html/27fe9aa5b33dbee97db1bc924ee410048137e4fe97d9c79d131c010c@%3Cdev.beam.apache.org%3E] was: The current proto representation of timesetamp and durations in Beam is either raw int64s or the well-known Google protobuf types "google.protobuf.timestamp" and "google.protobuf.duration". Apache Beam uses int64 MAX and MIN as sentinel values for an +inf watermark and -inf watermark. However, the google.protobuf.timestamp is compliant with RFC3339, meaning it can only represent date-times between 0001-01-01 and -12-31. This is not the same as the int64 MAX and MIN representation. The questions remain: * What should the timestamp and duration representations be? * What units should the timestamps and duration be? Nanos? Micros? * What algebra is allowed when dealing with timestamps and durations? What is needed? See: * [https://lists.apache.org/thread.html/c8e7d8dc7d94487fae23fa2b74ee61aec93c94abbcbef3329ffbf3bd@%3Cdev.beam.apache.org%3E] * [https://lists.apache.org/thread.html/27fe9aa5b33dbee97db1bc924ee410048137e4fe97d9c79d131c010c@%3Cdev.beam.apache.org%3E] > Revisit timestamp and duration representation > - > > Key: BEAM-8738 > URL: https://issues.apache.org/jira/browse/BEAM-8738 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Sam Rohde >Priority: Minor > > The current proto representation of timesetamp and durations in Beam is > either raw int64s or the well-known Google protobuf types > "google.protobuf.timestamp" and "google.protobuf.duration". Apache Beam uses > int64 MAX and MIN as sentinel values for an +inf watermark and -inf > watermark. However, the google.protobuf.timestamp is compliant with RFC3339, > meaning it can only represent date-times between 0001-01-01 and -12-31. > This is not the same as the int64 MAX and MIN representation. The questions > remain: > * What should the timestamp and duration representations be? > * What units should the timestamps and duration be? Nanos? Micros? > * What algebra is allowed when dealing with timestamps and durations? What > is needed? > See: > * > [https://lists.apache.org/thread.html/c8e7d8dc7d94487fae23fa2b74ee61aec93c94abbcbef3329ffbf3bd@%3Cdev.beam.apache.org%3E] > * > [https://lists.apache.org/thread.html/27fe9aa5b33dbee97db1bc924ee410048137e4fe97d9c79d131c010c@%3Cdev.beam.apache.org%3E] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8738) Revisit timestamp and duration representation
Sam Rohde created BEAM-8738: --- Summary: Revisit timestamp and duration representation Key: BEAM-8738 URL: https://issues.apache.org/jira/browse/BEAM-8738 Project: Beam Issue Type: Improvement Components: beam-model Reporter: Sam Rohde The current proto representation of timesetamp and durations in Beam is either raw int64s or the well-known Google protobuf types "google.protobuf.timestamp" and "google.protobuf.duration". Apache Beam uses int64 MAX and MIN as sentinel values for an +inf watermark and -inf watermark. However, the google.protobuf.timestamp is compliant with RFC3339, meaning it can only represent date-times between 0001-01-01 and -12-31. This is not the same as the int64 MAX and MIN representation. The questions remain: * What should the timestamp and duration representations be? * What units should the timestamps and duration be? Nanos? Micros? * What algebra is allowed when dealing with timestamps and durations? What is needed? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-6274) Timer Backlog Bug
[ https://issues.apache.org/jira/browse/BEAM-6274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde closed BEAM-6274. --- Fix Version/s: Not applicable Resolution: Won't Fix > Timer Backlog Bug > - > > Key: BEAM-6274 > URL: https://issues.apache.org/jira/browse/BEAM-6274 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: Not applicable > > > * Move timer receiver into a new class > * Investigate what the getNextFiredTimer(window coder) parameter actually > does > * Add custom payload feature > * Set the correct "IsBounded" on the generated MainInput PCollection for > timers (CreateExecutableStageNodeFunction) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-6453) Create a single Jenkins job or Gradle task to serve for release test validation
[ https://issues.apache.org/jira/browse/BEAM-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-6453: --- Assignee: (was: Sam Rohde) > Create a single Jenkins job or Gradle task to serve for release test > validation > --- > > Key: BEAM-6453 > URL: https://issues.apache.org/jira/browse/BEAM-6453 > Project: Beam > Issue Type: Improvement > Components: project-management >Reporter: Sam Rohde >Priority: Major > > As per [https://github.com/apache/beam/pull/7509,] it looks like you can only > run a single jenkins job per phrase per comment. In addition, the list of > precommit and postcommit jobs will easily get stale. By creating a Jenkins > job or Gradle task, we can kill two birds with one stone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-6672) Make bundle execution with ExecutableStage support user states
[ https://issues.apache.org/jira/browse/BEAM-6672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-6672: --- Assignee: (was: Sam Rohde) > Make bundle execution with ExecutableStage support user states > -- > > Key: BEAM-6672 > URL: https://issues.apache.org/jira/browse/BEAM-6672 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Rohde >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8491) Add ability for multiple output PCollections from composites
[ https://issues.apache.org/jira/browse/BEAM-8491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-8491. - Fix Version/s: 2.16.0 Resolution: Fixed > Add ability for multiple output PCollections from composites > > > Key: BEAM-8491 > URL: https://issues.apache.org/jira/browse/BEAM-8491 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.16.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > The Python SDK has DoOutputTuples which allows for a single transform to have > multiple outputs. However, this does not include the ability for a composite > transform to have multiple outputs PCollections from different transforms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8582) Python SDK emits duplicate records for Default and AfterWatermark triggers
Sam Rohde created BEAM-8582: --- Summary: Python SDK emits duplicate records for Default and AfterWatermark triggers Key: BEAM-8582 URL: https://issues.apache.org/jira/browse/BEAM-8582 Project: Beam Issue Type: Bug Components: sdk-py-core Reporter: Sam Rohde Assignee: Sam Rohde This was found after fixing https://issues.apache.org/jira/browse/BEAM-8581. The fix for 8581 was to pass in the input watermark. Previously, it was using MIN_TIMESTAMP for all of its EOW calculations. By giving it a proper input watermark, this bug started to manifest. The DefaultTrigger and AfterWatermark do not clear their timers after the watermark passed the end of the endow, leading to duplicate records being emitted. Fix: Clear the watermark timer when the watermark reaches the end of the window. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8581) Python SDK labels ontime empty panes as late
Sam Rohde created BEAM-8581: --- Summary: Python SDK labels ontime empty panes as late Key: BEAM-8581 URL: https://issues.apache.org/jira/browse/BEAM-8581 Project: Beam Issue Type: Bug Components: sdk-py-core Reporter: Sam Rohde Assignee: Sam Rohde The GeneralTriggerDriver does not put watermark holds on timers, leading to the ontime empty pane being considered late data. Fix: Add a new notion of whether a trigger has an ontime pane. If it does, then set a watermark hold to end of window - 1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8491) Add ability for multiple output PCollections from composites
Sam Rohde created BEAM-8491: --- Summary: Add ability for multiple output PCollections from composites Key: BEAM-8491 URL: https://issues.apache.org/jira/browse/BEAM-8491 Project: Beam Issue Type: Improvement Components: sdk-py-core Reporter: Sam Rohde Assignee: Sam Rohde The Python SDK has DoOutputTuples which allows for a single transform to have multiple outputs. However, this does not include the ability for a composite transform to have multiple outputs PCollections from different transforms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-8335) Add streaming support to Interactive Beam
Sam Rohde created BEAM-8335: --- Summary: Add streaming support to Interactive Beam Key: BEAM-8335 URL: https://issues.apache.org/jira/browse/BEAM-8335 Project: Beam Issue Type: Improvement Components: runner-py-interactive Reporter: Sam Rohde Assignee: Sam Rohde This issue tracks the work items to introduce streaming support to the Interactive Beam experience. This will allow users to: * Write and run a streaming job in IPython * Automatically cache records from unbounded sources * Add a replay experience that replays all cached records to simulate the original pipeline execution * Add controls to play/pause/stop/step individual elements from the cached records * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-6442) Incomplete JobService API Semantics
[ https://issues.apache.org/jira/browse/BEAM-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde reassigned BEAM-6442: --- Assignee: (was: Sam Rohde) > Incomplete JobService API Semantics > --- > > Key: BEAM-6442 > URL: https://issues.apache.org/jira/browse/BEAM-6442 > Project: Beam > Issue Type: Test > Components: beam-model >Affects Versions: 2.9.0 >Reporter: Sam Rohde >Priority: Major > > The JobService API (beam_job_api.proto) allows for the possibility of never > seeing messages or states with Get(State|Message)Stream. This is because the > Get(State|Message)Stream calls need to have the job id which can only be > obtained from the RunJobResponse. But in order to see all messages/states the > streams need to be opened before the job starts. > This is fine in Dataflow as the preparation_id == job_id, but this is not > true in Flink. > Fix is to modify the API to only keep a single id to be used between the > preparation/run APIs. > Consumers of the API will have to be modified to meet the new semantics. > Dev list thread > (https://lists.apache.org/thread.html/3ace7585278c0545185fa4bb8d6975283d5c48c097e1bb2c2e18b9a2@%3Cdev.beam.apache.org%3E) > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-6442) Incomplete JobService API Semantics
[ https://issues.apache.org/jira/browse/BEAM-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908507#comment-16908507 ] Sam Rohde commented on BEAM-6442: - I unfortunately don't have time to work on this. > Incomplete JobService API Semantics > --- > > Key: BEAM-6442 > URL: https://issues.apache.org/jira/browse/BEAM-6442 > Project: Beam > Issue Type: Test > Components: beam-model >Affects Versions: 2.9.0 >Reporter: Sam Rohde >Priority: Major > > The JobService API (beam_job_api.proto) allows for the possibility of never > seeing messages or states with Get(State|Message)Stream. This is because the > Get(State|Message)Stream calls need to have the job id which can only be > obtained from the RunJobResponse. But in order to see all messages/states the > streams need to be opened before the job starts. > This is fine in Dataflow as the preparation_id == job_id, but this is not > true in Flink. > Fix is to modify the API to only keep a single id to be used between the > preparation/run APIs. > Consumers of the API will have to be modified to meet the new semantics. > Dev list thread > (https://lists.apache.org/thread.html/3ace7585278c0545185fa4bb8d6975283d5c48c097e1bb2c2e18b9a2@%3Cdev.beam.apache.org%3E) > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (BEAM-7820) Add hot key detection to Dataflow Runner
[ https://issues.apache.org/jira/browse/BEAM-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Rohde resolved BEAM-7820. - Resolution: Fixed Fix Version/s: 2.16.0 2.15.0 > Add hot key detection to Dataflow Runner > > > Key: BEAM-7820 > URL: https://issues.apache.org/jira/browse/BEAM-7820 > Project: Beam > Issue Type: New Feature > Components: runner-dataflow >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Minor > Fix For: 2.15.0, 2.16.0 > > Time Spent: 3.5h > Remaining Estimate: 0h > > This tracks adding hot key detection in the Dataflow Runner. > There are times when a user's pipeline spuriously slows down due to hot keys. > During these times, users are unable to see under the hood at what the > pipeline is doing. This adds hot key detection to show the user when their > pipeline has a hot key. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (BEAM-7820) Add hot key detection to Dataflow Runner
Sam Rohde created BEAM-7820: --- Summary: Add hot key detection to Dataflow Runner Key: BEAM-7820 URL: https://issues.apache.org/jira/browse/BEAM-7820 Project: Beam Issue Type: New Feature Components: runner-dataflow Reporter: Sam Rohde Assignee: Sam Rohde This tracks adding hot key detection in the Dataflow Runner. There are times when a user's pipeline spuriously slows down due to hot keys. During these times, users are unable to see under the hood at what the pipeline is doing. This adds hot key detection to show the user when their pipeline has a hot key. -- This message was sent by Atlassian JIRA (v7.6.14#76016)