This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c9664c4eac SOLR-17537: Manage ZK Compression through Curator (#2849)
5c9664c4eac is described below

commit 5c9664c4eac8e4d29cf048369580ca51487f93e7
Author: Houston Putman <[email protected]>
AuthorDate: Mon Mar 10 10:53:19 2025 -0500

    SOLR-17537: Manage ZK Compression through Curator (#2849)
---
 .../org/apache/solr/cli/ZkSubcommandsTest.java     | 57 +++++++++++++-----
 .../solr/cloud/overseer/ZkStateWriterTest.java     | 16 ++++-
 .../org/apache/solr/common/cloud/SolrZkClient.java | 53 ++---------------
 .../common/cloud/SolrZkCompressionProvider.java    | 69 ++++++++++++++++++++++
 4 files changed, 129 insertions(+), 66 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java 
b/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java
index 302741b72d1..af092aa6210 100644
--- a/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java
+++ b/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java
@@ -166,10 +166,7 @@ public class ZkSubcommandsTest extends SolrTestCaseJ4 {
 
     ZLibCompressor zLibCompressor = new ZLibCompressor();
     byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
-    byte[] expected =
-        random().nextBoolean()
-            ? zLibCompressor.compressBytes(dataBytes)
-            : zLibCompressor.compressBytes(dataBytes);
+    byte[] expected = zLibCompressor.compressBytes(dataBytes);
 
     String[] args =
         new String[] {
@@ -178,7 +175,9 @@ public class ZkSubcommandsTest extends SolrTestCaseJ4 {
     ZkCpTool tool = new ZkCpTool();
     assertEquals(0, runTool(args, tool));
 
-    
assertArrayEquals(zkClient.getCuratorFramework().getData().forPath("/state.json"),
 expected);
+    assertArrayEquals(dataBytes, 
zkClient.getCuratorFramework().getData().forPath("/state.json"));
+    assertArrayEquals(
+        expected, 
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json"));
 
     // test re-put to existing
     data = "my data deux";
@@ -201,13 +200,23 @@ public class ZkSubcommandsTest extends SolrTestCaseJ4 {
         };
     assertEquals(0, runTool(args, tool));
 
+    byte[] fromZkRaw =
+        
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json");
     byte[] fromZk = 
zkClient.getCuratorFramework().getData().forPath("/state.json");
-    byte[] fromLoc =
-        new ZLibCompressor()
-            
.compressBytes(Files.readAllBytes(Path.of(localFile.getAbsolutePath())));
-    assertArrayEquals("Should get back what we put in ZK", fromLoc, fromZk);
+    byte[] fromLocRaw = 
Files.readAllBytes(Path.of(localFile.getAbsolutePath()));
+    byte[] fromLoc = new ZLibCompressor().compressBytes(fromLocRaw);
+    assertArrayEquals(
+        "When asking to not decompress, we should get back the compressed data 
that what we put in ZK",
+        fromLoc,
+        fromZkRaw);
+    assertArrayEquals(
+        "When not specifying anything, we should get back what exactly we put 
in ZK (not compressed)",
+        fromLocRaw,
+        fromZk);
 
-    
assertArrayEquals(zkClient.getCuratorFramework().getData().forPath("/state.json"),
 expected);
+    assertArrayEquals(dataBytes, 
zkClient.getCuratorFramework().getData().forPath("/state.json"));
+    assertArrayEquals(
+        expected, 
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json"));
   }
 
   @Test
@@ -272,18 +281,34 @@ public class ZkSubcommandsTest extends SolrTestCaseJ4 {
     ZkCpTool tool = new ZkCpTool();
     assertEquals(0, runTool(args, tool));
 
-    byte[] fromZk = 
zkClient.getCuratorFramework().getData().forPath("/state.json");
     Path locFile = Path.of(SOLR_HOME, "solr-stress-new.xml");
-    byte[] fromLoc = new 
ZLibCompressor().compressBytes(Files.readAllBytes(locFile));
-    assertArrayEquals("Should get back what we put in ZK", fromLoc, fromZk);
+    byte[] fileBytes = Files.readAllBytes(locFile);
+
+    // Check raw ZK data
+    byte[] fromZk =
+        
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json");
+    byte[] fromLoc = new ZLibCompressor().compressBytes(fileBytes);
+    assertArrayEquals("Should get back a compressed version of what we put in 
ZK", fromLoc, fromZk);
+
+    // Check curator output (should be decompressed)
+    fromZk = zkClient.getCuratorFramework().getData().forPath("/state.json");
+    assertArrayEquals(
+        "Should get back an uncompressed version what we put in ZK", 
fileBytes, fromZk);
 
     // Lets do it again
     assertEquals(0, runTool(args, tool));
 
-    fromZk = zkClient.getCuratorFramework().getData().forPath("/state.json");
     locFile = Path.of(SOLR_HOME, "solr-stress-new.xml");
-    fromLoc = new ZLibCompressor().compressBytes(Files.readAllBytes(locFile));
-    assertArrayEquals("Should get back what we put in ZK", fromLoc, fromZk);
+    fileBytes = Files.readAllBytes(locFile);
+
+    fromZk = 
zkClient.getCuratorFramework().getData().undecompressed().forPath("/state.json");
+    fromLoc = new ZLibCompressor().compressBytes(fileBytes);
+    assertArrayEquals("Should get back a compressed version of what we put in 
ZK", fromLoc, fromZk);
+
+    // Check curator output (should be decompressed)
+    fromZk = zkClient.getCuratorFramework().getData().forPath("/state.json");
+    assertArrayEquals(
+        "Should get back an uncompressed version what we put in ZK", 
fileBytes, fromZk);
   }
 
   @Test
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java 
b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index 4008f694359..93a97e5bd90 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -507,13 +507,23 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         writer.enqueueUpdate(reader.getClusterState(), 
Collections.singletonList(c1), null);
         writer.writePendingUpdates();
 
-        byte[] data =
+        byte[] dataCompressed =
+            zkClient
+                .getCuratorFramework()
+                .getData()
+                .undecompressed()
+                .forPath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2/state.json");
+        assertTrue(compressor.isCompressedBytes(dataCompressed));
+        Map<?, ?> map = (Map<?, ?>) 
Utils.fromJSON(compressor.decompressBytes(dataCompressed));
+        assertNotNull(map.get("c2"));
+
+        byte[] dataDecompressed =
             zkClient
                 .getCuratorFramework()
                 .getData()
                 .forPath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2/state.json");
-        assertTrue(compressor.isCompressedBytes(data));
-        Map<?, ?> map = (Map<?, ?>) 
Utils.fromJSON(compressor.decompressBytes(data));
+        assertFalse(compressor.isCompressedBytes(dataDecompressed));
+        map = (Map<?, ?>) Utils.fromJSON(dataDecompressed);
         assertNotNull(map.get("c2"));
       }
 
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 9c1fe5778c3..3583f94592c 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -58,7 +58,6 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.ReflectMapWriter;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.ZLibCompressor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -202,6 +201,9 @@ public class SolrZkClient implements Closeable {
             .authorization(zkCredentialsProvider.getCredentials())
             .retryPolicy(retryPolicy)
             .runSafeService(curatorSafeServiceExecutor)
+            .compressionProvider(
+                new SolrZkCompressionProvider(compressor, 
minStateByteLenForCompression))
+            .enableCompression()
             .build();
     if (onReconnect != null) {
       client
@@ -231,18 +233,6 @@ public class SolrZkClient implements Closeable {
     }
 
     assert ObjectReleaseTracker.track(this);
-    if (aclProvider == null) {
-      this.aclProvider = useDefaultCredsAndACLs ? createACLProvider() : new 
DefaultZkACLProvider();
-    } else {
-      this.aclProvider = aclProvider;
-    }
-
-    if (compressor == null) {
-      this.compressor = new ZLibCompressor();
-    } else {
-      this.compressor = compressor;
-    }
-    this.minStateByteLenForCompression = minStateByteLenForCompression;
   }
 
   public CuratorFramework getCuratorFramework() {
@@ -447,18 +437,6 @@ public class SolrZkClient implements Closeable {
                     .storingStatIn(stat)
                     .usingWatcher(wrapWatcher(watcher))
                     .forPath(path));
-    if (compressor.isCompressedBytes(result)) {
-      log.debug("Zookeeper data at path {} is compressed", path);
-      try {
-        result = compressor.decompressBytes(result);
-      } catch (Exception e) {
-        throw new SolrException(
-            SolrException.ErrorCode.SERVER_ERROR,
-            String.format(
-                Locale.ROOT, "Unable to decompress data at path: %s from 
zookeeper", path),
-            e);
-      }
-    }
     metrics.reads.increment();
     if (result != null) {
       metrics.bytesRead.add(result.length);
@@ -473,16 +451,12 @@ public class SolrZkClient implements Closeable {
   }
 
   /** Returns node's state */
-  public Stat setData(final String path, byte data[], final int version, 
boolean retryOnConnLoss)
+  public Stat setData(
+      final String path, final byte[] data, final int version, boolean 
retryOnConnLoss)
       throws KeeperException, InterruptedException {
-    if (SolrZkClient.shouldCompressData(data, path, 
minStateByteLenForCompression)) {
-      // state.json should be compressed before being put to ZK
-      data = compressor.compressBytes(data);
-    }
-    final byte[] finalData = data;
     Stat result =
         runWithCorrectThrows(
-            "setting data", () -> 
client.setData().withVersion(version).forPath(path, finalData));
+            "setting data", () -> 
client.setData().withVersion(version).forPath(path, data));
     metrics.writes.increment();
     if (data != null) {
       metrics.bytesWritten.add(data.length);
@@ -678,11 +652,6 @@ public class SolrZkClient implements Closeable {
       createBuilder.orSetData();
     }
 
-    if (SolrZkClient.shouldCompressData(data, path, 
minStateByteLenForCompression)) {
-      // state.json should be compressed before being put to ZK
-      data = compressor.compressBytes(data);
-    }
-
     metrics.writes.increment();
     if (data != null) {
       metrics.bytesWritten.add(data.length);
@@ -1356,14 +1325,4 @@ public class SolrZkClient implements Closeable {
       return new SolrZkClient(this);
     }
   }
-
-  static boolean shouldCompressData(byte[] data, String path, int 
minStateByteLenForCompression) {
-    if (path.endsWith("state.json")
-        && minStateByteLenForCompression > -1
-        && data.length > minStateByteLenForCompression) {
-      // state.json should be compressed before being put to ZK
-      return true;
-    }
-    return false;
-  }
 }
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java
 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java
new file mode 100644
index 00000000000..689127d5a14
--- /dev/null
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Locale;
+import org.apache.curator.framework.api.CompressionProvider;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.Compressor;
+import org.apache.solr.common.util.ZLibCompressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolrZkCompressionProvider implements CompressionProvider {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Compressor compressor;
+  private final int minStateByteLenForCompression;
+
+  public SolrZkCompressionProvider(Compressor compressor, int 
minStateByteLenForCompression) {
+    this.compressor = compressor != null ? compressor : new ZLibCompressor();
+    this.minStateByteLenForCompression = minStateByteLenForCompression;
+  }
+
+  @Override
+  public byte[] compress(String path, byte[] data) throws Exception {
+    if (path.endsWith("state.json")
+        && minStateByteLenForCompression > -1
+        && data.length >= minStateByteLenForCompression) {
+      // state.json should be compressed before being put to ZK
+      return compressor.compressBytes(data);
+    } else {
+      return data;
+    }
+  }
+
+  @Override
+  public byte[] decompress(String path, byte[] compressedData) throws 
SolrException {
+    if (compressor.isCompressedBytes(compressedData)) {
+      log.debug("Zookeeper data at path {} is compressed", path);
+      try {
+        return compressor.decompressBytes(compressedData);
+      } catch (Exception e) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR,
+            String.format(
+                Locale.ROOT, "Unable to decompress data at path: %s from 
zookeeper", path),
+            e);
+      }
+    } else {
+      return compressedData;
+    }
+  }
+}

Reply via email to