walterddr commented on code in PR #9773:
URL: https://github.com/apache/pinot/pull/9773#discussion_r1019322644


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -18,86 +18,86 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 
 
 /**
  * This {@code MailboxSendOperator} is created to send {@link 
TransferableBlock}s to the receiving end.
  */
 public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxSendOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, 
RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, 
RelDistribution.Type.HASH_DISTRIBUTED);
-  private static final Random RANDOM = new Random();
-  // TODO: Deduct this value via grpc config maximum byte size; and make it 
configurable with override.
-  // TODO: Max block size is a soft limit. only counts fixedSize datatable 
byte buffer
-  private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final List<ServerInstance> _receivingStageInstances;
-  private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final String _serverHostName;
-  private final int _serverPort;
-  private final long _jobId;
-  private final int _stageId;
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final DataSchema _dataSchema;
-  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
-
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, 
DataSchema dataSchema,
+
+  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final BlockExchange _exchange;
+
+  interface BlockExchangeFactory {
+    BlockExchange build(MailboxService<TransferableBlock> mailboxService, 
List<MailboxIdentifier> destinations,
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> 
selector);
+  }
+
+  interface GenerateMailboxId {

Review Comment:
   `mailboxIDGenerator` ? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.exchange;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.runtime.blocks.BlockSplitter;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+/**
+ * A class that contains the logic for properly distributing
+ */

Review Comment:
   not clear javadoc?
   properly distributing blocks? what's the definition of "properly"?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.util.List;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+
+
+/**
+ * Interface for splitting transferable blocks. This is used for ensuring
+ * that the blocks that are sent along the wire play nicely with the
+ * underlying transport.
+ */
+public interface BlockSplitter {
+
+  /**
+   * @return a list of blocks that was split from the original {@code block}
+   */
+  List<TransferableBlock> split(TransferableBlock block, BaseDataBlock.Type 
type, int maxBlockSize);

Review Comment:
   (major) this design is cleaner but i also wonder if this introduces 
unnecessary looping of the dataset. 
   
   for example, hash distribution + split block loops over the entire data set 
twice instead of once: we can actually partition by key and put them in split 
block at the same time, but i am not sure how to achieve that in these 
interface settings. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to