This is an automated email from the ASF dual-hosted git repository.

soumyava pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c42ac8c4db Fix for latest agg to handle nulls in time column. Also 
adding optimi… (#14911)
5c42ac8c4db is described below

commit 5c42ac8c4dbbc8fa34ab3910145847cea2e49ec1
Author: Soumyava <[email protected]>
AuthorDate: Wed Sep 13 17:37:26 2023 -0700

    Fix for latest agg to handle nulls in time column. Also adding optimi… 
(#14911)
    
    * Fix for latest agg to handle nulls in time column. Also adding 
optimization for dictionary encoded string columns
    
    * One minor fix
    
    * Adding more tests for the new class
    
    * Changing the init to a putInt
---
 ...SingleStringFirstDimensionVectorAggregator.java |   2 +-
 ...SingleStringLastDimensionVectorAggregator.java} |  51 +++++-----
 .../last/StringLastAggregatorFactory.java          |  22 ++++-
 .../last/StringLastVectorAggregator.java           |  20 ++--
 .../last/StringLastVectorAggregatorTest.java       | 109 ++++++++++++++++++++-
 5 files changed, 162 insertions(+), 42 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
index 22fa50ea462..119e13464a0 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
@@ -57,7 +57,7 @@ public class SingleStringFirstDimensionVectorAggregator 
implements VectorAggrega
         position + NumericFirstVectorAggregator.NULL_OFFSET,
         useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
     );
-    buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
+    buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/SingleStringLastDimensionVectorAggregator.java
similarity index 70%
copy from 
processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
copy to 
processing/src/main/java/org/apache/druid/query/aggregation/last/SingleStringLastDimensionVectorAggregator.java
index 22fa50ea462..6b39088faa2 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/SingleStringLastDimensionVectorAggregator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.query.aggregation.first;
+package org.apache.druid.query.aggregation.last;
 
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.StringUtils;
@@ -29,15 +29,15 @@ import org.apache.druid.segment.vector.VectorValueSelector;
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 
-public class SingleStringFirstDimensionVectorAggregator implements 
VectorAggregator
+public class SingleStringLastDimensionVectorAggregator implements 
VectorAggregator
 {
   private final VectorValueSelector timeSelector;
   private final SingleValueDimensionVectorSelector 
valueDimensionVectorSelector;
-  private long firstTime;
+  private long lastTime;
   private final int maxStringBytes;
   private final boolean useDefault = NullHandling.replaceWithDefault();
 
-  public SingleStringFirstDimensionVectorAggregator(
+  public SingleStringLastDimensionVectorAggregator(
       VectorValueSelector timeSelector,
       SingleValueDimensionVectorSelector valueDimensionVectorSelector,
       int maxStringBytes
@@ -46,18 +46,18 @@ public class SingleStringFirstDimensionVectorAggregator 
implements VectorAggrega
     this.timeSelector = timeSelector;
     this.valueDimensionVectorSelector = valueDimensionVectorSelector;
     this.maxStringBytes = maxStringBytes;
-    this.firstTime = Long.MAX_VALUE;
+    this.lastTime = Long.MIN_VALUE;
   }
 
   @Override
   public void init(ByteBuffer buf, int position)
   {
-    buf.putLong(position, Long.MAX_VALUE);
+    buf.putLong(position, Long.MIN_VALUE);
     buf.put(
-        position + NumericFirstVectorAggregator.NULL_OFFSET,
+        position + NumericLastVectorAggregator.NULL_OFFSET,
         useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
     );
-    buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
+    buf.putInt(position + NumericLastVectorAggregator.VALUE_OFFSET, 0);
   }
 
   @Override
@@ -66,20 +66,20 @@ public class SingleStringFirstDimensionVectorAggregator 
implements VectorAggrega
     final long[] timeVector = timeSelector.getLongVector();
     final boolean[] nullTimeVector = timeSelector.getNullVector();
     final int[] valueVector = valueDimensionVectorSelector.getRowVector();
-    firstTime = buf.getLong(position);
+    lastTime = buf.getLong(position);
     int index;
 
-    long earliestTime;
-    for (index = startRow; index < endRow; index++) {
+    long latestTime;
+    for (index = endRow - 1; index >= startRow; index--) {
       if (nullTimeVector != null && nullTimeVector[index]) {
         continue;
       }
-      earliestTime = timeVector[index];
-      if (earliestTime < firstTime) {
-        firstTime = earliestTime;
-        buf.putLong(position, firstTime);
-        buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, 
NullHandling.IS_NOT_NULL_BYTE);
-        buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, 
valueVector[index]);
+      latestTime = timeVector[index];
+      if (latestTime > lastTime) {
+        lastTime = latestTime;
+        buf.putLong(position, lastTime);
+        buf.put(position + NumericLastVectorAggregator.NULL_OFFSET, 
NullHandling.IS_NOT_NULL_BYTE);
+        buf.putInt(position + NumericLastVectorAggregator.VALUE_OFFSET, 
valueVector[index]);
       }
     }
   }
@@ -90,28 +90,27 @@ public class SingleStringFirstDimensionVectorAggregator 
implements VectorAggrega
     final long[] timeVector = timeSelector.getLongVector();
     final boolean[] nullTimeVector = timeSelector.getNullVector();
     final int[] values = valueDimensionVectorSelector.getRowVector();
-    for (int i = 0; i < numRows; i++) {
+    for (int i = numRows - 1; i >= 0; i--) {
       if (nullTimeVector != null && nullTimeVector[i]) {
         continue;
       }
       int position = positions[i] + positionOffset;
       int row = rows == null ? i : rows[i];
-      long firstTime = buf.getLong(position);
-      if (timeVector[row] < firstTime) {
-        firstTime = timeVector[row];
-        buf.putLong(position, firstTime);
-        buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, 
NullHandling.IS_NOT_NULL_BYTE);
-        buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, 
values[row]);
+      lastTime = buf.getLong(position);
+      if (timeVector[row] > lastTime) {
+        lastTime = timeVector[row];
+        buf.putLong(position, lastTime);
+        buf.put(position + NumericLastVectorAggregator.NULL_OFFSET, 
NullHandling.IS_NOT_NULL_BYTE);
+        buf.putInt(position + NumericLastVectorAggregator.VALUE_OFFSET, 
values[row]);
       }
     }
-
   }
 
   @Nullable
   @Override
   public Object get(ByteBuffer buf, int position)
   {
-    int index = buf.getInt(position + 
NumericFirstVectorAggregator.VALUE_OFFSET);
+    int index = buf.getInt(position + 
NumericLastVectorAggregator.VALUE_OFFSET);
     long earliest = buf.getLong(position);
     String strValue = valueDimensionVectorSelector.lookupName(index);
     return new SerializablePairLongString(earliest, StringUtils.chop(strValue, 
maxStringBytes));
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
index 234f03be3f9..909b7d4971e 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -35,6 +35,7 @@ import org.apache.druid.query.aggregation.VectorAggregator;
 import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
 import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -43,6 +44,8 @@ import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.Types;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorObjectSelector;
 import org.apache.druid.segment.vector.VectorValueSelector;
@@ -160,6 +163,7 @@ public class StringLastAggregatorFactory extends 
AggregatorFactory
 
     final ColumnCapabilities capabilities = 
selectorFactory.getColumnCapabilities(fieldName);
     VectorValueSelector timeSelector = 
selectorFactory.makeValueSelector(timeColumn);
+
     if (Types.isNumeric(capabilities)) {
       VectorValueSelector valueSelector = 
selectorFactory.makeValueSelector(fieldName);
       VectorObjectSelector objectSelector = 
ExpressionVectorSelectors.castValueSelectorToObject(
@@ -171,6 +175,18 @@ public class StringLastAggregatorFactory extends 
AggregatorFactory
       );
       return new StringLastVectorAggregator(timeSelector, objectSelector, 
maxStringBytes);
     }
+
+    if (capabilities != null) {
+      if (capabilities.is(ValueType.STRING) && 
capabilities.isDictionaryEncoded().isTrue()) {
+        if (!capabilities.hasMultipleValues().isTrue()) {
+          SingleValueDimensionVectorSelector sSelector = 
selectorFactory.makeSingleValueDimensionSelector(
+              DefaultDimensionSpec.of(
+                  fieldName));
+          return new SingleStringLastDimensionVectorAggregator(timeSelector, 
sSelector, maxStringBytes);
+        }
+      }
+    }
+
     VectorObjectSelector vSelector = 
selectorFactory.makeObjectSelector(fieldName);
     if (capabilities != null) {
       return new StringLastVectorAggregator(timeSelector, vSelector, 
maxStringBytes);
@@ -296,9 +312,9 @@ public class StringLastAggregatorFactory extends 
AggregatorFactory
     }
     StringLastAggregatorFactory that = (StringLastAggregatorFactory) o;
     return maxStringBytes == that.maxStringBytes &&
-        Objects.equals(fieldName, that.fieldName) &&
-        Objects.equals(timeColumn, that.timeColumn) &&
-        Objects.equals(name, that.name);
+           Objects.equals(fieldName, that.fieldName) &&
+           Objects.equals(timeColumn, that.timeColumn) &&
+           Objects.equals(name, that.name);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java
index a18a1d4c963..00e70c78098 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java
@@ -64,8 +64,9 @@ public class StringLastVectorAggregator implements 
VectorAggregator
     if (timeSelector == null) {
       return;
     }
-    long[] times = timeSelector.getLongVector();
-    Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
+    final long[] times = timeSelector.getLongVector();
+    final boolean[] nullTimeVector = timeSelector.getNullVector();
+    final Object[] objectsWhichMightBeStrings = 
valueSelector.getObjectVector();
 
     lastTime = buf.getLong(position);
     int index;
@@ -76,6 +77,9 @@ public class StringLastVectorAggregator implements 
VectorAggregator
       if (times[i] <= lastTime) {
         continue;
       }
+      if (nullTimeVector != null && nullTimeVector[i]) {
+        continue;
+      }
       index = i;
       final boolean foldNeeded = 
StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]);
       if (foldNeeded) {
@@ -127,22 +131,24 @@ public class StringLastVectorAggregator implements 
VectorAggregator
     if (timeSelector == null) {
       return;
     }
-    long[] timeVector = timeSelector.getLongVector();
-    Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
+    final long[] timeVector = timeSelector.getLongVector();
+    final boolean[] nullTimeVector = timeSelector.getNullVector();
+    final Object[] objectsWhichMightBeStrings = 
valueSelector.getObjectVector();
 
     // iterate once over the object vector to find first non null element and
     // determine if the type is Pair or not
     boolean foldNeeded = false;
     for (Object obj : objectsWhichMightBeStrings) {
-      if (obj == null) {
-        continue;
-      } else {
+      if (obj != null) {
         foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(obj);
         break;
       }
     }
 
     for (int i = 0; i < numRows; i++) {
+      if (nullTimeVector != null && nullTimeVector[i]) {
+        continue;
+      }
       int position = positions[i] + positionOffset;
       int row = rows == null ? i : rows[i];
       long lastTime = buf.getLong(position);
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregatorTest.java
index f144552d57e..da79faae3c1 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregatorTest.java
@@ -23,7 +23,9 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.IdLookup;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnType;
@@ -49,11 +51,13 @@ public class StringLastVectorAggregatorTest extends 
InitializedNullHandlingTest
 {
   private static final double EPSILON = 1e-5;
   private static final String[] VALUES = new String[]{"a", "b", null, "c"};
+  private static final int[] DICT_VALUES = new int[]{1, 2, 0, 3};
   private static final long[] LONG_VALUES = new long[]{1L, 2L, 3L, 4L};
   private static final String[] STRING_VALUES = new String[]{"1", "2", "3", 
"4"};
   private static final float[] FLOAT_VALUES = new float[]{1.0f, 2.0f, 3.0f, 
4.0f};
   private static final double[] DOUBLE_VALUES = new double[]{1.0, 2.0, 3.0, 
4.0};
   private static final boolean[] NULLS = new boolean[]{false, false, true, 
false};
+  private static final boolean[] NULLS1 = new boolean[]{false, false};
   private static final String NAME = "NAME";
   private static final String FIELD_NAME = "FIELD_NAME";
   private static final String FIELD_NAME_LONG = "LONG_NAME";
@@ -74,6 +78,7 @@ public class StringLastVectorAggregatorTest extends 
InitializedNullHandlingTest
 
   private StringLastAggregatorFactory stringLastAggregatorFactory;
   private StringLastAggregatorFactory stringLastAggregatorFactory1;
+  private SingleStringLastDimensionVectorAggregator targetSingleDim;
 
   private VectorColumnSelectorFactory selectorFactory;
 
@@ -96,7 +101,7 @@ public class StringLastVectorAggregatorTest extends 
InitializedNullHandlingTest
       @Override
       public boolean[] getNullVector()
       {
-        return NULLS;
+        return null;
       }
     };
     nonStringValueSelector = new BaseLongVectorValueSelector(new 
NoFilterVectorOffset(
@@ -163,9 +168,9 @@ public class StringLastVectorAggregatorTest extends 
InitializedNullHandlingTest
       }
     };
     BaseLongVectorValueSelector timeSelectorForPairs = new 
BaseLongVectorValueSelector(new NoFilterVectorOffset(
-        times.length,
+        timesSame.length,
         0,
-        times.length
+        timesSame.length
     ))
     {
       @Override
@@ -178,7 +183,7 @@ public class StringLastVectorAggregatorTest extends 
InitializedNullHandlingTest
       @Override
       public boolean[] getNullVector()
       {
-        return new boolean[0];
+        return NULLS1;
       }
     };
     VectorObjectSelector selectorForPairs = new VectorObjectSelector()
@@ -212,7 +217,61 @@ public class StringLastVectorAggregatorTest extends 
InitializedNullHandlingTest
       @Override
       public SingleValueDimensionVectorSelector 
makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
       {
-        return null;
+        return new SingleValueDimensionVectorSelector()
+        {
+          @Override
+          public int[] getRowVector()
+          {
+            return DICT_VALUES;
+          }
+
+          @Override
+          public int getValueCardinality()
+          {
+            return DICT_VALUES.length;
+          }
+
+          @Nullable
+          @Override
+          public String lookupName(int id)
+          {
+            switch (id) {
+              case 1:
+                return "a";
+              case 2:
+                return "b";
+              case 3:
+                return "c";
+              default:
+                return null;
+            }
+          }
+
+          @Override
+          public boolean nameLookupPossibleInAdvance()
+          {
+            return false;
+          }
+
+          @Nullable
+          @Override
+          public IdLookup idLookup()
+          {
+            return null;
+          }
+
+          @Override
+          public int getMaxVectorSize()
+          {
+            return DICT_VALUES.length;
+          }
+
+          @Override
+          public int getCurrentVectorSize()
+          {
+            return DICT_VALUES.length;
+          }
+        };
       }
 
       @Override
@@ -257,6 +316,8 @@ public class StringLastVectorAggregatorTest extends 
InitializedNullHandlingTest
 
     target = new StringLastVectorAggregator(timeSelector, selector, 10);
     targetWithPairs = new StringLastVectorAggregator(timeSelectorForPairs, 
selectorForPairs, 10);
+    targetSingleDim = new 
SingleStringLastDimensionVectorAggregator(timeSelector, 
selectorFactory.makeSingleValueDimensionSelector(
+        DefaultDimensionSpec.of(FIELD_NAME)), 10);
     clearBufferForPositions(0, 0);
 
 
@@ -361,6 +422,44 @@ public class StringLastVectorAggregatorTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void aggregateSingleDim()
+  {
+    targetSingleDim.aggregate(buf, 0, 0, VALUES.length);
+    Pair<Long, String> result = (Pair<Long, String>) targetSingleDim.get(buf, 
0);
+    Assert.assertEquals(times[3], result.lhs.longValue());
+    Assert.assertEquals(VALUES[3], result.rhs);
+  }
+
+  @Test
+  public void aggregateBatchWithoutRowsSingleDim()
+  {
+    int[] positions = new int[]{0, 43, 70};
+    int positionOffset = 2;
+    clearBufferForPositions(positionOffset, positions);
+    targetSingleDim.aggregate(buf, 3, positions, null, positionOffset);
+    for (int i = 0; i < positions.length; i++) {
+      Pair<Long, String> result = (Pair<Long, String>) 
targetSingleDim.get(buf, positions[i] + positionOffset);
+      Assert.assertEquals(times[i], result.lhs.longValue());
+      Assert.assertEquals(VALUES[i], result.rhs);
+    }
+  }
+
+  @Test
+  public void aggregateBatchWithRowsSingleDim()
+  {
+    int[] positions = new int[]{0, 43, 70};
+    int[] rows = new int[]{3, 2, 0};
+    int positionOffset = 2;
+    clearBufferForPositions(positionOffset, positions);
+    targetSingleDim.aggregate(buf, 3, positions, rows, positionOffset);
+    for (int i = 0; i < positions.length; i++) {
+      Pair<Long, String> result = (Pair<Long, String>) 
targetSingleDim.get(buf, positions[i] + positionOffset);
+      Assert.assertEquals(times[rows[i]], result.lhs.longValue());
+      Assert.assertEquals(VALUES[rows[i]], result.rhs);
+    }
+  }
+
   private void clearBufferForPositions(int offset, int... positions)
   {
     for (int position : positions) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to