This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a8a3db5ef1 [multistage] make mailboxID JSON serialized (#10154)
a8a3db5ef1 is described below

commit a8a3db5ef1ce8388349a07ff589ea4e8c093f7c8
Author: Almog Gavra <[email protected]>
AuthorDate: Fri Jan 20 09:35:36 2023 -0800

    [multistage] make mailboxID JSON serialized (#10154)
---
 .../query/mailbox/InMemorySendingMailbox.java      |   2 +-
 .../pinot/query/mailbox/JsonMailboxIdentifier.java | 132 +++++++++++++++++++++
 .../pinot/query/mailbox/MailboxIdentifier.java     |  22 +---
 .../apache/pinot/query/mailbox/ServerAddress.java  |  82 +++++++++++++
 .../query/mailbox/StringMailboxIdentifier.java     | 105 ----------------
 .../java/org/apache/pinot/query/mailbox/Utils.java |   9 +-
 .../channel/MailboxContentStreamObserver.java      |   6 +-
 .../runtime/operator/MailboxReceiveOperator.java   |   9 +-
 .../runtime/operator/MailboxSendOperator.java      |  13 +-
 .../query/mailbox/GrpcMailboxServiceTest.java      |  12 +-
 .../query/mailbox/InMemoryMailboxServiceTest.java  |   8 +-
 .../mailbox/MultiplexingMailboxServiceTest.java    |   8 +-
 .../runtime/executor/RoundRobinSchedulerTest.java  |   6 +-
 .../operator/MailboxReceiveOperatorTest.java       |  93 ++++++++++-----
 .../runtime/operator/MailboxSendOperatorTest.java  |  12 +-
 .../operator/exchange/BlockExchangeTest.java       |   6 +-
 .../operator/exchange/BroadcastExchangeTest.java   |   6 +-
 .../operator/exchange/HashExchangeTest.java        |   6 +-
 .../operator/exchange/RandomExchangeTest.java      |   6 +-
 .../operator/exchange/SingletonExchangeTest.java   |   4 +-
 20 files changed, 340 insertions(+), 207 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 9c2a713c36..ebde5b9c41 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -49,7 +49,7 @@ public class InMemorySendingMailbox implements 
SendingMailbox<TransferableBlock>
       throw new IllegalStateException("Failed to insert into in-memory mailbox 
"
           + _mailboxId);
     }
-    _gotMailCallback.accept(new StringMailboxIdentifier(_mailboxId));
+    _gotMailCallback.accept(JsonMailboxIdentifier.parse(_mailboxId));
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
new file mode 100644
index 0000000000..7068b06ab6
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Objects;
+
+
+public class JsonMailboxIdentifier implements MailboxIdentifier {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  private final String _jobId;
+  private final String _from;
+  private final String _to;
+
+  private final ServerAddress _fromAddress;
+  private final ServerAddress _toAddress;
+
+  @JsonCreator
+  public JsonMailboxIdentifier(
+      @JsonProperty(value = "jobId") String jobId,
+      @JsonProperty(value = "from") String from,
+      @JsonProperty(value = "to") String to
+  ) {
+    _jobId = jobId;
+    _from = from;
+    _to = to;
+    _fromAddress = ServerAddress.parse(_from);
+    _toAddress = ServerAddress.parse(_to);
+  }
+
+  public JsonMailboxIdentifier(
+      String jobId,
+      ServerAddress from,
+      ServerAddress to
+  ) {
+    _jobId = jobId;
+    _from = from.toString();
+    _to = to.toString();
+    _fromAddress = from;
+    _toAddress = to;
+  }
+
+  public static JsonMailboxIdentifier parse(final String mailboxId) {
+    try {
+      return MAPPER.readValue(mailboxId, JsonMailboxIdentifier.class);
+    } catch (JsonProcessingException e) {
+      throw new IllegalArgumentException("Invalid mailboxID: '" + mailboxId + 
"'. If you see this exception it may "
+          + "be because you are doing a rolling upgrade from an old version of 
Pinot that is not backwards "
+          + "compatible with the current V2 engine.", e);
+    }
+  }
+
+  @Override
+  public String getJobId() {
+    return _jobId;
+  }
+
+  public String getFrom() {
+    return _from;
+  }
+
+  @JsonIgnore
+  @Override
+  public ServerAddress getFromHost() {
+    return _fromAddress;
+  }
+
+  public String getTo() {
+    return _to;
+  }
+
+  @JsonIgnore
+  @Override
+  public ServerAddress getToHost() {
+    return _toAddress;
+  }
+
+  @JsonIgnore
+  @Override
+  public boolean isLocal() {
+    return _fromAddress.equals(_toAddress);
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    JsonMailboxIdentifier that = (JsonMailboxIdentifier) o;
+    return Objects.equals(_jobId, that._jobId) && Objects.equals(_from, 
that._from) && Objects.equals(_to, that._to)
+        && Objects.equals(_fromAddress, that._fromAddress) && 
Objects.equals(_toAddress, that._toAddress);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_jobId, _from, _to, _fromAddress, _toAddress);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
index 8646f508ae..77647d3db2 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
@@ -32,28 +32,14 @@ public interface MailboxIdentifier {
   String getJobId();
 
   /**
-   * get the sender host.
-   * @return sender host
+   * @return the sender address
    */
-  String getFromHost();
+  ServerAddress getFromHost();
 
   /**
-   * get the sender port.
-   * @return sender port
+   * @return the destination address
    */
-  int getFromPort();
-
-  /**
-   * get the receiver host.
-   * @return receiver host
-   */
-  String getToHost();
-
-  /**
-   * get the receiver port.
-   * @return receiver port
-   */
-  int getToPort();
+  ServerAddress getToHost();
 
   /**
    * Checks whether sender and receiver are in the same JVM.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
new file mode 100644
index 0000000000..cda2b332fe
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.query.mailbox;
+
+import java.util.Objects;
+
+
+public class ServerAddress {
+
+  private final String _hostname;
+  private final int _port;
+
+  public ServerAddress(String hostname, int port) {
+    _hostname = hostname;
+    _port = port;
+  }
+
+  /**
+   * Parses the standard hostname:port pattern into
+   * a {@code ServerAddress}
+   *
+   * @param address the serialized string
+   * @return the deserialized form
+   */
+  public static ServerAddress parse(String address) {
+    String[] split = address.split(":");
+    return new ServerAddress(split[0], Integer.parseInt(split[1]));
+  }
+
+  /**
+   * @return the server's hostname
+   */
+  public String hostname() {
+    return _hostname;
+  }
+
+  /**
+   * @return the server's port
+   */
+  public int port() {
+    return _port;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ServerAddress that = (ServerAddress) o;
+    return _port == that._port && Objects.equals(_hostname, that._hostname);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_hostname, _port);
+  }
+
+  @Override
+  public String toString() {
+    return _hostname + ":" + _port;
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
deleted file mode 100644
index c3d3078780..0000000000
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.mailbox;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
-
-public class StringMailboxIdentifier implements MailboxIdentifier {
-  private static final Joiner JOINER = Joiner.on(':');
-
-  private final String _mailboxIdString;
-  private final String _jobId;
-  private final String _fromHost;
-  private final int _fromPort;
-  private final String _toHost;
-  private final int _toPort;
-
-  public StringMailboxIdentifier(String jobId, String fromHost, int fromPort, 
String toHost,
-      int toPort) {
-    _jobId = jobId;
-    _fromHost = fromHost;
-    _fromPort = fromPort;
-    _toHost = toHost;
-    _toPort = toPort;
-    _mailboxIdString = JOINER.join(_jobId, _fromHost, _fromPort, _toHost, 
_toPort);
-  }
-
-  public StringMailboxIdentifier(String mailboxId) {
-    _mailboxIdString = mailboxId;
-    String[] splits = mailboxId.split(":");
-    Preconditions.checkState(splits.length == 5);
-    _jobId = splits[0];
-    _fromHost = splits[1];
-    _fromPort = Integer.parseInt(splits[2]);
-    _toHost = splits[3];
-    _toPort = Integer.parseInt(splits[4]);
-
-    // assert that resulting string are identical.
-    Preconditions.checkState(
-        JOINER.join(_jobId, _fromHost, _fromPort, _toHost, 
_toPort).equals(_mailboxIdString));
-  }
-
-  @Override
-  public String getJobId() {
-    return _jobId;
-  }
-
-  @Override
-  public String getFromHost() {
-    return _fromHost;
-  }
-
-  @Override
-  public int getFromPort() {
-    return _fromPort;
-  }
-
-  @Override
-  public String getToHost() {
-    return _toHost;
-  }
-
-  @Override
-  public int getToPort() {
-    return _toPort;
-  }
-
-  @Override
-  public boolean isLocal() {
-    return _fromHost.equals(_toHost) && _fromPort == _toPort;
-  }
-
-  @Override
-  public String toString() {
-    return _mailboxIdString;
-  }
-
-  @Override
-  public int hashCode() {
-    return _mailboxIdString.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object that) {
-    return (that instanceof StringMailboxIdentifier) && 
_mailboxIdString.equals(
-        ((StringMailboxIdentifier) that)._mailboxIdString);
-  }
-}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
index 90a4070544..60ae614a2f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
@@ -18,11 +18,7 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import com.google.common.base.Joiner;
-
-
 public final class Utils {
-  private static final Joiner JOINER = Joiner.on(':');
 
   private Utils() {
     // do not instantiate.
@@ -30,11 +26,12 @@ public final class Utils {
 
   public static String constructChannelId(String mailboxId) {
     MailboxIdentifier mailboxIdentifier = toMailboxIdentifier(mailboxId);
-    return JOINER.join(mailboxIdentifier.getToHost(), 
mailboxIdentifier.getToPort());
+    ServerAddress dest = mailboxIdentifier.getToHost();
+    return dest.toString();
   }
 
   public static MailboxIdentifier toMailboxIdentifier(String mailboxId) {
-    return new StringMailboxIdentifier(mailboxId);
+    return JsonMailboxIdentifier.parse(mailboxId);
   }
 
   public static String fromMailboxIdentifier(MailboxIdentifier mailboxId) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 9fd3a4b2ba..4a35c8d660 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -31,8 +31,8 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,7 +71,7 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
   private ReadWriteLock _errorLock = new ReentrantReadWriteLock();
   @GuardedBy("_errorLock")
   private Mailbox.MailboxContent _errorContent = null;
-  private StringMailboxIdentifier _mailboxId;
+  private JsonMailboxIdentifier _mailboxId;
   private Consumer<MailboxIdentifier> _gotMailCallback;
 
   private void updateMaxBufferSize() {
@@ -132,7 +132,7 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
 
   @Override
   public void onNext(Mailbox.MailboxContent mailboxContent) {
-    _mailboxId = new StringMailboxIdentifier(mailboxContent.getMailboxId());
+    _mailboxId = JsonMailboxIdentifier.parse(mailboxContent.getMailboxId());
 
     GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox) 
_mailboxService.getReceivingMailbox(_mailboxId);
     _gotMailCallback = receivingMailbox.init(this);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index aa569ba56f..8fb1d145c1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -30,10 +30,11 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.mailbox.ServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.service.QueryConfig;
@@ -69,8 +70,10 @@ public class MailboxReceiveOperator extends 
MultiStageOperator {
 
   private static MailboxIdentifier toMailboxId(ServerInstance fromInstance, 
long jobId, long stageId,
       String receiveHostName, int receivePort) {
-    return new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
fromInstance.getHostname(),
-        fromInstance.getQueryMailboxPort(), receiveHostName, receivePort);
+    return new JsonMailboxIdentifier(
+        String.format("%s_%s", jobId, stageId),
+        new ServerAddress(fromInstance.getHostname(), 
fromInstance.getQueryMailboxPort()),
+        new ServerAddress(receiveHostName, receivePort));
   }
 
   // TODO: Move deadlineInNanoSeconds to OperatorContext.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 79d64bfefe..047f5fd5b2 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -29,9 +29,10 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.mailbox.ServerAddress;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -159,9 +160,11 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     return transferableBlock;
   }
 
-  private static StringMailboxIdentifier toMailboxId(
-      ServerInstance serverInstance, long jobId, int stageId, String 
serverHostName, int serverPort) {
-    return new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
serverHostName, serverPort,
-        serverInstance.getHostname(), serverInstance.getQueryMailboxPort());
+  private static JsonMailboxIdentifier toMailboxId(
+      ServerInstance destination, long jobId, int stageId, String sender, int 
senderPort) {
+    return new JsonMailboxIdentifier(
+        String.format("%s_%s", jobId, stageId),
+        new ServerAddress(sender, senderPort),
+        new ServerAddress(destination.getHostname(), 
destination.getQueryMailboxPort()));
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 7585c0ca87..70b7763ca9 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -74,8 +74,10 @@ public class GrpcMailboxServiceTest {
   public void testHappyPath()
       throws Exception {
     // Given:
-    StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
-        "happypath", "localhost", _mailboxService1.getMailboxPort(), 
"localhost", _mailboxService2.getMailboxPort());
+    JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+        "happypath",
+        new ServerAddress("localhost", _mailboxService1.getMailboxPort()),
+        new ServerAddress("localhost", _mailboxService2.getMailboxPort()));
     SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId);
     ReceivingMailbox<TransferableBlock> receivingMailbox = 
_mailboxService2.getReceivingMailbox(mailboxId);
     CountDownLatch gotData = new CountDownLatch(1);
@@ -104,8 +106,10 @@ public class GrpcMailboxServiceTest {
   public void testGrpcException()
       throws Exception {
     // Given:
-    StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
-        "exception", "localhost", _mailboxService1.getMailboxPort(), 
"localhost", _mailboxService2.getMailboxPort());
+    JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+        "exception",
+        new ServerAddress("localhost", _mailboxService1.getMailboxPort()),
+        new ServerAddress("localhost", _mailboxService2.getMailboxPort()));
     SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId);
     ReceivingMailbox<TransferableBlock> receivingMailbox = 
_mailboxService2.getReceivingMailbox(mailboxId);
     CountDownLatch gotData = new CountDownLatch(1);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index 7ffb51238f..a1eae8e1ff 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -38,8 +38,8 @@ public class InMemoryMailboxServiceTest {
   public void testHappyPath()
       throws Exception {
     InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
-    final StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
-        "happyPathJob", "localhost", 0, "localhost", 0);
+    final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+        "happyPathJob", new ServerAddress("localhost", 0), new 
ServerAddress("localhost", 0));
     InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox) 
mailboxService.getReceivingMailbox(
         mailboxId);
     InMemorySendingMailbox sendingMailbox = (InMemorySendingMailbox) 
mailboxService.getSendingMailbox(mailboxId);
@@ -74,8 +74,8 @@ public class InMemoryMailboxServiceTest {
   @Test
   public void testNonLocalMailboxId() {
     InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
-    final StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
-        "happyPathJob", "localhost", 0, "localhost", 1);
+    final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+        "happyPathJob", new ServerAddress("localhost", 0), new 
ServerAddress("localhost", 1));
 
     // Test getReceivingMailbox
     try {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
index 347fcd0f59..110aeb9d74 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
@@ -24,10 +24,10 @@ import org.testng.annotations.Test;
 
 
 public class MultiplexingMailboxServiceTest {
-  private static final StringMailboxIdentifier LOCAL_MAILBOX_ID = new 
StringMailboxIdentifier(
-      "localJobId", "localhost", 0, "localhost", 0);
-  private static final StringMailboxIdentifier NON_LOCAL_MAILBOX_ID = new 
StringMailboxIdentifier(
-      "localJobId", "localhost", 0, "localhost", 1);
+  private static final JsonMailboxIdentifier LOCAL_MAILBOX_ID = new 
JsonMailboxIdentifier(
+      "localJobId", new ServerAddress("localhost", 0), new 
ServerAddress("localhost", 0));
+  private static final JsonMailboxIdentifier NON_LOCAL_MAILBOX_ID = new 
JsonMailboxIdentifier(
+      "localJobId", new ServerAddress("localhost", 0), new 
ServerAddress("localhost", 1));
 
   @Test
   public void testHappyPath() {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index bb3faf63ec..0435b50daa 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -20,8 +20,8 @@ package org.apache.pinot.query.runtime.executor;
 
 import com.google.common.collect.ImmutableList;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.mockito.Mock;
@@ -34,8 +34,8 @@ import org.testng.annotations.Test;
 
 public class RoundRobinSchedulerTest {
 
-  private static final MailboxIdentifier MAILBOX_1 = new 
StringMailboxIdentifier("1_1:foo:2:bar:3");
-  private static final MailboxIdentifier MAILBOX_2 = new 
StringMailboxIdentifier("1_2:foo:2:bar:3");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1_1", "foo:2", "bar:3");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1_2", "foo:2", "bar:3");
 
   @Mock
   private MultiStageOperator _operator;
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 5c3008ae42..ce6777df14 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -26,9 +26,10 @@ import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.mailbox.ServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.mockito.Mock;
@@ -177,8 +178,10 @@ public class MailboxReceiveOperatorTest {
     int toPort = 8888;
     String toHost = "toHost";
 
-    StringMailboxIdentifier expectedMailboxId =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
serverHost, server2port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(serverHost, server2port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(true);
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
@@ -208,8 +211,10 @@ public class MailboxReceiveOperatorTest {
     int toPort = 8888;
     String toHost = "toHost";
 
-    StringMailboxIdentifier expectedMailboxId =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
serverHost, server2port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(serverHost, server2port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     // Receive null mailbox during timeout.
@@ -241,8 +246,10 @@ public class MailboxReceiveOperatorTest {
     int toPort = 8888;
     String toHost = "toHost";
 
-    StringMailboxIdentifier expectedMailboxId =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
serverHost, server2port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(serverHost, server2port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -273,8 +280,10 @@ public class MailboxReceiveOperatorTest {
     int toPort = 8888;
     String toHost = "toHost";
 
-    StringMailboxIdentifier expectedMailboxId =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
serverHost, server2port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(serverHost, server2port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
@@ -309,8 +318,10 @@ public class MailboxReceiveOperatorTest {
     int toPort = 8888;
     String toHost = "toHost";
 
-    StringMailboxIdentifier expectedMailboxId =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
serverHost, server2port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(serverHost, server2port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Exception e = new Exception("errorBlock");
@@ -341,13 +352,17 @@ public class MailboxReceiveOperatorTest {
     int toPort = 8888;
     String toHost = "toHost";
 
-    StringMailboxIdentifier expectedMailboxId1 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server1Host, server1Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId1 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server1Host, server1Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(true);
 
-    StringMailboxIdentifier expectedMailboxId2 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server2Host, server2Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId2 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server2Host, server2Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
@@ -379,14 +394,18 @@ public class MailboxReceiveOperatorTest {
     int toPort = 8888;
     String toHost = "toHost";
 
-    StringMailboxIdentifier expectedMailboxId1 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server1Host, server1Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId1 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server1Host, server1Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Mockito.when(_mailbox.receive()).thenReturn(null);
 
-    StringMailboxIdentifier expectedMailboxId2 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server2Host, server2Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId2 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server2Host, server2Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
@@ -419,8 +438,10 @@ public class MailboxReceiveOperatorTest {
     String toHost = "toHost";
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
-    StringMailboxIdentifier expectedMailboxId1 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server1Host, server1Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId1 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server1Host, server1Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Object[] expRow1 = new Object[]{1, 1};
@@ -430,8 +451,10 @@ public class MailboxReceiveOperatorTest {
             TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     Object[] expRow3 = new Object[]{3, 3};
-    StringMailboxIdentifier expectedMailboxId2 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server2Host, server2Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId2 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server2Host, server2Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow3));
@@ -474,16 +497,20 @@ public class MailboxReceiveOperatorTest {
     String toHost = "toHost";
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
-    StringMailboxIdentifier expectedMailboxId1 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server1Host, server1Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId1 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server1Host, server1Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Mockito.when(_mailbox.receive())
         .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("mailboxError")));
 
     Object[] expRow3 = new Object[]{3, 3};
-    StringMailboxIdentifier expectedMailboxId2 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server2Host, server2Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId2 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server2Host, server2Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow3));
@@ -515,15 +542,19 @@ public class MailboxReceiveOperatorTest {
     String toHost = "toHost";
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
-    StringMailboxIdentifier expectedMailboxId1 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server1Host, server1Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId1 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server1Host, server1Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Mockito.when(_mailbox.receive()).thenThrow(new Exception("mailboxError"));
 
     Object[] expRow3 = new Object[]{3, 3};
-    StringMailboxIdentifier expectedMailboxId2 =
-        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), 
server2Host, server2Port, toHost, toPort);
+    JsonMailboxIdentifier expectedMailboxId2 =
+        new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+            new ServerAddress(server2Host, server2Port),
+            new ServerAddress(toHost, toPort));
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow3));
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 9f6577dc66..b616efcf91 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -24,8 +24,8 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -74,7 +74,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
@@ -91,7 +91,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock errorBlock = 
TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
     Mockito.when(_input.nextBlock())
         .thenReturn(errorBlock);
@@ -109,7 +109,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
     Mockito.when(_input.nextBlock())
         .thenThrow(new RuntimeException("foo!"));
     ArgumentCaptor<TransferableBlock> captor = 
ArgumentCaptor.forClass(TransferableBlock.class);
@@ -128,7 +128,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock eosBlock = 
TransferableBlockUtils.getEndOfStreamTransferableBlock();
     Mockito.when(_input.nextBlock())
         .thenReturn(eosBlock);
@@ -146,7 +146,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
     Mockito.when(_input.nextBlock())
         .thenReturn(dataBlock)
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index 617718a566..d2cb71a04c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -26,10 +26,10 @@ import java.util.function.BiFunction;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -45,8 +45,8 @@ import org.testng.annotations.Test;
 
 public class BlockExchangeTest {
 
-  private static final MailboxIdentifier MAILBOX_1 = new 
StringMailboxIdentifier("1:host:1:host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new 
StringMailboxIdentifier("1:host:1:host:2");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "host:1", "host:2");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
index 14b673dcd8..c99ea643c6 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
 import java.util.Iterator;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.mockito.Mock;
@@ -34,8 +34,8 @@ import org.testng.annotations.Test;
 
 
 public class BroadcastExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new 
StringMailboxIdentifier("1:host:1:host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new 
StringMailboxIdentifier("1:host:1:host:2");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "host:1", "host:2");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
index cf5420a92f..4cd1d71075 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
@@ -21,9 +21,9 @@ package org.apache.pinot.query.runtime.operator.exchange;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import java.util.Iterator;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -38,8 +38,8 @@ import org.testng.annotations.Test;
 
 public class HashExchangeTest {
 
-  private static final MailboxIdentifier MAILBOX_1 = new 
StringMailboxIdentifier("1:host:1:host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new 
StringMailboxIdentifier("1:host:1:host:2");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "host:1", "host:2");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
index cf6033cbd6..19e3db1711 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
 import java.util.Iterator;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.mockito.Mock;
@@ -33,8 +33,8 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class RandomExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new 
StringMailboxIdentifier("1:host:1:host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new 
StringMailboxIdentifier("1:host:1:host:2");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "host:1", "host:2");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index 203cf7d907..e38e533a9a 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
 import java.util.Iterator;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.mockito.Mock;
@@ -34,7 +34,7 @@ import org.testng.annotations.Test;
 
 
 public class SingletonExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new 
StringMailboxIdentifier("1:host:1:host:1");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
 
   private AutoCloseable _mocks;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to