[ https://issues.apache.org/jira/browse/BEAM-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot reassigned BEAM-9527: ----------------------------------- Assignee: (was: Boyuan Zhang) > apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf > is flaky > ----------------------------------------------------------------------------------------------------- > > Key: BEAM-9527 > URL: https://issues.apache.org/jira/browse/BEAM-9527 > Project: Beam > Issue Type: Bug > Components: test-failures > Reporter: Valentyn Tymofieiev > Priority: P1 > Labels: beam-fixit, flake, stale-assigned > > {noformat} > self = <apache_beam.runners.portability.fn_api_runner.BundleManager object at > 0x7fe494edb450> > split_manager = <function split_manager at 0x7fe4c2ff0c80> > inputs = {'ref_PCollection_PCollection_3_split/Read': > ['\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x08V\xff\x80\x02capache_beam....\nOffsetRange\nq\x01)\x81q\x02}q\x03(U\x04stopq\x04K\x05U\x05startq\x05K\x00ub.\x01\x00@\x14\x00\x00\x00\x00\x00\x00']} > process_bundle_id = 'bundle_2575' > def _generate_splits_for_testing(self, > split_manager, > inputs, # type: Mapping[str, > PartitionableBuffer] > process_bundle_id): > # type: (...) -> List[beam_fn_api_pb2.ProcessBundleSplitResponse] > split_results = [] # type: > List[beam_fn_api_pb2.ProcessBundleSplitResponse] > read_transform_id, buffer_data = only_element(inputs.items()) > byte_stream = b''.join(buffer_data) > num_elements = len( > list( > self._get_input_coder_impl(read_transform_id).decode_all( > byte_stream))) > > # Start the split manager in case it wants to set any breakpoints. > split_manager_generator = split_manager(num_elements) > try: > split_fraction = next(split_manager_generator) > done = False > except StopIteration: > done = True > > # Send all the data. > self._send_input_to_worker( > process_bundle_id, read_transform_id, [byte_stream]) > > assert self._worker_handler is not None > > # Execute the requested splits. > while not done: > if split_fraction is None: > split_result = None > else: > split_request = beam_fn_api_pb2.InstructionRequest( > process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest( > instruction_id=process_bundle_id, > desired_splits={ > read_transform_id: beam_fn_api_pb2. > ProcessBundleSplitRequest.DesiredSplit( > fraction_of_remainder=split_fraction, > estimated_input_elements=num_elements) > })) > split_response = self._worker_handler.control_conn.push( > split_request).get() # type: > beam_fn_api_pb2.InstructionResponse > for t in (0.05, 0.1, 0.2): > waiting = ('Instruction not running', 'not yet scheduled') > if any(msg in split_response.error for msg in waiting): > time.sleep(t) > split_response = self._worker_handler.control_conn.push( > split_request).get() > if 'Unknown process bundle' in split_response.error: > # It may have finished too fast. > split_result = None > elif split_response.error: > > raise RuntimeError(split_response.error) > E RuntimeError: Traceback (most recent call last): > E File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", > line 190, in _execute > E response = task() > E File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", > line 229, in <lambda> > E lambda: self.create_worker().do_instruction(request), request) > E File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", > line 416, in do_instruction > E getattr(request, request_type), request.instruction_id) > E File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", > line 479, in process_bundle_split > E process_bundle_split=processor.try_split(request)) > E File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py", > line 882, in try_split > E desired_split.estimated_input_elements) > E File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py", > line 250, in try_split > E keep_of_element_remainder > E File "apache_beam/runners/worker/operations.py", line 202, in > apache_beam.runners.worker.operations.SingletonConsumerSet.try_split > E return self.consumer.try_split(fraction_of_remainder) > E File "apache_beam/runners/worker/operations.py", line 804, in > apache_beam.runners.worker.operations.SdfProcessSizedElements.try_split > E split = self.dofn_runner.try_split(fraction_of_remainder) > E File "apache_beam/runners/common.py", line 973, in > apache_beam.runners.common.DoFnRunner.try_split > E return self.do_fn_invoker.try_split(fraction) > E File "apache_beam/runners/common.py", line 839, in > apache_beam.runners.common.PerWindowInvoker.try_split > E self.threadsafe_watermark_estimator.current_watermark()) > E AttributeError: 'NoneType' object has no attribute > 'current_watermark' > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)