SteNicholas opened a new pull request, #3664:
URL: https://github.com/apache/celeborn/pull/3664

   ### What changes were proposed in this pull request?
   
   Introduce a filter for Java deserialization `JavaDeserializerFilter` to 
prevent deserialization attacks of CWE-502.
   
   ### Why are the changes needed?
   
   Apache Celeborn's internal RPC transport layer deserializes arbitrary Java 
objects from unauthenticated network connections using `ObjectInputStream` with 
no class filtering (`ObjectInputFilter` / JEP 290). An attacker who can reach 
the Celeborn Master (default port 9097) or any Worker RPC port over the network 
can achieve Remote Code Execution without any credentials.
   
   While Celeborn is infrastructure software, not directly exposed to end 
users, this report demonstrates that the vulnerability is exploitable in 
realistic production deployments by tenants, compromised workloads, or any 
network-adjacent actor. The standard deployment model for Celeborn (shared 
shuffle service for Spark/Flink clusters) places it on the same network as 
user-submitted workloads, making exploitation practical.
   
   Celeborn's RPC protocol supports two serialization formats, selected by the 
first byte of the message body:
   
   - V2 (0xFF prefix): Protocol Buffers (`TransportMessage`) - safe
   - V1 (any other prefix): Java native `ObjectInputStream` - vulnerable
   
   Since Java serialization streams begin with magic bytes `0xAC 0xED`, sending 
a standard Java serialized object automatically triggers the V1 code path.
   
   The deserialization in `JavaDeserializationStream` 
(`JavaSerializer.scala:62-79`) creates an `ObjectInputStream` that:
   
   1. Has **no `ObjectInputFilter`** (JEP 290 filtering)
   2. Overrides `resolveClass` only for classloading, not for security filtering
   3. Calls `readObject()` directly on untrusted network data
   
   ```scala
   // JavaSerializer.scala lines 62-79
   private[celeborn] class JavaDeserializationStream(in: InputStream, loader: 
ClassLoader)
     extends DeserializationStream {
     private val objIn = new ObjectInputStream(in) {
       override def resolveClass(desc: ObjectStreamClass): Class[_] =
         try {
           Class.forName(desc.getName, false, loader)  // NO FILTERING
         } catch {
           case e: ClassNotFoundException =>
             
JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e)
         }
     }
     def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T]  // 
UNRESTRICTED
   }
   ```
   
   A common response to deserialization vulnerabilities in infrastructure 
software is: "This only affects internal services; end users can't reach the 
port." and I could agree, but in this case, Celeborn is a remote shuffle 
service deployed as shared infrastructure for Apache Spark and Apache Flink 
clusters and by design, Spark/Flink executors must have direct TCP connectivity 
to Celeborn Master and Workers. This is not optional it is how Celeborn 
functions. The `ShuffleClientImpl` in each executor opens TCP connections to 
Celeborn's RPC ports to register shuffle partitions, push data, and pull 
results.
   
   A possible scenario could be Malicious Tenant in a Multi-Tenant Cluster
   
   - Organizations commonly run shared Spark/Flink clusters where multiple 
teams or tenants submit jobs. Services like Databricks, AWS EMR, Google 
Dataproc, and on-premise Hadoop/YARN clusters all support this model. Celeborn 
is deployed as the shared shuffle service.
   
   Attack:
   
   1. A tenant submits a Spark job containing malicious code. This is by 
design, Spark jobs execute arbitrary user code (UDFs, custom transformers, etc.)
   2. The malicious code runs inside a Spark executor, which has network access 
to Celeborn (required for shuffle operations)
   3. The executor opens a raw TCP socket to `celeborn-master:9097`
   4. It sends a crafted RPC_REQUEST frame containing a Java deserialization 
payload
   5. Celeborn Master deserializes the payload via 
`ObjectInputStream.readObject()` with no filtering
   6. Arbitrary code executes as the Celeborn Master process (typically root or 
a privileged service account)
   
   ```java
   // This code runs inside a Spark UDF completely normal for tenant workloads
   // The executor already has network access to Celeborn
   Socket sock = new Socket("celeborn-master", 9097);
   sock.getOutputStream().write(craftedRpcFrame);
   // Celeborn Master now executes attacker's code
   ```
   
   Impact: The attacker escalates from tenant-level access (running code in 
their own executor) to cluster-infrastructure-level access (running code on the 
Celeborn Master). From the Master, they can:
   
   - Access all shuffle data from all tenants (data exfiltration)
   - Compromise all Celeborn Workers
   - Pivot to other infrastructure services on the same network
   - Persist access via cron jobs, SSH keys, or backdoored binaries
   
   Another scenario could be Network-Adjacent Attacker
   
   In on-premise data center deployments, Celeborn runs on the same network 
segment as the Hadoop/YARN cluster.
   
   Attack:
   
   1. An attacker gains access to any machine on the data center network (e.g., 
via phishing, VPN compromise, or a compromised dev workstation)
   2. They scan for open port 9097 and find the Celeborn Master
   3. They send the deserialization payload, supposing they have credentials.
   4. Code executes on the Celeborn Master node
   
   Even if `celeborn.auth.enabled=true` is set, the deserialization 
vulnerability remains exploitable by authenticated clients:
   - SASL QOP uses `QOP_AUTH = "auth"` (authentication only, no message 
integrity or confidentiality) 
   - `AbstractAuthRpcHandler.receive()` delegates directly to `NettyRpcHandler` 
with no message filtering 
   - Zero use of `ObjectInputFilter` in the entire codebase  deserialization is 
unfiltered regardless of auth state 
   - Any authenticated Spark/Flink application can craft a V1 payload and 
achieve RCE 
   
   Running a docker instance with Celeborn 0.6.2 I was able to trigger the 
deserialization
   
   Sends a serialized `java.lang.String` object to confirm the deserialization 
code path is reached.
   
   ```bash
   javac CelebornExploit.java
   java -cp . CelebornExploit localhost 9097 --probe
   ```
   
   **Result:**
   ```
   [*] Probe mode: sending minimal serialized String object
   [*] Target: localhost:9097
   [*] Payload size: 27 bytes
   [*] Total frame size: 69 bytes
   
   [+] RPC_REQUEST frame sent
   [+] Received 21 bytes response
   [*] Response frame: msgSize=12, msgType=4
   [+] Server returned RPC_RESPONSE (success)
   [+] VULNERABILITY CONFIRMED
   ```
   
   The server returned `RPC_RESPONSE` (success), confirming it deserialized the 
attacker-controlled `String` object without any class filtering.
   
   This could be also rejected, but for sure it's not a good practice to 
deserialize without any filters, even if this was only an infrastructure block 
in the architecture.
   
   Therefore, a filter for Java deserialization is required to prevent 
deserialization attacks (CWE-502).
   
   ### Does this PR resolve a correctness bug?
   
   No.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   `JavaDeserializerFilterSuiteJ`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to