clintropolis commented on code in PR #13268: URL: https://github.com/apache/druid/pull/13268#discussion_r1037548430
########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +/** + * The cursor to help unnest MVDs without dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'e'] + * + * The baseCursor points to the row ['a', 'b', 'c'] + * while the unnestCursor with each call of advance() moves over individual elements. + * + * unnestCursor.advance() -> 'a' + * unnestCursor.advance() -> 'b' + * unnestCursor.advance() -> 'c' + * unnestCursor.advance() -> 'd' (advances base cursor first) + * unnestCursor.advance() -> 'e' + * + * + * The allowSet if available helps skip over elements which are not in the allowList by moving the cursor to + * the next available match. + * + * The index reference points to the index of each row that the unnest cursor is accessing through currentVal + * The index ranges from 0 to the size of the list in each row which is held in the unnestListForCurrentRow + * + * The needInitialization flag sets up the initial values of unnestListForCurrentRow at the beginning of the segment + * + */ +public class ColumnarValueUnnestCursor implements Cursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + ColumnSelectorFactory baseColumSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = baseColumSelectorFactory; + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + throw new UnsupportedOperationException( + "Dimension selector not applicable for column value selector for column " + outputName); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); Review Comment: I don't think you can count on casting to a string here, since it depends on the type of the underlying column value selector, same for other primitive numeric getters ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +/** + * The cursor to help unnest MVDs without dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'e'] + * + * The baseCursor points to the row ['a', 'b', 'c'] + * while the unnestCursor with each call of advance() moves over individual elements. + * + * unnestCursor.advance() -> 'a' + * unnestCursor.advance() -> 'b' + * unnestCursor.advance() -> 'c' + * unnestCursor.advance() -> 'd' (advances base cursor first) + * unnestCursor.advance() -> 'e' + * + * + * The allowSet if available helps skip over elements which are not in the allowList by moving the cursor to + * the next available match. + * + * The index reference points to the index of each row that the unnest cursor is accessing through currentVal + * The index ranges from 0 to the size of the list in each row which is held in the unnestListForCurrentRow + * + * The needInitialization flag sets up the initial values of unnestListForCurrentRow at the beginning of the segment + * + */ +public class ColumnarValueUnnestCursor implements Cursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + ColumnSelectorFactory baseColumSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = baseColumSelectorFactory; + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + throw new UnsupportedOperationException( + "Dimension selector not applicable for column value selector for column " + outputName); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Float.valueOf((String) value); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Long.valueOf((String) value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + columnValueSelector.inspectRuntimeShape(inspector); + } + + @Override + public boolean isNull() + { + return getObject() == null; + } + + @Nullable + @Override + public Object getObject() + { + if (!unnestListForCurrentRow.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return unnestListForCurrentRow.get(index); + } else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) { + return unnestListForCurrentRow.get(index); + } + } + return null; + } + + @Override + public Class<?> classOfObject() + { + return Object.class; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumSelectorFactory.getColumnCapabilities(column); + } + return baseColumSelectorFactory.getColumnCapabilities(columnName); Review Comment: i don't think you want to strictly pass through the underlying capabilities. If the underlying column is a multi-value string, you need to return capabilities that have multiple values set to false since it is no longer a multi-value string, if the underlying capabilities is an ARRAY type, you need to return the element type of the array. ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +/** + * The cursor to help unnest MVDs with dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'c'] + * + * Considering dictionary encoding, these are represented as + * + * 'a' -> 0 + * 'b' -> 1 + * 'c' -> 2 + * 'd' -> 3 + * + * The baseCursor points to the row of IndexedInts [0, 1, 2] + * while the unnestCursor with each call of advance() moves over individual elements. + * + * advance() -> 0 -> 'a' + * advance() -> 1 -> 'b' + * advance() -> 2 -> 'c' + * advance() -> 3 -> 'd' (advances base cursor first) + * advance() -> 2 -> 'c' + * + * Total 5 advance calls above + * + * The allowSet, if available, helps skip over elements that are not in the allowList by moving the cursor to + * the next available match. The hashSet is converted into a bitset (during initialization) for efficiency. + * If allowSet is ['c', 'd'] then the advance moves over to the next available match + * + * advance() -> 2 -> 'c' + * advance() -> 3 -> 'd' (advances base cursor first) + * advance() -> 2 -> 'c' + * + * Total 3 advance calls in this case + * + * The index reference points to the index of each row that the unnest cursor is accessing + * The indexedInts for each row are held in the indexedIntsForCurrentRow object + * + * The needInitialization flag sets up the initial values of indexedIntsForCurrentRow at the beginning of the segment + * + */ +public class DimensionUnnestCursor implements Cursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + private SingleIndexInts indexIntsForRow; + + public DimensionUnnestCursor( + Cursor cursor, + ColumnSelectorFactory baseColumnSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = baseColumnSelectorFactory; + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + this.allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + // This object reference has been created + // during the call to initialize and referenced henceforth + return indexIntsForRow; + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + final int idForLookup = idLookup().lookupId(value); + if (idForLookup < 0) { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return false; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + }; + } + + return new ValueMatcher() + { + @Override + public boolean matches() + { + return idForLookup == indexedIntsForCurrentRow.get(index); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + dimSelector.inspectRuntimeShape(inspector); + } + }; + } + + @Override + public ValueMatcher makeValueMatcher(Predicate<String> predicate) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + dimSelector.inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public Object getObject() + { + if (allowedBitSet.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + } else if (allowedBitSet.get(indexedIntsForCurrentRow.get(index))) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + return null; + } + + @Override + public Class<?> classOfObject() + { + return Object.class; + } + + @Override + public int getValueCardinality() + { + if (!allowedBitSet.isEmpty()) { + return allowedBitSet.cardinality(); + } + return dimSelector.getValueCardinality(); + } + + @Nullable + @Override + public String lookupName(int id) + { + return dimSelector.lookupName(id); + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return dimSelector.nameLookupPossibleInAdvance(); + } + + @Nullable + @Override + public IdLookup idLookup() + { + return dimSelector.idLookup(); + } + }; + } + + /* + This ideally should not be called. If called delegate using the makeDimensionSelector + */ + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumnSelectorFactory.makeColumnValueSelector(columnName); + } + return makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumnSelectorFactory.getColumnCapabilities(column); + } + return baseColumnSelectorFactory.getColumnCapabilities(columnName); Review Comment: same comment about adjusting column capabilities to remove has multiple values from the columns capabilities ########## processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.filter.AndFilter; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.LinkedHashSet; + +public class UnnestStorageAdapter implements StorageAdapter +{ + private final StorageAdapter baseAdapter; + private final String dimensionToUnnest; + private final String outputColumnName; + private final LinkedHashSet<String> allowSet; + + public UnnestStorageAdapter( + final StorageAdapter baseAdapter, + final String dimension, + final String outputColumnName, + final LinkedHashSet<String> allowSet + ) + { + this.baseAdapter = baseAdapter; + this.dimensionToUnnest = dimension; + this.outputColumnName = outputColumnName; + this.allowSet = allowSet; + } + + @Override + public Sequence<Cursor> makeCursors( + @Nullable Filter filter, + Interval interval, + VirtualColumns virtualColumns, + Granularity gran, + boolean descending, + @Nullable QueryMetrics<?> queryMetrics + ) + { + Filter updatedFilter; + final InDimFilter allowListFilters; + if (allowSet != null && !allowSet.isEmpty()) { + allowListFilters = new InDimFilter(dimensionToUnnest, allowSet); + if (filter != null) { + updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters)); + } else { + updatedFilter = allowListFilters; + } + } else { + updatedFilter = filter; + } + final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors( + updatedFilter, + interval, + virtualColumns, + gran, + descending, + queryMetrics + ); + + return Sequences.map( + baseCursorSequence, + cursor -> { + assert cursor != null; + Cursor retVal = cursor; + UnnestCursor retUnnestCursor; + ColumnCapabilities capabilities = cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest); + if (capabilities.isDictionaryEncoded() == ColumnCapabilities.Capable.TRUE + && capabilities.areDictionaryValuesUnique() == ColumnCapabilities.Capable.TRUE) { + retUnnestCursor = new DimensionUnnestCursor(retVal, dimensionToUnnest, outputColumnName, allowSet); + } else { + retUnnestCursor = new ColumnarValueUnnestCursor(retVal, dimensionToUnnest, outputColumnName, allowSet); + } + return retUnnestCursor; + } + ); + } + + @Override + public Interval getInterval() + { + return baseAdapter.getInterval(); + } + + @Override + public Indexed<String> getAvailableDimensions() + { + final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>(); + + for (String dim : baseAdapter.getAvailableDimensions()) { + availableDimensions.add(dim); + } + // check to see if output name provided is already + // a part of available dimensions + if (availableDimensions.contains(outputColumnName)) { + throw new IAE( + "Provided output name [%s] already exists in table to be unnested. Please use a different name.", + outputColumnName + ); + } else { + availableDimensions.add(outputColumnName); + } + return new ListIndexed<>(Lists.newArrayList(availableDimensions)); + } + + @Override + public Iterable<String> getAvailableMetrics() + { + return baseAdapter.getAvailableMetrics(); + } + + @Override + public int getDimensionCardinality(String column) + { + if (outputColumnName.equals(dimensionToUnnest)) { + return baseAdapter.getDimensionCardinality(column); + } + return baseAdapter.getDimensionCardinality(dimensionToUnnest); + } + + @Override + public DateTime getMinTime() + { + return baseAdapter.getMinTime(); + } + + @Override + public DateTime getMaxTime() + { + return baseAdapter.getMaxTime(); + } + + @Nullable + @Override + public Comparable getMinValue(String column) + { + if (outputColumnName.equals(dimensionToUnnest)) { + return baseAdapter.getMinValue(column); + } + return baseAdapter.getMinValue(dimensionToUnnest); + } + + @Nullable + @Override + public Comparable getMaxValue(String column) + { + if (outputColumnName.equals(dimensionToUnnest)) { + return baseAdapter.getMaxValue(column); + } + return baseAdapter.getMaxValue(dimensionToUnnest); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (outputColumnName.equals(dimensionToUnnest)) { + return baseAdapter.getColumnCapabilities(column); + } + return baseAdapter.getColumnCapabilities(dimensionToUnnest); + } + + @Override + public int getNumRows() + { + return 0; Review Comment: i think this is fine, only segment metadata uses it and some metrics about segment row counts ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +/** + * The cursor to help unnest MVDs without dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'e'] + * + * The baseCursor points to the row ['a', 'b', 'c'] + * while the unnestCursor with each call of advance() moves over individual elements. + * + * unnestCursor.advance() -> 'a' + * unnestCursor.advance() -> 'b' + * unnestCursor.advance() -> 'c' + * unnestCursor.advance() -> 'd' (advances base cursor first) + * unnestCursor.advance() -> 'e' + * + * + * The allowSet if available helps skip over elements which are not in the allowList by moving the cursor to + * the next available match. + * + * The index reference points to the index of each row that the unnest cursor is accessing through currentVal + * The index ranges from 0 to the size of the list in each row which is held in the unnestListForCurrentRow + * + * The needInitialization flag sets up the initial values of unnestListForCurrentRow at the beginning of the segment + * + */ +public class ColumnarValueUnnestCursor implements Cursor Review Comment: super nitpick, but why not just call this thing what it is doing, e.g. `UnnestColumnValueSelectorCursor`? Same thing with the other one, `UnnestDimensionSelectorCursor` ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +/** + * The cursor to help unnest MVDs with dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'c'] + * + * Considering dictionary encoding, these are represented as + * + * 'a' -> 0 + * 'b' -> 1 + * 'c' -> 2 + * 'd' -> 3 + * + * The baseCursor points to the row of IndexedInts [0, 1, 2] + * while the unnestCursor with each call of advance() moves over individual elements. + * + * advance() -> 0 -> 'a' + * advance() -> 1 -> 'b' + * advance() -> 2 -> 'c' + * advance() -> 3 -> 'd' (advances base cursor first) + * advance() -> 2 -> 'c' + * + * Total 5 advance calls above + * + * The allowSet, if available, helps skip over elements that are not in the allowList by moving the cursor to + * the next available match. The hashSet is converted into a bitset (during initialization) for efficiency. + * If allowSet is ['c', 'd'] then the advance moves over to the next available match + * + * advance() -> 2 -> 'c' + * advance() -> 3 -> 'd' (advances base cursor first) + * advance() -> 2 -> 'c' + * + * Total 3 advance calls in this case + * + * The index reference points to the index of each row that the unnest cursor is accessing + * The indexedInts for each row are held in the indexedIntsForCurrentRow object + * + * The needInitialization flag sets up the initial values of indexedIntsForCurrentRow at the beginning of the segment + * + */ +public class DimensionUnnestCursor implements Cursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + private SingleIndexInts indexIntsForRow; + + public DimensionUnnestCursor( + Cursor cursor, + ColumnSelectorFactory baseColumnSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = baseColumnSelectorFactory; + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + this.allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + // This object reference has been created + // during the call to initialize and referenced henceforth + return indexIntsForRow; + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + final int idForLookup = idLookup().lookupId(value); + if (idForLookup < 0) { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return false; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + }; + } + + return new ValueMatcher() + { + @Override + public boolean matches() + { + return idForLookup == indexedIntsForCurrentRow.get(index); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + dimSelector.inspectRuntimeShape(inspector); + } + }; + } + + @Override + public ValueMatcher makeValueMatcher(Predicate<String> predicate) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + dimSelector.inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public Object getObject() + { + if (allowedBitSet.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + } else if (allowedBitSet.get(indexedIntsForCurrentRow.get(index))) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + return null; + } + + @Override + public Class<?> classOfObject() + { + return Object.class; + } + + @Override + public int getValueCardinality() + { + if (!allowedBitSet.isEmpty()) { + return allowedBitSet.cardinality(); + } + return dimSelector.getValueCardinality(); + } + + @Nullable + @Override + public String lookupName(int id) + { + return dimSelector.lookupName(id); + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return dimSelector.nameLookupPossibleInAdvance(); + } + + @Nullable + @Override + public IdLookup idLookup() + { + return dimSelector.idLookup(); + } + }; + } + + /* + This ideally should not be called. If called delegate using the makeDimensionSelector + */ + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumnSelectorFactory.makeColumnValueSelector(columnName); + } + return makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumnSelectorFactory.getColumnCapabilities(column); + } + return baseColumnSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && !baseCursor.isDone()) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && !baseCursor.isDoneOrInterrupted()) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + /** + * This initializes the unnest cursor and creates data structures + * to start iterating over the values to be unnested. + * This would also create a bitset for dictonary encoded columns to + * check for matching values specified in allowedList of UnnestDataSource. + */ + private void initialize() + { + IdLookup idLookup = dimSelector.idLookup(); + this.indexIntsForRow = new SingleIndexInts(); + if (allowSet != null && !allowSet.isEmpty() && idLookup != null) { + for (String s : allowSet) { + if (idLookup.lookupId(s) >= 0) { + allowedBitSet.set(idLookup.lookupId(s)); + } + } + } + if (dimSelector.getObject() != null) { + this.indexedIntsForCurrentRow = dimSelector.getRow(); + } + if (!allowedBitSet.isEmpty()) { + if (!allowedBitSet.get(indexedIntsForCurrentRow.get(index))) { + advance(); + } + } + needInitialization = false; + } + + /** + * This advances the cursor to move to the next element to be unnested. + * When the last element in a row is unnested, it is also responsible + * to move the base cursor to the next row for unnesting and repopulates + * the data structures, created during initialize(), to point to the new row + */ + private void advanceAndUpdate() + { + if (index >= indexedIntsForCurrentRow.size() - 1) { + if (!baseCursor.isDone()) { + baseCursor.advanceUninterruptibly(); + } + if (!baseCursor.isDone()) { + indexedIntsForCurrentRow = dimSelector.getRow(); + } + index = 0; + } else { + ++index; + } + } + + /** + * This advances the unnest cursor in cases where an allowList is specified + * and the current value at the unnest cursor is not in the allowList. + * The cursor in such cases is moved till the next match is found. + * + * @return a boolean to indicate whether to stay or move cursor + */ + private boolean matchAndProceed() + { + boolean matchStatus; + if ((allowSet == null || allowSet.isEmpty()) && allowedBitSet.isEmpty()) { + matchStatus = true; + } else { + matchStatus = allowedBitSet.get(indexedIntsForCurrentRow.get(index)); + } + return !baseCursor.isDone() && !matchStatus; + } + + // Helper class to help in returning + // getRow from the dimensionSelector + // This is set in the initialize method + private class SingleIndexInts implements IndexedInts + { + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + //nothing to inspect + } + + @Override + public int size() + { + // After unnest each row will have a single element + return 1; + } + + @Override + public int get(int idx) + { + return indexedIntsForCurrentRow.get(index); + } Review Comment: this seems a bit confusing to pass this through to the underlying rows IndexedInts... size is 1, so get of this method should always be 0, no? I guess i'm worried about silent bugs possible by having it like this instead of the other `SingleIndexInts` which can only possible expose a single value ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +/** + * The cursor to help unnest MVDs without dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'e'] + * + * The baseCursor points to the row ['a', 'b', 'c'] + * while the unnestCursor with each call of advance() moves over individual elements. + * + * unnestCursor.advance() -> 'a' + * unnestCursor.advance() -> 'b' + * unnestCursor.advance() -> 'c' + * unnestCursor.advance() -> 'd' (advances base cursor first) + * unnestCursor.advance() -> 'e' + * + * + * The allowSet if available helps skip over elements which are not in the allowList by moving the cursor to + * the next available match. + * + * The index reference points to the index of each row that the unnest cursor is accessing through currentVal + * The index ranges from 0 to the size of the list in each row which is held in the unnestListForCurrentRow + * + * The needInitialization flag sets up the initial values of unnestListForCurrentRow at the beginning of the segment + * + */ +public class ColumnarValueUnnestCursor implements Cursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + ColumnSelectorFactory baseColumSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = baseColumSelectorFactory; + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + throw new UnsupportedOperationException( + "Dimension selector not applicable for column value selector for column " + outputName); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Float.valueOf((String) value); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Long.valueOf((String) value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + columnValueSelector.inspectRuntimeShape(inspector); + } + + @Override + public boolean isNull() + { + return getObject() == null; + } + + @Nullable + @Override + public Object getObject() + { + if (!unnestListForCurrentRow.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return unnestListForCurrentRow.get(index); + } else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) { + return unnestListForCurrentRow.get(index); + } + } + return null; + } + + @Override + public Class<?> classOfObject() + { + return Object.class; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumSelectorFactory.getColumnCapabilities(column); + } + return baseColumSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && !baseCursor.isDone()) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && !baseCursor.isDoneOrInterrupted()) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + /** + * This method populates the objects when the base cursor moves to the next row + * + * @param firstRun flag to populate one time object references to hold values for unnest cursor + */ + private void getNextRow(boolean firstRun) + { + currentVal = this.columnValueSelector.getObject(); + if (currentVal == null) { + if (!firstRun) { + unnestListForCurrentRow = new ArrayList<>(); + } + unnestListForCurrentRow.add(null); + } else { + if (currentVal instanceof List) { + unnestListForCurrentRow = (List<Object>) currentVal; + } else if (currentVal.getClass().equals(String.class)) { + if (!firstRun) { + unnestListForCurrentRow = new ArrayList<>(); + } + unnestListForCurrentRow.add(currentVal); + } + } + } + + /** + * This initializes the unnest cursor and creates data structures + * to start iterating over the values to be unnested. + * This would also create a bitset for dictonary encoded columns to + * check for matching values specified in allowedList of UnnestDataSource. + */ + private void initialize() + { + this.unnestListForCurrentRow = new ArrayList<>(); + getNextRow(needInitialization); + if (allowSet != null) { + if (!allowSet.isEmpty()) { + if (!allowSet.contains((String) unnestListForCurrentRow.get(index))) { Review Comment: nit, collapse nested ifs ########## processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.filter.AndFilter; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Objects; + +/** + * This class serves as the Storage Adapter for the Unnest Segment and is responsible for creating the cursors + * If the column is dictionary encoded it creates {@link DimensionUnnestCursor} else {@link ColumnarValueUnnestCursor} + * These cursors help navigate the segments for these cases + */ +public class UnnestStorageAdapter implements StorageAdapter +{ + private final StorageAdapter baseAdapter; + private final String dimensionToUnnest; + private final String outputColumnName; + private final LinkedHashSet<String> allowSet; + + public UnnestStorageAdapter( + final StorageAdapter baseAdapter, + final String dimension, + final String outputColumnName, + final LinkedHashSet<String> allowSet + ) + { + this.baseAdapter = baseAdapter; + this.dimensionToUnnest = dimension; + this.outputColumnName = outputColumnName; + this.allowSet = allowSet; + } + + @Override + public Sequence<Cursor> makeCursors( + @Nullable Filter filter, + Interval interval, + VirtualColumns virtualColumns, + Granularity gran, + boolean descending, + @Nullable QueryMetrics<?> queryMetrics + ) + { + Filter updatedFilter; + if (allowSet != null && !allowSet.isEmpty()) { + final InDimFilter allowListFilters; + allowListFilters = new InDimFilter(dimensionToUnnest, allowSet); + if (filter != null) { + updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters)); + } else { + updatedFilter = allowListFilters; + } + } else { + updatedFilter = filter; + } + final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors( + updatedFilter, + interval, + virtualColumns, + gran, + descending, + queryMetrics + ); + + return Sequences.map( + baseCursorSequence, + cursor -> { + Objects.requireNonNull(cursor); + Cursor retVal = cursor; + ColumnCapabilities capabilities = cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest); + if (capabilities.isDictionaryEncoded() == ColumnCapabilities.Capable.TRUE + && capabilities.areDictionaryValuesUnique() == ColumnCapabilities.Capable.TRUE) { Review Comment: capablities returned here are allowed to be null, suggest checking for nulls. Also the statement can be slightly simplified ```capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue()``` ########## processing/src/main/java/org/apache/druid/query/UnnestDataSource.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.UnnestSegmentReference; +import org.apache.druid.utils.JvmUtils; + +import javax.annotation.Nullable; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class UnnestDataSource implements DataSource +{ + private final DataSource base; + private final String column; + private final String outputName; + private final LinkedHashSet<String> allowList; + + private UnnestDataSource( + DataSource dataSource, + String columnName, + String outputName, + LinkedHashSet<String> allowList + ) + { + this.base = dataSource; + this.column = columnName; + this.outputName = outputName; + this.allowList = allowList; + } + + @JsonCreator + public static UnnestDataSource create( + @JsonProperty("base") DataSource base, + @JsonProperty("column") String columnName, + @JsonProperty("outputName") String outputName, + @Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList + ) + { + return new UnnestDataSource(base, columnName, outputName, allowList); + } + + @JsonProperty("base") + public DataSource getBase() + { + return base; + } + + @JsonProperty("column") + public String getColumn() + { + return column; + } + + @JsonProperty("outputName") + public String getOutputName() + { + return outputName; + } + + @JsonProperty("allowList") + public LinkedHashSet<String> getAllowList() + { + return allowList; + } + + @Override + public Set<String> getTableNames() + { + return base.getTableNames(); + } + + @Override + public List<DataSource> getChildren() + { + return ImmutableList.of(base); + } + + @Override + public DataSource withChildren(List<DataSource> children) + { + if (children.size() != 1) { + throw new IAE("Expected [1] child, got [%d]", children.size()); + } + return new UnnestDataSource(children.get(0), column, outputName, allowList); + } + + @Override + public boolean isCacheable(boolean isBroker) + { + return base.isCacheable(isBroker); + } + + @Override + public boolean isGlobal() + { + return base.isGlobal(); + } + + @Override + public boolean isConcrete() + { + return base.isConcrete(); + } + + @Override + public Function<SegmentReference, SegmentReference> createSegmentMapFunction( + Query query, + AtomicLong cpuTimeAccumulator + ) + { + final Function<SegmentReference, SegmentReference> segmentMapFn = base.createSegmentMapFunction( + query, + cpuTimeAccumulator + ); + return JvmUtils.safeAccumulateThreadCpuTime( + cpuTimeAccumulator, + () -> { + if (column == null) { + return segmentMapFn; + } else if (column.isEmpty()) { + return segmentMapFn; + } else { + return + segmentMapFn.andThen( + baseSegment -> + new UnnestSegmentReference( + baseSegment, + column, + outputName, + allowList + ) + ); + } + } + ); + + } + + @Override + public DataSource withUpdatedDataSource(DataSource newSource) + { + return new UnnestDataSource(newSource, column, outputName, allowList); + } + + @Override + public byte[] getCacheKey() + { + return null; Review Comment: i think the column being unnested would need to be part of the cache key since the reason table datasources can get away with an empty cache key is because that is part of the segmentId. However here the results are dependent on what is being unnested, so we can't rely on just the datasource name, so a cache key would need to be non-empty ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +/** + * The cursor to help unnest MVDs without dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'e'] + * + * The baseCursor points to the row ['a', 'b', 'c'] + * while the unnestCursor with each call of advance() moves over individual elements. + * + * unnestCursor.advance() -> 'a' + * unnestCursor.advance() -> 'b' + * unnestCursor.advance() -> 'c' + * unnestCursor.advance() -> 'd' (advances base cursor first) + * unnestCursor.advance() -> 'e' + * + * + * The allowSet if available helps skip over elements which are not in the allowList by moving the cursor to + * the next available match. + * + * The index reference points to the index of each row that the unnest cursor is accessing through currentVal + * The index ranges from 0 to the size of the list in each row which is held in the unnestListForCurrentRow + * + * The needInitialization flag sets up the initial values of unnestListForCurrentRow at the beginning of the segment + * + */ +public class ColumnarValueUnnestCursor implements Cursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + ColumnSelectorFactory baseColumSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = baseColumSelectorFactory; + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + throw new UnsupportedOperationException( + "Dimension selector not applicable for column value selector for column " + outputName); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Float.valueOf((String) value); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Long.valueOf((String) value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + columnValueSelector.inspectRuntimeShape(inspector); + } + + @Override + public boolean isNull() + { + return getObject() == null; + } + + @Nullable + @Override + public Object getObject() + { + if (!unnestListForCurrentRow.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return unnestListForCurrentRow.get(index); + } else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) { + return unnestListForCurrentRow.get(index); + } + } + return null; + } + + @Override + public Class<?> classOfObject() + { + return Object.class; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumSelectorFactory.getColumnCapabilities(column); + } + return baseColumSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && !baseCursor.isDone()) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && !baseCursor.isDoneOrInterrupted()) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + /** + * This method populates the objects when the base cursor moves to the next row + * + * @param firstRun flag to populate one time object references to hold values for unnest cursor + */ + private void getNextRow(boolean firstRun) + { + currentVal = this.columnValueSelector.getObject(); + if (currentVal == null) { + if (!firstRun) { + unnestListForCurrentRow = new ArrayList<>(); + } + unnestListForCurrentRow.add(null); + } else { + if (currentVal instanceof List) { Review Comment: I think you'll want to check for `Object[]` too, since that is the type we have been standardizing `ARRAY` types to deal in ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +/** + * The cursor to help unnest MVDs with dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'c'] + * + * Considering dictionary encoding, these are represented as + * + * 'a' -> 0 + * 'b' -> 1 + * 'c' -> 2 + * 'd' -> 3 + * + * The baseCursor points to the row of IndexedInts [0, 1, 2] + * while the unnestCursor with each call of advance() moves over individual elements. + * + * advance() -> 0 -> 'a' + * advance() -> 1 -> 'b' + * advance() -> 2 -> 'c' + * advance() -> 3 -> 'd' (advances base cursor first) + * advance() -> 2 -> 'c' + * + * Total 5 advance calls above + * + * The allowSet, if available, helps skip over elements that are not in the allowList by moving the cursor to + * the next available match. The hashSet is converted into a bitset (during initialization) for efficiency. + * If allowSet is ['c', 'd'] then the advance moves over to the next available match + * + * advance() -> 2 -> 'c' + * advance() -> 3 -> 'd' (advances base cursor first) + * advance() -> 2 -> 'c' + * + * Total 3 advance calls in this case + * + * The index reference points to the index of each row that the unnest cursor is accessing + * The indexedInts for each row are held in the indexedIntsForCurrentRow object + * + * The needInitialization flag sets up the initial values of indexedIntsForCurrentRow at the beginning of the segment + * + */ +public class DimensionUnnestCursor implements Cursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + private SingleIndexInts indexIntsForRow; + + public DimensionUnnestCursor( + Cursor cursor, + ColumnSelectorFactory baseColumnSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = baseColumnSelectorFactory; + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + this.allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + // This object reference has been created + // during the call to initialize and referenced henceforth + return indexIntsForRow; + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + final int idForLookup = idLookup().lookupId(value); + if (idForLookup < 0) { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return false; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + }; + } + + return new ValueMatcher() + { + @Override + public boolean matches() + { + return idForLookup == indexedIntsForCurrentRow.get(index); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + dimSelector.inspectRuntimeShape(inspector); + } + }; + } + + @Override + public ValueMatcher makeValueMatcher(Predicate<String> predicate) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + dimSelector.inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public Object getObject() + { + if (allowedBitSet.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + } else if (allowedBitSet.get(indexedIntsForCurrentRow.get(index))) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + return null; + } + + @Override + public Class<?> classOfObject() + { + return Object.class; + } + + @Override + public int getValueCardinality() + { + if (!allowedBitSet.isEmpty()) { + return allowedBitSet.cardinality(); + } + return dimSelector.getValueCardinality(); + } + + @Nullable + @Override + public String lookupName(int id) + { + return dimSelector.lookupName(id); + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return dimSelector.nameLookupPossibleInAdvance(); + } + + @Nullable + @Override + public IdLookup idLookup() + { + return dimSelector.idLookup(); + } + }; + } + + /* + This ideally should not be called. If called delegate using the makeDimensionSelector + */ + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumnSelectorFactory.makeColumnValueSelector(columnName); + } + return makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumnSelectorFactory.getColumnCapabilities(column); + } + return baseColumnSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && !baseCursor.isDone()) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && !baseCursor.isDoneOrInterrupted()) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + /** + * This initializes the unnest cursor and creates data structures + * to start iterating over the values to be unnested. + * This would also create a bitset for dictonary encoded columns to + * check for matching values specified in allowedList of UnnestDataSource. + */ + private void initialize() + { + IdLookup idLookup = dimSelector.idLookup(); + this.indexIntsForRow = new SingleIndexInts(); + if (allowSet != null && !allowSet.isEmpty() && idLookup != null) { + for (String s : allowSet) { + if (idLookup.lookupId(s) >= 0) { + allowedBitSet.set(idLookup.lookupId(s)); + } + } + } + if (dimSelector.getObject() != null) { + this.indexedIntsForCurrentRow = dimSelector.getRow(); + } + if (!allowedBitSet.isEmpty()) { + if (!allowedBitSet.get(indexedIntsForCurrentRow.get(index))) { + advance(); + } + } + needInitialization = false; + } + + /** + * This advances the cursor to move to the next element to be unnested. + * When the last element in a row is unnested, it is also responsible + * to move the base cursor to the next row for unnesting and repopulates + * the data structures, created during initialize(), to point to the new row + */ + private void advanceAndUpdate() + { + if (index >= indexedIntsForCurrentRow.size() - 1) { + if (!baseCursor.isDone()) { + baseCursor.advanceUninterruptibly(); + } + if (!baseCursor.isDone()) { + indexedIntsForCurrentRow = dimSelector.getRow(); + } + index = 0; + } else { + ++index; + } + } + + /** + * This advances the unnest cursor in cases where an allowList is specified + * and the current value at the unnest cursor is not in the allowList. + * The cursor in such cases is moved till the next match is found. + * + * @return a boolean to indicate whether to stay or move cursor + */ + private boolean matchAndProceed() + { + boolean matchStatus; + if ((allowSet == null || allowSet.isEmpty()) && allowedBitSet.isEmpty()) { + matchStatus = true; + } else { + matchStatus = allowedBitSet.get(indexedIntsForCurrentRow.get(index)); + } + return !baseCursor.isDone() && !matchStatus; + } + + // Helper class to help in returning + // getRow from the dimensionSelector + // This is set in the initialize method + private class SingleIndexInts implements IndexedInts Review Comment: why not https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/data/SingleIndexedInt.java (apologies if this was discussed somewhere and I missed it....) ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; + +/** + * The cursor to help unnest MVDs without dictionary encoding. Review Comment: this isn't specific to multi-value dimensions, since this also handles `ARRAY` typed selectors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
