Hi dev@,
I’d like to propose adding at-rest encryption for shuffle data in Celeborn
and would appreciate the community’s input before writing a full
implementation.
cURRENT gap
Celeborn encrypts data in transit (TLS, SASL) but not at rest. When a
worker flushes shuffle data to local disk, HDFS, S3, or OSS, the bytes land
as plaintext.
The only write site for local disk is LocalFlushTask.flush() in
FlushTask.scala (L66, L71 at commit a56f69a), which calls
fileChannel.write(buffer) with no cipher transform. The tiered-storage
paths (HdfsFlushTask, S3FlushTask, OssFlushTask) are the same — raw bytes
to the underlying store.
Verified with:
grep -rnE 'cipher|\.encrypt|aes|envelope' worker/src/main/
grep -rn 'javax\.crypto' worker/src/main/
(both zero matches)
This matters because spark.io.encryption.enabled does *not* cover the
Celeborn path. When Celeborn’s ShuffleManager replaces Spark’s shuffle
writer, Spark’s encryption key is never consulted — confirmed by grepping
client-spark/ for IOEncryptionKey (zero matches).
Teams adopting Celeborn for performance silently lose shuffle-encryption
guarantees their compliance posture may assume.
Who Needs This
- Regulated industries (healthcare, finance, public sector) whose
auditors require application-layer encryption independent of disk/volume
encryption.
- Multi-tenant platforms needing cryptographic isolation between tenants
on shared workers.
- Teams using object-store tiering who want encryption before offload.
Proposed Approach (High Level)
1. A *StreamCipher SPI* in common/ for wrapping WritableByteChannel /
ReadableByteChannel with encrypt/decrypt. No KMS SDK in core.
2. A *KeyService SPI* for envelope encryption — generate/unwrap DEKs
using a KMS-held KEK. Implementations live in separate optional modules (
aws-kms, gcp-kms, azure-kv, vault, static for dev/PoC).
3. Wire into the worker write path: LocalFlushTask wraps fileChannel
with StreamCipher.wrapForWrite(). Same for HDFS/S3/OSS flush tasks.
4. Wire into the reader path: LocalPartitionDataReader detects a 16-byte
encrypted-file header, unwraps the DEK (cached per worker+shuffle), wraps
the channel with StreamCipher.wrapForRead().
5. Opt-in via celeborn.shuffle.io.encryption.enabled=true. Default off.
Unencrypted deployments are byte-identical to today, zero overhead.
6. Per-shuffle DEKs by default (one KMS call per shuffle reservation,
amortized). Per-application DEK scope as an option.
Interaction with Recent Work
CELEBORN-2301 (commit 95419e1) recently landed enhanced zero-copy sendfile
for FileRegion on native transports — a nice throughput win for the fetch
path.
Encryption and sendfile are fundamentally incompatible: sendfile(2) cannot
transform bytes, so encrypted partitions must use a buffered read path.
This is only relevant for encrypted workloads; unencrypted workloads on the
same cluster keep the full CELEBORN-2301 benefit. Per-application
encryption flags (not per-cluster) would let encrypted and unencrypted apps
coexist without regressing the latter.
Questions for the Community
Trimming to three since these are the ones I’d need opinions on before
writing code. Happy to take the rest up in follow-ups.
- Any prior design work or internal discussion on this topic I should
know about before proceeding?
- *Per-shuffle vs. per-application DEK scope* as the default?
Per-shuffle gives smaller blast radius and simpler lifecycle;
per-application amortizes KMS round-trips and is friendlier for
long-running jobs.
- *Key distribution path:* wrapped DEKs flow through Master metadata
(simpler, one KMS-aware role) vs. workers unwrap directly from KMS (removes
Master from the key path, but every worker needs KMS credentials).
Preference?
Tracking
JIRA: CELEBORN-2311 <https://issues.apache.org/jira/browse/CELEBORN-2311>
I have a detailed design document with source citations, threat model,
performance analysis, and phased implementation plan. Happy to share
on-list or off-list if there’s interest.
- Karthik