This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 5c9664c4eac SOLR-17537: Manage ZK Compression through Curator (#2849)
5c9664c4eac is described below
commit 5c9664c4eac8e4d29cf048369580ca51487f93e7
Author: Houston Putman <[email protected]>
AuthorDate: Mon Mar 10 10:53:19 2025 -0500
SOLR-17537: Manage ZK Compression through Curator (#2849)
---
.../org/apache/solr/cli/ZkSubcommandsTest.java | 57 +++++++++++++-----
.../solr/cloud/overseer/ZkStateWriterTest.java | 16 ++++-
.../org/apache/solr/common/cloud/SolrZkClient.java | 53 ++---------------
.../common/cloud/SolrZkCompressionProvider.java | 69 ++++++++++++++++++++++
4 files changed, 129 insertions(+), 66 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java
b/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java
index 302741b72d1..af092aa6210 100644
--- a/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java
+++ b/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java
@@ -166,10 +166,7 @@ public class ZkSubcommandsTest extends SolrTestCaseJ4 {
ZLibCompressor zLibCompressor = new ZLibCompressor();
byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
- byte[] expected =
- random().nextBoolean()
- ? zLibCompressor.compressBytes(dataBytes)
- : zLibCompressor.compressBytes(dataBytes);
+ byte[] expected = zLibCompressor.compressBytes(dataBytes);
String[] args =
new String[] {
@@ -178,7 +175,9 @@ public class ZkSubcommandsTest extends SolrTestCaseJ4 {
ZkCpTool tool = new ZkCpTool();
assertEquals(0, runTool(args, tool));
-
assertArrayEquals(zkClient.getCuratorFramework().getData().forPath("/state.json"),
expected);
+ assertArrayEquals(dataBytes,
zkClient.getCuratorFramework().getData().forPath("/state.json"));
+ assertArrayEquals(
+ expected,
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json"));
// test re-put to existing
data = "my data deux";
@@ -201,13 +200,23 @@ public class ZkSubcommandsTest extends SolrTestCaseJ4 {
};
assertEquals(0, runTool(args, tool));
+ byte[] fromZkRaw =
+
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json");
byte[] fromZk =
zkClient.getCuratorFramework().getData().forPath("/state.json");
- byte[] fromLoc =
- new ZLibCompressor()
-
.compressBytes(Files.readAllBytes(Path.of(localFile.getAbsolutePath())));
- assertArrayEquals("Should get back what we put in ZK", fromLoc, fromZk);
+ byte[] fromLocRaw =
Files.readAllBytes(Path.of(localFile.getAbsolutePath()));
+ byte[] fromLoc = new ZLibCompressor().compressBytes(fromLocRaw);
+ assertArrayEquals(
+ "When asking to not decompress, we should get back the compressed data
that what we put in ZK",
+ fromLoc,
+ fromZkRaw);
+ assertArrayEquals(
+ "When not specifying anything, we should get back what exactly we put
in ZK (not compressed)",
+ fromLocRaw,
+ fromZk);
-
assertArrayEquals(zkClient.getCuratorFramework().getData().forPath("/state.json"),
expected);
+ assertArrayEquals(dataBytes,
zkClient.getCuratorFramework().getData().forPath("/state.json"));
+ assertArrayEquals(
+ expected,
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json"));
}
@Test
@@ -272,18 +281,34 @@ public class ZkSubcommandsTest extends SolrTestCaseJ4 {
ZkCpTool tool = new ZkCpTool();
assertEquals(0, runTool(args, tool));
- byte[] fromZk =
zkClient.getCuratorFramework().getData().forPath("/state.json");
Path locFile = Path.of(SOLR_HOME, "solr-stress-new.xml");
- byte[] fromLoc = new
ZLibCompressor().compressBytes(Files.readAllBytes(locFile));
- assertArrayEquals("Should get back what we put in ZK", fromLoc, fromZk);
+ byte[] fileBytes = Files.readAllBytes(locFile);
+
+ // Check raw ZK data
+ byte[] fromZk =
+
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json");
+ byte[] fromLoc = new ZLibCompressor().compressBytes(fileBytes);
+ assertArrayEquals("Should get back a compressed version of what we put in
ZK", fromLoc, fromZk);
+
+ // Check curator output (should be decompressed)
+ fromZk = zkClient.getCuratorFramework().getData().forPath("/state.json");
+ assertArrayEquals(
+ "Should get back an uncompressed version what we put in ZK",
fileBytes, fromZk);
// Lets do it again
assertEquals(0, runTool(args, tool));
- fromZk = zkClient.getCuratorFramework().getData().forPath("/state.json");
locFile = Path.of(SOLR_HOME, "solr-stress-new.xml");
- fromLoc = new ZLibCompressor().compressBytes(Files.readAllBytes(locFile));
- assertArrayEquals("Should get back what we put in ZK", fromLoc, fromZk);
+ fileBytes = Files.readAllBytes(locFile);
+
+ fromZk =
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json");
+ fromLoc = new ZLibCompressor().compressBytes(fileBytes);
+ assertArrayEquals("Should get back a compressed version of what we put in
ZK", fromLoc, fromZk);
+
+ // Check curator output (should be decompressed)
+ fromZk = zkClient.getCuratorFramework().getData().forPath("/state.json");
+ assertArrayEquals(
+ "Should get back an uncompressed version what we put in ZK",
fileBytes, fromZk);
}
@Test
diff --git
a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index 4008f694359..93a97e5bd90 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -507,13 +507,23 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
writer.enqueueUpdate(reader.getClusterState(),
Collections.singletonList(c1), null);
writer.writePendingUpdates();
- byte[] data =
+ byte[] dataCompressed =
+ zkClient
+ .getCuratorFramework()
+ .getData()
+ .undecompressed()
+ .forPath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2/state.json");
+ assertTrue(compressor.isCompressedBytes(dataCompressed));
+ Map<?, ?> map = (Map<?, ?>)
Utils.fromJSON(compressor.decompressBytes(dataCompressed));
+ assertNotNull(map.get("c2"));
+
+ byte[] dataDecompressed =
zkClient
.getCuratorFramework()
.getData()
.forPath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2/state.json");
- assertTrue(compressor.isCompressedBytes(data));
- Map<?, ?> map = (Map<?, ?>)
Utils.fromJSON(compressor.decompressBytes(data));
+ assertFalse(compressor.isCompressedBytes(dataDecompressed));
+ map = (Map<?, ?>) Utils.fromJSON(dataDecompressed);
assertNotNull(map.get("c2"));
}
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 9c1fe5778c3..3583f94592c 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -58,7 +58,6 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.ReflectMapWriter;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.ZLibCompressor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -202,6 +201,9 @@ public class SolrZkClient implements Closeable {
.authorization(zkCredentialsProvider.getCredentials())
.retryPolicy(retryPolicy)
.runSafeService(curatorSafeServiceExecutor)
+ .compressionProvider(
+ new SolrZkCompressionProvider(compressor,
minStateByteLenForCompression))
+ .enableCompression()
.build();
if (onReconnect != null) {
client
@@ -231,18 +233,6 @@ public class SolrZkClient implements Closeable {
}
assert ObjectReleaseTracker.track(this);
- if (aclProvider == null) {
- this.aclProvider = useDefaultCredsAndACLs ? createACLProvider() : new
DefaultZkACLProvider();
- } else {
- this.aclProvider = aclProvider;
- }
-
- if (compressor == null) {
- this.compressor = new ZLibCompressor();
- } else {
- this.compressor = compressor;
- }
- this.minStateByteLenForCompression = minStateByteLenForCompression;
}
public CuratorFramework getCuratorFramework() {
@@ -447,18 +437,6 @@ public class SolrZkClient implements Closeable {
.storingStatIn(stat)
.usingWatcher(wrapWatcher(watcher))
.forPath(path));
- if (compressor.isCompressedBytes(result)) {
- log.debug("Zookeeper data at path {} is compressed", path);
- try {
- result = compressor.decompressBytes(result);
- } catch (Exception e) {
- throw new SolrException(
- SolrException.ErrorCode.SERVER_ERROR,
- String.format(
- Locale.ROOT, "Unable to decompress data at path: %s from
zookeeper", path),
- e);
- }
- }
metrics.reads.increment();
if (result != null) {
metrics.bytesRead.add(result.length);
@@ -473,16 +451,12 @@ public class SolrZkClient implements Closeable {
}
/** Returns node's state */
- public Stat setData(final String path, byte data[], final int version,
boolean retryOnConnLoss)
+ public Stat setData(
+ final String path, final byte[] data, final int version, boolean
retryOnConnLoss)
throws KeeperException, InterruptedException {
- if (SolrZkClient.shouldCompressData(data, path,
minStateByteLenForCompression)) {
- // state.json should be compressed before being put to ZK
- data = compressor.compressBytes(data);
- }
- final byte[] finalData = data;
Stat result =
runWithCorrectThrows(
- "setting data", () ->
client.setData().withVersion(version).forPath(path, finalData));
+ "setting data", () ->
client.setData().withVersion(version).forPath(path, data));
metrics.writes.increment();
if (data != null) {
metrics.bytesWritten.add(data.length);
@@ -678,11 +652,6 @@ public class SolrZkClient implements Closeable {
createBuilder.orSetData();
}
- if (SolrZkClient.shouldCompressData(data, path,
minStateByteLenForCompression)) {
- // state.json should be compressed before being put to ZK
- data = compressor.compressBytes(data);
- }
-
metrics.writes.increment();
if (data != null) {
metrics.bytesWritten.add(data.length);
@@ -1356,14 +1325,4 @@ public class SolrZkClient implements Closeable {
return new SolrZkClient(this);
}
}
-
- static boolean shouldCompressData(byte[] data, String path, int
minStateByteLenForCompression) {
- if (path.endsWith("state.json")
- && minStateByteLenForCompression > -1
- && data.length > minStateByteLenForCompression) {
- // state.json should be compressed before being put to ZK
- return true;
- }
- return false;
- }
}
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java
new file mode 100644
index 00000000000..689127d5a14
--- /dev/null
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Locale;
+import org.apache.curator.framework.api.CompressionProvider;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.Compressor;
+import org.apache.solr.common.util.ZLibCompressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolrZkCompressionProvider implements CompressionProvider {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final Compressor compressor;
+ private final int minStateByteLenForCompression;
+
+ public SolrZkCompressionProvider(Compressor compressor, int
minStateByteLenForCompression) {
+ this.compressor = compressor != null ? compressor : new ZLibCompressor();
+ this.minStateByteLenForCompression = minStateByteLenForCompression;
+ }
+
+ @Override
+ public byte[] compress(String path, byte[] data) throws Exception {
+ if (path.endsWith("state.json")
+ && minStateByteLenForCompression > -1
+ && data.length >= minStateByteLenForCompression) {
+ // state.json should be compressed before being put to ZK
+ return compressor.compressBytes(data);
+ } else {
+ return data;
+ }
+ }
+
+ @Override
+ public byte[] decompress(String path, byte[] compressedData) throws
SolrException {
+ if (compressor.isCompressedBytes(compressedData)) {
+ log.debug("Zookeeper data at path {} is compressed", path);
+ try {
+ return compressor.decompressBytes(compressedData);
+ } catch (Exception e) {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ String.format(
+ Locale.ROOT, "Unable to decompress data at path: %s from
zookeeper", path),
+ e);
+ }
+ } else {
+ return compressedData;
+ }
+ }
+}