This is an automated email from the ASF dual-hosted git repository.

adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c9201ad6583 Minor refactors to processing module (#17136)
c9201ad6583 is described below

commit c9201ad65839994306de544d355e0fb1d9949835
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Mon Oct 7 13:18:35 2024 +0530

    Minor refactors to processing module (#17136)
    
    Refactors a few things.
    
    - Adds SemanticUtils maps to columns.
    - Add some addAll functions to reduce duplication, and for future reuse.
    - Refactor VariantColumnAndIndexSupplier to only take a SmooshedFileMapper 
instead.
    - Refactor LongColumnSerializerV2 to have separate functions for 
serializing a value and null.
---
 .../java/util/common/io/smoosh/FileSmoosher.java   |  8 ++++++
 .../druid/segment/LongColumnSerializerV2.java      | 25 +++++++++++++++---
 .../column/StringUtf8DictionaryEncodedColumn.java  | 15 +++++++++++
 .../data/BlockLayoutColumnarDoublesSupplier.java   |  4 +--
 .../data/BlockLayoutColumnarLongsSupplier.java     | 15 +++++++++++
 .../apache/druid/segment/data/ColumnarDoubles.java |  7 ++++-
 .../segment/data/ColumnarDoublesSerializer.java    |  9 +++++++
 .../data/CompressedVSizeColumnarIntsSupplier.java  |  8 +++++-
 .../segment/data/LongsLongEncodingWriter.java      | 13 ++++++++++
 .../data/SingleValueColumnarIntsSerializer.java    |  7 +++++
 .../nested/CompressedNestedDataComplexColumn.java  | 15 +++++++++++
 .../nested/NestedCommonFormatColumnSerializer.java | 14 +++-------
 .../segment/nested/NestedDataColumnSerializer.java | 23 +++++++++++------
 .../druid/segment/nested/ScalarDoubleColumn.java   | 15 +++++++++++
 .../druid/segment/nested/ScalarLongColumn.java     | 15 +++++++++++
 .../apache/druid/segment/nested/VariantColumn.java | 15 +++++++++++
 .../nested/VariantColumnAndIndexSupplier.java      | 26 +++++++++----------
 .../druid/segment/serde/ColumnSerializerUtils.java | 30 +++++++++++++++-------
 .../serde/NestedCommonFormatColumnPartSerde.java   |  2 +-
 .../data/CompressedColumnarIntsSerializerTest.java |  4 +--
 .../segment/data/CompressedDoublesSerdeTest.java   |  4 +--
 .../segment/nested/VariantColumnSupplierTest.java  |  4 +--
 22 files changed, 219 insertions(+), 59 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
index f4103ec1847..edb7589fb82 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
@@ -31,6 +31,7 @@ import 
org.apache.druid.java.util.common.MappedByteBufferHandler;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.serde.Serializer;
 
 import java.io.BufferedWriter;
 import java.io.Closeable;
@@ -161,6 +162,13 @@ public class FileSmoosher implements Closeable
     }
   }
 
+  public void serializeAs(String name, Serializer serializer) throws 
IOException
+  {
+    try (SmooshedWriter smooshChannel = addWithSmooshedWriter(name, 
serializer.getSerializedSize())) {
+      serializer.writeTo(smooshChannel, this);
+    }
+  }
+
   public SmooshedWriter addWithSmooshedWriter(final String name, final long 
size) throws IOException
   {
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java 
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
index 226cd8bafb0..0611d96e9c0 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
@@ -121,12 +121,29 @@ public class LongColumnSerializerV2 implements 
GenericColumnSerializer<Object>
   public void serialize(ColumnValueSelector<?> selector) throws IOException
   {
     if (selector.isNull()) {
-      nullRowsBitmap.add(rowCount);
-      writer.add(0L);
+      serializeNull();
     } else {
-      writer.add(selector.getLong());
+      serializeValue(selector.getLong());
     }
-    rowCount++;
+  }
+
+  /**
+   * Serializes a null value at the rowCount position, and increments the 
current rowCount.
+   */
+  public void serializeNull() throws IOException
+  {
+    nullRowsBitmap.add(rowCount);
+    writer.add(0L);
+    ++rowCount;
+  }
+
+  /**
+   * Serializes a value of val at the rowCount position, and increments the 
current rowCount.
+   */
+  public void serializeValue(long val) throws IOException
+  {
+    writer.add(val);
+    ++rowCount;
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
index a5966096a6e..bed58a43675 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.column;
 
 import com.google.common.collect.Lists;
+import org.apache.druid.common.semantic.SemanticUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.DruidObjectPredicate;
@@ -52,6 +53,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
 
 /**
  * {@link DictionaryEncodedColumn<String>} for a column which has a {@link 
ByteBuffer} based UTF-8 dictionary.
@@ -62,6 +65,9 @@ import java.util.List;
  */
 public class StringUtf8DictionaryEncodedColumn implements 
DictionaryEncodedColumn<String>, NestedCommonFormatColumn
 {
+  private static final Map<Class<?>, 
Function<StringUtf8DictionaryEncodedColumn, ?>> AS_MAP =
+      SemanticUtils.makeAsMap(StringUtf8DictionaryEncodedColumn.class);
+
   @Nullable
   private final ColumnarInts column;
   @Nullable
@@ -498,6 +504,15 @@ public class StringUtf8DictionaryEncodedColumn implements 
DictionaryEncodedColum
     }
   }
 
+  @SuppressWarnings("unchecked")
+  @Nullable
+  @Override
+  public <T> T as(Class<? extends T> clazz)
+  {
+    //noinspection ReturnOfNull
+    return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+  }
+
   @Override
   public void close() throws IOException
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
index e3699ea3db8..4473fa77d46 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
@@ -112,7 +112,7 @@ public class BlockLayoutColumnarDoublesSupplier implements 
Supplier<ColumnarDoub
     }
 
     @Override
-    public void get(final double[] out, final int start, final int length)
+    public void get(final double[] out, int offset, final int start, final int 
length)
     {
       // division + remainder is optimized by the compiler so keep those 
together
       int bufferNum = start / sizePer;
@@ -129,7 +129,7 @@ public class BlockLayoutColumnarDoublesSupplier implements 
Supplier<ColumnarDoub
         final int oldPosition = doubleBuffer.position();
         try {
           doubleBuffer.position(bufferIndex);
-          doubleBuffer.get(out, p, limit);
+          doubleBuffer.get(out, offset + p, limit);
         }
         finally {
           doubleBuffer.position(oldPosition);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
index 36dbf5f5309..2b46d9aa6e2 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
@@ -21,14 +21,20 @@ package org.apache.druid.segment.data;
 
 import com.google.common.base.Supplier;
 import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.common.semantic.SemanticUtils;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.LongBuffer;
+import java.util.Map;
+import java.util.function.Function;
 
 public class BlockLayoutColumnarLongsSupplier implements 
Supplier<ColumnarLongs>
 {
+  private static final Map<Class<?>, Function<BlockLayoutColumnarLongs, ?>> 
AS_MAP =
+      SemanticUtils.makeAsMap(BlockLayoutColumnarLongs.class);
+
   private final GenericIndexed<ResourceHolder<ByteBuffer>> baseLongBuffers;
 
   // The number of rows in this column.
@@ -222,6 +228,15 @@ public class BlockLayoutColumnarLongsSupplier implements 
Supplier<ColumnarLongs>
       }
     }
 
+    @SuppressWarnings("unchecked")
+    @Nullable
+    @Override
+    public <T> T as(Class<? extends T> clazz)
+    {
+      //noinspection ReturnOfNull
+      return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+    }
+
     @Override
     public String toString()
     {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java 
b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java
index 9e098673da3..4c8ee6f0651 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoubles.java
@@ -46,9 +46,14 @@ public interface ColumnarDoubles extends Closeable
   double get(int index);
 
   default void get(double[] out, int start, int length)
+  {
+    get(out, 0, start, length);
+  }
+
+  default void get(double[] out, int offset, int start, int length)
   {
     for (int i = 0; i < length; i++) {
-      out[i] = get(i + start);
+      out[offset + i] = get(i + start);
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
index f33cdee4c4e..3d3fd4287d1 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
@@ -29,6 +29,15 @@ import java.io.IOException;
 public interface ColumnarDoublesSerializer extends Serializer
 {
   void open() throws IOException;
+
   int size();
+
   void add(double value) throws IOException;
+
+  default void addAll(double[] values, int start, int end) throws IOException
+  {
+    for (int i = start; i < end; ++i) {
+      add(values[i]);
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
index 503d4f65fe9..eaf0b2a47a9 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
@@ -354,6 +354,12 @@ public class CompressedVSizeColumnarIntsSupplier 
implements WritableSupplier<Col
 
     @Override
     public void get(int[] out, int start, int length)
+    {
+      get(out, 0, start, length);
+    }
+
+    @Override
+    public void get(int[] out, int offset, int start, int length)
     {
       int p = 0;
 
@@ -374,7 +380,7 @@ public class CompressedVSizeColumnarIntsSupplier implements 
WritableSupplier<Col
             break;
           }
 
-          out[i] = _get(buffer, bigEndian, index - currBufferStart);
+          out[offset + i] = _get(buffer, bigEndian, index - currBufferStart);
         }
 
         assert i > p;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
index 728a50aa2fc..f2b198b7e7a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java
@@ -74,6 +74,19 @@ public class LongsLongEncodingWriter implements 
CompressionFactory.LongEncodingW
     }
   }
 
+  @Override
+  public void write(long[] values, int offset, int length) throws IOException
+  {
+    if (outBuffer != null) {
+      outBuffer.asLongBuffer().put(values, offset, length);
+      outBuffer.position(outBuffer.position() + (length * Long.BYTES));
+    } else {
+      for (int i = offset; i < length; ++i) {
+        write(values[i]);
+      }
+    }
+  }
+
   @Override
   public void flush()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java
index 5f4fad50814..892549638f8 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/SingleValueColumnarIntsSerializer.java
@@ -27,4 +27,11 @@ import java.io.IOException;
 public abstract class SingleValueColumnarIntsSerializer implements 
ColumnarIntsSerializer
 {
   public abstract void addValue(int val) throws IOException;
+
+  public void addValues(int[] vals, int start, int stop) throws IOException
+  {
+    for (int i = start; i < stop; ++i) {
+      addValue(vals[i]);
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
index 9821155e178..c2e46cec107 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Doubles;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.common.semantic.SemanticUtils;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
@@ -84,10 +85,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 /**
  * Implementation of {@link NestedDataComplexColumn} which uses a {@link 
CompressedVariableSizedBlobColumn} for the
@@ -104,6 +107,9 @@ import java.util.concurrent.ConcurrentHashMap;
 public abstract class CompressedNestedDataComplexColumn<TStringDictionary 
extends Indexed<ByteBuffer>>
     extends NestedDataComplexColumn implements NestedCommonFormatColumn
 {
+  private static final Map<Class<?>, 
Function<CompressedNestedDataComplexColumn, ?>> AS_MAP =
+      SemanticUtils.makeAsMap(CompressedNestedDataComplexColumn.class);
+
   private static final ObjectStrategy<Object> STRATEGY = 
NestedDataComplexTypeSerde.INSTANCE.getObjectStrategy();
   public static final IntTypeStrategy INT_TYPE_STRATEGY = new 
IntTypeStrategy();
   private final ColumnConfig columnConfig;
@@ -903,6 +909,15 @@ public abstract class 
CompressedNestedDataComplexColumn<TStringDictionary extend
     return getColumnHolder(field, fieldIndex).getCapabilities().isNumeric();
   }
 
+  @SuppressWarnings("unchecked")
+  @Nullable
+  @Override
+  public <T> T as(Class<? extends T> clazz)
+  {
+    //noinspection ReturnOfNull
+    return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+  }
+
   private ColumnHolder getColumnHolder(String field, int fieldIndex)
   {
     return columns.computeIfAbsent(fieldIndex, (f) -> 
readNestedFieldColumn(field, fieldIndex));
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
index 50bca997735..ab5b09417c5 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
@@ -19,16 +19,13 @@
 
 package org.apache.druid.segment.nested;
 
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.GenericColumnSerializer;
-import org.apache.druid.segment.data.VByte;
 import org.apache.druid.segment.serde.ColumnSerializerUtils;
 import org.apache.druid.segment.serde.Serializer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
 import java.util.SortedMap;
 
@@ -85,7 +82,7 @@ public abstract class NestedCommonFormatColumnSerializer 
implements GenericColum
     ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(), 
fileName);
   }
 
-  protected void writeV0Header(WritableByteChannel channel, ByteBuffer 
columnNameBuffer) throws IOException
+  public static void writeV0Header(WritableByteChannel channel, ByteBuffer 
columnNameBuffer) throws IOException
   {
     channel.write(ByteBuffer.wrap(new 
byte[]{NestedCommonFormatColumnSerializer.V0}));
     channel.write(columnNameBuffer);
@@ -93,12 +90,7 @@ public abstract class NestedCommonFormatColumnSerializer 
implements GenericColum
 
   protected ByteBuffer computeFilenameBytes()
   {
-    final byte[] bytes = StringUtils.toUtf8(getColumnName());
-    final int length = VByte.computeIntSize(bytes.length);
-    final ByteBuffer buffer = ByteBuffer.allocate(length + 
bytes.length).order(ByteOrder.nativeOrder());
-    VByte.writeInt(buffer, bytes.length);
-    buffer.put(bytes);
-    buffer.flip();
-    return buffer;
+    final String columnName = getColumnName();
+    return ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 6a3405e58fc..cad28eada06 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -24,9 +24,8 @@ import com.google.common.collect.Maps;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@@ -109,7 +108,7 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
           return ProcessedValue.NULL_LITERAL;
         }
         catch (IOException e) {
-          throw new RE(e, "Failed to write field [%s], unhandled value", 
fieldPath);
+          throw DruidException.defensive(e, "Failed to write field [%s], 
unhandled value", fieldPath);
         }
       }
       return ProcessedValue.NULL_LITERAL;
@@ -134,7 +133,7 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
             return ProcessedValue.NULL_LITERAL;
           }
           catch (IOException e) {
-            throw new RE(e, "Failed to write field [%s] value [%s]", 
fieldPath, array);
+            throw DruidException.defensive(e, "Failed to write field [%s] 
value [%s]", fieldPath, array);
           }
         }
       }
@@ -318,7 +317,7 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
               globalDictionaryIdLookup
           );
         } else {
-          throw new ISE("Invalid field type [%s], how did this happen?", type);
+          throw DruidException.defensive("Invalid field type [%s], how did 
this happen?", type);
         }
       } else {
         writer = new VariantFieldColumnWriter(
@@ -343,7 +342,9 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
   ) throws IOException
   {
     if (dictionarySerialized) {
-      throw new ISE("String dictionary already serialized for column [%s], 
cannot serialize again", name);
+      throw DruidException.defensive(
+          "String dictionary already serialized for column [%s], cannot 
serialize again", name
+      );
     }
 
     // null is always 0
@@ -383,11 +384,17 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
 
   @Override
   public void serialize(ColumnValueSelector<? extends StructuredData> 
selector) throws IOException
+  {
+    serialize(StructuredData.wrap(selector.getObject()));
+  }
+
+  public void serialize(StructuredData data) throws IOException
   {
     if (!dictionarySerialized) {
-      throw new ISE("Must serialize value dictionaries before serializing 
values for column [%s]", name);
+      throw DruidException.defensive(
+          "Must serialize value dictionaries before serializing values for 
column [%s]", name
+      );
     }
-    StructuredData data = StructuredData.wrap(selector.getObject());
     if (data == null) {
       nullRowsBitmap.add(rowCount);
     }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
index 689ad57cf7a..6ade80b6e83 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.nested;
 
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.common.semantic.SemanticUtils;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DoubleColumnSelector;
@@ -37,12 +38,17 @@ import org.apache.druid.segment.vector.VectorValueSelector;
 import org.roaringbitmap.PeekableIntIterator;
 
 import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.function.Function;
 
 /**
  * {@link NestedCommonFormatColumn} for {@link ColumnType#DOUBLE}
  */
 public class ScalarDoubleColumn implements NestedCommonFormatColumn
 {
+  private static final Map<Class<?>, Function<ScalarDoubleColumn, ?>> AS_MAP =
+      SemanticUtils.makeAsMap(ScalarDoubleColumn.class);
+
   private final FixedIndexed<Double> doubleDictionary;
   private final ColumnarDoubles valueColumn;
   private final ImmutableBitmap nullValueIndex;
@@ -181,4 +187,13 @@ public class ScalarDoubleColumn implements 
NestedCommonFormatColumn
   {
     valueColumn.close();
   }
+
+  @SuppressWarnings("unchecked")
+  @Nullable
+  @Override
+  public <T> T as(Class<? extends T> clazz)
+  {
+    //noinspection ReturnOfNull
+    return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
index 6002b87c126..8a54ff31278 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.nested;
 
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.common.semantic.SemanticUtils;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.LongColumnSelector;
@@ -37,12 +38,17 @@ import org.apache.druid.segment.vector.VectorValueSelector;
 import org.roaringbitmap.PeekableIntIterator;
 
 import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.function.Function;
 
 /**
  * {@link NestedCommonFormatColumn} for {@link ColumnType#LONG}
  */
 public class ScalarLongColumn implements NestedCommonFormatColumn
 {
+  private static final Map<Class<?>, Function<ScalarLongColumn, ?>> AS_MAP =
+      SemanticUtils.makeAsMap(ScalarLongColumn.class);
+
   private final FixedIndexed<Long> longDictionary;
   private final ColumnarLongs valueColumn;
   private final ImmutableBitmap nullValueIndex;
@@ -182,4 +188,13 @@ public class ScalarLongColumn implements 
NestedCommonFormatColumn
   {
     valueColumn.close();
   }
+
+  @SuppressWarnings("unchecked")
+  @Nullable
+  @Override
+  public <T> T as(Class<? extends T> clazz)
+  {
+    //noinspection ReturnOfNull
+    return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
index 3eef2ac36e7..dfd6307148d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntArraySet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.common.semantic.SemanticUtils;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -69,8 +70,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.function.Function;
 
 /**
  * {@link NestedCommonFormatColumn} for single type array columns, and mixed 
type columns. If {@link #variantTypes}
@@ -80,6 +83,9 @@ import java.util.TreeMap;
 public class VariantColumn<TStringDictionary extends Indexed<ByteBuffer>>
     implements DictionaryEncodedColumn<String>, NestedCommonFormatColumn
 {
+  private static final Map<Class<?>, Function<VariantColumn, ?>> AS_MAP =
+      SemanticUtils.makeAsMap(VariantColumn.class);
+
   private final TStringDictionary stringDictionary;
   private final FixedIndexed<Long> longDictionary;
   private final FixedIndexed<Double> doubleDictionary;
@@ -1008,4 +1014,13 @@ public class VariantColumn<TStringDictionary extends 
Indexed<ByteBuffer>>
       return offset.getCurrentVectorSize();
     }
   }
+
+  @SuppressWarnings("unchecked")
+  @Nullable
+  @Override
+  public <T> T as(Class<? extends T> clazz)
+  {
+    //noinspection ReturnOfNull
+    return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
index 23555b2ea2d..b93235b5bab 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
@@ -31,7 +31,6 @@ import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.math.expr.ExprType;
 import org.apache.druid.math.expr.ExpressionType;
 import org.apache.druid.query.BitmapResultFactory;
-import org.apache.druid.segment.column.ColumnBuilder;
 import org.apache.druid.segment.column.ColumnIndexSupplier;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.StringEncodingStrategies;
@@ -68,7 +67,7 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
       ByteOrder byteOrder,
       BitmapSerdeFactory bitmapSerdeFactory,
       ByteBuffer bb,
-      ColumnBuilder columnBuilder,
+      SmooshedFileMapper fileMapper,
       @Nullable VariantColumnAndIndexSupplier parent
   )
   {
@@ -89,7 +88,6 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
 
     if (version == NestedCommonFormatColumnSerializer.V0) {
       try {
-        final SmooshedFileMapper mapper = columnBuilder.getFileMapper();
         final Supplier<? extends Indexed<ByteBuffer>> stringDictionarySupplier;
         final Supplier<FixedIndexed<Long>> longDictionarySupplier;
         final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
@@ -104,33 +102,33 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
           arrayElementDictionarySupplier = 
parent.arrayElementDictionarySupplier;
         } else {
           final ByteBuffer stringDictionaryBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
-              mapper,
+              fileMapper,
               columnName,
               ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
           );
           final ByteBuffer longDictionaryBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
-              mapper,
+              fileMapper,
               columnName,
               ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
           );
           final ByteBuffer doubleDictionaryBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
-              mapper,
+              fileMapper,
               columnName,
               ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
           );
           final ByteBuffer arrayDictionarybuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
-              mapper,
+              fileMapper,
               columnName,
               ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
           );
           final ByteBuffer arrayElementDictionaryBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
-              mapper,
+              fileMapper,
               columnName,
               ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME
           );
 
           stringDictionarySupplier = 
StringEncodingStrategies.getStringDictionarySupplier(
-              mapper,
+              fileMapper,
               stringDictionaryBuffer,
               byteOrder
           );
@@ -159,7 +157,7 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
         }
 
         final ByteBuffer encodedValueColumn = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
-            mapper,
+            fileMapper,
             columnName,
             ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
         );
@@ -168,24 +166,24 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
             byteOrder
         );
         final ByteBuffer valueIndexBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
-            mapper,
+            fileMapper,
             columnName,
             ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME
         );
         final GenericIndexed<ImmutableBitmap> valueIndexes = 
GenericIndexed.read(
             valueIndexBuffer,
             bitmapSerdeFactory.getObjectStrategy(),
-            columnBuilder.getFileMapper()
+            fileMapper
         );
         final ByteBuffer elementIndexBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
-            mapper,
+            fileMapper,
             columnName,
             ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME
         );
         final GenericIndexed<ImmutableBitmap> arrayElementIndexes = 
GenericIndexed.read(
             elementIndexBuffer,
             bitmapSerdeFactory.getObjectStrategy(),
-            columnBuilder.getFileMapper()
+            fileMapper
         );
 
         return new VariantColumnAndIndexSupplier(
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
 
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
index 8396f07fd35..c8974f5e6f4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
@@ -24,11 +24,13 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
 import org.apache.druid.guice.BuiltInTypesModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
+import org.apache.druid.segment.data.VByte;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 public class ColumnSerializerUtils
 {
@@ -59,23 +61,33 @@ public class ColumnSerializerUtils
   public static void writeInternal(FileSmoosher smoosher, Serializer 
serializer, String columnName, String fileName)
       throws IOException
   {
-    final String internalName = getInternalFileName(columnName, fileName);
-    try (SmooshedWriter smooshChannel = 
smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) {
-      serializer.writeTo(smooshChannel, smoosher);
-    }
+    smoosher.serializeAs(getInternalFileName(columnName, fileName), 
serializer);
   }
 
   public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, 
String columnName, String fileName)
       throws IOException
   {
-    final String internalName = getInternalFileName(columnName, fileName);
-    try (SmooshedWriter smooshChannel = 
smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) {
-      smooshChannel.write(buffer);
-    }
+    smoosher.add(getInternalFileName(columnName, fileName), buffer);
   }
 
   public static String getInternalFileName(String fileNameBase, String field)
   {
     return fileNameBase + "." + field;
   }
+
+  /**
+   * Convert a String to a ByteBuffer with a variable size length prepended to 
it.
+   * @param stringVal the value to store in the ByteBuffer
+   * @return ByteBuffer with the string converted to utf8 bytes and stored 
with a variable size length int prepended
+   */
+  public static ByteBuffer stringToUtf8InVSizeByteBuffer(String stringVal)
+  {
+    final byte[] bytes = StringUtils.toUtf8(stringVal);
+    final int length = VByte.computeIntSize(bytes.length);
+    final ByteBuffer buffer = ByteBuffer.allocate(length + 
bytes.length).order(ByteOrder.nativeOrder());
+    VByte.writeInt(buffer, bytes.length);
+    buffer.put(bytes);
+    buffer.flip();
+    return buffer;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
 
b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
index d923de51d09..6e1f0967c83 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
@@ -281,7 +281,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
           byteOrder,
           bitmapSerdeFactory,
           buffer,
-          builder,
+          builder.getFileMapper(),
           parent == null ? null : (VariantColumnAndIndexSupplier) 
parent.getColumnSupplier()
       );
       ColumnCapabilitiesImpl capabilitiesBuilder = 
builder.getCapabilitiesBuilder();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
index 6e1c4229084..2d213eddb5f 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
@@ -253,9 +253,7 @@ public class CompressedColumnarIntsSerializerTest
     );
 
     writer.open();
-    for (int val : vals) {
-      writer.addValue(val);
-    }
+    writer.addValues(vals, 0, vals.length);
     final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test", 
writer.getSerializedSize());
     writer.writeTo(channel, smoosher);
     channel.close();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
index be472a71ed9..613809b836f 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
@@ -172,9 +172,7 @@ public class CompressedDoublesSerdeTest
     );
     serializer.open();
 
-    for (double value : values) {
-      serializer.add(value);
-    }
+    serializer.addAll(values, 0, values.length);
     Assert.assertEquals(values.length, serializer.size());
 
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index 6aea2ace234..0598552d519 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -317,7 +317,7 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
         ByteOrder.nativeOrder(),
         bitmapSerdeFactory,
         baseBuffer,
-        bob,
+        bob.getFileMapper(),
         null
     );
     try (VariantColumn<?> column = (VariantColumn<?>) supplier.get()) {
@@ -336,7 +336,7 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
         ByteOrder.nativeOrder(),
         bitmapSerdeFactory,
         baseBuffer,
-        bob,
+        bob.getFileMapper(),
         null
     );
     final String expectedReason = "none";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to