agavra commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1018255056


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -18,26 +18,208 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
 
 
 public class AggregateOperatorTest {
 
+  private AutoCloseable _mocks;
+
+  @Mock
+  private Operator<TransferableBlock> _input;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void shouldHandleUpstreamErrorBlocks() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("foo!")));
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+
+    // When:
+    TransferableBlock block1 = operator.nextBlock(); // build
+
+    // Then:
+    Mockito.verify(_input, Mockito.times(1)).nextBlock();
+    Assert.assertTrue(block1.isErrorBlock(), "Input errors should propagate 
immediately");
+  }
+
+  @Test
+  public void shouldHandleEndOfStreamBlockWithNoOtherInputs() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+
+    // When:
+    TransferableBlock block = operator.nextBlock();
+
+    // Then:
+    Mockito.verify(_input, Mockito.times(1)).nextBlock();
+    Assert.assertTrue(block.isEndOfStreamBlock(), "EOS blocks should 
propagate");
+  }
+
+  @Test
+  public void shouldHandleUpstreamNoOpBlocksWhileConstructing() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
+    Mockito.when(_input.nextBlock())
+        .thenReturn(block(inSchema, new Object[]{1, 1}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);

Review Comment:
   > but i am also ok if the goal is to keep these tests isolated and easy to 
maintain.
   
   that's my goal :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to