This is an automated email from the ASF dual-hosted git repository.

noob-se7en pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ac1eb38bbaf ZK multi path transaction API support (#18380)
ac1eb38bbaf is described below

commit ac1eb38bbafb4d7147450c7173eb55b30ce43ea7
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed May 13 12:23:41 2026 +0530

    ZK multi path transaction API support (#18380)
    
    * ZK multi path transaction API support
    
    * [refactor] Collapse ZK multi-op API to a fluent ZkMultiWriteBuilder
    
    Replace PinotZkOp + PinotZkMultiResult + ZkMultiWriter with a single
    ZkMultiWriteBuilder; PinotHelixResourceManager.multiWriteZK() returns a
    fresh builder. execute() returns void and throws KeeperException on
    atomic rollback, so callers branch on BadVersionException /
    NoNodeException / NodeExistsException for the retry-vs-hard-error
    trichotomy without a custom result type.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Address PR review: tighten ZkMultiWriteBuilder + happy-path test
    
    - Switch ZkMultiWriteBuilder ctor to concrete ZkClient (matches the
      rest of Pinot, which uses ZkClient over RealmAwareZkClient).
    - Document the builder is single-use and not thread-safe.
    - Restrict multiWriteZK to Helix property-store paths: builder takes a
      propertyStoreRoot prefix and op paths are property-store-relative
      (the same paths callers pass to ZkHelixPropertyStore). Validates
      prefix shape in the constructor.
    - Simplify PinotHelixResourceManager multi-write client setup: derive
      zkAddress from the started Helix manager instead of caching it +
      drop unused _stopped flag.
    - Add PinotHelixResourceManagerStatelessTest#testMultiWriteZkSegment
      MetadataUpdates: pre-creates two SegmentZKMetadata znodes, updates
      both atomically via multiWriteZK(), and verifies round-trip read
      through the property store.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Make multiWriteZK ZkClient timeouts configurable via ControllerConf
    
    Thread ControllerConf into PinotHelixResourceManager so the dedicated
    multi-write ZkClient honors the existing zk.client.session.timeout.ms
    and zk.client.connection.timeout.ms controller config keys (same keys
    HelixSetupUtils already uses), instead of always using the hardcoded
    defaults. JVM-level ZooKeeper system properties (e.g. jute.maxbuffer)
    continue to be picked up automatically by the ZooKeeper client library
    through the shared ZkClient.Builder.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../common/utils/helix/ZkMultiWriteBuilder.java    | 170 ++++++++++++++
 .../utils/helix/ZkMultiWriteBuilderTest.java       | 255 +++++++++++++++++++++
 .../helix/core/PinotHelixResourceManager.java      |  87 ++++++-
 ...otHelixResourceManagerConfigValidationTest.java |  16 +-
 .../PinotHelixResourceManagerStatelessTest.java    |  47 ++++
 5 files changed, 572 insertions(+), 3 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java
new file mode 100644
index 00000000000..1053ed41484
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.helix;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.ZooDefs;
+
+
+/**
+ * Fluent builder for an atomic ZooKeeper {@code multi()} transaction over 
Helix property-store
+ * paths. Accumulates ops via {@link #set}/{@link #create}/{@link 
#delete}/{@link #check} and
+ * submits them as a single all-or-nothing batch on {@link #execute()}.
+ *
+ * <p>Paths passed to op methods are property-store-relative — i.e. the same 
path you would pass to
+ * {@code ZkHelixPropertyStore.set(...)}, e.g. {@code 
/SEGMENTS/{table}/{segment}}. The builder
+ * prepends the configured property-store root (e.g. {@code 
/{cluster}/PROPERTYSTORE}) before
+ * submitting to ZK. Multi-path writes outside the property store are 
intentionally not supported.
+ *
+ * <p>On atomic rollback (e.g. version mismatch, node missing, node already 
exists), {@link #execute()}
+ * throws the underlying {@link KeeperException} subtype ({@code 
BadVersionException},
+ * {@code NoNodeException}, {@code NodeExistsException}, ...). Callers branch 
on the subtype to
+ * distinguish retryable concurrent-state changes from hard errors. Per-op 
offender info is reachable
+ * via {@link KeeperException#getResults()}.
+ *
+ * <p>Connectivity / session failures (timeout, interrupt, session expiry) are 
not atomic outcomes
+ * and propagate as the original {@link ZkException}.
+ *
+ * <p>Single-use: each instance can be executed at most once. Obtain a fresh 
builder per transaction
+ * (typically via {@code PinotHelixResourceManager.multiWriteZK()}).
+ *
+ * <p>Not thread-safe: instance state ({@code _ops}, {@code _executed}) is 
mutated by every fluent
+ * call. A single builder must not be shared across threads; use a fresh 
builder per thread.
+ */
+public final class ZkMultiWriteBuilder {
+
+  /** Pass as {@code expectedVersion} to skip the version check on a {@link 
#set}/{@link #delete}. */
+  public static final int ANY_VERSION = -1;
+
+  private final ZkClient _zkClient;
+  private final String _propertyStoreRoot;
+  private final List<Op> _ops = new ArrayList<>();
+  private boolean _executed;
+
+  /**
+   * @param zkClient ZK client used to serialize records and submit the 
transaction.
+   * @param propertyStoreRoot absolute ZK path that all op paths are prefixed 
with (typically
+   *     {@code /{clusterName}/PROPERTYSTORE}). Must start with {@code /} and 
not end with {@code /}.
+   *     Pass {@code ""} to operate on raw absolute paths (test-only).
+   */
+  public ZkMultiWriteBuilder(ZkClient zkClient, String propertyStoreRoot) {
+    _zkClient = Preconditions.checkNotNull(zkClient, "zkClient");
+    Preconditions.checkNotNull(propertyStoreRoot, "propertyStoreRoot");
+    Preconditions.checkArgument(
+        propertyStoreRoot.isEmpty() || (propertyStoreRoot.startsWith("/") && 
!propertyStoreRoot.endsWith("/")),
+        "propertyStoreRoot must be empty or start with '/' and not end with 
'/': %s", propertyStoreRoot);
+    _propertyStoreRoot = propertyStoreRoot;
+  }
+
+  /**
+   * Set (overwrite) the znode at {@code path} (property-store-relative) to 
{@code record}, with a
+   * CAS check on {@code expectedVersion}. Pass {@link #ANY_VERSION} to skip 
the check.
+   */
+  public ZkMultiWriteBuilder set(String path, ZNRecord record, int 
expectedVersion) {
+    checkNotExecuted();
+    Preconditions.checkNotNull(record, "record");
+    String fullPath = resolve(path);
+    _ops.add(Op.setData(fullPath, _zkClient.serialize(record, fullPath), 
expectedVersion));
+    return this;
+  }
+
+  /** Set without a version check; equivalent to {@code set(path, record, 
ANY_VERSION)}. */
+  public ZkMultiWriteBuilder set(String path, ZNRecord record) {
+    return set(path, record, ANY_VERSION);
+  }
+
+  /**
+   * Create a persistent znode at {@code path} (property-store-relative) with 
{@code record} as its
+   * data.
+   */
+  public ZkMultiWriteBuilder create(String path, ZNRecord record) {
+    checkNotExecuted();
+    Preconditions.checkNotNull(record, "record");
+    String fullPath = resolve(path);
+    _ops.add(
+        Op.create(fullPath, _zkClient.serialize(record, fullPath), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+    return this;
+  }
+
+  /**
+   * Delete the znode at {@code path} (property-store-relative), with a CAS 
check on
+   * {@code expectedVersion}. Pass {@link #ANY_VERSION} to skip the check.
+   */
+  public ZkMultiWriteBuilder delete(String path, int expectedVersion) {
+    checkNotExecuted();
+    _ops.add(Op.delete(resolve(path), expectedVersion));
+    return this;
+  }
+
+  /** Delete without a version check; equivalent to {@code delete(path, 
ANY_VERSION)}. */
+  public ZkMultiWriteBuilder delete(String path) {
+    return delete(path, ANY_VERSION);
+  }
+
+  /**
+   * Assert the version of the znode at {@code path} 
(property-store-relative). No mutation; gates
+   * other ops in the batch atomically — the whole transaction fails with
+   * {@link KeeperException.BadVersionException} if the version no longer 
matches.
+   */
+  public ZkMultiWriteBuilder check(String path, int expectedVersion) {
+    checkNotExecuted();
+    _ops.add(Op.check(resolve(path), expectedVersion));
+    return this;
+  }
+
+  /**
+   * Submit the accumulated ops as a single atomic ZK {@code multi()} 
transaction. Throws
+   * {@link KeeperException} on atomic rollback (subtype identifies the 
cause). Throws
+   * {@link IllegalStateException} if called more than once or if no ops have 
been added.
+   * Connectivity / session failures propagate as the original {@link 
ZkException}.
+   */
+  public void execute()
+      throws KeeperException {
+    checkNotExecuted();
+    _executed = true;
+    Preconditions.checkState(!_ops.isEmpty(), "no ops to execute");
+    try {
+      _zkClient.multi(_ops);
+    } catch (ZkException ze) {
+      Throwable cause = ze.getCause();
+      if (cause instanceof KeeperException) {
+        throw (KeeperException) cause;
+      }
+      throw ze;
+    }
+  }
+
+  private void checkNotExecuted() {
+    Preconditions.checkState(!_executed, "ZkMultiWriteBuilder already 
executed");
+  }
+
+  private String resolve(String path) {
+    Preconditions.checkNotNull(path, "path");
+    Preconditions.checkArgument(path.startsWith("/"), "path must start with 
'/': %s", path);
+    return _propertyStoreRoot + path;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java
new file mode 100644
index 00000000000..b09e316a44e
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.helix;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ZkMultiWriteBuilderTest {
+
+  private static final String ROOT = "/ZkMultiWriteBuilderTest";
+
+  private ZkStarter.ZookeeperInstance _zk;
+  private ZkClient _client;
+
+  @BeforeClass
+  public void beforeClass() {
+    _zk = ZkStarter.startLocalZkServer();
+    _client = new ZkClient.Builder()
+        .setZkServer(_zk.getZkUrl())
+        .setZkSerializer(new ZNRecordSerializer())
+        .build();
+    Assert.assertTrue(_client.waitUntilConnected(10_000, 
TimeUnit.MILLISECONDS));
+  }
+
+  @AfterClass
+  public void afterClass() {
+    if (_client != null) {
+      _client.close();
+    }
+    if (_zk != null) {
+      ZkStarter.stopLocalZkServer(_zk);
+    }
+  }
+
+  @BeforeMethod
+  public void cleanRoot() {
+    if (_client.exists(ROOT)) {
+      _client.deleteRecursively(ROOT);
+    }
+    _client.createPersistent(ROOT, true);
+  }
+
+  // -----------------------------------------------------------------------
+  // Helpers
+  // -----------------------------------------------------------------------
+
+  private static ZNRecord record(String id, String key, String value) {
+    ZNRecord r = new ZNRecord(id);
+    r.setSimpleField(key, value);
+    return r;
+  }
+
+  private void seed(String path, ZNRecord rec) {
+    _client.createPersistent(path, rec);
+  }
+
+  private ZNRecord read(String path) {
+    return _client.readData(path, true);
+  }
+
+  private int version(String path) {
+    Stat s = new Stat();
+    _client.readData(path, s);
+    return s.getVersion();
+  }
+
+  private ZkMultiWriteBuilder builder() {
+    // Tests use absolute paths under ROOT, so pass empty prefix (no 
property-store rebase).
+    return new ZkMultiWriteBuilder(_client, "");
+  }
+
+  // -----------------------------------------------------------------------
+  // Tests
+  // -----------------------------------------------------------------------
+
+  @Test
+  public void testAllSetSuccess()
+      throws KeeperException {
+    String pA = ROOT + "/a";
+    String pB = ROOT + "/b";
+    seed(pA, record("a", "v", "1"));
+    seed(pB, record("b", "v", "1"));
+
+    builder()
+        .set(pA, record("a", "v", "2"), 0)
+        .set(pB, record("b", "v", "2"), 0)
+        .execute();
+
+    Assert.assertEquals(read(pA).getSimpleField("v"), "2");
+    Assert.assertEquals(read(pB).getSimpleField("v"), "2");
+    Assert.assertEquals(version(pA), 1);
+    Assert.assertEquals(version(pB), 1);
+  }
+
+  @Test
+  public void testMixedCreateSetDeleteSuccess()
+      throws KeeperException {
+    String pExisting = ROOT + "/existing";
+    String pNew = ROOT + "/new";
+    String pStale = ROOT + "/stale";
+    seed(pExisting, record("existing", "v", "1"));
+    seed(pStale, record("stale", "v", "x"));
+
+    builder()
+        .set(pExisting, record("existing", "v", "2"), 0)
+        .create(pNew, record("new", "v", "1"))
+        .delete(pStale, 0)
+        .execute();
+
+    Assert.assertEquals(read(pExisting).getSimpleField("v"), "2");
+    Assert.assertEquals(read(pNew).getSimpleField("v"), "1");
+    Assert.assertFalse(_client.exists(pStale));
+  }
+
+  @Test
+  public void testBadVersionAtomicRollback() {
+    String pA = ROOT + "/a";
+    String pB = ROOT + "/b";
+    seed(pA, record("a", "v", "1"));
+    seed(pB, record("b", "v", "1"));
+
+    // Bump version on pB so the expected-0 check on it will fail.
+    _client.writeData(pB, record("b", "v", "bumped"));
+    Assert.assertEquals(version(pB), 1);
+
+    Assert.expectThrows(KeeperException.BadVersionException.class, () ->
+        builder()
+            .set(pA, record("a", "v", "2"), 0)
+            .set(pB, record("b", "v", "2"), 0) // stale version -> BADVERSION
+            .execute());
+
+    // Atomic rollback — pA must NOT have been updated.
+    Assert.assertEquals(read(pA).getSimpleField("v"), "1");
+    Assert.assertEquals(version(pA), 0);
+    Assert.assertEquals(read(pB).getSimpleField("v"), "bumped");
+  }
+
+  @Test
+  public void testCheckOpGatesSet() {
+    String pGate = ROOT + "/gate";
+    String pTarget = ROOT + "/target";
+    seed(pGate, record("gate", "v", "1"));
+    seed(pTarget, record("target", "v", "1"));
+
+    // Bump gate's version; check(gate, 0) should fail and prevent the set.
+    _client.writeData(pGate, record("gate", "v", "bumped"));
+
+    Assert.expectThrows(KeeperException.BadVersionException.class, () ->
+        builder()
+            .check(pGate, 0)
+            .set(pTarget, record("target", "v", "2"), 0)
+            .execute());
+
+    Assert.assertEquals(read(pTarget).getSimpleField("v"), "1", "target must 
not have been mutated");
+  }
+
+  @Test
+  public void testDeleteNonExistentRollback() {
+    String pExisting = ROOT + "/existing";
+    String pMissing = ROOT + "/missing";
+    seed(pExisting, record("existing", "v", "1"));
+
+    Assert.expectThrows(KeeperException.NoNodeException.class, () ->
+        builder()
+            .set(pExisting, record("existing", "v", "2"), 0)
+            .delete(pMissing)
+            .execute());
+
+    // pExisting must NOT have been updated.
+    Assert.assertEquals(read(pExisting).getSimpleField("v"), "1");
+  }
+
+  @Test
+  public void testCreateExistingNodeRollback() {
+    String pA = ROOT + "/a";
+    String pB = ROOT + "/b";
+    seed(pA, record("a", "v", "1"));
+    seed(pB, record("b", "v", "existing"));
+
+    Assert.expectThrows(KeeperException.NodeExistsException.class, () ->
+        builder()
+            .set(pA, record("a", "v", "2"), 0)
+            .create(pB, record("b", "v", "fresh"))
+            .execute());
+
+    Assert.assertEquals(read(pA).getSimpleField("v"), "1");
+    Assert.assertEquals(read(pB).getSimpleField("v"), "existing");
+  }
+
+  @Test
+  public void testAnyVersionSetSucceedsRegardlessOfVersion()
+      throws KeeperException {
+    String pA = ROOT + "/a";
+    seed(pA, record("a", "v", "1"));
+    _client.writeData(pA, record("a", "v", "bumped"));
+    _client.writeData(pA, record("a", "v", "bumped-again"));
+    Assert.assertEquals(version(pA), 2);
+
+    builder().set(pA, record("a", "v", "final")).execute();
+
+    Assert.assertEquals(read(pA).getSimpleField("v"), "final");
+  }
+
+  @Test
+  public void testBuilderRejectsDoubleExecute()
+      throws KeeperException {
+    String pA = ROOT + "/a";
+    seed(pA, record("a", "v", "1"));
+
+    // After a successful execute(), the builder rejects further calls.
+    ZkMultiWriteBuilder b = builder().set(pA, record("a", "v", "2"), 0);
+    b.execute();
+    Assert.expectThrows(IllegalStateException.class, b::execute);
+    Assert.expectThrows(IllegalStateException.class, () -> b.set(pA, 
record("a", "v", "3"), 1));
+
+    // After a failed execute() (atomic rollback), the builder is also burned 
— no retry through
+    // the same instance. Caller must obtain a fresh builder for the retry 
tick.
+    ZkMultiWriteBuilder failed = builder().set(pA, record("a", "v", "x"), 99); 
// stale version
+    Assert.expectThrows(KeeperException.BadVersionException.class, 
failed::execute);
+    Assert.expectThrows(IllegalStateException.class, failed::execute);
+    Assert.expectThrows(IllegalStateException.class, () -> failed.set(pA, 
record("a", "v", "y"), 1));
+
+    // Empty execute() also burns the builder.
+    ZkMultiWriteBuilder empty = builder();
+    Assert.expectThrows(IllegalStateException.class, empty::execute);
+    Assert.expectThrows(IllegalStateException.class, empty::execute);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ddab54f6bb9..a923ec3721a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -87,6 +87,8 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
@@ -128,12 +130,14 @@ import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.LogicalTableConfigUtils;
+import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.config.TierConfigUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import 
org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
+import org.apache.pinot.common.utils.helix.ZkMultiWriteBuilder;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
@@ -231,6 +235,8 @@ public class PinotHelixResourceManager {
   private final boolean _enableBatchMessageMode;
   private final int _deletedSegmentsRetentionInDays;
   private final boolean _enableTieredSegmentAssignment;
+  @Nullable
+  private final ControllerConf _controllerConf;
 
   private HelixManager _helixZkManager;
   private HelixAdmin _helixAdmin;
@@ -243,16 +249,25 @@ public class PinotHelixResourceManager {
   private TableCache _tableCache;
   private final LineageManager _lineageManager;
   private final QueryWorkloadManager _queryWorkloadManager;
+  // Dedicated ZkClient for transactional multi-path writes (atomic ZK 
multi()). Lazily built on
+  // first multiWriteZK call. A dedicated session is used because Helix 1.3.2 
does not expose
+  // multi() on BaseDataAccessor, and the underlying ZkClient inside 
ZKHelixManager is not publicly
+  // reachable — reusing it would require reflection, which breaks on Helix 
point-release field
+  // renames. The resulting extra session is consistent with the controller's 
existing footprint
+  // (_propertyStore cache client, _leadControllerManager manager client are 
each distinct).
+  private volatile ZkClient _zkClient;
 
   public PinotHelixResourceManager(String helixClusterName, @Nullable String 
dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
-      boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
+      boolean enableTieredSegmentAssignment, LineageManager lineageManager,
+      @Nullable ControllerConf controllerConf) {
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
     _isSingleTenantCluster = isSingleTenantCluster;
     _enableBatchMessageMode = enableBatchMessageMode;
     _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
     _enableTieredSegmentAssignment = enableTieredSegmentAssignment;
+    _controllerConf = controllerConf;
     _instanceAdminEndpointCache =
         
CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, 
TimeUnit.HOURS)
             .build(new CacheLoader<>() {
@@ -275,7 +290,7 @@ public class PinotHelixResourceManager {
     this(controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
         controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
-        LineageManagerFactory.create(controllerConf));
+        LineageManagerFactory.create(controllerConf), controllerConf);
   }
 
   /**
@@ -337,6 +352,12 @@ public class PinotHelixResourceManager {
    */
   public synchronized void stop() {
     _segmentDeletionManager.stop();
+    ZkClient zkClient = _zkClient;
+    if (zkClient != null) {
+      _zkClient = null;
+      LOGGER.info("Closing dedicated multiWriteZK ZkClient");
+      ZkStarter.closeAsync(zkClient);
+    }
   }
 
   /**
@@ -2081,6 +2102,68 @@ public class PinotHelixResourceManager {
     return _helixDataAccessor.getBaseDataAccessor().set(path, record, 
expectedVersion, accessOption);
   }
 
+  /**
+   * Returns a fresh {@link ZkMultiWriteBuilder} for submitting an atomic 
ZooKeeper {@code multi()}
+   * transaction over Helix property-store paths (set / create / delete / 
version-check ops on any
+   * combination of property-store znodes). Either every op commits or none do.
+   * <p>Op paths are property-store-relative (e.g. {@code 
/SEGMENTS/{table}/{segment}}); the builder
+   * prepends {@code /{cluster}/PROPERTYSTORE} before submitting to ZK. 
Multi-path writes outside
+   * the property store are intentionally not supported.
+   * <p>Requires {@link #start} to have been called (so the ZK address is 
reachable via the Helix
+   * manager); throws {@link IllegalStateException} otherwise.
+   * <p>The builder's {@code execute()} throws {@link 
org.apache.zookeeper.KeeperException} on atomic
+   * rollback (the subtype identifies the cause: {@code BadVersionException}, 
{@code NoNodeException},
+   * {@code NodeExistsException}, ...). Connectivity / session failures 
propagate as the original
+   * {@link org.apache.helix.zookeeper.zkclient.exception.ZkException}.
+   * <p>The dedicated underlying {@link ZkClient} honors the controller's
+   * {@value 
CommonConstants.Helix.ZkClient#ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG} and
+   * {@value 
CommonConstants.Helix.ZkClient#ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG} 
overrides; JVM-level
+   * ZooKeeper system properties (e.g. {@code jute.maxbuffer}) are picked up 
automatically by the
+   * ZooKeeper client library itself.
+   */
+  public ZkMultiWriteBuilder multiWriteZK() {
+    return new ZkMultiWriteBuilder(getOrBuildMultiWriteZkClient(),
+        PropertyPathBuilder.propertyStore(_helixClusterName));
+  }
+
+  private ZkClient getOrBuildMultiWriteZkClient() {
+    ZkClient c = _zkClient;
+    if (c != null) {
+      return c;
+    }
+    synchronized (this) {
+      if (_zkClient == null) {
+        Preconditions.checkState(_helixZkManager != null,
+            "multiWriteZK unavailable: PinotHelixResourceManager has not been 
started");
+        String zkAddress = _helixZkManager.getMetadataStoreConnectionString();
+        int sessionTimeoutMs = _controllerConf != null
+            ? 
_controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
+                CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS)
+            : CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS;
+        int connectTimeoutMs = _controllerConf != null
+            ? 
_controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
+                CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS)
+            : CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS;
+        LOGGER.info("Building dedicated multiWriteZK ZkClient at {} 
(session={}ms, connect={}ms)",
+            zkAddress, sessionTimeoutMs, connectTimeoutMs);
+        ZkClient built = new ZkClient.Builder()
+            .setZkServer(zkAddress)
+            .setSessionTimeout(sessionTimeoutMs)
+            .setConnectionTimeout(connectTimeoutMs)
+            .setZkSerializer(new ZNRecordSerializer())
+            .build();
+        if (!built.waitUntilConnected(connectTimeoutMs, 
TimeUnit.MILLISECONDS)) {
+          ZkStarter.closeAsync(built);
+          throw new RuntimeException(
+              "Timed out connecting to ZK at " + zkAddress + " after " + 
connectTimeoutMs
+                  + "ms for multiWriteZK");
+        }
+        _zkClient = built;
+      }
+      return _zkClient;
+    }
+  }
+
   public boolean createZKNode(String path, ZNRecord record, int accessOption, 
long ttl) {
     return _helixDataAccessor.getBaseDataAccessor().create(path, record, 
accessOption, ttl);
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java
index c7552479cfd..a2b9f78d405 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java
@@ -52,7 +52,8 @@ public class PinotHelixResourceManagerConfigValidationTest {
   public void setUp()
       throws Exception {
     LineageManager lineageManager = Mockito.mock(LineageManager.class);
-    _resourceManager = new PinotHelixResourceManager("testCluster", null, 
false, false, 7, false, lineageManager);
+    _resourceManager =
+        new PinotHelixResourceManager("testCluster", null, false, false, 7, 
false, lineageManager, null);
 
     _helixAdmin = Mockito.mock(HelixAdmin.class);
     _helixDataAccessor = Mockito.mock(HelixDataAccessor.class);
@@ -162,6 +163,19 @@ public class PinotHelixResourceManagerConfigValidationTest 
{
     verify(_helixAdmin).addInstance(eq("testCluster"), 
any(InstanceConfig.class));
   }
 
+  @Test
+  public void testMultiWriteZkThrowsBeforeStart() {
+    // The fixture never calls start(), so _helixZkManager is null. 
multiWriteZK() must refuse to
+    // build a client — the ZK address is derived from the Helix manager.
+    try {
+      _resourceManager.multiWriteZK();
+      fail("Expected IllegalStateException");
+    } catch (IllegalStateException expected) {
+      assertTrue(expected.getMessage().contains("not been started"),
+          "Unexpected message: " + expected.getMessage());
+    }
+  }
+
   private void setField(String fieldName, Object value)
       throws Exception {
     Field field = PinotHelixResourceManager.class.getDeclaredField(fieldName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index bc643b8c92e..cd70ff00e99 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -1741,6 +1741,53 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     deleteSchema(rawTableName);
   }
 
+  /**
+   * Happy-path coverage for {@link PinotHelixResourceManager#multiWriteZK()}: 
pre-creates two
+   * segment ZK metadata znodes, atomically updates both via a single multi() 
transaction, then
+   * reads back through the property store and asserts the mutated fields 
round-tripped. Verifies
+   * the dedicated multi-write ZkClient is built correctly and the ZNRecord 
serialization /
+   * deserialization path matches what the rest of the controller uses.
+   */
+  @Test
+  public void testMultiWriteZkSegmentMetadataUpdates()
+      throws Exception {
+    String segName1 = "multiWriteZk_seg_1";
+    String segName2 = "multiWriteZk_seg_2";
+    SegmentZKMetadata seg1 = new SegmentZKMetadata(segName1);
+    seg1.setCrc(1L);
+    seg1.setTotalDocs(10L);
+    SegmentZKMetadata seg2 = new SegmentZKMetadata(segName2);
+    seg2.setCrc(2L);
+    seg2.setTotalDocs(20L);
+    assertTrue(ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
OFFLINE_TABLE_NAME, seg1));
+    assertTrue(ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
OFFLINE_TABLE_NAME, seg2));
+
+    // Mutate both and submit as a single atomic transaction.
+    seg1.setCrc(11L);
+    seg1.setTotalDocs(110L);
+    seg2.setCrc(22L);
+    seg2.setTotalDocs(220L);
+    String path1 = 
ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME, 
segName1);
+    String path2 = 
ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME, 
segName2);
+    _helixResourceManager.multiWriteZK()
+        .set(path1, seg1.toZNRecord())
+        .set(path2, seg2.toZNRecord())
+        .execute();
+
+    SegmentZKMetadata read1 = 
ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, 
segName1);
+    SegmentZKMetadata read2 = 
ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, 
segName2);
+    assertNotNull(read1);
+    assertNotNull(read2);
+    assertEquals(read1.getCrc(), 11L);
+    assertEquals(read1.getTotalDocs(), 110L);
+    assertEquals(read2.getCrc(), 22L);
+    assertEquals(read2.getTotalDocs(), 220L);
+
+    // Cleanup
+    assertTrue(ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, 
OFFLINE_TABLE_NAME, segName1));
+    assertTrue(ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, 
OFFLINE_TABLE_NAME, segName2));
+  }
+
   @AfterClass
   public void tearDown() {
     stopFakeInstances();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to