shunping opened a new pull request, #39183:
URL: https://github.com/apache/beam/pull/39183

   Failed run:
   https://github.com/apache/beam/runs/84438206754
   
   Traceback:
   ```
   self = <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest 
testMethod=test_batch_write_with_ordering_key>
   
       @pytest.mark.it_postcommit
       def test_batch_write_with_ordering_key(self):
         """Test WriteToPubSub in batch mode with ordering keys.
       
     ...
           # Retry pulling to handle PubSub delivery delays
           received_messages = []
           received_message_ids = set()
           ack_ids = []
           deadline = time.time() + 60  # wait up to 60 seconds
           while time.time() < deadline:
             response = self.sub_client.pull(
                 request={
                     'subscription': ordering_sub.name,
                     'max_messages': 10,
                 })
             for msg in response.received_messages:
               ack_ids.append(msg.ack_id)
               # Pub/Sub guarantees at-least-once delivery, so we must 
deduplicate
               # messages by message_id to handle potential duplicate 
deliveries.
               if msg.message.message_id not in received_message_ids:
                 received_message_ids.add(msg.message.message_id)
                 received_messages.append(msg)
             if len(received_messages) >= len(test_messages):
               break
             time.sleep(5)
       
   >       self.assertEqual(len(received_messages), len(test_messages))
   E       AssertionError: 2 != 3
   
   apache_beam/io/gcp/pubsub_integration_test.py:385: AssertionError
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to