Valentyn Tymofieiev created BEAM-9527:
-----------------------------------------

             Summary: 
apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf
 is flaky
                 Key: BEAM-9527
                 URL: https://issues.apache.org/jira/browse/BEAM-9527
             Project: Beam
          Issue Type: Bug
          Components: test-failures
            Reporter: Valentyn Tymofieiev


{noformat}
self = <apache_beam.runners.portability.fn_api_runner.BundleManager object at 
0x7fe494edb450>
split_manager = <function split_manager at 0x7fe4c2ff0c80>
inputs = {'ref_PCollection_PCollection_3_split/Read': 
['\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x08V\xff\x80\x02capache_beam....\nOffsetRange\nq\x01)\x81q\x02}q\x03(U\x04stopq\x04K\x05U\x05startq\x05K\x00ub.\x01\x00@\x14\x00\x00\x00\x00\x00\x00']}
process_bundle_id = 'bundle_2575'

    def _generate_splits_for_testing(self,
                                     split_manager,
                                     inputs,  # type: Mapping[str, 
PartitionableBuffer]
                                     process_bundle_id):
      # type: (...) -> List[beam_fn_api_pb2.ProcessBundleSplitResponse]
      split_results = []  # type: 
List[beam_fn_api_pb2.ProcessBundleSplitResponse]
      read_transform_id, buffer_data = only_element(inputs.items())
      byte_stream = b''.join(buffer_data)
      num_elements = len(
          list(
              self._get_input_coder_impl(read_transform_id).decode_all(
                  byte_stream)))
    
      # Start the split manager in case it wants to set any breakpoints.
      split_manager_generator = split_manager(num_elements)
      try:
        split_fraction = next(split_manager_generator)
        done = False
      except StopIteration:
        done = True
    
      # Send all the data.
      self._send_input_to_worker(
          process_bundle_id, read_transform_id, [byte_stream])
    
      assert self._worker_handler is not None
    
      # Execute the requested splits.
      while not done:
        if split_fraction is None:
          split_result = None
        else:
          split_request = beam_fn_api_pb2.InstructionRequest(
              process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
                  instruction_id=process_bundle_id,
                  desired_splits={
                      read_transform_id: beam_fn_api_pb2.
                      ProcessBundleSplitRequest.DesiredSplit(
                          fraction_of_remainder=split_fraction,
                          estimated_input_elements=num_elements)
                  }))
          split_response = self._worker_handler.control_conn.push(
              split_request).get()  # type: beam_fn_api_pb2.InstructionResponse
          for t in (0.05, 0.1, 0.2):
            waiting = ('Instruction not running', 'not yet scheduled')
            if any(msg in split_response.error for msg in waiting):
              time.sleep(t)
              split_response = self._worker_handler.control_conn.push(
                  split_request).get()
          if 'Unknown process bundle' in split_response.error:
            # It may have finished too fast.
            split_result = None
          elif split_response.error:
>           raise RuntimeError(split_response.error)
E           RuntimeError: Traceback (most recent call last):
E             File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 190, in _execute
E               response = task()
E             File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 229, in <lambda>
E               lambda: self.create_worker().do_instruction(request), request)
E             File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 416, in do_instruction
E               getattr(request, request_type), request.instruction_id)
E             File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 479, in process_bundle_split
E               process_bundle_split=processor.try_split(request))
E             File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 882, in try_split
E               desired_split.estimated_input_elements)
E             File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 250, in try_split
E               keep_of_element_remainder
E             File "apache_beam/runners/worker/operations.py", line 202, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.try_split
E               return self.consumer.try_split(fraction_of_remainder)
E             File "apache_beam/runners/worker/operations.py", line 804, in 
apache_beam.runners.worker.operations.SdfProcessSizedElements.try_split
E               split = self.dofn_runner.try_split(fraction_of_remainder)
E             File "apache_beam/runners/common.py", line 973, in 
apache_beam.runners.common.DoFnRunner.try_split
E               return self.do_fn_invoker.try_split(fraction)
E             File "apache_beam/runners/common.py", line 839, in 
apache_beam.runners.common.PerWindowInvoker.try_split
E               self.threadsafe_watermark_estimator.current_watermark())
E           AttributeError: 'NoneType' object has no attribute 
'current_watermark'
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to