This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c9201ad6583 Minor refactors to processing module (#17136)
c9201ad6583 is described below
commit c9201ad65839994306de544d355e0fb1d9949835
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Mon Oct 7 13:18:35 2024 +0530
Minor refactors to processing module (#17136)
Refactors a few things.
- Adds SemanticUtils maps to columns.
- Add some addAll functions to reduce duplication, and for future reuse.
- Refactor VariantColumnAndIndexSupplier to only take a SmooshedFileMapper
instead.
- Refactor LongColumnSerializerV2 to have separate functions for
serializing a value and null.
---
.../java/util/common/io/smoosh/FileSmoosher.java | 8 ++++++
.../druid/segment/LongColumnSerializerV2.java | 25 +++++++++++++++---
.../column/StringUtf8DictionaryEncodedColumn.java | 15 +++++++++++
.../data/BlockLayoutColumnarDoublesSupplier.java | 4 +--
.../data/BlockLayoutColumnarLongsSupplier.java | 15 +++++++++++
.../apache/druid/segment/data/ColumnarDoubles.java | 7 ++++-
.../segment/data/ColumnarDoublesSerializer.java | 9 +++++++
.../data/CompressedVSizeColumnarIntsSupplier.java | 8 +++++-
.../segment/data/LongsLongEncodingWriter.java | 13 ++++++++++
.../data/SingleValueColumnarIntsSerializer.java | 7 +++++
.../nested/CompressedNestedDataComplexColumn.java | 15 +++++++++++
.../nested/NestedCommonFormatColumnSerializer.java | 14 +++-------
.../segment/nested/NestedDataColumnSerializer.java | 23 +++++++++++------
.../druid/segment/nested/ScalarDoubleColumn.java | 15 +++++++++++
.../druid/segment/nested/ScalarLongColumn.java | 15 +++++++++++
.../apache/druid/segment/nested/VariantColumn.java | 15 +++++++++++
.../nested/VariantColumnAndIndexSupplier.java | 26 +++++++++----------
.../druid/segment/serde/ColumnSerializerUtils.java | 30 +++++++++++++++-------
.../serde/NestedCommonFormatColumnPartSerde.java | 2 +-
.../data/CompressedColumnarIntsSerializerTest.java | 4 +--
.../segment/data/CompressedDoublesSerdeTest.java | 4 +--
.../segment/nested/VariantColumnSupplierTest.java | 4 +--
22 files changed, 219 insertions(+), 59 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
index f4103ec1847..edb7589fb82 100644
---
a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
+++
b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
@@ -31,6 +31,7 @@ import
org.apache.druid.java.util.common.MappedByteBufferHandler;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.serde.Serializer;
import java.io.BufferedWriter;
import java.io.Closeable;
@@ -161,6 +162,13 @@ public class FileSmoosher implements Closeable
}
}
+ public void serializeAs(String name, Serializer serializer) throws
IOException
+ {
+ try (SmooshedWriter smooshChannel = addWithSmooshedWriter(name,
serializer.getSerializedSize())) {
+ serializer.writeTo(smooshChannel, this);
+ }
+ }
+
public SmooshedWriter addWithSmooshedWriter(final String name, final long
size) throws IOException
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
index 226cd8bafb0..0611d96e9c0 100644
---
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
+++
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
@@ -121,12 +121,29 @@ public class LongColumnSerializerV2 implements
GenericColumnSerializer<Object>
public void serialize(ColumnValueSelector<?> selector) throws IOException
{
if (selector.isNull()) {
- nullRowsBitmap.add(rowCount);
- writer.add(0L);
+ serializeNull();
} else {
- writer.add(selector.getLong());
+ serializeValue(selector.getLong());
}
- rowCount++;
+ }
+
+ /**
+ * Serializes a null value at the rowCount position, and increments the
current rowCount.
+ */
+ public void serializeNull() throws IOException
+ {
+ nullRowsBitmap.add(rowCount);
+ writer.add(0L);
+ ++rowCount;
+ }
+
+ /**
+ * Serializes a value of val at the rowCount position, and increments the
current rowCount.
+ */
+ public void serializeValue(long val) throws IOException
+ {
+ writer.add(val);
+ ++rowCount;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
index a5966096a6e..bed58a43675 100644
---
a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.column;
import com.google.common.collect.Lists;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate;
@@ -52,6 +53,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
/**
* {@link DictionaryEncodedColumn<String>} for a column which has a {@link
ByteBuffer} based UTF-8 dictionary.
@@ -62,6 +65,9 @@ import java.util.List;
*/
public class StringUtf8DictionaryEncodedColumn implements
DictionaryEncodedColumn<String>, NestedCommonFormatColumn
{
+ private static final Map<Class<?>,
Function<StringUtf8DictionaryEncodedColumn, ?>> AS_MAP =
+ SemanticUtils.makeAsMap(StringUtf8DictionaryEncodedColumn.class);
+
@Nullable
private final ColumnarInts column;
@Nullable
@@ -498,6 +504,15 @@ public class StringUtf8DictionaryEncodedColumn implements
DictionaryEncodedColum
}
}
+ @SuppressWarnings("unchecked")
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ //noinspection ReturnOfNull
+ return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+ }
+
@Override
public void close() throws IOException
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
index e3699ea3db8..4473fa77d46 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
@@ -112,7 +112,7 @@ public class BlockLayoutColumnarDoublesSupplier implements
Supplier<ColumnarDoub
}
@Override
- public void get(final double[] out, final int start, final int length)
+ public void get(final double[] out, int offset, final int start, final int
length)
{
// division + remainder is optimized by the compiler so keep those
together
int bufferNum = start / sizePer;
@@ -129,7 +129,7 @@ public class BlockLayoutColumnarDoublesSupplier implements
Supplier<ColumnarDoub
final int oldPosition = doubleBuffer.position();
try {
doubleBuffer.position(bufferIndex);
- doubleBuffer.get(out, p, limit);
+ doubleBuffer.get(out, offset + p, limit);
}
finally {
doubleBuffer.position(oldPosition);
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
index 36dbf5f5309..2b46d9aa6e2 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
@@ -21,14 +21,20 @@ package org.apache.druid.segment.data;
import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.common.semantic.SemanticUtils;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
+import java.util.Map;
+import java.util.function.Function;
public class BlockLayoutColumnarLongsSupplier implements
Supplier<ColumnarLongs>
{
+ private static final Map<Class<?>, Function<BlockLayoutColumnarLongs, ?>>
AS_MAP =
+ SemanticUtils.makeAsMap(BlockLayoutColumnarLongs.class);
+
private final GenericIndexed<ResourceHolder<ByteBuffer>> baseLongBuffers;
// The number of rows in this column.
@@ -222,6 +228,15 @@ public class BlockLayoutColumnarLongsSupplier implements
Supplier<ColumnarLongs>
}
}
+ @SuppressWarnings("unchecked")
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ //noinspection ReturnOfNull
+ return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+ }
+
@Override
public String toString()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java
b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java
index 9e098673da3..4c8ee6f0651 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java
@@ -46,9 +46,14 @@ public interface ColumnarDoubles extends Closeable
double get(int index);
default void get(double[] out, int start, int length)
+ {
+ get(out, 0, start, length);
+ }
+
+ default void get(double[] out, int offset, int start, int length)
{
for (int i = 0; i < length; i++) {
- out[i] = get(i + start);
+ out[offset + i] = get(i + start);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
index f33cdee4c4e..3d3fd4287d1 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
@@ -29,6 +29,15 @@ import java.io.IOException;
public interface ColumnarDoublesSerializer extends Serializer
{
void open() throws IOException;
+
int size();
+
void add(double value) throws IOException;
+
+ default void addAll(double[] values, int start, int end) throws IOException
+ {
+ for (int i = start; i < end; ++i) {
+ add(values[i]);
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
index 503d4f65fe9..eaf0b2a47a9 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
@@ -354,6 +354,12 @@ public class CompressedVSizeColumnarIntsSupplier
implements WritableSupplier<Col
@Override
public void get(int[] out, int start, int length)
+ {
+ get(out, 0, start, length);
+ }
+
+ @Override
+ public void get(int[] out, int offset, int start, int length)
{
int p = 0;
@@ -374,7 +380,7 @@ public class CompressedVSizeColumnarIntsSupplier implements
WritableSupplier<Col
break;
}
- out[i] = _get(buffer, bigEndian, index - currBufferStart);
+ out[offset + i] = _get(buffer, bigEndian, index - currBufferStart);
}
assert i > p;
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
index 728a50aa2fc..f2b198b7e7a 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
@@ -74,6 +74,19 @@ public class LongsLongEncodingWriter implements
CompressionFactory.LongEncodingW
}
}
+ @Override
+ public void write(long[] values, int offset, int length) throws IOException
+ {
+ if (outBuffer != null) {
+ outBuffer.asLongBuffer().put(values, offset, length);
+ outBuffer.position(outBuffer.position() + (length * Long.BYTES));
+ } else {
+ for (int i = offset; i < length; ++i) {
+ write(values[i]);
+ }
+ }
+ }
+
@Override
public void flush()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java
index 5f4fad50814..892549638f8 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java
@@ -27,4 +27,11 @@ import java.io.IOException;
public abstract class SingleValueColumnarIntsSerializer implements
ColumnarIntsSerializer
{
public abstract void addValue(int val) throws IOException;
+
+ public void addValues(int[] vals, int start, int stop) throws IOException
+ {
+ for (int i = start; i < stop; ++i) {
+ addValue(vals[i]);
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
index 9821155e178..c2e46cec107 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -84,10 +85,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
/**
* Implementation of {@link NestedDataComplexColumn} which uses a {@link
CompressedVariableSizedBlobColumn} for the
@@ -104,6 +107,9 @@ import java.util.concurrent.ConcurrentHashMap;
public abstract class CompressedNestedDataComplexColumn<TStringDictionary
extends Indexed<ByteBuffer>>
extends NestedDataComplexColumn implements NestedCommonFormatColumn
{
+ private static final Map<Class<?>,
Function<CompressedNestedDataComplexColumn, ?>> AS_MAP =
+ SemanticUtils.makeAsMap(CompressedNestedDataComplexColumn.class);
+
private static final ObjectStrategy<Object> STRATEGY =
NestedDataComplexTypeSerde.INSTANCE.getObjectStrategy();
public static final IntTypeStrategy INT_TYPE_STRATEGY = new
IntTypeStrategy();
private final ColumnConfig columnConfig;
@@ -903,6 +909,15 @@ public abstract class
CompressedNestedDataComplexColumn<TStringDictionary extend
return getColumnHolder(field, fieldIndex).getCapabilities().isNumeric();
}
+ @SuppressWarnings("unchecked")
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ //noinspection ReturnOfNull
+ return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+ }
+
private ColumnHolder getColumnHolder(String field, int fieldIndex)
{
return columns.computeIfAbsent(fieldIndex, (f) ->
readNestedFieldColumn(field, fieldIndex));
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
index 50bca997735..ab5b09417c5 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
@@ -19,16 +19,13 @@
package org.apache.druid.segment.nested;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.GenericColumnSerializer;
-import org.apache.druid.segment.data.VByte;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.Serializer;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.SortedMap;
@@ -85,7 +82,7 @@ public abstract class NestedCommonFormatColumnSerializer
implements GenericColum
ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(),
fileName);
}
- protected void writeV0Header(WritableByteChannel channel, ByteBuffer
columnNameBuffer) throws IOException
+ public static void writeV0Header(WritableByteChannel channel, ByteBuffer
columnNameBuffer) throws IOException
{
channel.write(ByteBuffer.wrap(new
byte[]{NestedCommonFormatColumnSerializer.V0}));
channel.write(columnNameBuffer);
@@ -93,12 +90,7 @@ public abstract class NestedCommonFormatColumnSerializer
implements GenericColum
protected ByteBuffer computeFilenameBytes()
{
- final byte[] bytes = StringUtils.toUtf8(getColumnName());
- final int length = VByte.computeIntSize(bytes.length);
- final ByteBuffer buffer = ByteBuffer.allocate(length +
bytes.length).order(ByteOrder.nativeOrder());
- VByte.writeInt(buffer, bytes.length);
- buffer.put(bytes);
- buffer.flip();
- return buffer;
+ final String columnName = getColumnName();
+ return ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 6a3405e58fc..cad28eada06 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -24,9 +24,8 @@ import com.google.common.collect.Maps;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@@ -109,7 +108,7 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
return ProcessedValue.NULL_LITERAL;
}
catch (IOException e) {
- throw new RE(e, "Failed to write field [%s], unhandled value",
fieldPath);
+ throw DruidException.defensive(e, "Failed to write field [%s],
unhandled value", fieldPath);
}
}
return ProcessedValue.NULL_LITERAL;
@@ -134,7 +133,7 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
return ProcessedValue.NULL_LITERAL;
}
catch (IOException e) {
- throw new RE(e, "Failed to write field [%s] value [%s]",
fieldPath, array);
+ throw DruidException.defensive(e, "Failed to write field [%s]
value [%s]", fieldPath, array);
}
}
}
@@ -318,7 +317,7 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
globalDictionaryIdLookup
);
} else {
- throw new ISE("Invalid field type [%s], how did this happen?", type);
+ throw DruidException.defensive("Invalid field type [%s], how did
this happen?", type);
}
} else {
writer = new VariantFieldColumnWriter(
@@ -343,7 +342,9 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
) throws IOException
{
if (dictionarySerialized) {
- throw new ISE("String dictionary already serialized for column [%s],
cannot serialize again", name);
+ throw DruidException.defensive(
+ "String dictionary already serialized for column [%s], cannot
serialize again", name
+ );
}
// null is always 0
@@ -383,11 +384,17 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
@Override
public void serialize(ColumnValueSelector<? extends StructuredData>
selector) throws IOException
+ {
+ serialize(StructuredData.wrap(selector.getObject()));
+ }
+
+ public void serialize(StructuredData data) throws IOException
{
if (!dictionarySerialized) {
- throw new ISE("Must serialize value dictionaries before serializing
values for column [%s]", name);
+ throw DruidException.defensive(
+ "Must serialize value dictionaries before serializing values for
column [%s]", name
+ );
}
- StructuredData data = StructuredData.wrap(selector.getObject());
if (data == null) {
nullRowsBitmap.add(rowCount);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
index 689ad57cf7a..6ade80b6e83 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.nested;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector;
@@ -37,12 +38,17 @@ import org.apache.druid.segment.vector.VectorValueSelector;
import org.roaringbitmap.PeekableIntIterator;
import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.function.Function;
/**
* {@link NestedCommonFormatColumn} for {@link ColumnType#DOUBLE}
*/
public class ScalarDoubleColumn implements NestedCommonFormatColumn
{
+ private static final Map<Class<?>, Function<ScalarDoubleColumn, ?>> AS_MAP =
+ SemanticUtils.makeAsMap(ScalarDoubleColumn.class);
+
private final FixedIndexed<Double> doubleDictionary;
private final ColumnarDoubles valueColumn;
private final ImmutableBitmap nullValueIndex;
@@ -181,4 +187,13 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
{
valueColumn.close();
}
+
+ @SuppressWarnings("unchecked")
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ //noinspection ReturnOfNull
+ return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
index 6002b87c126..8a54ff31278 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.nested;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.LongColumnSelector;
@@ -37,12 +38,17 @@ import org.apache.druid.segment.vector.VectorValueSelector;
import org.roaringbitmap.PeekableIntIterator;
import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.function.Function;
/**
* {@link NestedCommonFormatColumn} for {@link ColumnType#LONG}
*/
public class ScalarLongColumn implements NestedCommonFormatColumn
{
+ private static final Map<Class<?>, Function<ScalarLongColumn, ?>> AS_MAP =
+ SemanticUtils.makeAsMap(ScalarLongColumn.class);
+
private final FixedIndexed<Long> longDictionary;
private final ColumnarLongs valueColumn;
private final ImmutableBitmap nullValueIndex;
@@ -182,4 +188,13 @@ public class ScalarLongColumn implements
NestedCommonFormatColumn
{
valueColumn.close();
}
+
+ @SuppressWarnings("unchecked")
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ //noinspection ReturnOfNull
+ return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
index 3eef2ac36e7..dfd6307148d 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
@@ -69,8 +70,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Iterator;
+import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.function.Function;
/**
* {@link NestedCommonFormatColumn} for single type array columns, and mixed
type columns. If {@link #variantTypes}
@@ -80,6 +83,9 @@ import java.util.TreeMap;
public class VariantColumn<TStringDictionary extends Indexed<ByteBuffer>>
implements DictionaryEncodedColumn<String>, NestedCommonFormatColumn
{
+ private static final Map<Class<?>, Function<VariantColumn, ?>> AS_MAP =
+ SemanticUtils.makeAsMap(VariantColumn.class);
+
private final TStringDictionary stringDictionary;
private final FixedIndexed<Long> longDictionary;
private final FixedIndexed<Double> doubleDictionary;
@@ -1008,4 +1014,13 @@ public class VariantColumn<TStringDictionary extends
Indexed<ByteBuffer>>
return offset.getCurrentVectorSize();
}
}
+
+ @SuppressWarnings("unchecked")
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ //noinspection ReturnOfNull
+ return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
index 23555b2ea2d..b93235b5bab 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
@@ -31,7 +31,6 @@ import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.BitmapResultFactory;
-import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.StringEncodingStrategies;
@@ -68,7 +67,7 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
ByteOrder byteOrder,
BitmapSerdeFactory bitmapSerdeFactory,
ByteBuffer bb,
- ColumnBuilder columnBuilder,
+ SmooshedFileMapper fileMapper,
@Nullable VariantColumnAndIndexSupplier parent
)
{
@@ -89,7 +88,6 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
if (version == NestedCommonFormatColumnSerializer.V0) {
try {
- final SmooshedFileMapper mapper = columnBuilder.getFileMapper();
final Supplier<? extends Indexed<ByteBuffer>> stringDictionarySupplier;
final Supplier<FixedIndexed<Long>> longDictionarySupplier;
final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
@@ -104,33 +102,33 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
arrayElementDictionarySupplier =
parent.arrayElementDictionarySupplier;
} else {
final ByteBuffer stringDictionaryBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
- mapper,
+ fileMapper,
columnName,
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
);
final ByteBuffer longDictionaryBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
- mapper,
+ fileMapper,
columnName,
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
);
final ByteBuffer doubleDictionaryBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
- mapper,
+ fileMapper,
columnName,
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
);
final ByteBuffer arrayDictionarybuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
- mapper,
+ fileMapper,
columnName,
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
);
final ByteBuffer arrayElementDictionaryBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
- mapper,
+ fileMapper,
columnName,
ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME
);
stringDictionarySupplier =
StringEncodingStrategies.getStringDictionarySupplier(
- mapper,
+ fileMapper,
stringDictionaryBuffer,
byteOrder
);
@@ -159,7 +157,7 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
}
final ByteBuffer encodedValueColumn =
NestedCommonFormatColumnPartSerde.loadInternalFile(
- mapper,
+ fileMapper,
columnName,
ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
);
@@ -168,24 +166,24 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
byteOrder
);
final ByteBuffer valueIndexBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
- mapper,
+ fileMapper,
columnName,
ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME
);
final GenericIndexed<ImmutableBitmap> valueIndexes =
GenericIndexed.read(
valueIndexBuffer,
bitmapSerdeFactory.getObjectStrategy(),
- columnBuilder.getFileMapper()
+ fileMapper
);
final ByteBuffer elementIndexBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
- mapper,
+ fileMapper,
columnName,
ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME
);
final GenericIndexed<ImmutableBitmap> arrayElementIndexes =
GenericIndexed.read(
elementIndexBuffer,
bitmapSerdeFactory.getObjectStrategy(),
- columnBuilder.getFileMapper()
+ fileMapper
);
return new VariantColumnAndIndexSupplier(
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
index 8396f07fd35..c8974f5e6f4 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
@@ -24,11 +24,13 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
+import org.apache.druid.segment.data.VByte;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
public class ColumnSerializerUtils
{
@@ -59,23 +61,33 @@ public class ColumnSerializerUtils
public static void writeInternal(FileSmoosher smoosher, Serializer
serializer, String columnName, String fileName)
throws IOException
{
- final String internalName = getInternalFileName(columnName, fileName);
- try (SmooshedWriter smooshChannel =
smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) {
- serializer.writeTo(smooshChannel, smoosher);
- }
+ smoosher.serializeAs(getInternalFileName(columnName, fileName),
serializer);
}
public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer,
String columnName, String fileName)
throws IOException
{
- final String internalName = getInternalFileName(columnName, fileName);
- try (SmooshedWriter smooshChannel =
smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) {
- smooshChannel.write(buffer);
- }
+ smoosher.add(getInternalFileName(columnName, fileName), buffer);
}
public static String getInternalFileName(String fileNameBase, String field)
{
return fileNameBase + "." + field;
}
+
+ /**
+ * Convert a String to a ByteBuffer with a variable size length prepended to
it.
+ * @param stringVal the value to store in the ByteBuffer
+ * @return ByteBuffer with the string converted to utf8 bytes and stored
with a variable size length int prepended
+ */
+ public static ByteBuffer stringToUtf8InVSizeByteBuffer(String stringVal)
+ {
+ final byte[] bytes = StringUtils.toUtf8(stringVal);
+ final int length = VByte.computeIntSize(bytes.length);
+ final ByteBuffer buffer = ByteBuffer.allocate(length +
bytes.length).order(ByteOrder.nativeOrder());
+ VByte.writeInt(buffer, bytes.length);
+ buffer.put(bytes);
+ buffer.flip();
+ return buffer;
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
index d923de51d09..6e1f0967c83 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
@@ -281,7 +281,7 @@ public class NestedCommonFormatColumnPartSerde implements
ColumnPartSerde
byteOrder,
bitmapSerdeFactory,
buffer,
- builder,
+ builder.getFileMapper(),
parent == null ? null : (VariantColumnAndIndexSupplier)
parent.getColumnSupplier()
);
ColumnCapabilitiesImpl capabilitiesBuilder =
builder.getCapabilitiesBuilder();
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
index 6e1c4229084..2d213eddb5f 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
@@ -253,9 +253,7 @@ public class CompressedColumnarIntsSerializerTest
);
writer.open();
- for (int val : vals) {
- writer.addValue(val);
- }
+ writer.addValues(vals, 0, vals.length);
final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test",
writer.getSerializedSize());
writer.writeTo(channel, smoosher);
channel.close();
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
index be472a71ed9..613809b836f 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
@@ -172,9 +172,7 @@ public class CompressedDoublesSerdeTest
);
serializer.open();
- for (double value : values) {
- serializer.add(value);
- }
+ serializer.addAll(values, 0, values.length);
Assert.assertEquals(values.length, serializer.size());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index 6aea2ace234..0598552d519 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -317,7 +317,7 @@ public class VariantColumnSupplierTest extends
InitializedNullHandlingTest
ByteOrder.nativeOrder(),
bitmapSerdeFactory,
baseBuffer,
- bob,
+ bob.getFileMapper(),
null
);
try (VariantColumn<?> column = (VariantColumn<?>) supplier.get()) {
@@ -336,7 +336,7 @@ public class VariantColumnSupplierTest extends
InitializedNullHandlingTest
ByteOrder.nativeOrder(),
bitmapSerdeFactory,
baseBuffer,
- bob,
+ bob.getFileMapper(),
null
);
final String expectedReason = "none";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]