[
https://issues.apache.org/jira/browse/IGNITE-28848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anton Vinogradov updated IGNITE-28848:
--------------------------------------
Description:
{\{ZookeeperDiscoveryImpl.marshalZip()}} sits on the two busiest control-plane
paths of ZooKeeper discovery:
* the coordinator re-marshals and rewrites the whole \{{ZkDiscoveryEventsData}}
to ZooKeeper on *every* discovery event — node joins/failures and every custom
discovery message (cache create/destroy, exchange, services, snapshots, ...);
* join data of each joining node and the coordinator's data-for-joined are
marshalled on every join — megabytes in clusters with hundreds of caches (large
enough to be split across znodes by \{{jute.maxbuffer}}).
The current implementation materializes the whole uncompressed marshalled form
and only then compresses it, allocating ~6x the uncompressed size in garbage
per call and holding ~3x of it at peak. For an N-MB payload that is ~6N MB of
garbage and a ~3N MB heap spike *per call* — and a mass join, a client
reconnect storm or a rolling restart fires this repeatedly, so GC pressure and
heap spikes hit the coordinator exactly when it is busiest, and the resulting
GC pauses land on the discovery critical path (events are processed
sequentially by a single worker, so the whole cluster sees slower event
propagation). On top of that \{{Deflater.end()}} is never called, so zlib
native memory is reclaimed only by the Cleaner after a GC.
This change makes the compression path streaming: allocations drop by 73–82% on
realistic payloads, peak memory is bounded by the compressed output instead of
~3x the uncompressed size, and zlib native memory is released
deterministically. Per-call time stays on par or slightly better — the value is
coordinator stability under mass joins / topology churn, not per-call speed.
h3. Current implementation
{code:java}
return zip(U.marshal(marsh, obj));
{code}
For an uncompressed size of N bytes a single call allocates:
* in \{{U.marshal}} — the marshalling buffer grown to >= N plus a trim copy of
N (\{{GridByteArrayOutputStream.toByteArray()}});
* in \{{zip()}} — a deflate buffer of N (\{{new byte[bytes.length]}}) and an
output \{{GridByteArrayOutputStream}} pre-sized to N, plus the final trim copy.
Marshalled bytes, deflate buffer and output buffer are alive simultaneously —
hence the ~3x peak.
h3. Change
Stream the marshaller output straight through the compressor, so the
uncompressed form is never materialized:
{code:java}
GridByteArrayOutputStream out = new GridByteArrayOutputStream();
try (BufferedOutputStream zipOut = new BufferedOutputStream(new
DeflaterOutputStream(out))) {
U.marshal(marsh, obj, zipOut);
}
return out.toByteArray();
{code}
* The \{{BufferedOutputStream}} is essential: \{{ObjectOutputStream}} writes in
~1 KB blocks, and an unbuffered \{{DeflaterOutputStream}} pays a JNI deflate
call per block. The unbuffered variant was measured and rejected: +14–20% time
vs the current code (25.5 vs 21.3 us/op on a 1.6 KB payload, 20,114 vs 17,661
us/op on 1.66 MB). The default 8 KB buffer coalesces the blocks and brings the
time on par with or below the current code.
* \{{DeflaterOutputStream.close()}} ends the \{{Deflater}} it owns, so zlib
native memory is released deterministically instead of waiting for the Cleaner.
* The private \{{zip()}} helper is removed (\{{marshalZip}} was its only
caller); \{{unzip()}} stays — it is still used for
\{{ATTR_SECURITY_SUBJECT_V2}}. The read side (\{{unmarshalZip}}) already
streams through \{{InflaterInputStream}} and is not changed.
* Same pattern as \{{DiscoveryMessageParser.marshalZip}} in the same package.
h3. Compatibility
The produced bytes are a regular zlib stream, same as before (default
\{{Deflater}} settings in both versions): zlib format unchanged, old nodes
inflate it as is — rolling upgrade safe. The benchmark setup asserts
\{{inflate(new) == inflate(old) == marshal(obj)}}.
h3. Benchmark
JMH, avgt, JDK 17 (corretto), Apple Silicon, fork 1, 3 warmup + 5 measurement
iterations, \{{-prof gc}}; old = current code, new = this change:
||payload (marshalled -> zipped)||time old, us/op||time new, us/op||alloc old,
B/op||alloc new, B/op||
|MAP_2K (1.6 KB -> 0.7 KB)|21.3 ± 11.5|15.4 ± 0.7|12,544|14,368 (+15%)|
|MAP_100K (132 KB -> 40 KB)|1,987 ± 512|1,931 ± 264|1,065,204|283,835 (−73%)|
|LIST_1M (1.66 MB -> 318 KB)|17,661 ± 3,433|16,825 ± 980|9,891,984|1,788,235
(−82%)|
The MAP_2K row is the trade-off, spelled out:
* time is better even there — the new path skips the trim copy of the
marshalled form and the input-sized deflate buffer;
* allocations grow by a *bounded* ~1.8 KB/call: the fixed stream overhead (8 KB
\{{BufferedOutputStream}} + 512 B \{{DeflaterOutputStream}} buffer) replaces
buffers that used to scale with the payload. Rough break-even is ~3 KB of
marshalled data; above it the new code wins and the win grows with size;
* small payloads pass through \{{marshalZip}} rarely and outside any loop
(security credentials on node join, the \{{ZkJoiningNodeData}} marker when join
data is split, communication-error resolve state), while the frequent call site
— the coordinator rewriting \{{ZkDiscoveryEventsData}} on every discovery event
— operates on tens to hundreds of KB in clusters where this path matters at all.
was:
h3. Problem
{\{ZookeeperDiscoveryImpl.marshalZip()}} materializes the whole uncompressed
marshalled form before compressing it:
{code:java}
return zip(U.marshal(marsh, obj));
{code}
For an uncompressed size of N bytes a single call allocates:
* in \{{U.marshal}} — the marshalling buffer grown to >= N plus a trim copy of
N (\{{GridByteArrayOutputStream.toByteArray()}});
* in \{{zip()}} — a deflate buffer of N (\{{new byte[bytes.length]}}) and an
output \{{GridByteArrayOutputStream}} pre-sized to N, plus the final trim copy.
That is roughly 6x the uncompressed size in garbage and ~3x in peak live memory
(marshalled bytes, deflate buffer and output buffer are alive simultaneously).
\{{Deflater.end()}} is never called, so zlib native memory is released only by
the Cleaner after a GC.
{\{marshalZip()}} runs on the discovery control plane:
* joining node data and the coordinator's data-for-joined — megabytes in
clusters with hundreds of caches (large enough to be split across znodes by
\{{jute.maxbuffer}});
* the coordinator re-marshals \{{ZkDiscoveryEventsData}} on every discovery
event;
* security credentials / security subject.
h3. Change
Stream the marshaller output straight through the compressor, so the
uncompressed form is never materialized:
{code:java}
GridByteArrayOutputStream out = new GridByteArrayOutputStream();
try (BufferedOutputStream zipOut = new BufferedOutputStream(new
DeflaterOutputStream(out))) {
U.marshal(marsh, obj, zipOut);
}
return out.toByteArray();
{code}
* The \{{BufferedOutputStream}} is essential: \{{ObjectOutputStream}} writes in
~1 KB blocks, and an unbuffered \{{DeflaterOutputStream}} pays a JNI deflate
call per block. The unbuffered variant was measured and rejected: +14-20% time
vs the current code (25.5 vs 21.3 us/op on a 1.6 KB payload, 20,114 vs 17,661
us/op on 1.66 MB). The default 8 KB buffer coalesces the blocks and brings the
time on par with or below the current code.
* \{{DeflaterOutputStream.close()}} ends the \{{Deflater}} it owns, so zlib
native memory is released deterministically instead of waiting for the Cleaner.
* The private \{{zip()}} helper is removed (\{{marshalZip}} was its only
caller); \{{unzip()}} stays — it is still used for
\{{ATTR_SECURITY_SUBJECT_V2}}. The read side (\{{unmarshalZip}}) already
streams through \{{InflaterInputStream}} and is not changed.
* Same pattern as \{{DiscoveryMessageParser.marshalZip}} in the same package.
h3. Compatibility
The produced bytes are a regular zlib stream, same as before (default
\{{Deflater}} settings in both versions): zlib format unchanged, old nodes
inflate it as is — rolling upgrade safe. The benchmark setup asserts
\{{inflate(new) == inflate(old) == marshal(obj)}}.
h3. Benchmark
JMH, avgt, JDK 17 (corretto), Apple Silicon, fork 1, 3 warmup + 5 measurement
iterations, \{{-prof gc}}; old = current code, new = this change:
||payload (marshalled -> zipped)||time old, us/op||time new, us/op||alloc old,
B/op||alloc new, B/op||
|MAP_2K (1.6 KB -> 0.7 KB)|21.3 ± 11.5|15.4 ± 0.7|12,544|14,368 (+15%)|
|MAP_100K (132 KB -> 40 KB)|1,987 ± 512|1,931 ± 264|1,065,204|283,835 (−73%)|
|LIST_1M (1.66 MB -> 318 KB)|17,661 ± 3,433|16,825 ± 980|9,891,984|1,788,235
(−82%)|
The MAP_2K row is the trade-off, spelled out:
* time is better even there — the new path skips the trim copy of the
marshalled form and the input-sized deflate buffer;
* allocations grow by a *bounded* ~1.8 KB/call: the fixed stream overhead (8 KB
\{{BufferedOutputStream}} + 512 B \{{DeflaterOutputStream}} buffer) replaces
buffers that used to scale with the payload. Rough break-even is ~3 KB of
marshalled data; above it the new code wins and the win grows with size;
* small payloads pass through \{{marshalZip}} rarely and outside any loop
(security credentials on node join, the \{{ZkJoiningNodeData}} marker when join
data is split, communication-error resolve state), while the frequent call site
— the coordinator rewriting \{{ZkDiscoveryEventsData}} on every discovery event
— operates on tens to hundreds of KB in clusters where this path matters at all.
h3. Expected effect
This is a control-plane path, so the point is not per-call latency (time is on
par or better): it is GC pressure and peak heap on the coordinator during mass
joins / topology churn, where join data and events data are marshalled
repeatedly — allocations drop by 73-82% on realistic payloads, peak usage no
longer holds ~3x the uncompressed size, and zlib native memory is released
deterministically.
> ZooKeeper discovery: stream marshalZip through DeflaterOutputStream to cut
> allocations and peak memory
> ------------------------------------------------------------------------------------------------------
>
> Key: IGNITE-28848
> URL: https://issues.apache.org/jira/browse/IGNITE-28848
> Project: Ignite
> Issue Type: Task
> Reporter: Anton Vinogradov
> Assignee: Anton Vinogradov
> Priority: Major
>
> {\{ZookeeperDiscoveryImpl.marshalZip()}} sits on the two busiest
> control-plane paths of ZooKeeper discovery:
> * the coordinator re-marshals and rewrites the whole
> \{{ZkDiscoveryEventsData}} to ZooKeeper on *every* discovery event — node
> joins/failures and every custom discovery message (cache create/destroy,
> exchange, services, snapshots, ...);
> * join data of each joining node and the coordinator's data-for-joined are
> marshalled on every join — megabytes in clusters with hundreds of caches
> (large enough to be split across znodes by \{{jute.maxbuffer}}).
> The current implementation materializes the whole uncompressed marshalled
> form and only then compresses it, allocating ~6x the uncompressed size in
> garbage per call and holding ~3x of it at peak. For an N-MB payload that is
> ~6N MB of garbage and a ~3N MB heap spike *per call* — and a mass join, a
> client reconnect storm or a rolling restart fires this repeatedly, so GC
> pressure and heap spikes hit the coordinator exactly when it is busiest, and
> the resulting GC pauses land on the discovery critical path (events are
> processed sequentially by a single worker, so the whole cluster sees slower
> event propagation). On top of that \{{Deflater.end()}} is never called, so
> zlib native memory is reclaimed only by the Cleaner after a GC.
> This change makes the compression path streaming: allocations drop by 73–82%
> on realistic payloads, peak memory is bounded by the compressed output
> instead of ~3x the uncompressed size, and zlib native memory is released
> deterministically. Per-call time stays on par or slightly better — the value
> is coordinator stability under mass joins / topology churn, not per-call
> speed.
> h3. Current implementation
> {code:java}
> return zip(U.marshal(marsh, obj));
> {code}
> For an uncompressed size of N bytes a single call allocates:
> * in \{{U.marshal}} — the marshalling buffer grown to >= N plus a trim copy
> of N (\{{GridByteArrayOutputStream.toByteArray()}});
> * in \{{zip()}} — a deflate buffer of N (\{{new byte[bytes.length]}}) and an
> output \{{GridByteArrayOutputStream}} pre-sized to N, plus the final trim
> copy.
> Marshalled bytes, deflate buffer and output buffer are alive simultaneously —
> hence the ~3x peak.
> h3. Change
> Stream the marshaller output straight through the compressor, so the
> uncompressed form is never materialized:
> {code:java}
> GridByteArrayOutputStream out = new GridByteArrayOutputStream();
> try (BufferedOutputStream zipOut = new BufferedOutputStream(new
> DeflaterOutputStream(out))) {
> U.marshal(marsh, obj, zipOut);
> }
> return out.toByteArray();
> {code}
> * The \{{BufferedOutputStream}} is essential: \{{ObjectOutputStream}} writes
> in ~1 KB blocks, and an unbuffered \{{DeflaterOutputStream}} pays a JNI
> deflate call per block. The unbuffered variant was measured and rejected:
> +14–20% time vs the current code (25.5 vs 21.3 us/op on a 1.6 KB payload,
> 20,114 vs 17,661 us/op on 1.66 MB). The default 8 KB buffer coalesces the
> blocks and brings the time on par with or below the current code.
> * \{{DeflaterOutputStream.close()}} ends the \{{Deflater}} it owns, so zlib
> native memory is released deterministically instead of waiting for the
> Cleaner.
> * The private \{{zip()}} helper is removed (\{{marshalZip}} was its only
> caller); \{{unzip()}} stays — it is still used for
> \{{ATTR_SECURITY_SUBJECT_V2}}. The read side (\{{unmarshalZip}}) already
> streams through \{{InflaterInputStream}} and is not changed.
> * Same pattern as \{{DiscoveryMessageParser.marshalZip}} in the same package.
> h3. Compatibility
> The produced bytes are a regular zlib stream, same as before (default
> \{{Deflater}} settings in both versions): zlib format unchanged, old nodes
> inflate it as is — rolling upgrade safe. The benchmark setup asserts
> \{{inflate(new) == inflate(old) == marshal(obj)}}.
> h3. Benchmark
> JMH, avgt, JDK 17 (corretto), Apple Silicon, fork 1, 3 warmup + 5 measurement
> iterations, \{{-prof gc}}; old = current code, new = this change:
> ||payload (marshalled -> zipped)||time old, us/op||time new, us/op||alloc
> old, B/op||alloc new, B/op||
> |MAP_2K (1.6 KB -> 0.7 KB)|21.3 ± 11.5|15.4 ± 0.7|12,544|14,368 (+15%)|
> |MAP_100K (132 KB -> 40 KB)|1,987 ± 512|1,931 ± 264|1,065,204|283,835 (−73%)|
> |LIST_1M (1.66 MB -> 318 KB)|17,661 ± 3,433|16,825 ± 980|9,891,984|1,788,235
> (−82%)|
> The MAP_2K row is the trade-off, spelled out:
> * time is better even there — the new path skips the trim copy of the
> marshalled form and the input-sized deflate buffer;
> * allocations grow by a *bounded* ~1.8 KB/call: the fixed stream overhead (8
> KB \{{BufferedOutputStream}} + 512 B \{{DeflaterOutputStream}} buffer)
> replaces buffers that used to scale with the payload. Rough break-even is ~3
> KB of marshalled data; above it the new code wins and the win grows with size;
> * small payloads pass through \{{marshalZip}} rarely and outside any loop
> (security credentials on node join, the \{{ZkJoiningNodeData}} marker when
> join data is split, communication-error resolve state), while the frequent
> call site — the coordinator rewriting \{{ZkDiscoveryEventsData}} on every
> discovery event — operates on tens to hundreds of KB in clusters where this
> path matters at all.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)