Hi James,
The flight server is written in python. Here is the below code snippet.
We have overridden the *do_get* method only
getStream() and getFlightInfo() is left untouched / default implementation.
def do_get(self, context, ticket):
"""
Handles client requests for data. The ticket will contain:
- access_key: S3 access key
- secret_key: S3 secret key
- s3_path: Full S3 path (e.g., bucket_name/object_key)
- mode: 'batch' (for batch streaming) or 'full' (for loading
the entire dataset)
"""
try:
# Parse the ticket to extract credentials, S3 path, and mode
access_key, secret_key, s3_path, mode, batch_size = parse_ticket(ticket)
except InvalidTicketFormatError as e:
logging.error(str(e))
raise
except InvalidModeError as e:
logging.error(str(e))
raise
# s3fs dont need s3a protocol.
if s3_path.startswith("s3://"):
s3_path = s3_path.replace("s3://", "", 1)
logging.info(f"Cloudian S3 Override endpoint:
{Config.S3_ENDPOINT_OVERRIDE}")
logging.info(f"Cloudian S3 Region: {Config.S3_REGION}")
logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: {mode}")
# Initialize the S3 handler with credentials
try:
s3_handler = S3Handler(
endpoint=Config.S3_ENDPOINT_OVERRIDE,
region=Config.S3_REGION,
access_key=access_key,
secret_key=secret_key
)
except Exception as e:
logging.error(f"Error initializing S3 handler: {str(e)}")
raise S3AccessError(f"Error initializing S3 handler: {str(e)}") from e
if mode == DataRetrievalMode.BATCH:
try:
# Use the get_parquet_data method for both wildcard and
non-wildcard cases
parquet_data = s3_handler.get_parquet_data(s3_path)
# parquet_data.schema: This is used when parquet_data is
an instance of ds.Dataset
# (i.e., when multiple Parquet files are being processed
as a dataset).
#
# parquet_data.schema_arrow: This is used when
parquet_data is an instance of pq (pyarrow.parquet) module.
# A single Parquet file has its own schema, accessible
via schema_arrow in PyArrow
schema = parquet_data.schema if isinstance(parquet_data,
ds.Dataset) else parquet_data.schema_arrow
return flight.GeneratorStream(schema,
s3_handler.stream_parquet_batches(parquet_data, batch_size))
except OSError as e:
logging.error(f"AWS S3 access error: {str(e)}")
raise S3AccessError(f"Failed to access S3: {str(e)}") from e
except Exception as e:
logging.error(f"Error streaming Parquet data: {str(e)}")
raise DataProcessingError(f"Error streaming Parquet data:
{str(e)}") from e
# Handle 'full' mode to load the entire dataset
elif mode == DataRetrievalMode.FULL:
try:
# Check if the S3 path contains a wildcard and the mode is FULL
if "*" in s3_path:
logging.warning(
f"Wildcard pattern detected in S3 path '{s3_path}'
with FULL data retrieval mode. "
f"This may put pressure on memory as all files
will be loaded into memory at once."
)
# Use the same get_parquet_data method for both wildcard
and non-wildcard cases
parquet_data = s3_handler.get_parquet_data(s3_path)
# Load the entire dataset into memory / Chance of OOM.
# table = parquet_data.to_table() if
isinstance(parquet_data, ds.Dataset) else parquet_data.read_table()
# Load the entire dataset into memory, with consideration
for Dataset vs. ParquetFile
if isinstance(parquet_data, ds.Dataset):
table = parquet_data.to_table()
else:
table = parquet_data.read()
return flight.RecordBatchStream(table)
except OSError as e:
logging.error(f"AWS S3 access error: {str(e)}")
raise S3AccessError(f"Failed to access S3: {str(e)}") from e
except Exception as e:
logging.error(f"Error loading full Parquet dataset: {str(e)}")
raise DataProcessingError(f"Error loading full Parquet
dataset: {str(e)}") from e
else:
logging.error(f"Invalid mode:
{DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.")
raise InvalidModeError()
// Helper functions.
def get_parquet_data(self, s3_path):
"""
Retrieves Parquet data from S3. If the path contains a
wildcard pattern, it lists all matching files manually.
If it's a single file, it reads the file directly.
:param s3_path: The S3 path, which could be a wildcard pattern
or a direct file path.
:return: PyArrow Dataset object if it's a wildcard, or a
ParquetFile object for a single file.
"""
try:
# Check if the path contains a wildcard
if "*" in s3_path:
# Split the directory and pattern (e.g., `*.parquet`)
directory, pattern = s3_path.rsplit("/", 1)
# List all files in the directory and filter using the pattern
logging.info(f"Fetching Parquet files matching wildcard: {s3_path}")
files = self.s3_fs.get_file_info(fs.FileSelector(directory))
# Filter files matching the pattern (e.g., *.parquet) and
sort them by modification time (mtime_ns)
sorted_file_paths = [file.path for file in sorted(files,
key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path,
f"{directory}/{pattern}")]
if not sorted_file_paths:
raise FileNotFoundError(f"No files matching pattern
{pattern} found in {directory}")
logging.info(f"Sorted files: {sorted_file_paths}")
# Validate schemas across all files
if not validate_schemas(sorted_file_paths, self.s3_fs):
raise ValueError("Schema mismatch detected across files.")
# Create a dataset from the matching files
dataset = ds.dataset(sorted_file_paths, format="parquet",
filesystem=self.s3_fs)
return dataset
else:
# Handle single file case: read the specific Parquet file
logging.info(f"Fetching single Parquet file: {s3_path}")
parquet_file = pq.ParquetFile(self.s3_fs.open_input_file(s3_path))
return parquet_file
except Exception as e:
logging.error(f"Error fetching Parquet data from S3: {e}")
raise e
@staticmethod
def stream_parquet_batches(parquet_data, batch_size=None):
"""
Stream the Parquet data in batches. Supports both datasets
(multiple files) and single Parquet files.
:param parquet_data: The Dataset or ParquetFile object to
stream data from.
:param batch_size: The size of the batches to stream. Default
is 100,000 if not provided.
:return: Generator for streaming Parquet batches.
"""
try:
# Ensure batch_size is an integer, set default if None
if batch_size is None or not isinstance(batch_size, int):
batch_size = 100000
if isinstance(parquet_data, ds.Dataset):
# If it's a dataset (multiple files), stream dataset
batches using `int_batch_size`
logging.info(f"Streaming Parquet data in batches from a dataset")
for batch in parquet_data.to_batches(batch_size=batch_size):
yield batch
else:
# If it's a single file (ParquetFile), stream file batches
(iter_batches)
logging.info(f"Streaming Parquet data in batches from a
single file")
for batch in parquet_data.iter_batches(batch_size=batch_size):
yield batch
except Exception as e:
logging.error(f"Error streaming Parquet batches: {e}")
raise e
On Wed, Apr 30, 2025 at 11:14 PM James Duong
<[email protected]> wrote:
> Would you be able to share the server’s getStream() and getFlightInfo()
> implementations?
>
> Note that getStream() needs should be written such that it doesn’t block
> the grpc thread.
>
>
> Get Outlook for Mac <https://aka.ms/GetOutlookForMac>
>
> From: Susmit Sarkar <[email protected]>
> Date: Wednesday, April 30, 2025 at 2:59 AM
> To: David Li <[email protected]>
> Cc: [email protected] <[email protected]>, [email protected] <
> [email protected]>
> Subject: Re: Query on stuck Arrow Flight Client while interacting from
> local workstation (mac)
>
> Hi David
>
> Sharing the arrow client thread dump for reference. Strangely if we pass a
> dummy non existent s3 path we are getting proper error from server
>
> cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2]
> Path does not exist
> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'.
> Detail: [errno 2] No such file or directory
>
> Which translates the server is reachable and we do see the logs in server
> as well
>
> It works fine if we call the client within a VM issue arises in local
> workstation, where its stuck indefinitely.
>
> Thanks,
> Susmit
>
> On Wed, Apr 30, 2025 at 12:54 PM David Li <[email protected]<mailto:
> [email protected]>> wrote:
> This is not specific to Flight; use jstack or your favorite
> instrumentation tool (VisualVM etc.)
>
> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote:
> > Hi David,
> >
> > How to enable thread dump logs for both client and server code.
> >
> > As of now, I don't see any error on either client side or server side. It
> > just hangs/gets stuck.
> >
> > Thanks,
> > Nikhil
> >
> > On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <[email protected]
> <mailto:[email protected]>> wrote:
> >
> >> Hi Team,
> >>
> >> We are using this below code snippet in Scala to query the flight
> server,
> >> but seems to be stuck indefinitely, this issue is seen when we are
> testing
> >> from our local workstation (Mac to be precise)
> >>
> >> Another interesting thing, it's able to propagate the error message
> >> correctly but not the FlightStream data, the same code works fine when
> we
> >> run inside a linux VM.
> >>
> >> Do you folks see any issue in the code?
> >>
> >> def fetchDataStreamIterator(details: BaseDataAccessDetails):
> Iterator[FlightStream] = {
> >> logger.info<http://logger.info>(s"Fetching data for details:
> ${details.toString}")
> >> val ticketStr = buildTicketStr(details)
> >> logger.info<http://logger.info>(s"Generated ticket string:
> $ticketStr")
> >>
> >> val allocator = new RootAllocator(Long.MaxValue)
> >> val client = FlightClient.builder(allocator,
> Location.forGrpcInsecure(serverHost, serverPort)).build()
> >>
> >> try {
> >> val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
> >> val stream = client.getStream(ticket)
> >>
> >> Iterator.continually {
> >> if (stream.next()) Some(stream) else {
> >> // Cleanup when no more batches
> >> close(stream, client)
> >> None
> >> }
> >> }.takeWhile(_.isDefined).flatten
> >> } catch {
> >> case e: FlightRuntimeException =>
> >> logger.error(s"Error communicating with Flight server:
> ${e.getMessage}")
> >> throw new CefFlightServerException(s"Error communicating with
> Flight server: ${e.getMessage}", e)
> >> case e: Exception =>
> >> logger.error(s"Failed to fetch data: ${e.getMessage}")
> >> throw new CefFlightServerException("Failed to fetch data from
> Flight Server", e)
> >> }
> >> }
> >>
> >>
> >> Thanks,
> >>
> >> Susmit
> >>
> >>
>
> Warning: The sender of this message could not be validated and may not be
> the actual sender.
>