abhishekagarwal87 commented on a change in pull request #10949:
URL: https://github.com/apache/druid/pull/10949#discussion_r613828445



##########
File path: 
processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongSerde.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.common.config.NullHandling;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * The class serializes a Long-Long pair (SerializablePair<Long, Long>).
+ * The serialization structure is: Long:Long
+ * <p>
+ * The class is used on first/last Long aggregators to store the time and the 
first/last Long.
+ * Long:Long -> Timestamp:Long

Review comment:
       nit
   ```suggestion
    * (Long:timestamp, Long:value)
   ```

##########
File path: 
processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.segment.GenericColumnSerializer;
+import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.data.GenericIndexed;
+import org.apache.druid.segment.data.ObjectStrategy;
+import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
+import org.apache.druid.segment.serde.ComplexMetricExtractor;
+import org.apache.druid.segment.serde.ComplexMetricSerde;
+import 
org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * The class serializes/deserializes a Pair<Long, ?> object for 
double/float/longFirst and double/float/longLast aggregators
+ */
+public abstract class AbstractSerializableLongObjectPairSerde<T extends 
SerializablePair<Long, ?>>
+    extends ComplexMetricSerde
+{
+  private final Class<T> pairClassObject;
+
+  public AbstractSerializableLongObjectPairSerde(Class<T> pairClassObject)
+  {
+    this.pairClassObject = pairClassObject;
+  }
+
+  @Override
+  public ComplexMetricExtractor getExtractor()
+  {
+    return new ComplexMetricExtractor()
+    {
+      @Override
+      public Class<T> extractedClass()
+      {
+        return pairClassObject;
+      }
+
+      @Override
+      public Object extractValue(InputRow inputRow, String metricName)
+      {
+        return inputRow.getRaw(metricName);
+      }
+    };
+  }
+
+  @Override
+  public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
+  {
+    final GenericIndexed column = GenericIndexed.read(buffer, 
getObjectStrategy(), columnBuilder.getFileMapper());
+    columnBuilder.setComplexColumnSupplier(new 
ComplexColumnPartSupplier(getTypeName(), column));
+  }
+
+  @Override
+  public ObjectStrategy<T> getObjectStrategy()
+  {
+    return new ObjectStrategy<T>()
+    {
+      @Override
+      public int compare(@Nullable T o1, @Nullable T o2)
+      {
+        return getLongObjectPairComparator().compare(o1, o2);

Review comment:
       it would be nice to not create a new comparator object for each 
comparison op.

##########
File path: 
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
##########
@@ -49,16 +55,44 @@ public NumericFirstAggregator(BaseLongColumnValueSelector 
timeSelector, TSelecto
   /**
    * Store the current primitive typed 'first' value
    */
-  abstract void setCurrentValue();
+  abstract void setCurrentValue(ColumnValueSelector valueSelector);
+
+  abstract void setCurrentValue(Number number);
 
   @Override
   public void aggregate()
   {
+    if (needsFoldCheck) {
+
+      // Need to read this first (before time), just in case it's a 
SerializablePairLongString (we don't know; it's
+      // detected at query time).
+      final Object object = valueSelector.getObject();
+
+      if (object instanceof SerializablePair) {
+
+        // cast to Pair<Long, Number> to support reindex from type such as 
doubleFirst into longFirst
+        final SerializablePair<Long, Number> pair = (SerializablePair<Long, 
Number>) object;
+        if (pair.lhs < firstTime) {
+          firstTime = pair.lhs;
+
+          // rhs might be NULL under SQL-compatibility mode

Review comment:
       a bit out of my depth here. what will happen if the aggregate was stored 
as null in segment since sql compatibility was on in the task writing the 
segment. But then sql compatability is turned off when the segment data is 
being read. should it still be read as null? 

##########
File path: 
processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java
##########
@@ -183,6 +201,40 @@ public void testSerde() throws Exception
     Assert.assertEquals(floatFirstAggregatorFactory, 
mapper.readValue(doubleSpecJson, AggregatorFactory.class));
   }
 
+  @Test
+  public void testFloatFirstAggregateCombiner()
+  {
+    AggregateCombiner floatFirstAggregateCombiner = 
combiningAggFactory.makeAggregateCombiner();
+
+    SerializablePair[] inputPairs = {
+        new SerializablePair<>(5L, 134.3f),

Review comment:
       can you also add tests with null values as input and/or expected result? 

##########
File path: 
processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.segment.GenericColumnSerializer;
+import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.data.GenericIndexed;
+import org.apache.druid.segment.data.ObjectStrategy;
+import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
+import org.apache.druid.segment.serde.ComplexMetricExtractor;
+import org.apache.druid.segment.serde.ComplexMetricSerde;
+import 
org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * The class serializes/deserializes a Pair<Long, ?> object for 
double/float/longFirst and double/float/longLast aggregators
+ */
+public abstract class AbstractSerializableLongObjectPairSerde<T extends 
SerializablePair<Long, ?>>
+    extends ComplexMetricSerde
+{
+  private final Class<T> pairClassObject;
+
+  public AbstractSerializableLongObjectPairSerde(Class<T> pairClassObject)
+  {
+    this.pairClassObject = pairClassObject;
+  }
+
+  @Override
+  public ComplexMetricExtractor getExtractor()
+  {
+    return new ComplexMetricExtractor()
+    {
+      @Override
+      public Class<T> extractedClass()
+      {
+        return pairClassObject;
+      }
+
+      @Override
+      public Object extractValue(InputRow inputRow, String metricName)
+      {
+        return inputRow.getRaw(metricName);
+      }
+    };
+  }
+
+  @Override
+  public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
+  {
+    final GenericIndexed column = GenericIndexed.read(buffer, 
getObjectStrategy(), columnBuilder.getFileMapper());
+    columnBuilder.setComplexColumnSupplier(new 
ComplexColumnPartSupplier(getTypeName(), column));
+  }
+
+  @Override
+  public ObjectStrategy<T> getObjectStrategy()

Review comment:
       I think it will be cleaner to have each subclass implement 
getObjectStrategy() and then we need not have three abstract methods. 

##########
File path: 
processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
##########
@@ -36,11 +36,16 @@
   private static final int NULL_VALUE = -1;
 
   /**
-   * Returns whether a given value selector *might* contain 
SerializablePairLongString objects.

Review comment:
       The class may require a rename now. May be FirstLastUtils? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to