Hi Team,
We are seeing the issue often with Memory Leak:
*JDK 11*
"org.apache.arrow" % "arrow-flight" % "17.0.0",
"org.apache.arrow" % "arrow-vector" % "17.0.0",
"org.apache.arrow" % "flight-core" % "17.0.0",
4-10-25 15:25:06.394 [main] ERROR o.apache.arrow.memory.BaseAllocator -
Memory was leaked by query. Memory leaked: (10485760)
Allocator(flight-client) 0/10485760/10485760/9223372036854775807
(res/actual/peak/limit) 2024-10-25 15:25:06.395 [main] ERROR
o.apache.arrow.memory.BaseAllocator - Memory was leaked by query. Memory
leaked: (10485760) Outstanding child allocators : Allocator(flight-client)
0/10485760/10485760/9223372036854775807 (res/actual/peak/limit)
Allocator(ROOT) 0/10485760/10485760/9223372036854775807
(res/actual/peak/limit) 2024-10-25 15:25:06.396 [main] ERROR
c.t.c.d.ArrowFlightDataFetcher - Failed to fetch data: Memory was leaked by
query. Memory leaked: (10485760) Allocator(flight-client)
0/10485760/10485760/9223372036854775807 (res/actual/peak/limit) Exception
in thread "main" com.tesco.cef.exceptions.CefFlightServerException: Failed
to fetch data from Flight Server at
com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:47)
at
com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:41)
at scala.util.Failure.recover(Try.scala:233) at
com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:47)
at
com.tesco.cef.client.CefFlightDataClient.getStream(CefFlightDataClient.scala:27)
at com.tesco.cef.samples.client.Main$.processStream(Main.scala:120) at
com.tesco.cef.samples.client.Main$.main(Main.scala:60) at
com.tesco.cef.samples.client.Main.main(Main.scala) Caused by:
java.lang.IllegalStateException: Memory was leaked by query. Memory leaked:
(10485760) Allocator(flight-client) 0/10485760/10485760/9223372036854775807
(res/actual/peak/limit) at
org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:501) at
org.apache.arrow.flight.FlightClient.close(FlightClient.java:754) at
scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:392)
at
scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:391)
at scala.util.Using$.$anonfun$apply$1(Using.scala:268) at
scala.util.Using$.apply(Using.scala:113) at
com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream$$anonfun$2(ArrowFlightDataFetcher.scala:40)
at scala.util.Using$.$anonfun$apply$1(Using.scala:262) at
scala.util.Using$.apply(Using.scala:113) at
com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:41)
... 4 more Suppressed: java.lang.IllegalStateException: Memory was leaked
by query. Memory leaked: (10485760)
def fetchDataStream(details: ObjectStoreDetails): Iterator[FlightStream] = {
logger.info(s"Fetching data for S3 path: ${details.s3Path}")
val ticketStr = buildTicketStr(details)
logger.info(s"Generated ticket string: $ticketStr")
val allocator = new RootAllocator(Long.MaxValue)
val client = FlightClient.builder(allocator,
Location.forGrpcInsecure(serverHost, serverPort)).build()
val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
// Using Try for proper resource cleanup
try {
val stream = client.getStream(ticket)
// Collect and return an iterator for each batch
Iterator.continually(stream).takeWhile(_.next())
} catch {
case e: FlightRuntimeException =>
logger.error(s"Error communicating with Flight server: ${e.getMessage}")
throw new CefFlightServerException(s"Error communicating with
Flight server: ${e.getMessage}", e)
case e: Exception =>
logger.error(s"Failed to fetch data: ${e.getMessage}")
throw new CefFlightServerException("Failed to fetch data from
Flight Server", e)
} finally {
if (client != null) client.close()
if (allocator != null) allocator.close()
}
}
Sharing the above code snippet for reference
Are we doing anything wrong here? how to avoid memory spillage issue?
Thanks,
Susmit