This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9177419628 Unnest functionality for Druid (#13268)
9177419628 is described below

commit 91774196285ea51b823883394ef1bfe8f2417892
Author: somu-imply <93540295+somu-im...@users.noreply.github.com>
AuthorDate: Fri Dec 2 18:48:25 2022 -0800

    Unnest functionality for Druid (#13268)
    
    * Moving all unnest cursor code atop refactored code for unnest
    
    * Updating unnest cursor
    
    * Removing dedup and fixing up some null checks
    
    * AllowList changes
    
    * Fixing some NPEs
    
    * Using bitset for allowlist
    
    * Updating the initialization only when cursor is in non-done state
    
    * Updating code to skip rows not in allow list
    
    * Adding a flag for cases when first element is not in allowed list
    
    * Updating for a null in allowList
    
    * Splitting unnest cursor into 2 subclasses
    
    * Intercepting some apis with columnName for new unnested column
    
    * Adding test cases and renaming some stuff
    
    * checkstyle fixes
    
    * Moving to an interface for Unnest
    
    * handling null rows in a dimension
    
    * Updating cursors after comments part-1
    
    * Addressing comments and adding some more tests
    
    * Reverting a change to ScanQueryRunner and improving a comment
    
    * removing an unused function
    
    * Updating cursors after comments part 2
    
    * One last fix for review comments
    
    * Making some functions private, deleting some comments, adding a test for 
unnest of unnest with allowList
    
    * Adding an exception for a case
    
    * Closure for unnest data source
    
    * Adding some javadocs
    
    * One minor change in makeDimSelector of columnarCursor
    
    * Updating an error message
    
    * Update 
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java
    
    Co-authored-by: Abhishek Agarwal 
<1477457+abhishekagarwa...@users.noreply.github.com>
    
    * Unnesting on virtual columns was missing an object array, adding that to 
support virtual columns unnesting
    
    * Updating exceptions to use UOE
    
    * Renamed files, added column capability test on adapter, return statement 
and made unnest datasource not cacheable for the time being
    
    * Handling for null values in dim selector
    
    * Fixing a NPE for null row
    
    * Updating capabilities
    
    * Updating capabilities
    
    Co-authored-by: Abhishek Agarwal 
<1477457+abhishekagarwa...@users.noreply.github.com>
---
 .../java/org/apache/druid/query/DataSource.java    |   3 +-
 .../org/apache/druid/query/UnnestDataSource.java   | 212 +++++++
 .../druid/query/planning/DataSourceAnalysis.java   |  39 +-
 .../segment/UnnestColumnValueSelectorCursor.java   | 336 +++++++++++
 .../druid/segment/UnnestDimensionCursor.java       | 415 ++++++++++++++
 .../druid/segment/UnnestSegmentReference.java      | 115 ++++
 .../apache/druid/segment/UnnestStorageAdapter.java | 234 ++++++++
 .../java/org/apache/druid/segment/ListCursor.java  | 228 ++++++++
 .../UnnestColumnValueSelectorCursorTest.java       | 632 +++++++++++++++++++++
 .../druid/segment/UnnestStorageAdapterTest.java    | 399 +++++++++++++
 10 files changed, 2602 insertions(+), 11 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java 
b/processing/src/main/java/org/apache/druid/query/DataSource.java
index f56a3550a3..43dfb3be85 100644
--- a/processing/src/main/java/org/apache/druid/query/DataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/DataSource.java
@@ -41,7 +41,8 @@ import java.util.function.Function;
     @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
     @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
     @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
-    @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = 
"globalTable")
+    @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = 
"globalTable"),
+    @JsonSubTypes.Type(value = UnnestDataSource.class, name = "unnest")
 })
 public interface DataSource
 {
diff --git 
a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java 
b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
new file mode 100644
index 0000000000..4623701674
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.UnnestSegmentReference;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+/**
+ * The data source for representing an unnest operation.
+ *
+ * An unnest data source has the following:
+ * a base data source which is to be unnested
+ * the column name of the MVD which will be unnested
+ * the name of the column that will hold the unnested values
+ * and an allowlist serving as a filter of which values in the MVD will be 
unnested.
+ */
+public class UnnestDataSource implements DataSource
+{
+  private final DataSource base;
+  private final String column;
+  private final String outputName;
+  private final LinkedHashSet<String> allowList;
+
+  private UnnestDataSource(
+      DataSource dataSource,
+      String columnName,
+      String outputName,
+      LinkedHashSet<String> allowList
+  )
+  {
+    this.base = dataSource;
+    this.column = columnName;
+    this.outputName = outputName;
+    this.allowList = allowList;
+  }
+
+  @JsonCreator
+  public static UnnestDataSource create(
+      @JsonProperty("base") DataSource base,
+      @JsonProperty("column") String columnName,
+      @JsonProperty("outputName") String outputName,
+      @Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList
+  )
+  {
+    return new UnnestDataSource(base, columnName, outputName, allowList);
+  }
+
+  @JsonProperty("base")
+  public DataSource getBase()
+  {
+    return base;
+  }
+
+  @JsonProperty("column")
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty("outputName")
+  public String getOutputName()
+  {
+    return outputName;
+  }
+
+  @JsonProperty("allowList")
+  public LinkedHashSet<String> getAllowList()
+  {
+    return allowList;
+  }
+
+  @Override
+  public Set<String> getTableNames()
+  {
+    return base.getTableNames();
+  }
+
+  @Override
+  public List<DataSource> getChildren()
+  {
+    return ImmutableList.of(base);
+  }
+
+  @Override
+  public DataSource withChildren(List<DataSource> children)
+  {
+    if (children.size() != 1) {
+      throw new IAE("Expected [1] child, got [%d]", children.size());
+    }
+    return new UnnestDataSource(children.get(0), column, outputName, 
allowList);
+  }
+
+  @Override
+  public boolean isCacheable(boolean isBroker)
+  {
+    return false;
+  }
+
+  @Override
+  public boolean isGlobal()
+  {
+    return base.isGlobal();
+  }
+
+  @Override
+  public boolean isConcrete()
+  {
+    return base.isConcrete();
+  }
+
+  @Override
+  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
+      Query query,
+      AtomicLong cpuTimeAccumulator
+  )
+  {
+    final Function<SegmentReference, SegmentReference> segmentMapFn = 
base.createSegmentMapFunction(
+        query,
+        cpuTimeAccumulator
+    );
+    return JvmUtils.safeAccumulateThreadCpuTime(
+        cpuTimeAccumulator,
+        () -> {
+          if (column == null) {
+            return segmentMapFn;
+          } else if (column.isEmpty()) {
+            return segmentMapFn;
+          } else {
+            return
+                baseSegment ->
+                    new UnnestSegmentReference(
+                        segmentMapFn.apply(baseSegment),
+                        column,
+                        outputName,
+                        allowList
+                    );
+          }
+        }
+    );
+
+  }
+
+  @Override
+  public DataSource withUpdatedDataSource(DataSource newSource)
+  {
+    return new UnnestDataSource(newSource, column, outputName, allowList);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    // The column being unnested would need to be part of the cache key
+    // as the results are dependent on what column is being unnested.
+    // Currently, it is not cacheable.
+    // Future development should use the table name and column came to
+    // create an appropriate cac
+    return null;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    UnnestDataSource that = (UnnestDataSource) o;
+    return column.equals(that.column)
+           && outputName.equals(that.outputName)
+           && base.equals(that.base);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(base, column, outputName);
+  }
+}
+
+
diff --git 
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
 
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index c329e3a570..63c2c8b815 100644
--- 
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++ 
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -28,6 +28,7 @@ import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.UnnestDataSource;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 
@@ -112,17 +113,29 @@ public class DataSourceAnalysis
     Query<?> baseQuery = null;
     DataSource current = dataSource;
 
-    while (current instanceof QueryDataSource) {
-      final Query<?> subQuery = ((QueryDataSource) current).getQuery();
+    // This needs to be an or condition between QueryDataSource and 
UnnestDataSource
+    // As queries can have interleaving query and unnest data sources.
+    // Ideally if each data source generate their own analysis object we can 
avoid the or here
+    // and have cleaner code. Especially as we increase the types of data 
sources in future
+    // these or checks will be tedious. Future development should move 
forDataSource method
+    // into each data source.
 
-      if (!(subQuery instanceof BaseQuery)) {
-        // We must verify that the subQuery is a BaseQuery, because it is 
required to make "getBaseQuerySegmentSpec"
-        // work properly. All built-in query types are BaseQuery, so we only 
expect this with funky extension queries.
-        throw new IAE("Cannot analyze subquery of class[%s]", 
subQuery.getClass().getName());
-      }
+    while (current instanceof QueryDataSource || current instanceof 
UnnestDataSource) {
+      if (current instanceof QueryDataSource) {
+        final Query<?> subQuery = ((QueryDataSource) current).getQuery();
+
+        if (!(subQuery instanceof BaseQuery)) {
+          // We must verify that the subQuery is a BaseQuery, because it is 
required to make "getBaseQuerySegmentSpec"
+          // work properly. All built-in query types are BaseQuery, so we only 
expect this with funky extension queries.
+          throw new IAE("Cannot analyze subquery of class[%s]", 
subQuery.getClass().getName());
+        }
 
-      baseQuery = subQuery;
-      current = subQuery.getDataSource();
+        baseQuery = subQuery;
+        current = subQuery.getDataSource();
+      } else {
+        final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
+        current = unnestDataSource.getBase();
+      }
     }
 
     if (current instanceof JoinDataSource) {
@@ -276,7 +289,8 @@ public class DataSourceAnalysis
 
   /**
    * Returns true if this datasource is concrete-based (see {@link 
#isConcreteBased()}, and the base datasource is a
-   * {@link TableDataSource} or a {@link UnionDataSource} composed entirely of 
{@link TableDataSource}. This is an
+   * {@link TableDataSource} or a {@link UnionDataSource} composed entirely of 
{@link TableDataSource}
+   * or an {@link UnnestDataSource} composed entirely of {@link 
TableDataSource} . This is an
    * important property, because it corresponds to datasources that can be 
handled by Druid's distributed query stack.
    */
   public boolean isConcreteTableBased()
@@ -286,6 +300,10 @@ public class DataSourceAnalysis
     // so check anyway for future-proofing.
     return isConcreteBased() && (baseDataSource instanceof TableDataSource
                                  || (baseDataSource instanceof UnionDataSource 
&&
+                                     baseDataSource.getChildren()
+                                                   .stream()
+                                                   .allMatch(ds -> ds 
instanceof TableDataSource))
+                                 || (baseDataSource instanceof 
UnnestDataSource &&
                                      baseDataSource.getChildren()
                                                    .stream()
                                                    .allMatch(ds -> ds 
instanceof TableDataSource)));
@@ -298,6 +316,7 @@ public class DataSourceAnalysis
   {
     return dataSource instanceof QueryDataSource;
   }
+  
 
   /**
    * Returns true if this datasource is made out of a join operation
diff --git 
a/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java
 
b/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java
new file mode 100644
index 0000000000..db4acb893e
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+/**
+ * The cursor to help unnest MVDs without dictionary encoding and ARRAY type 
selectors.
+ * <p>
+ * Consider a segment has 2 rows
+ * ['a', 'b', 'c']
+ * ['d', 'e']
+ * <p>
+ * The baseCursor points to the row ['a', 'b', 'c']
+ * while the unnestCursor with each call of advance() moves over individual 
elements.
+ * <p>
+ * unnestCursor.advance() -> 'a'
+ * unnestCursor.advance() -> 'b'
+ * unnestCursor.advance() -> 'c'
+ * unnestCursor.advance() -> 'd' (advances base cursor first)
+ * unnestCursor.advance() -> 'e'
+ * <p>
+ * <p>
+ * The allowSet if available helps skip over elements which are not in the 
allowList by moving the cursor to
+ * the next available match.
+ * <p>
+ * The index reference points to the index of each row that the unnest cursor 
is accessing through currentVal
+ * The index ranges from 0 to the size of the list in each row which is held 
in the unnestListForCurrentRow
+ * <p>
+ * The needInitialization flag sets up the initial values of 
unnestListForCurrentRow at the beginning of the segment
+ */
+public class UnnestColumnValueSelectorCursor implements Cursor
+{
+  private final Cursor baseCursor;
+  private final ColumnSelectorFactory baseColumnSelectorFactory;
+  private final ColumnValueSelector columnValueSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private int index;
+  private Object currentVal;
+  private List<Object> unnestListForCurrentRow;
+  private boolean needInitialization;
+
+  public UnnestColumnValueSelectorCursor(
+      Cursor cursor,
+      ColumnSelectorFactory baseColumSelectorFactory,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumnSelectorFactory = baseColumSelectorFactory;
+    this.columnValueSelector = 
this.baseColumnSelectorFactory.makeColumnValueSelector(columnName);
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        if (!outputName.equals(dimensionSpec.getDimension())) {
+          return 
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+        }
+        throw new UOE("Unsupported dimension selector while using column value 
selector for column [%s]", outputName);
+      }
+
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String columnName)
+      {
+        if (!outputName.equals(columnName)) {
+          return baseColumnSelectorFactory.makeColumnValueSelector(columnName);
+        }
+        return new ColumnValueSelector()
+        {
+          @Override
+          public double getDouble()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            if (value instanceof Number) {
+              return ((Number) value).doubleValue();
+            }
+            throw new UOE("Cannot convert object to double");
+          }
+
+          @Override
+          public float getFloat()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            if (value instanceof Number) {
+              return ((Number) value).floatValue();
+            }
+            throw new UOE("Cannot convert object to float");
+          }
+
+          @Override
+          public long getLong()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            if (value instanceof Number) {
+              return ((Number) value).longValue();
+            }
+            throw new UOE("Cannot convert object to long");
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            columnValueSelector.inspectRuntimeShape(inspector);
+          }
+
+          @Override
+          public boolean isNull()
+          {
+            return getObject() == null;
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (!unnestListForCurrentRow.isEmpty()) {
+              if (allowSet == null || allowSet.isEmpty()) {
+                return unnestListForCurrentRow.get(index);
+              } else if (allowSet.contains((String) 
unnestListForCurrentRow.get(index))) {
+                return unnestListForCurrentRow.get(index);
+              }
+            }
+            return null;
+          }
+
+          @Override
+          public Class<?> classOfObject()
+          {
+            return Object.class;
+          }
+        };
+      }
+
+      @Nullable
+      @Override
+      public ColumnCapabilities getColumnCapabilities(String column)
+      {
+        if (!outputName.equals(column)) {
+          return baseColumnSelectorFactory.getColumnCapabilities(column);
+        }
+        final ColumnCapabilities capabilities = 
baseColumnSelectorFactory.getColumnCapabilities(columnName);
+        if (capabilities.isArray()) {
+          return 
ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType());
+        }
+        if (capabilities.hasMultipleValues().isTrue()) {
+          return 
ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false);
+        }
+        return baseColumnSelectorFactory.getColumnCapabilities(columnName);
+      }
+    };
+  }
+
+  @Override
+  public DateTime getTime()
+  {
+    return baseCursor.getTime();
+  }
+
+  @Override
+  public void advance()
+  {
+    advanceUninterruptibly();
+    BaseQuery.checkInterrupted();
+  }
+
+  @Override
+  public void advanceUninterruptibly()
+  {
+    do {
+      advanceAndUpdate();
+    } while (matchAndProceed());
+  }
+
+  @Override
+  public boolean isDone()
+  {
+    if (needInitialization && !baseCursor.isDone()) {
+      initialize();
+    }
+    return baseCursor.isDone();
+  }
+
+  @Override
+  public boolean isDoneOrInterrupted()
+  {
+    if (needInitialization && !baseCursor.isDoneOrInterrupted()) {
+      initialize();
+    }
+    return baseCursor.isDoneOrInterrupted();
+  }
+
+  @Override
+  public void reset()
+  {
+    index = 0;
+    needInitialization = true;
+    baseCursor.reset();
+  }
+
+  /**
+   * This method populates the objects when the base cursor moves to the next 
row
+   *
+   * @param firstRun flag to populate one time object references to hold 
values for unnest cursor
+   */
+  private void getNextRow(boolean firstRun)
+  {
+    currentVal = this.columnValueSelector.getObject();
+    if (currentVal == null) {
+      if (!firstRun) {
+        unnestListForCurrentRow = new ArrayList<>();
+      }
+      unnestListForCurrentRow.add(null);
+    } else {
+      if (currentVal instanceof List) {
+        unnestListForCurrentRow = (List<Object>) currentVal;
+      } else if (currentVal instanceof Object[]) {
+        unnestListForCurrentRow = Arrays.asList((Object[]) currentVal);
+      } else if (currentVal.getClass().equals(String.class)) {
+        if (!firstRun) {
+          unnestListForCurrentRow = new ArrayList<>();
+        }
+        unnestListForCurrentRow.add(currentVal);
+      }
+    }
+  }
+
+  /**
+   * This initializes the unnest cursor and creates data structures
+   * to start iterating over the values to be unnested.
+   * This would also create a bitset for dictonary encoded columns to
+   * check for matching values specified in allowedList of UnnestDataSource.
+   */
+  private void initialize()
+  {
+    this.unnestListForCurrentRow = new ArrayList<>();
+    getNextRow(needInitialization);
+    if (allowSet != null) {
+      if (!allowSet.isEmpty()) {
+        if (!allowSet.contains((String) unnestListForCurrentRow.get(index))) {
+          advance();
+        }
+      }
+    }
+    needInitialization = false;
+  }
+
+  /**
+   * This advances the cursor to move to the next element to be unnested.
+   * When the last element in a row is unnested, it is also responsible
+   * to move the base cursor to the next row for unnesting and repopulates
+   * the data structures, created during initialize(), to point to the new row
+   */
+  private void advanceAndUpdate()
+  {
+    if (unnestListForCurrentRow.isEmpty() || index >= 
unnestListForCurrentRow.size() - 1) {
+      index = 0;
+      baseCursor.advance();
+      if (!baseCursor.isDone()) {
+        getNextRow(needInitialization);
+      }
+    } else {
+      index++;
+    }
+  }
+
+  /**
+   * This advances the unnest cursor in cases where an allowList is specified
+   * and the current value at the unnest cursor is not in the allowList.
+   * The cursor in such cases is moved till the next match is found.
+   *
+   * @return a boolean to indicate whether to stay or move cursor
+   */
+  private boolean matchAndProceed()
+  {
+    boolean matchStatus;
+    if (allowSet == null || allowSet.isEmpty()) {
+      matchStatus = true;
+    } else {
+      matchStatus = allowSet.contains((String) 
unnestListForCurrentRow.get(index));
+    }
+    return !baseCursor.isDone() && !matchStatus;
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java 
b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java
new file mode 100644
index 0000000000..46a2c626ca
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+
+/**
+ * The cursor to help unnest MVDs with dictionary encoding.
+ * Consider a segment has 2 rows
+ * ['a', 'b', 'c']
+ * ['d', 'c']
+ * <p>
+ * Considering dictionary encoding, these are represented as
+ * <p>
+ * 'a' -> 0
+ * 'b' -> 1
+ * 'c' -> 2
+ * 'd' -> 3
+ * <p>
+ * The baseCursor points to the row of IndexedInts [0, 1, 2]
+ * while the unnestCursor with each call of advance() moves over individual 
elements.
+ * <p>
+ * advance() -> 0 -> 'a'
+ * advance() -> 1 -> 'b'
+ * advance() -> 2 -> 'c'
+ * advance() -> 3 -> 'd' (advances base cursor first)
+ * advance() -> 2 -> 'c'
+ * <p>
+ * Total 5 advance calls above
+ * <p>
+ * The allowSet, if available, helps skip over elements that are not in the 
allowList by moving the cursor to
+ * the next available match. The hashSet is converted into a bitset (during 
initialization) for efficiency.
+ * If allowSet is ['c', 'd'] then the advance moves over to the next available 
match
+ * <p>
+ * advance() -> 2 -> 'c'
+ * advance() -> 3 -> 'd' (advances base cursor first)
+ * advance() -> 2 -> 'c'
+ * <p>
+ * Total 3 advance calls in this case
+ * <p>
+ * The index reference points to the index of each row that the unnest cursor 
is accessing
+ * The indexedInts for each row are held in the indexedIntsForCurrentRow object
+ * <p>
+ * The needInitialization flag sets up the initial values of 
indexedIntsForCurrentRow at the beginning of the segment
+ */
+public class UnnestDimensionCursor implements Cursor
+{
+  private final Cursor baseCursor;
+  private final DimensionSelector dimSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private final BitSet allowedBitSet;
+  private final ColumnSelectorFactory baseColumnSelectorFactory;
+  private int index;
+  private IndexedInts indexedIntsForCurrentRow;
+  private boolean needInitialization;
+  private SingleIndexInts indexIntsForRow;
+
+  public UnnestDimensionCursor(
+      Cursor cursor,
+      ColumnSelectorFactory baseColumnSelectorFactory,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumnSelectorFactory = baseColumnSelectorFactory;
+    this.dimSelector = 
this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+    this.allowedBitSet = new BitSet();
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        if (!outputName.equals(dimensionSpec.getDimension())) {
+          return 
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+        }
+
+        return new DimensionSelector()
+        {
+          @Override
+          public IndexedInts getRow()
+          {
+            // This object reference has been created
+            // during the call to initialize and referenced henceforth
+            return indexIntsForRow;
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(@Nullable String value)
+          {
+            final int idForLookup = idLookup().lookupId(value);
+            if (idForLookup < 0) {
+              return new ValueMatcher()
+              {
+                @Override
+                public boolean matches()
+                {
+                  return false;
+                }
+
+                @Override
+                public void inspectRuntimeShape(RuntimeShapeInspector 
inspector)
+                {
+
+                }
+              };
+            }
+
+            return new ValueMatcher()
+            {
+              @Override
+              public boolean matches()
+              {
+                return idForLookup == indexedIntsForCurrentRow.get(index);
+              }
+
+              @Override
+              public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+              {
+                dimSelector.inspectRuntimeShape(inspector);
+              }
+            };
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+          {
+            return DimensionSelectorUtils.makeValueMatcherGeneric(this, 
predicate);
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            dimSelector.inspectRuntimeShape(inspector);
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (indexedIntsForCurrentRow == null) {
+              return null;
+            }
+            if (allowedBitSet.isEmpty()) {
+              if (allowSet == null || allowSet.isEmpty()) {
+                return lookupName(indexedIntsForCurrentRow.get(index));
+              }
+            } else if (allowedBitSet.get(indexedIntsForCurrentRow.get(index))) 
{
+              return lookupName(indexedIntsForCurrentRow.get(index));
+            }
+            return null;
+          }
+
+          @Override
+          public Class<?> classOfObject()
+          {
+            return Object.class;
+          }
+
+          @Override
+          public int getValueCardinality()
+          {
+            if (!allowedBitSet.isEmpty()) {
+              return allowedBitSet.cardinality();
+            }
+            return dimSelector.getValueCardinality();
+          }
+
+          @Nullable
+          @Override
+          public String lookupName(int id)
+          {
+            return dimSelector.lookupName(id);
+          }
+
+          @Override
+          public boolean nameLookupPossibleInAdvance()
+          {
+            return dimSelector.nameLookupPossibleInAdvance();
+          }
+
+          @Nullable
+          @Override
+          public IdLookup idLookup()
+          {
+            return dimSelector.idLookup();
+          }
+        };
+      }
+
+      /*
+      This ideally should not be called. If called delegate using the 
makeDimensionSelector
+       */
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String columnName)
+      {
+        if (!outputName.equals(columnName)) {
+          return baseColumnSelectorFactory.makeColumnValueSelector(columnName);
+        }
+        return makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+      }
+
+      @Nullable
+      @Override
+      public ColumnCapabilities getColumnCapabilities(String column)
+      {
+        if (!outputName.equals(column)) {
+          return baseColumnSelectorFactory.getColumnCapabilities(column);
+        }
+        // This currently returns the same type as of the column to be unnested
+        // This is fine for STRING types
+        // But going forward if the dimension to be unnested is of type ARRAY,
+        // this should strip down to the base type of the array
+        final ColumnCapabilities capabilities = 
baseColumnSelectorFactory.getColumnCapabilities(columnName);
+        if (capabilities.isArray()) {
+          return 
ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType());
+        }
+        if (capabilities.hasMultipleValues().isTrue()) {
+          return 
ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false);
+        }
+        return baseColumnSelectorFactory.getColumnCapabilities(columnName);
+      }
+    };
+  }
+
+  @Override
+  public DateTime getTime()
+  {
+    return baseCursor.getTime();
+  }
+
+  @Override
+  public void advance()
+  {
+    advanceUninterruptibly();
+    BaseQuery.checkInterrupted();
+  }
+
+  @Override
+  public void advanceUninterruptibly()
+  {
+    do {
+      advanceAndUpdate();
+    } while (matchAndProceed());
+  }
+
+  @Override
+  public boolean isDone()
+  {
+    if (needInitialization && !baseCursor.isDone()) {
+      initialize();
+    }
+    return baseCursor.isDone();
+  }
+
+  @Override
+  public boolean isDoneOrInterrupted()
+  {
+    if (needInitialization && !baseCursor.isDoneOrInterrupted()) {
+      initialize();
+    }
+    return baseCursor.isDoneOrInterrupted();
+  }
+
+  @Override
+  public void reset()
+  {
+    index = 0;
+    needInitialization = true;
+    baseCursor.reset();
+  }
+
+  /**
+   * This initializes the unnest cursor and creates data structures
+   * to start iterating over the values to be unnested.
+   * This would also create a bitset for dictonary encoded columns to
+   * check for matching values specified in allowedList of UnnestDataSource.
+   */
+  private void initialize()
+  {
+    IdLookup idLookup = dimSelector.idLookup();
+    this.indexIntsForRow = new SingleIndexInts();
+    if (allowSet != null && !allowSet.isEmpty() && idLookup != null) {
+      for (String s : allowSet) {
+        if (idLookup.lookupId(s) >= 0) {
+          allowedBitSet.set(idLookup.lookupId(s));
+        }
+      }
+    }
+    if (dimSelector.getObject() != null) {
+      this.indexedIntsForCurrentRow = dimSelector.getRow();
+    }
+    if (!allowedBitSet.isEmpty()) {
+      if (!allowedBitSet.get(indexedIntsForCurrentRow.get(index))) {
+        advance();
+      }
+    }
+    needInitialization = false;
+  }
+
+  /**
+   * This advances the cursor to move to the next element to be unnested.
+   * When the last element in a row is unnested, it is also responsible
+   * to move the base cursor to the next row for unnesting and repopulates
+   * the data structures, created during initialize(), to point to the new row
+   */
+  private void advanceAndUpdate()
+  {
+    if (indexedIntsForCurrentRow == null) {
+      index = 0;
+      if (!baseCursor.isDone()) {
+        baseCursor.advanceUninterruptibly();
+      }
+    } else {
+      if (index >= indexedIntsForCurrentRow.size() - 1) {
+        if (!baseCursor.isDone()) {
+          baseCursor.advanceUninterruptibly();
+        }
+        if (!baseCursor.isDone()) {
+          indexedIntsForCurrentRow = dimSelector.getRow();
+        }
+        index = 0;
+      } else {
+        ++index;
+      }
+    }
+  }
+
+  /**
+   * This advances the unnest cursor in cases where an allowList is specified
+   * and the current value at the unnest cursor is not in the allowList.
+   * The cursor in such cases is moved till the next match is found.
+   *
+   * @return a boolean to indicate whether to stay or move cursor
+   */
+  private boolean matchAndProceed()
+  {
+    boolean matchStatus;
+    if ((allowSet == null || allowSet.isEmpty()) && allowedBitSet.isEmpty()) {
+      matchStatus = true;
+    } else {
+      matchStatus = allowedBitSet.get(indexedIntsForCurrentRow.get(index));
+    }
+    return !baseCursor.isDone() && !matchStatus;
+  }
+
+  // Helper class to help in returning
+  // getRow from the dimensionSelector
+  // This is set in the initialize method
+  private class SingleIndexInts implements IndexedInts
+  {
+
+    @Override
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    {
+      //nothing to inspect
+    }
+
+    @Override
+    public int size()
+    {
+      // After unnest each row will have a single element
+      return 1;
+    }
+
+    @Override
+    public int get(int idx)
+    {
+      return indexedIntsForCurrentRow.get(index);
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java 
b/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java
new file mode 100644
index 0000000000..9da6b8132c
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CloseableUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+
+/**
+ * The segment reference for the Unnest Data Source.
+ * The input column name, output name and the allowSet follow from {@link 
org.apache.druid.query.UnnestDataSource}
+ */
+public class UnnestSegmentReference implements SegmentReference
+{
+  private static final Logger log = new Logger(UnnestSegmentReference.class);
+
+  private final SegmentReference baseSegment;
+  private final String dimension;
+  private final String renamedOutputDimension;
+  private final LinkedHashSet<String> allowSet;
+
+  public UnnestSegmentReference(SegmentReference baseSegment, String 
dimension, String outputName, LinkedHashSet<String> allowList)
+  {
+    this.baseSegment = baseSegment;
+    this.dimension = dimension;
+    this.renamedOutputDimension = outputName;
+    this.allowSet = allowList;
+  }
+
+  @Override
+  public Optional<Closeable> acquireReferences()
+  {
+    Closer closer = Closer.create();
+    try {
+      boolean acquireFailed = baseSegment.acquireReferences().map(closeable -> 
{
+        closer.register(closeable);
+        return false;
+      }).orElse(true);
+
+      if (acquireFailed) {
+        CloseableUtils.closeAndWrapExceptions(closer);
+        return Optional.empty();
+      } else {
+        return Optional.of(closer);
+      }
+    }
+    catch (Throwable e) {
+      // acquireReferences is not permitted to throw exceptions.
+      CloseableUtils.closeAndSuppressExceptions(closer, e::addSuppressed);
+      log.warn(e, "Exception encountered while trying to acquire reference");
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public SegmentId getId()
+  {
+    return baseSegment.getId();
+  }
+
+  @Override
+  public Interval getDataInterval()
+  {
+    return baseSegment.getDataInterval();
+  }
+
+  @Nullable
+  @Override
+  public QueryableIndex asQueryableIndex()
+  {
+    return null;
+  }
+
+  @Override
+  public StorageAdapter asStorageAdapter()
+  {
+    return new UnnestStorageAdapter(
+        baseSegment.asStorageAdapter(),
+        dimension,
+        renamedOutputDimension,
+        allowSet
+    );
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    baseSegment.close();
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java 
b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
new file mode 100644
index 0000000000..f76ab89270
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.filter.AndFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+
+/**
+ * This class serves as the Storage Adapter for the Unnest Segment and is 
responsible for creating the cursors
+ * If the column is dictionary encoded it creates {@link 
UnnestDimensionCursor} else {@link UnnestColumnValueSelectorCursor}
+ * These cursors help navigate the segments for these cases
+ */
+public class UnnestStorageAdapter implements StorageAdapter
+{
+  private final StorageAdapter baseAdapter;
+  private final String dimensionToUnnest;
+  private final String outputColumnName;
+  private final LinkedHashSet<String> allowSet;
+
+  public UnnestStorageAdapter(
+      final StorageAdapter baseAdapter,
+      final String dimension,
+      final String outputColumnName,
+      final LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseAdapter = baseAdapter;
+    this.dimensionToUnnest = dimension;
+    this.outputColumnName = outputColumnName;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public Sequence<Cursor> makeCursors(
+      @Nullable Filter filter,
+      Interval interval,
+      VirtualColumns virtualColumns,
+      Granularity gran,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    Filter updatedFilter;
+    if (allowSet != null && !allowSet.isEmpty()) {
+      final InDimFilter allowListFilters;
+      allowListFilters = new InDimFilter(dimensionToUnnest, allowSet);
+      if (filter != null) {
+        updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters));
+      } else {
+        updatedFilter = allowListFilters;
+      }
+    } else {
+      updatedFilter = filter;
+    }
+    final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors(
+        updatedFilter,
+        interval,
+        virtualColumns,
+        gran,
+        descending,
+        queryMetrics
+    );
+
+    return Sequences.map(
+        baseCursorSequence,
+        cursor -> {
+          Objects.requireNonNull(cursor);
+          Cursor retVal = cursor;
+          ColumnCapabilities capabilities = 
cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest);
+          if (capabilities != null) {
+            if 
(capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue())
 {
+              retVal = new UnnestDimensionCursor(
+                  retVal,
+                  retVal.getColumnSelectorFactory(),
+                  dimensionToUnnest,
+                  outputColumnName,
+                  allowSet
+              );
+            } else {
+              retVal = new UnnestColumnValueSelectorCursor(
+                  retVal,
+                  retVal.getColumnSelectorFactory(),
+                  dimensionToUnnest,
+                  outputColumnName,
+                  allowSet
+              );
+            }
+          } else {
+            retVal = new UnnestColumnValueSelectorCursor(
+                retVal,
+                retVal.getColumnSelectorFactory(),
+                dimensionToUnnest,
+                outputColumnName,
+                allowSet
+            );
+          }
+          return retVal;
+        }
+    );
+  }
+
+  @Override
+  public Interval getInterval()
+  {
+    return baseAdapter.getInterval();
+  }
+
+  @Override
+  public Indexed<String> getAvailableDimensions()
+  {
+    final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>();
+
+    for (String dim : baseAdapter.getAvailableDimensions()) {
+      availableDimensions.add(dim);
+    }
+    availableDimensions.add(outputColumnName);
+    return new ListIndexed<>(Lists.newArrayList(availableDimensions));
+  }
+
+  @Override
+  public Iterable<String> getAvailableMetrics()
+  {
+    return baseAdapter.getAvailableMetrics();
+  }
+
+  @Override
+  public int getDimensionCardinality(String column)
+  {
+    if (!outputColumnName.equals(column)) {
+      return baseAdapter.getDimensionCardinality(column);
+    }
+    return baseAdapter.getDimensionCardinality(dimensionToUnnest);
+  }
+
+  @Override
+  public DateTime getMinTime()
+  {
+    return baseAdapter.getMinTime();
+  }
+
+  @Override
+  public DateTime getMaxTime()
+  {
+    return baseAdapter.getMaxTime();
+  }
+
+  @Nullable
+  @Override
+  public Comparable getMinValue(String column)
+  {
+    if (!outputColumnName.equals(column)) {
+      return baseAdapter.getMinValue(column);
+    }
+    return baseAdapter.getMinValue(dimensionToUnnest);
+  }
+
+  @Nullable
+  @Override
+  public Comparable getMaxValue(String column)
+  {
+    if (!outputColumnName.equals(column)) {
+      return baseAdapter.getMaxValue(column);
+    }
+    return baseAdapter.getMaxValue(dimensionToUnnest);
+  }
+
+  @Nullable
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String column)
+  {
+    if (!outputColumnName.equals(column)) {
+      return baseAdapter.getColumnCapabilities(column);
+    }
+    return baseAdapter.getColumnCapabilities(dimensionToUnnest);
+  }
+
+  @Override
+  public int getNumRows()
+  {
+    return 0;
+  }
+
+  @Override
+  public DateTime getMaxIngestedEventTime()
+  {
+    return baseAdapter.getMaxIngestedEventTime();
+  }
+
+  @Nullable
+  @Override
+  public Metadata getMetadata()
+  {
+    return baseAdapter.getMetadata();
+  }
+
+  public String getDimensionToUnnest()
+  {
+    return dimensionToUnnest;
+  }
+}
+
diff --git a/processing/src/test/java/org/apache/druid/segment/ListCursor.java 
b/processing/src/test/java/org/apache/druid/segment/ListCursor.java
new file mode 100644
index 0000000000..666bc21be5
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/ListCursor.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+
+/**
+ * A Cursor that iterates over a user created list.
+ * This is used to test the base cursor of an UnnestCursor.
+ * Usages can be found in tests of {@link UnnestColumnValueSelectorCursor} in 
{@link UnnestColumnValueSelectorCursorTest}
+ * However this cannot help with {@link UnnestDimensionCursor}.
+ * Tests for {@link UnnestDimensionCursor} are done alongside tests for {@link 
UnnestStorageAdapterTest}
+ */
+public class ListCursor implements Cursor
+{
+  List<Object> baseList;
+  private int index;
+
+  public ListCursor(List<Object> inputList)
+  {
+    this.baseList = inputList;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        return new DimensionSelector()
+        {
+          @Override
+          public IndexedInts getRow()
+          {
+            return null;
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(@Nullable String value)
+          {
+            return null;
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+          {
+            return null;
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (index < baseList.size()) {
+              return baseList.get(index);
+            }
+            return null;
+          }
+
+          @Override
+          public Class<?> classOfObject()
+          {
+            return null;
+          }
+
+          @Override
+          public int getValueCardinality()
+          {
+            return 0;
+          }
+
+          @Nullable
+          @Override
+          public String lookupName(int id)
+          {
+            return null;
+          }
+
+          @Override
+          public boolean nameLookupPossibleInAdvance()
+          {
+            return false;
+          }
+
+          @Nullable
+          @Override
+          public IdLookup idLookup()
+          {
+            return null;
+          }
+        };
+      }
+
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String columnName)
+      {
+        return new ColumnValueSelector()
+        {
+          @Override
+          public double getDouble()
+          {
+            return 0;
+          }
+
+          @Override
+          public float getFloat()
+          {
+            return 0;
+          }
+
+          @Override
+          public long getLong()
+          {
+            return 0;
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+
+          }
+
+          @Override
+          public boolean isNull()
+          {
+            return false;
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (index < baseList.size()) {
+              return baseList.get(index);
+            }
+            return null;
+          }
+
+          @Override
+          public Class classOfObject()
+          {
+            return null;
+          }
+        };
+      }
+
+      @Nullable
+      @Override
+      public ColumnCapabilities getColumnCapabilities(String column)
+      {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public DateTime getTime()
+  {
+    return null;
+  }
+
+  @Override
+  public void advance()
+  {
+    advanceUninterruptibly();
+    BaseQuery.checkInterrupted();
+  }
+
+  @Override
+  public void advanceUninterruptibly()
+  {
+    index++;
+  }
+
+  @Override
+  public boolean isDone()
+  {
+    return index > baseList.size() - 1;
+  }
+
+  @Override
+  public boolean isDoneOrInterrupted()
+  {
+    return false;
+  }
+
+  @Override
+  public void reset()
+  {
+    index = 0;
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java
 
b/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java
new file mode 100644
index 0000000000..b3346e1e56
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class UnnestColumnValueSelectorCursorTest extends 
InitializedNullHandlingTest
+{
+  private static String OUTPUT_NAME = "unnested-column";
+  private static LinkedHashSet<String> IGNORE_SET = null;
+  private static LinkedHashSet<String> IGNORE_SET1 = new 
LinkedHashSet<>(Arrays.asList("b", "f"));
+
+
+  @Test
+  public void test_list_unnest_cursors()
+  {
+    ArrayList<Object> baseList = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      List<Object> newList = new ArrayList<>();
+      for (int j = 0; j < 2; j++) {
+        newList.add(String.valueOf(i * 2 + j));
+      }
+      baseList.add(newList);
+    }
+    ListCursor listCursor = new ListCursor(baseList);
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int j = 0;
+    while (!unnestCursor.isDone()) {
+      Object colSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertEquals(colSelectorVal.toString(), String.valueOf(j));
+      j++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(j, 4);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "b", "c"),
+        Arrays.asList("e", "f", "g", "h", "i"),
+        Collections.singletonList("j")
+    );
+
+    List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", 
"h", "i", "j");
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 9);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_only_nulls()
+  {
+    List<Object> inputList = Arrays.asList(
+        Collections.singletonList(null),
+        Arrays.asList(null, null),
+        Collections.singletonList(null)
+    );
+
+    List<String> expectedResults = Arrays.asList(null, null, null, null);
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertNull(valueSelectorVal);
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 4);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_mixed_with_nulls()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "b"),
+        Arrays.asList("b", "c"),
+        "d",
+        null,
+        null,
+        null
+    );
+
+    List<String> expectedResults = Arrays.asList("a", "b", "b", "c", "d", 
null, null, null);
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      if (valueSelectorVal == null) {
+        Assert.assertEquals(null, expectedResults.get(k));
+      } else {
+        Assert.assertEquals(valueSelectorVal.toString(), 
expectedResults.get(k));
+      }
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 8);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_strings_and_no_lists()
+  {
+    List<Object> inputList = Arrays.asList("a", "b", "c", "e", "f", "g", "h", 
"i", "j");
+
+    List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", 
"h", "i", "j");
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 9);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_strings_mixed_with_list()
+  {
+    List<Object> inputList = Arrays.asList("a", "b", "c", "e", "f", 
Arrays.asList("g", "h"), "i", "j");
+
+    List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", 
"h", "i", "j");
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 9);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_lists_three_levels()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "b", "c"),
+        Arrays.asList("e", "f", "g", "h", "i"),
+        Arrays.asList("j", Arrays.asList("a", "b"))
+    );
+
+    List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", 
"h", "i", "j", Arrays.asList("a", "b"));
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertEquals(valueSelectorVal.toString(), 
expectedResults.get(k).toString());
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 10);
+  }
+
+  @Test
+  public void 
test_list_unnest_of_unnest_cursors_user_supplied_list_three_levels()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "b", "c"),
+        Arrays.asList("e", "f", "g", "h", "i"),
+        Arrays.asList("j", Arrays.asList("a", "b"))
+    );
+
+    List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", 
"h", "i", "j", "a", "b");
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor childCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    UnnestColumnValueSelectorCursor parentCursor = new 
UnnestColumnValueSelectorCursor(
+        childCursor,
+        childCursor.getColumnSelectorFactory(),
+        OUTPUT_NAME,
+        "tmp-out",
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
parentCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector("tmp-out");
+    int k = 0;
+    while (!parentCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertEquals(valueSelectorVal.toString(), 
expectedResults.get(k).toString());
+      k++;
+      parentCursor.advance();
+    }
+    Assert.assertEquals(k, 11);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_with_nulls()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "b", "c"),
+        Arrays.asList("e", "f", "g", "h", "i", null),
+        Collections.singletonList("j")
+    );
+
+    List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", 
"h", "i", null, "j");
+
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      if (valueSelectorVal == null) {
+        Assert.assertEquals(null, expectedResults.get(k));
+      } else {
+        Assert.assertEquals(valueSelectorVal.toString(), 
expectedResults.get(k));
+      }
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, expectedResults.size());
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_with_dups()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "a", "a"),
+        Arrays.asList("e", "f", null, "h", "i", null),
+        Collections.singletonList("j")
+    );
+
+    List<Object> expectedResults = Arrays.asList("a", "a", "a", "e", "f", 
null, "h", "i", null, "j");
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      if (valueSelectorVal == null) {
+        Assert.assertEquals(null, expectedResults.get(k));
+      } else {
+        Assert.assertEquals(valueSelectorVal.toString(), 
expectedResults.get(k));
+      }
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 10);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_with_ignore_set()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "b", "c"),
+        Arrays.asList("e", "f", "g", "h", "i"),
+        Collections.singletonList("j")
+    );
+
+    List<String> expectedResults = Arrays.asList("b", "f");
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET1
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      if (valueSelectorVal == null) {
+        Assert.assertEquals(null, expectedResults.get(k));
+      } else {
+        Assert.assertEquals(valueSelectorVal.toString(), 
expectedResults.get(k));
+      }
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 2);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_double()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList(1, 2, 3),
+        Arrays.asList(4, 5, 6, 7, 8),
+        Collections.singletonList(9)
+    );
+
+    List<Double> expectedResults = Arrays.asList(1d, 2d, 3d, 4d, 5d, 6d, 7d, 
8d, 9d);
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Double valueSelectorVal = unnestColumnValueSelector.getDouble();
+      Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 9);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_float()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList(1, 2, 3),
+        Arrays.asList(4, 5, 6, 7, 8),
+        Collections.singletonList(9)
+    );
+
+    List<Float> expectedResults = Arrays.asList(1f, 2f, 3f, 4f, 5f, 6f, 7f, 
8f, 9f);
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Float valueSelectorVal = unnestColumnValueSelector.getFloat();
+      Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 9);
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_long()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList(1, 2, 3),
+        Arrays.asList(4, 5, 6, 7, 8),
+        Collections.singletonList(9)
+    );
+
+    List<Long> expectedResults = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
9L);
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object obj = unnestColumnValueSelector.getObject();
+      Assert.assertNotNull(obj);
+      Long valueSelectorVal = unnestColumnValueSelector.getLong();
+      Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 9);
+  }
+
+  @Test
+  public void 
test_list_unnest_cursors_user_supplied_list_three_level_arrays_and_methods()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "b", "c"),
+        Arrays.asList("e", "f", "g", "h", "i"),
+        Arrays.asList("j", Arrays.asList("a", "b"))
+    );
+
+    List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", 
"h", "i", "j", Arrays.asList("a", "b"));
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertEquals(valueSelectorVal.toString(), 
expectedResults.get(k).toString());
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 10);
+    unnestCursor.reset();
+    Assert.assertFalse(unnestCursor.isDoneOrInterrupted());
+  }
+
+  @Test(expected = UOE.class)
+  public void test_list_unnest_cursors_dimSelector()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList("a", "b", "c"),
+        Arrays.asList("e", "f", "g", "h", "i"),
+        Collections.singletonList("j")
+    );
+
+    List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", 
"h", "i", "j");
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    
unnestCursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_NAME));
+  }
+
+  @Test
+  public void test_list_unnest_cursors_user_supplied_list_of_integers()
+  {
+    List<Object> inputList = Arrays.asList(
+        Arrays.asList(1, 2, 3),
+        Arrays.asList(4, 5, 6, 7, 8),
+        Collections.singletonList(9)
+    );
+
+    List<Integer> expectedResults = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+    //Create base cursor
+    ListCursor listCursor = new ListCursor(inputList);
+
+    //Create unnest cursor
+    UnnestColumnValueSelectorCursor unnestCursor = new 
UnnestColumnValueSelectorCursor(
+        listCursor,
+        listCursor.getColumnSelectorFactory(),
+        "dummy",
+        OUTPUT_NAME,
+        IGNORE_SET
+    );
+    ColumnValueSelector unnestColumnValueSelector = 
unnestCursor.getColumnSelectorFactory()
+                                                                
.makeColumnValueSelector(OUTPUT_NAME);
+    int k = 0;
+    while (!unnestCursor.isDone()) {
+      Object valueSelectorVal = unnestColumnValueSelector.getObject();
+      Assert.assertEquals(valueSelectorVal.toString(), 
expectedResults.get(k).toString());
+      k++;
+      unnestCursor.advance();
+    }
+    Assert.assertEquals(k, 9);
+  }
+}
+
diff --git 
a/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
new file mode 100644
index 0000000000..35d42b82d4
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.apache.druid.utils.CloseableUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class UnnestStorageAdapterTest extends InitializedNullHandlingTest
+{
+  private static Closer CLOSER;
+  private static IncrementalIndex INCREMENTAL_INDEX;
+  private static IncrementalIndexStorageAdapter 
INCREMENTAL_INDEX_STORAGE_ADAPTER;
+  private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER;
+  private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER1;
+  private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER2;
+  private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER3;
+  private static List<StorageAdapter> ADAPTERS;
+  private static String COLUMNNAME = "multi-string1";
+  private static String OUTPUT_COLUMN_NAME = "unnested-multi-string1";
+  private static String OUTPUT_COLUMN_NAME1 = "unnested-multi-string1-again";
+  private static LinkedHashSet<String> IGNORE_SET = new 
LinkedHashSet<>(Arrays.asList("1", "3", "5"));
+
+  @BeforeClass
+  public static void setup()
+  {
+    CLOSER = Closer.create();
+    final GeneratorSchemaInfo schemaInfo = 
GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench");
+
+    final DataSegment dataSegment = DataSegment.builder()
+                                               .dataSource("foo")
+                                               
.interval(schemaInfo.getDataInterval())
+                                               .version("1")
+                                               .shardSpec(new 
LinearShardSpec(0))
+                                               .size(0)
+                                               .build();
+    final SegmentGenerator segmentGenerator = CLOSER.register(new 
SegmentGenerator());
+
+    final int numRows = 2;
+    INCREMENTAL_INDEX = CLOSER.register(
+        segmentGenerator.generateIncrementalIndex(dataSegment, schemaInfo, 
Granularities.HOUR, numRows)
+    );
+    INCREMENTAL_INDEX_STORAGE_ADAPTER = new 
IncrementalIndexStorageAdapter(INCREMENTAL_INDEX);
+    UNNEST_STORAGE_ADAPTER = new UnnestStorageAdapter(
+        INCREMENTAL_INDEX_STORAGE_ADAPTER,
+        COLUMNNAME,
+        OUTPUT_COLUMN_NAME,
+        null
+    );
+    UNNEST_STORAGE_ADAPTER1 = new UnnestStorageAdapter(
+        INCREMENTAL_INDEX_STORAGE_ADAPTER,
+        COLUMNNAME,
+        OUTPUT_COLUMN_NAME,
+        IGNORE_SET
+    );
+    UNNEST_STORAGE_ADAPTER2 = new UnnestStorageAdapter(
+        UNNEST_STORAGE_ADAPTER,
+        COLUMNNAME,
+        OUTPUT_COLUMN_NAME1,
+        null
+    );
+    UNNEST_STORAGE_ADAPTER3 = new UnnestStorageAdapter(
+        UNNEST_STORAGE_ADAPTER1,
+        COLUMNNAME,
+        OUTPUT_COLUMN_NAME1,
+        IGNORE_SET
+    );
+    ADAPTERS = ImmutableList.of(
+        UNNEST_STORAGE_ADAPTER,
+        UNNEST_STORAGE_ADAPTER1,
+        UNNEST_STORAGE_ADAPTER2,
+        UNNEST_STORAGE_ADAPTER3
+    );
+  }
+
+  @AfterClass
+  public static void teardown()
+  {
+    CloseableUtils.closeAndSuppressExceptions(CLOSER, throwable -> {
+    });
+  }
+
+  @Test
+  public void test_group_of_unnest_adapters_methods()
+  {
+    String colName = "multi-string1";
+    for (StorageAdapter adapter : ADAPTERS) {
+      Assert.assertEquals(
+          DateTimes.of("2000-01-01T23:00:00.000Z"),
+          adapter.getMaxTime()
+      );
+      Assert.assertEquals(
+          DateTimes.of("2000-01-01T12:00:00.000Z"),
+          adapter.getMinTime()
+      );
+      adapter.getColumnCapabilities(colName);
+      Assert.assertEquals(adapter.getNumRows(), 0);
+      Assert.assertNotNull(adapter.getMetadata());
+      Assert.assertEquals(
+          DateTimes.of("2000-01-01T23:59:59.999Z"),
+          adapter.getMaxIngestedEventTime()
+      );
+      Assert.assertEquals(
+          adapter.getColumnCapabilities(colName).toColumnType(),
+          
INCREMENTAL_INDEX_STORAGE_ADAPTER.getColumnCapabilities(colName).toColumnType()
+      );
+      Assert.assertEquals(((UnnestStorageAdapter) 
adapter).getDimensionToUnnest(), colName);
+    }
+  }
+
+  @Test
+  public void test_group_of_unnest_adapters_column_capabilities()
+  {
+    String colName = "multi-string1";
+    List<String> columnsInTable = Arrays.asList(
+        "string1",
+        "long1",
+        "double1",
+        "float1",
+        "multi-string1",
+        OUTPUT_COLUMN_NAME
+    );
+    List<ValueType> valueTypes = Arrays.asList(
+        ValueType.STRING,
+        ValueType.LONG,
+        ValueType.DOUBLE,
+        ValueType.FLOAT,
+        ValueType.STRING,
+        ValueType.STRING
+    );
+    UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER;
+
+    for (int i = 0; i < columnsInTable.size(); i++) {
+      ColumnCapabilities capabilities = 
adapter.getColumnCapabilities(columnsInTable.get(i));
+      Assert.assertEquals(capabilities.getType(), valueTypes.get(i));
+    }
+    Assert.assertEquals(adapter.getDimensionToUnnest(), colName);
+
+  }
+
+  @Test
+  public void test_unnest_adapters_basic()
+  {
+
+    Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER.makeCursors(
+        null,
+        UNNEST_STORAGE_ADAPTER.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+      DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+      int count = 0;
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        if (dimSelectorVal == null) {
+          Assert.assertNull(dimSelectorVal);
+        }
+        cursor.advance();
+        count++;
+      }
+        /*
+      each row has 8 entries.
+      unnest 2 rows -> 16 rows after unnest
+       */
+      Assert.assertEquals(count, 16);
+      return null;
+    });
+
+  }
+
+  @Test
+  public void test_two_levels_of_unnest_adapters()
+  {
+    Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER2.makeCursors(
+        null,
+        UNNEST_STORAGE_ADAPTER2.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+
+
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+      DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME1));
+      ColumnValueSelector valueSelector = 
factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME1);
+
+      int count = 0;
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        Object valueSelectorVal = valueSelector.getObject();
+        if (dimSelectorVal == null) {
+          Assert.assertNull(dimSelectorVal);
+        } else if (valueSelectorVal == null) {
+          Assert.assertNull(valueSelectorVal);
+        }
+        cursor.advance();
+        count++;
+      }
+      /*
+      each row has 8 entries.
+      unnest 2 rows -> 16 entries also the value cardinality
+      unnest of unnest -> 16*8 = 128 rows
+       */
+      Assert.assertEquals(count, 128);
+      Assert.assertEquals(dimSelector.getValueCardinality(), 16);
+      return null;
+    });
+  }
+
+  @Test
+  public void test_unnest_adapters_with_allowList()
+  {
+    final String columnName = "multi-string1";
+
+    Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
+        null,
+        UNNEST_STORAGE_ADAPTER1.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+      DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+      ColumnValueSelector valueSelector = 
factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME);
+
+      int count = 0;
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        Object valueSelectorVal = valueSelector.getObject();
+        if (dimSelectorVal == null) {
+          Assert.assertNull(dimSelectorVal);
+        } else if (valueSelectorVal == null) {
+          Assert.assertNull(valueSelectorVal);
+        }
+        cursor.advance();
+        count++;
+      }
+      /*
+      each row has 8 distinct entries.
+      allowlist has 3 entries also the value cardinality
+      unnest will have 3 distinct entries
+       */
+      Assert.assertEquals(count, 3);
+      Assert.assertEquals(dimSelector.getValueCardinality(), 3);
+      return null;
+    });
+  }
+
+  @Test
+  public void test_two_levels_of_unnest_adapters_with_allowList()
+  {
+    final String columnName = "multi-string1";
+
+    Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER3.makeCursors(
+        null,
+        UNNEST_STORAGE_ADAPTER3.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+    UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER3;
+    Assert.assertEquals(adapter.getDimensionToUnnest(), columnName);
+    Assert.assertEquals(
+        
adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(),
+        ColumnCapabilities.Capable.TRUE
+    );
+    Assert.assertEquals(adapter.getMaxValue(columnName), 
adapter.getMaxValue(OUTPUT_COLUMN_NAME));
+    Assert.assertEquals(adapter.getMinValue(columnName), 
adapter.getMinValue(OUTPUT_COLUMN_NAME));
+
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+      DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME1));
+      ColumnValueSelector valueSelector = 
factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME1);
+
+      int count = 0;
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        Object valueSelectorVal = valueSelector.getObject();
+        if (dimSelectorVal == null) {
+          Assert.assertNull(dimSelectorVal);
+        } else if (valueSelectorVal == null) {
+          Assert.assertNull(valueSelectorVal);
+        }
+        cursor.advance();
+        count++;
+      }
+      /*
+      each row has 8 distinct entries.
+      allowlist has 3 entries also the value cardinality
+      unnest will have 3 distinct entries
+      unnest of that unnest will have 3*3 = 9 entries
+       */
+      Assert.assertEquals(count, 9);
+      Assert.assertEquals(dimSelector.getValueCardinality(), 3);
+      return null;
+    });
+  }
+
+  @Test
+  public void test_unnest_adapters_methods_with_allowList()
+  {
+    final String columnName = "multi-string1";
+
+    Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
+        null,
+        UNNEST_STORAGE_ADAPTER1.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+    UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER1;
+    Assert.assertEquals(adapter.getDimensionToUnnest(), columnName);
+    Assert.assertEquals(
+        
adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(),
+        ColumnCapabilities.Capable.TRUE
+    );
+    Assert.assertEquals(adapter.getMaxValue(columnName), 
adapter.getMaxValue(OUTPUT_COLUMN_NAME));
+    Assert.assertEquals(adapter.getMinValue(columnName), 
adapter.getMinValue(OUTPUT_COLUMN_NAME));
+
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+      DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+      IdLookup idlookUp = dimSelector.idLookup();
+      Assert.assertFalse(dimSelector.isNull());
+      int[] indices = new int[]{1, 3, 5};
+      int count = 0;
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        Assert.assertEquals(idlookUp.lookupId((String) dimSelectorVal), 
indices[count]);
+        // after unnest first entry in get row should equal the object
+        // and the row size will always be 1
+        Assert.assertEquals(dimSelector.getRow().get(0), indices[count]);
+        Assert.assertEquals(dimSelector.getRow().size(), 1);
+        Assert.assertNotNull(dimSelector.makeValueMatcher(OUTPUT_COLUMN_NAME));
+        cursor.advance();
+        count++;
+      }
+      Assert.assertEquals(dimSelector.getValueCardinality(), 3);
+      Assert.assertEquals(count, 3);
+      return null;
+    });
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org


Reply via email to