Hi Team,
We are writing polyglot sdk wrappers on top of Flight Client, with
Scala/Java, I don't see any issues, but with pyarrow, it's showing sporadic
behavior.
The calls are getting stuck in between after iterating few loops,
this happens specifically with pyarrow flight rpc
Requesting your help
Thanks,
Susmit
Sharing the snippet below
def process_stream(client, object_store_details):
"""
Fetches the data stream from the Flight server and displays row
counts and top 10 rows.
:param client: CefFlightDataClient instance to interact with the
Flight server.
:param object_store_details: ObjectStoreDetails instance
containing the access and retrieval details.
"""
for batch in client.get_stream_iterator(object_store_details):
print("Number of rows:", batch.num_rows)
# Optionally convert to pandas for easier handling
df = batch.to_pandas()
print("Top 10 rows:\n", df.head(10))
def get_stream_iterator(self, object_store_details: ObjectStoreDetails):
ticket_str = build_ticket_str(object_store_details)
ticket = flight.Ticket(ticket_str.encode('utf-8'))
client = None
try:
client = self._connect_to_flight_server() # Initialize a new client
data_stream = client.do_get(ticket, FlightCallOptions(timeout=10))
for chunk in data_stream:
yield chunk.data
except Exception as e:
logging.error(f"Error processing stream: {e}")
raise CefFlightException("Error processing stream", e)
finally:
if client:
client.close()
logging.info("Flight client closed after streaming.")