LantaoJin opened a new pull request, #85:
URL: https://github.com/apache/datafusion-java/pull/85

   ## Which issue does this PR close?
   
   - Closes #82 .
   
   ## Rationale for this change
   
   Multi-tenant DataFusion deployments need two operational signals that the 
Java binding currently does not expose:
   
   1. **Per-session memory.** `SessionContextBuilder.memoryLimit(...)` (PR #28) 
caps the global pool, but if a tenant blows past their fair-share allocation 
there is no way to attribute the bytes back to a session. Without per-session 
attribution, fair-share scheduling, abuse detection, and OOM root-causing all 
fall back to runtime restart.
   
   2. **Tokio runtime stats.** The JNI library drives a single shared 
multi-threaded Tokio runtime in `lib.rs`. Embedders that surface node-level 
health -- e.g. an OpenSearch `_nodes/stats` endpoint -- need worker count, busy 
time, queue depth, etc. Today they have to hand-roll a parallel native bridge.
   
   Both share an FFI snapshot pattern: read a small struct of counters across 
the boundary on demand. They are bundled here so the design conversation only 
happens once.
   
   ## What changes are included in this PR?
   
   Two new accessors on `SessionContext`, two new immutable POJOs:
   
   ```java
   public final class MemoryUsage { long currentBytes(); long peakBytes(); }
   public final class RuntimeStats {
     int  numWorkers();
     long liveTasksCount(), globalQueueDepth(),
          elapsedNanos(), totalBusyNanos(),
          totalParkCount(), totalPollsCount(), totalNoopCount(),
          totalStealCount(), totalLocalScheduleCount(), totalOverflowCount();
   }
   
   ctx.memoryUsage();   // always-on; thread-safe; pollable while queries run
   ctx.runtimeStats();  // requires `runtime-metrics` Cargo feature
   ```
   
   ### Per-session memory tracking
   
   `native/src/memory.rs` introduces `TrackingMemoryPool`, a thin wrapper 
around any `Arc<dyn MemoryPool>` that intercepts `grow`/`try_grow`/`shrink` to 
maintain two `AtomicU64` counters: total bytes currently held and the peak 
observed since session creation. Pool semantics (limits, eviction, spilling) 
are unchanged because `try_grow` still defers to the inner pool.
   
   The wrapper is layered on automatically by both `createSessionContext` and 
`createSessionContextWithOptions` -- callers don't opt in. If 
`SessionContextBuilder.memoryLimit(...)` configured a `GreedyMemoryPool` or 
`TrackConsumersPool`, the tracker wraps that. If it didn't, the tracker wraps 
DataFusion's default `UnboundedMemoryPool`.
   
   Java callers can't downcast `Arc<dyn MemoryPool>` back to the concrete 
tracker type (the trait does not require `Any`), so a process-wide 
`Mutex<HashMap<jlong, Arc<TrackingMemoryPool>>>` keyed by the JNI handle gives 
the snapshot path a way to find the right tracker. Inserted at session create, 
drained at session close; no extra failure modes.
   
   **Per-session, not per-DataFrame.** A cross-engine survey (pandas / Polars / 
Spark / DuckDB / DataFusion-Rust + Python) confirmed that no engine ships 
per-DataFrame in-flight memory accounting. What pandas/Polars expose as 
`memory_usage` / `estimated_size` is data-at-rest sizing of materialised 
columns -- a different feature. Multi-tenant attribution in DataFusion is 
conventionally one session per tenant, which matches the OpenSearch prior art 
(`QueryMemoryPool` keyed off `context_id`). Per-DataFrame attribution would 
need a side-channel registry hooked into operator-time consumer creation; not 
blocked by this PR, can land later if requested.
   
   ### Tokio runtime metrics
   
   `native/src/runtime_metrics.rs` is gated behind a default-off 
`runtime-metrics` Cargo feature because `tokio-metrics` requires `--cfg 
tokio_unstable` at build time. `tokio_metrics::RuntimeMonitor::intervals()` is 
a delta iterator -- each `next()` returns metrics covering the period since the 
previous call -- so the module owns a single process-wide `RuntimeAccumulator` 
that maintains running totals for documented-monotonic fields. Snapshot 
(point-in-time) fields (`workers_count`, `live_tasks_count`, 
`global_queue_depth`) pass through without accumulation.
   
   ```toml
   [features]
   runtime-metrics = ["dep:tokio-metrics"]
   
   [dependencies]
   tokio-metrics = { version = "0.5", optional = true }
   ```
   
   Build matrix:
   
   | invocation | runtime-metrics | build prereqs |
   |---|---|---|
   | `cargo build` (default) | off (stub handler) | none |
   | `RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics` 
| on | `--cfg tokio_unstable` |
   
   The Java surface is unchanged either way -- `SessionContext.runtimeStats()` 
is always present; calls just throw a clear "datafusion-jni was built without 
the `runtime-metrics` Cargo feature; rebuild the native crate with 
`RUSTFLAGS=\"--cfg tokio_unstable\" cargo build --features runtime-metrics`" 
error from the JVM if the feature was compiled off. 
`SessionContextRuntimeStatsTest` detects this case and skips itself via JUnit's 
`Assumptions.assumeFalse(...)`, so `make test` stays green either way.
   
   A new `make native-runtime-metrics` target makes the opt-in build a 
one-liner.
   
   This is intentionally similar to PR #75's `substrait` feature handling: a 
heavy / build-prereq-bearing dependency stays out of the default build, the 
Java surface is unchanged, and a feature-off compile substitutes a stub handler 
that throws clearly.
   
   ## Are these changes tested?
   
   Yes -- 9 new tests across `SessionContextMemoryUsageTest` and 
`SessionContextRuntimeStatsTest`.
   
   ## Are there any user-facing changes?
   
   Yes -- purely additive. New public API:
   
   - `org.apache.datafusion.MemoryUsage` (immutable value class)
   - `org.apache.datafusion.RuntimeStats` (immutable value class)
   - `SessionContext.memoryUsage() -> MemoryUsage`
   - `SessionContext.runtimeStats() -> RuntimeStats`
   
   No API removals, no deprecations, no behavior change for existing callers. 
The default `cargo build` does **not** pull in `tokio-metrics` and adds no new 
build prerequisites. `SessionContext.memoryUsage()` is always available; 
`runtimeStats()` is present but throws "feature not enabled" at runtime unless 
rebuilt with the feature.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to