This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 57e12df3525 Sql Single Value Aggregator for scalar queries (#15700)
57e12df3525 is described below

commit 57e12df35252a26657e4a4c306f254566c1786f5
Author: Sree Charan Manamala <[email protected]>
AuthorDate: Thu Feb 8 19:20:30 2024 +0530

    Sql Single Value Aggregator for scalar queries (#15700)
    
    Executing single value correlated queries will throw an exception today 
since single_value function is not available in druid.
    With these added classes, this provides druid, the capability to plan and 
run such queries.
---
 .../apache/druid/jackson/AggregatorsModule.java    |  24 +-
 .../druid/query/aggregation/AggregatorUtil.java    |  10 +-
 .../query/aggregation/SingleValueAggregator.java   | 103 +++++++
 .../aggregation/SingleValueAggregatorFactory.java  | 201 ++++++++++++++
 .../aggregation/SingleValueBufferAggregator.java   | 131 +++++++++
 .../aggregation/SingleValueAggregationTest.java    | 307 ++++++++++++++++++++
 .../builtin/SingleValueSqlAggregator.java          |  75 +++++
 .../sql/calcite/planner/DruidOperatorTable.java    |   2 +
 .../druid/sql/calcite/CalciteSubqueryTest.java     | 309 +++++++++++++++++++++
 9 files changed, 1152 insertions(+), 10 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java 
b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
index 93d6afb1dd9..dddf39956ed 100644
--- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
@@ -43,6 +43,7 @@ import 
org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetri
 import 
org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde;
 import 
org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde;
 import 
org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
+import org.apache.druid.query.aggregation.SingleValueAggregatorFactory;
 import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
 import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
 import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
@@ -84,11 +85,23 @@ public class AggregatorsModule extends SimpleModule
 
     ComplexMetrics.registerSerde(HyperUniquesSerde.TYPE_NAME, new 
HyperUniquesSerde());
     ComplexMetrics.registerSerde(PreComputedHyperUniquesSerde.TYPE_NAME, new 
PreComputedHyperUniquesSerde());
-    
ComplexMetrics.registerSerde(SerializablePairLongStringComplexMetricSerde.TYPE_NAME,
 new SerializablePairLongStringComplexMetricSerde());
+    ComplexMetrics.registerSerde(
+        SerializablePairLongStringComplexMetricSerde.TYPE_NAME,
+        new SerializablePairLongStringComplexMetricSerde()
+    );
 
-    
ComplexMetrics.registerSerde(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME,
 new SerializablePairLongFloatComplexMetricSerde());
-    
ComplexMetrics.registerSerde(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME,
 new SerializablePairLongDoubleComplexMetricSerde());
-    
ComplexMetrics.registerSerde(SerializablePairLongLongComplexMetricSerde.TYPE_NAME,
 new SerializablePairLongLongComplexMetricSerde());
+    ComplexMetrics.registerSerde(
+        SerializablePairLongFloatComplexMetricSerde.TYPE_NAME,
+        new SerializablePairLongFloatComplexMetricSerde()
+    );
+    ComplexMetrics.registerSerde(
+        SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME,
+        new SerializablePairLongDoubleComplexMetricSerde()
+    );
+    ComplexMetrics.registerSerde(
+        SerializablePairLongLongComplexMetricSerde.TYPE_NAME,
+        new SerializablePairLongLongComplexMetricSerde()
+    );
 
     setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
     setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
@@ -129,7 +142,8 @@ public class AggregatorsModule extends SimpleModule
       @JsonSubTypes.Type(name = "doubleAny", value = 
DoubleAnyAggregatorFactory.class),
       @JsonSubTypes.Type(name = "stringAny", value = 
StringAnyAggregatorFactory.class),
       @JsonSubTypes.Type(name = "grouping", value = 
GroupingAggregatorFactory.class),
-      @JsonSubTypes.Type(name = "expression", value = 
ExpressionLambdaAggregatorFactory.class)
+      @JsonSubTypes.Type(name = "expression", value = 
ExpressionLambdaAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "singleValue", value = 
SingleValueAggregatorFactory.class)
   })
   public interface AggregatorFactoryMixin
   {
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
index 5bbb9eb16c8..2034ba21a5b 100755
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
@@ -158,6 +158,7 @@ public class AggregatorUtil
   public static final byte 
ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING_CACHE_TYPE_ID = 0x4C;
   public static final byte 
ARRAY_OF_DOUBLES_SKETCH_CONSTANT_SKETCH_CACHE_TYPE_ID = 0x4D;
   public static final byte 
ARRAY_OF_DOUBLES_SKETCH_TO_METRICS_SUM_ESTIMATE_CACHE_TYPE_ID = 0x4E;
+  public static final byte SINGLE_VALUE_CACHE_TYPE_ID = 0x4F;
 
   // DDSketch aggregator
   public static final byte DDSKETCH_CACHE_TYPE_ID = 0x50;
@@ -165,15 +166,15 @@ public class AggregatorUtil
   /**
    * Given a list of PostAggregators and the name of an output column, returns 
the minimal list of PostAggregators
    * required to compute the output column.
-   *
+   * <p>
    * If the outputColumn does not exist in the list of PostAggregators, the 
return list will be empty (under the
    * assumption that the outputColumn comes from a project, aggregation or 
really anything other than a
    * PostAggregator).
-   *
+   * <p>
    * If the outputColumn <strong>does</strong> exist in the list of 
PostAggregators, then the return list will have at
    * least one element.  If the PostAggregator with outputName depends on any 
other PostAggregators, then the returned
    * list will contain all PostAggregators required to compute the 
outputColumn.
-   *
+   * <p>
    * Note that PostAggregators are processed in list-order, meaning that for a 
PostAggregator to depend on another
    * PostAggregator, the "depender" must exist *after* the "dependee" in the 
list.  That is, if PostAggregator A
    * depends on PostAggregator B, then the list should be [B, A], such that A 
is computed after B.
@@ -181,8 +182,7 @@ public class AggregatorUtil
    * @param postAggregatorList List of postAggregator, there is a restriction 
that the list should be in an order such
    *                           that all the dependencies of any given 
aggregator should occur before that aggregator.
    *                           See 
AggregatorUtilTest.testOutOfOrderPruneDependentPostAgg for example.
-   * @param outputName        name of the postAgg on which dependency is to be 
calculated
-   *
+   * @param outputName         name of the postAgg on which dependency is to 
be calculated
    * @return the list of dependent postAggregators
    */
   public static List<PostAggregator> 
pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String 
outputName)
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java
new file mode 100644
index 00000000000..0f2f96d7c3f
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+  @Nullable
+  private Object value;
+  private boolean isAggregateInvoked = false;
+
+  public SingleValueAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+    value = selector.getObject();
+    isAggregateInvoked = true;
+  }
+
+  @Override
+  public Object get()
+  {
+    return value;
+  }
+
+  @Override
+  public float getFloat()
+  {
+    assert validObjectValue();
+    return (value == null) ? NullHandling.ZERO_FLOAT : ((Number) 
value).floatValue();
+  }
+
+  @Override
+  public long getLong()
+  {
+    assert validObjectValue();
+    return (value == null) ? NullHandling.ZERO_LONG : ((Number) 
value).longValue();
+  }
+
+  @Override
+  public double getDouble()
+  {
+    assert validObjectValue();
+    return (value == null) ? NullHandling.ZERO_DOUBLE : ((Number) 
value).doubleValue();
+  }
+
+  @Override
+  public boolean isNull()
+  {
+    return NullHandling.sqlCompatible() && value == null;
+  }
+
+  private boolean validObjectValue()
+  {
+    return NullHandling.replaceWithDefault() || !isNull();
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SingleValueAggregator{" +
+           "selector=" + selector +
+           ", value=" + value +
+           ", isAggregateInvoked=" + isAggregateInvoked +
+           '}';
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java
new file mode 100644
index 00000000000..9f3557a9e0a
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+  @JsonProperty
+  private final String name;
+  @JsonProperty
+  private final String fieldName;
+  @JsonProperty
+  private final ColumnType columnType;
+  public static final int DEFAULT_MAX_VALUE_SIZE = 1024;
+
+  @JsonCreator
+  public SingleValueAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("columnType") final ColumnType columnType
+  )
+  {
+    this.name = Preconditions.checkNotNull(name, "name");
+    this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName");
+    this.columnType = Preconditions.checkNotNull(columnType, "columnType");
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    return new SingleValueAggregator(selector);
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    ColumnCapabilities columnCapabilities = 
metricFactory.getColumnCapabilities(fieldName);
+    if (columnCapabilities == null) {
+      throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+                          .ofCategory(DruidException.Category.DEFENSIVE)
+                          .build("Unable to get the capabilities of field 
[%s]", fieldName);
+    }
+    ColumnType columnType = new ColumnType(columnCapabilities.getType(), null, 
null);
+    return new SingleValueBufferAggregator(selector, columnType);
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to compare");
+  }
+
+  /**
+   * Combine method would never be invoked as the broker sends the subquery to 
multiple segments
+   * and gather the results to a single value on which the single value 
aggregator is applied.
+   * Though getCombiningFactory would be invoked for understanding the 
fieldname.
+   */
+  @Override
+  @Nullable
+  public Object combine(@Nullable Object lhs, @Nullable Object rhs)
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to combine");
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new SingleValueAggregatorFactory(name, name, columnType);
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    return object;
+  }
+
+  @Nullable
+  @Override
+  public Object finalizeComputation(@Nullable Object object)
+  {
+    return object;
+  }
+
+  @Override
+  public ColumnType getIntermediateType()
+  {
+    return columnType;
+  }
+
+  @Override
+  public ColumnType getResultType()
+  {
+    return columnType;
+  }
+
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  @JsonProperty
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @Override
+  public List<String> requiredFields()
+  {
+    return Collections.singletonList(fieldName);
+  }
+
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    // keeping 8 bytes for all numerics to make code look simple. This would 
store only a single value.
+    return Byte.BYTES + (columnType.isNumeric() ? Double.BYTES : 
DEFAULT_MAX_VALUE_SIZE);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return new byte[]{AggregatorUtil.SINGLE_VALUE_CACHE_TYPE_ID};
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SingleValueAggregatorFactory that = (SingleValueAggregatorFactory) o;
+    return Objects.equals(name, that.name)
+           && Objects.equals(fieldName, that.fieldName)
+           && Objects.equals(columnType, that.columnType);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(name, fieldName, columnType);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SingleValueAggregatorFactory{" +
+           "name='" + name + '\'' +
+           ", fieldName='" + fieldName + '\'' +
+           ", columnType=" + columnType +
+           '}';
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java
new file mode 100644
index 00000000000..0d26654f3a2
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  private final ColumnValueSelector selector;
+  private final ColumnType columnType;
+  private final NullableTypeStrategy typeStrategy;
+  private boolean isAggregateInvoked = false;
+
+  public SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType 
columnType)
+  {
+    this.selector = selector;
+    this.columnType = columnType;
+    this.typeStrategy = columnType.getNullableStrategy();
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, NullHandling.IS_NULL_BYTE);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+
+    int maxbufferSixe = Byte.BYTES + (columnType.isNumeric()
+                                      ? Double.BYTES
+                                      : 
SingleValueAggregatorFactory.DEFAULT_MAX_VALUE_SIZE);
+    int written = typeStrategy.write(
+        buf,
+        position,
+        getSelectorObject(),
+        maxbufferSixe
+    );
+    if (written < 0) {
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build("Subquery result exceeds the buffer limit 
[%s]", maxbufferSixe);
+    }
+    isAggregateInvoked = true;
+  }
+
+  @Nullable
+  private Object getSelectorObject()
+  {
+    if (columnType.isNumeric() && selector.isNull()) {
+      return null;
+    }
+    switch (columnType.getType()) {
+      case LONG:
+        return selector.getLong();
+      case FLOAT:
+        return selector.getFloat();
+      case DOUBLE:
+        return selector.getDouble();
+      default:
+        return selector.getObject();
+    }
+  }
+
+  @Nullable
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return typeStrategy.read(buf, position);
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    return TypeStrategies.isNullableNull(buf, position)
+           ? NullHandling.ZERO_FLOAT
+           : TypeStrategies.readNotNullNullableFloat(buf, position);
+  }
+
+  @Override
+  public double getDouble(ByteBuffer buf, int position)
+  {
+    return TypeStrategies.isNullableNull(buf, position)
+           ? NullHandling.ZERO_DOUBLE
+           : TypeStrategies.readNotNullNullableDouble(buf, position);
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    return TypeStrategies.isNullableNull(buf, position)
+           ? NullHandling.ZERO_LONG
+           : TypeStrategies.readNotNullNullableLong(buf, position);
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueAggregationTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueAggregationTest.java
new file mode 100644
index 00000000000..cd48d5a3e09
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueAggregationTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.TestColumnSelectorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class SingleValueAggregationTest extends InitializedNullHandlingTest
+{
+  private SingleValueAggregatorFactory longAggFactory;
+  private ColumnSelectorFactory colSelectorFactoryLong;
+  private ColumnCapabilities columnCapabilitiesLong;
+  private TestLongColumnSelector selectorLong;
+
+  private SingleValueAggregatorFactory doubleAggFactory;
+  private ColumnSelectorFactory colSelectorFactoryDouble;
+  private ColumnCapabilities columnCapabilitiesDouble;
+  private TestDoubleColumnSelectorImpl selectorDouble;
+
+  private SingleValueAggregatorFactory floatAggFactory;
+  private ColumnSelectorFactory colSelectorFactoryFloat;
+  private ColumnCapabilities columnCapabilitiesFloat;
+  private TestFloatColumnSelector selectorFloat;
+
+  private SingleValueAggregatorFactory stringAggFactory;
+  private ColumnSelectorFactory colSelectorFactoryString;
+  private ColumnCapabilities columnCapabilitiesString;
+  private TestObjectColumnSelector selectorString;
+
+  private final long[] longValues = {9223372036854775802L, 
9223372036854775803L};
+  private final double[] doubleValues = {5.2d, 2.8976552d};
+  private final float[] floatValues = {5.2f, 2.89f};
+  private final String[] strValues = {"str1", "str2"};
+
+  public SingleValueAggregationTest() throws Exception
+  {
+    String longAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"lng\", 
\"fieldName\": \"lngFld\", \"columnType\": \"LONG\"}";
+    longAggFactory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String doubleAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"dbl\", 
\"fieldName\": \"dblFld\", \"columnType\": \"DOUBLE\"}";
+    doubleAggFactory = 
TestHelper.makeJsonMapper().readValue(doubleAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String floatAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"dbl\", 
\"fieldName\": \"fltFld\", \"columnType\": \"FLOAT\"}";
+    floatAggFactory = TestHelper.makeJsonMapper().readValue(floatAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String strAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"str\", 
\"fieldName\": \"strFld\", \"columnType\": \"STRING\"}";
+    stringAggFactory = TestHelper.makeJsonMapper().readValue(strAggSpecJson, 
SingleValueAggregatorFactory.class);
+  }
+
+  @Before
+  public void setup()
+  {
+    selectorLong = new TestLongColumnSelector(longValues);
+    columnCapabilitiesLong = 
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG);
+    colSelectorFactoryLong = new TestColumnSelectorFactory()
+        .addCapabilities("lngFld", columnCapabilitiesLong)
+        .addColumnSelector("lngFld", selectorLong);
+
+    selectorDouble = new TestDoubleColumnSelectorImpl(doubleValues);
+    columnCapabilitiesDouble = 
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE);
+    colSelectorFactoryDouble = new TestColumnSelectorFactory()
+        .addCapabilities("dblFld", columnCapabilitiesDouble)
+        .addColumnSelector("dblFld", selectorDouble);
+
+    selectorFloat = new TestFloatColumnSelector(floatValues);
+    columnCapabilitiesFloat = 
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.FLOAT);
+    colSelectorFactoryFloat = new TestColumnSelectorFactory()
+        .addCapabilities("fltFld", columnCapabilitiesFloat)
+        .addColumnSelector("fltFld", selectorFloat);
+
+    selectorString = new TestObjectColumnSelector(strValues);
+    columnCapabilitiesString = 
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities();
+    colSelectorFactoryString = new TestColumnSelectorFactory()
+        .addCapabilities("strFld", columnCapabilitiesString)
+        .addColumnSelector("strFld", selectorString);
+  }
+
+  @Test
+  public void testLongAggregator()
+  {
+    Assert.assertEquals(ColumnType.LONG, longAggFactory.getIntermediateType());
+    Assert.assertEquals(ColumnType.LONG, longAggFactory.getResultType());
+    Assert.assertEquals("lng", longAggFactory.getName());
+    Assert.assertEquals("lngFld", longAggFactory.getFieldName());
+    Assert.assertThrows(DruidException.class, () -> 
longAggFactory.getComparator());
+
+    Aggregator agg = longAggFactory.factorize(colSelectorFactoryLong);
+    if (NullHandling.replaceWithDefault()) {
+      Assert.assertFalse(agg.isNull());
+      Assert.assertEquals(0L, agg.getLong());
+    } else {
+      Assert.assertTrue(agg.isNull());
+      Assert.assertThrows(AssertionError.class, () -> agg.getLong());
+    }
+
+    aggregate(selectorLong, agg);
+    Assert.assertEquals(longValues[0], ((Long) agg.get()).longValue());
+    Assert.assertEquals(longValues[0], agg.getLong());
+
+    Assert.assertThrows(DruidException.class, () -> aggregate(selectorLong, 
agg));
+  }
+
+  @Test
+  public void testLongBufferAggregator()
+  {
+    BufferAggregator agg = 
longAggFactory.factorizeBuffered(colSelectorFactoryLong);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[Double.BYTES + Byte.BYTES]);
+    agg.init(buffer, 0);
+    Assert.assertEquals(0L, agg.getLong(buffer, 0));
+
+    aggregate(selectorLong, agg, buffer, 0);
+    Assert.assertEquals(longValues[0], ((Long) agg.get(buffer, 
0)).longValue());
+    Assert.assertEquals(longValues[0], agg.getLong(buffer, 0));
+
+    Assert.assertThrows(DruidException.class, () -> aggregate(selectorLong, 
agg, buffer, 0));
+  }
+
+  @Test
+  public void testCombine()
+  {
+    Assert.assertThrows(DruidException.class, () -> 
longAggFactory.combine(9223372036854775800L, 9223372036854775803L));
+  }
+
+  @Test
+  public void testDoubleAggregator()
+  {
+    Aggregator agg = doubleAggFactory.factorize(colSelectorFactoryDouble);
+    if (NullHandling.replaceWithDefault()) {
+      Assert.assertEquals(0.0d, agg.getDouble(), 0.000001);
+    } else {
+      Assert.assertThrows(AssertionError.class, () -> agg.getDouble());
+    }
+
+    aggregate(selectorDouble, agg);
+    Assert.assertEquals(doubleValues[0], ((Double) agg.get()).doubleValue(), 
0.000001);
+    Assert.assertEquals(doubleValues[0], agg.getDouble(), 0.000001);
+
+    Assert.assertThrows(DruidException.class, () -> aggregate(selectorDouble, 
agg));
+  }
+
+  @Test
+  public void testDoubleBufferAggregator()
+  {
+    BufferAggregator agg = 
doubleAggFactory.factorizeBuffered(colSelectorFactoryDouble);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new 
byte[SingleValueAggregatorFactory.DEFAULT_MAX_VALUE_SIZE + Byte.BYTES]);
+    agg.init(buffer, 0);
+    Assert.assertEquals(0.0d, agg.getDouble(buffer, 0), 0.000001);
+
+    aggregate(selectorDouble, agg, buffer, 0);
+    Assert.assertEquals(doubleValues[0], ((Double) agg.get(buffer, 
0)).doubleValue(), 0.000001);
+    Assert.assertEquals(doubleValues[0], agg.getDouble(buffer, 0), 0.000001);
+
+    Assert.assertThrows(DruidException.class, () -> aggregate(selectorDouble, 
agg, buffer, 0));
+  }
+
+  @Test
+  public void testFloatAggregator()
+  {
+    Aggregator agg = floatAggFactory.factorize(colSelectorFactoryFloat);
+    if (NullHandling.replaceWithDefault()) {
+      Assert.assertEquals(0.0f, agg.getFloat(), 0.000001);
+    } else {
+      Assert.assertThrows(AssertionError.class, () -> agg.getFloat());
+    }
+
+    aggregate(selectorFloat, agg);
+    Assert.assertEquals(floatValues[0], ((Float) agg.get()).floatValue(), 
0.000001);
+    Assert.assertEquals(floatValues[0], agg.getFloat(), 0.000001);
+
+    Assert.assertThrows(DruidException.class, () -> aggregate(selectorFloat, 
agg));
+  }
+
+  @Test
+  public void testFloatBufferAggregator()
+  {
+    BufferAggregator agg = 
floatAggFactory.factorizeBuffered(colSelectorFactoryFloat);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[Double.BYTES + Byte.BYTES]);
+    agg.init(buffer, 0);
+    Assert.assertEquals(0.0f, agg.getFloat(buffer, 0), 0.000001);
+
+    aggregate(selectorFloat, agg, buffer, 0);
+    Assert.assertEquals(floatValues[0], ((Float) agg.get(buffer, 
0)).floatValue(), 0.000001);
+    Assert.assertEquals(floatValues[0], agg.getFloat(buffer, 0), 0.000001);
+
+    Assert.assertThrows(DruidException.class, () -> aggregate(selectorFloat, 
agg, buffer, 0));
+  }
+
+  @Test
+  public void testStringAggregator()
+  {
+    Aggregator agg = stringAggFactory.factorize(colSelectorFactoryString);
+
+    Assert.assertEquals(null, agg.get());
+
+    aggregate(selectorString, agg);
+    Assert.assertEquals(strValues[0], agg.get());
+
+    Assert.assertThrows(DruidException.class, () -> aggregate(selectorString, 
agg));
+  }
+
+  @Test
+  public void testStringBufferAggregator()
+  {
+    BufferAggregator agg = 
stringAggFactory.factorizeBuffered(colSelectorFactoryString);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new 
byte[SingleValueAggregatorFactory.DEFAULT_MAX_VALUE_SIZE + Byte.BYTES]);
+    agg.init(buffer, 0);
+
+    aggregate(selectorString, agg, buffer, 0);
+    Assert.assertEquals(strValues[0], agg.get(buffer, 0));
+
+    Assert.assertThrows(DruidException.class, () -> aggregate(selectorString, 
agg, buffer, 0));
+  }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    SingleValueAggregatorFactory one = new 
SingleValueAggregatorFactory("name1", "fieldName1", ColumnType.LONG);
+    SingleValueAggregatorFactory oneMore = new 
SingleValueAggregatorFactory("name1", "fieldName1", ColumnType.LONG);
+    SingleValueAggregatorFactory two = new 
SingleValueAggregatorFactory("name2", "fieldName2", ColumnType.LONG);
+
+    Assert.assertEquals(one.hashCode(), oneMore.hashCode());
+
+    Assert.assertTrue(one.equals(oneMore));
+    Assert.assertFalse(one.equals(two));
+  }
+
+  private void aggregate(TestLongColumnSelector selector, Aggregator agg)
+  {
+    agg.aggregate();
+    selector.increment();
+  }
+
+  private void aggregate(TestLongColumnSelector selector, BufferAggregator 
agg, ByteBuffer buff, int position)
+  {
+    agg.aggregate(buff, position);
+    selector.increment();
+  }
+
+  private void aggregate(TestFloatColumnSelector selector, Aggregator agg)
+  {
+    agg.aggregate();
+    selector.increment();
+  }
+
+  private void aggregate(TestFloatColumnSelector selector, BufferAggregator 
agg, ByteBuffer buff, int position)
+  {
+    agg.aggregate(buff, position);
+    selector.increment();
+  }
+
+  private void aggregate(TestDoubleColumnSelectorImpl selector, Aggregator agg)
+  {
+    agg.aggregate();
+    selector.increment();
+  }
+
+  private void aggregate(TestDoubleColumnSelectorImpl selector, 
BufferAggregator agg, ByteBuffer buff, int position)
+  {
+    agg.aggregate(buff, position);
+    selector.increment();
+  }
+
+  private void aggregate(TestObjectColumnSelector selector, Aggregator agg)
+  {
+    agg.aggregate();
+    selector.increment();
+  }
+
+  private void aggregate(TestObjectColumnSelector selector, BufferAggregator 
agg, ByteBuffer buff, int position)
+  {
+    agg.aggregate(buff, position);
+    selector.increment();
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SingleValueSqlAggregator.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SingleValueSqlAggregator.java
new file mode 100644
index 00000000000..088567e7be9
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SingleValueSqlAggregator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.aggregation.builtin;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.SingleValueAggregatorFactory;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.sql.calcite.aggregation.Aggregation;
+import org.apache.druid.sql.calcite.planner.Calcites;
+
+import javax.annotation.Nullable;
+
+/**
+ * This class serves as binding for Single Value Aggregator.
+ * Returns a single value in cases of subqueries used in expressions
+ */
+public class SingleValueSqlAggregator extends SimpleSqlAggregator
+{
+
+  @Override
+  public SqlAggFunction calciteFunction()
+  {
+    return SqlStdOperatorTable.SINGLE_VALUE;
+  }
+
+  @Override
+  @Nullable
+  Aggregation getAggregation(
+      final String name,
+      final AggregateCall aggregateCall,
+      final ExprMacroTable macroTable,
+      final String fieldName
+  )
+  {
+    final ColumnType valueType = 
Calcites.getColumnTypeForRelDataType(aggregateCall.getType());
+    if (valueType == null) {
+      return null;
+    }
+    return Aggregation.create(createSingleValueAggregatorFactory(
+        valueType,
+        name,
+        fieldName
+    ));
+  }
+
+  static AggregatorFactory createSingleValueAggregatorFactory(
+      final ColumnType aggregationType,
+      final String name,
+      final String fieldName
+  )
+  {
+    return new SingleValueAggregatorFactory(name, fieldName, aggregationType);
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
index d1851c511c2..0fda139868b 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
@@ -46,6 +46,7 @@ import 
org.apache.druid.sql.calcite.aggregation.builtin.GroupingSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.LiteralSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator;
+import 
org.apache.druid.sql.calcite.aggregation.builtin.SingleValueSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.StringSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.SumZeroSqlAggregator;
@@ -172,6 +173,7 @@ public class DruidOperatorTable implements SqlOperatorTable
                    .add(new BitwiseSqlAggregator(BitwiseSqlAggregator.Op.AND))
                    .add(new BitwiseSqlAggregator(BitwiseSqlAggregator.Op.OR))
                    .add(new BitwiseSqlAggregator(BitwiseSqlAggregator.Op.XOR))
+                   .add(new SingleValueSqlAggregator())
                    .build();
 
   // STRLEN has so many aliases.
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java
index 0f7d86eb411..ea8089e4c91 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java
@@ -36,16 +36,21 @@ import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.SingleValueAggregatorFactory;
 import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.ExtractionDimensionSpec;
 import org.apache.druid.query.extraction.SubstringDimExtractionFn;
+import org.apache.druid.query.filter.InDimFilter;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
+import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
 import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.query.scan.ScanQuery;
@@ -64,8 +69,10 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -1042,4 +1049,306 @@ public class CalciteSubqueryTest extends 
BaseCalciteQueryTest
         results
     );
   }
+
+  @Test
+  public void testSingleValueFloatAgg()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT count(*) FROM foo where m1 <= (select min(m1) + 4 from foo)",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(join(
+                      new TableDataSource(CalciteTests.DATASOURCE1),
+                      new QueryDataSource(GroupByQuery.builder()
+                                                      .setDataSource(new 
QueryDataSource(
+                                                          
Druids.newTimeseriesQueryBuilder()
+                                                                
.dataSource(CalciteTests.DATASOURCE1)
+                                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                                
.granularity(Granularities.ALL)
+                                                                
.aggregators(new FloatMinAggregatorFactory("a0", "m1"))
+                                                                .build()
+                                                      ))
+                                                      
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setVirtualColumns(expressionVirtualColumn(
+                                                                             
"v0",
+                                                                             
"(\"a0\" + 4)",
+                                                                             
ColumnType.FLOAT
+                                                                         )
+                                                      )
+                                                      .setAggregatorSpecs(
+                                                          aggregators(
+                                                              new 
SingleValueAggregatorFactory(
+                                                                  "_a0",
+                                                                  "v0",
+                                                                  
ColumnType.FLOAT
+                                                              )
+                                                          )
+                                                      )
+                                                      
.setLimitSpec(NoopLimitSpec.instance())
+                                                      
.setContext(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                      ),
+                      "j0.",
+                      "1",
+                      NullHandling.replaceWithDefault() ? JoinType.LEFT : 
JoinType.INNER
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .filters(expressionFilter("(\"m1\" <= \"j0._a0\")"))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{5L}
+        )
+    );
+  }
+
+  @Test
+  public void testSingleValueDoubleAgg()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT count(*) FROM foo where m1 >= (select max(m1) - 3.5 from foo)",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(join(
+                      new TableDataSource(CalciteTests.DATASOURCE1),
+                      new QueryDataSource(GroupByQuery.builder()
+                                                      .setDataSource(new 
QueryDataSource(
+                                                          
Druids.newTimeseriesQueryBuilder()
+                                                                
.dataSource(CalciteTests.DATASOURCE1)
+                                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                                
.granularity(Granularities.ALL)
+                                                                
.aggregators(new FloatMaxAggregatorFactory("a0", "m1"))
+                                                                .build()
+                                                      ))
+                                                      
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setVirtualColumns(expressionVirtualColumn(
+                                                                             
"v0",
+                                                                             
"(\"a0\" - 3.5)",
+                                                                             
ColumnType.DOUBLE
+                                                                         )
+                                                      )
+                                                      .setAggregatorSpecs(
+                                                          aggregators(
+                                                              new 
SingleValueAggregatorFactory(
+                                                                  "_a0",
+                                                                  "v0",
+                                                                  
ColumnType.DOUBLE
+                                                              )
+                                                          )
+                                                      )
+                                                      
.setLimitSpec(NoopLimitSpec.instance())
+                                                      
.setContext(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                      ),
+                      "j0.",
+                      "1",
+                      NullHandling.replaceWithDefault() ? JoinType.LEFT : 
JoinType.INNER
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .filters(expressionFilter("(\"m1\" >= \"j0._a0\")"))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{4L}
+        )
+    );
+  }
+
+  @Test
+  public void testSingleValueLongAgg()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT count(*) FROM wikipedia where __time >= (select max(__time) - 
INTERVAL '10' MINUTE from wikipedia)",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(join(
+                      new TableDataSource(CalciteTests.WIKIPEDIA),
+                      new QueryDataSource(GroupByQuery.builder()
+                                                      .setDataSource(new 
QueryDataSource(
+                                                          
Druids.newTimeseriesQueryBuilder()
+                                                                
.dataSource(CalciteTests.WIKIPEDIA)
+                                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                                
.granularity(Granularities.ALL)
+                                                                
.aggregators(new LongMaxAggregatorFactory(
+                                                                    "a0",
+                                                                    "__time"
+                                                                ))
+                                                                .build()
+                                                      ))
+                                                      
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setVirtualColumns(expressionVirtualColumn(
+                                                                             
"v0",
+                                                                             
"(\"a0\" - 600000)",
+                                                                             
ColumnType.LONG
+                                                                         )
+                                                      )
+                                                      .setAggregatorSpecs(
+                                                          aggregators(
+                                                              new 
SingleValueAggregatorFactory(
+                                                                  "_a0",
+                                                                  "v0",
+                                                                  
ColumnType.LONG
+                                                              )
+                                                          )
+                                                      )
+                                                      
.setLimitSpec(NoopLimitSpec.instance())
+                                                      
.setContext(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                      ),
+                      "j0.",
+                      "1",
+                      NullHandling.replaceWithDefault() ? JoinType.LEFT : 
JoinType.INNER
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .filters(expressionFilter("(\"__time\" >= \"j0._a0\")"))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{220L}
+        )
+    );
+  }
+
+  @Test
+  public void testSingleValueStringAgg()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT  count(*) FROM wikipedia where channel = (select channel from 
wikipedia order by __time desc LIMIT 1 OFFSET 6)",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(join(
+                      new TableDataSource(CalciteTests.WIKIPEDIA),
+                      new QueryDataSource(GroupByQuery.builder()
+                                                      .setDataSource(new 
QueryDataSource(
+                                                          
Druids.newScanQueryBuilder()
+                                                                
.dataSource(CalciteTests.WIKIPEDIA)
+                                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                                
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                                                                .offset(6L)
+                                                                .limit(1L)
+                                                                
.order(ScanQuery.Order.DESCENDING)
+                                                                
.columns("__time", "channel")
+                                                                .legacy(false)
+                                                                
.context(QUERY_CONTEXT_DEFAULT)
+                                                                .build()
+                                                      ))
+                                                      
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setVirtualColumns(expressionVirtualColumn(
+                                                                             
"v0",
+                                                                             
"\"channel\"",
+                                                                             
ColumnType.STRING
+                                                                         )
+                                                      )
+                                                      .setAggregatorSpecs(
+                                                          aggregators(
+                                                              new 
SingleValueAggregatorFactory(
+                                                                  "a0",
+                                                                  "v0",
+                                                                  
ColumnType.STRING
+                                                              )
+                                                          )
+                                                      )
+                                                      
.setLimitSpec(NoopLimitSpec.instance())
+                                                      
.setContext(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                      ),
+                      "j0.",
+                      "(\"channel\" == \"j0.a0\")",
+                      JoinType.INNER
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{1256L}
+        )
+    );
+  }
+
+  @Test
+  public void testSingleValueStringMultipleRowsAgg()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQueryThrows(
+        "SELECT  count(*) FROM wikipedia where channel = (select channel from 
wikipedia order by __time desc LIMIT 2 OFFSET 6)",
+        exception -> exception.expectMessage("Subquery expression returned 
more than one row")
+    );
+  }
+
+  @Test
+  public void testSingleValueEmptyInnerAgg()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT distinct countryName FROM wikipedia where countryName = ( 
select countryName from wikipedia where channel in ('abc', 'xyz'))",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(join(
+                            new TableDataSource(CalciteTests.WIKIPEDIA),
+                            new 
QueryDataSource(Druids.newTimeseriesQueryBuilder()
+                                                      
.dataSource(CalciteTests.WIKIPEDIA)
+                                                      
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                      
.granularity(Granularities.ALL)
+                                                      
.virtualColumns(expressionVirtualColumn(
+                                                                          "v0",
+                                                                          
"\"countryName\"",
+                                                                          
ColumnType.STRING
+                                                                      )
+                                                      )
+                                                      .aggregators(
+                                                          new 
SingleValueAggregatorFactory(
+                                                              "a0",
+                                                              "v0",
+                                                              ColumnType.STRING
+                                                          )
+                                                      )
+                                                      .filters(new InDimFilter(
+                                                          "channel",
+                                                          new 
HashSet<>(Arrays.asList(
+                                                              "abc",
+                                                              "xyz"
+                                                          ))
+                                                      ))
+                                                      
.context(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                            ),
+                            "j0.",
+                            "(\"countryName\" == \"j0.a0\")",
+                            JoinType.INNER
+                        ))
+                        .addDimension(new DefaultDimensionSpec("countryName", 
"d0", ColumnType.STRING))
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of()
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to