This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b645d09c5dc move long and double nested field serialization to later 
phase of serialization (#16769)
b645d09c5dc is described below

commit b645d09c5dc5791e538532efbd44e203e005a9fb
Author: Clint Wylie <cwy...@apache.org>
AuthorDate: Mon Jul 22 21:14:30 2024 -0700

    move long and double nested field serialization to later phase of 
serialization (#16769)
    
    changes:
    * moves value column serializer initialization, call to `writeValue` method 
to `GlobalDictionaryEncodedFieldColumnWriter.writeTo` instead of during 
`GlobalDictionaryEncodedFieldColumnWriter.addValue`. This shift means these 
numeric value columns are now done in the per field section that happens after 
serializing the nested column raw data, so only a single compression buffer and 
temp file will be needed at a time instead of the total number of nested 
literal fields present in the col [...]
---
 .../druid/segment/nested/DictionaryIdLookup.java   | 158 +++++++++++++--------
 .../GlobalDictionaryEncodedFieldColumnWriter.java  |  10 +-
 .../nested/ScalarDoubleFieldColumnWriter.java      |  26 ++--
 .../nested/ScalarLongFieldColumnWriter.java        |  26 ++--
 4 files changed, 128 insertions(+), 92 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
index f4176db220c..6827497f7a6 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
@@ -99,42 +99,27 @@ public final class DictionaryIdLookup implements Closeable
     this.arrayDictionaryWriter = arrayDictionaryWriter;
   }
 
-  public int lookupString(@Nullable String value)
+  @Nullable
+  public Object getDictionaryValue(int id)
   {
-    if (stringDictionary == null) {
-      // GenericIndexed v2 can write to multiple files if the dictionary is 
larger than 2gb, so we use a smooshfile
-      // for strings because of this. if other type dictionary writers could 
potentially use multiple internal files
-      // in the future, we should transition them to using this approach as 
well (or build a combination smoosher and
-      // mapper so that we can have a mutable smoosh)
-      File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, 
StringUtils.urlEncode(name) + "__stringTempSmoosh");
-      stringDictionaryFile = stringSmoosh.toPath();
-      final String fileName = 
NestedCommonFormatColumnSerializer.getInternalFileName(
-          name,
-          NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
-      );
-
-      try (
-          final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
-          final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
-              fileName,
-              stringDictionaryWriter.getSerializedSize()
-          )
-      ) {
-        stringDictionaryWriter.writeTo(writer, smoosher);
-        writer.close();
-        smoosher.close();
-        stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
-        final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
-        stringDictionary = 
StringEncodingStrategies.getStringDictionarySupplier(
-            stringBufferMapper,
-            stringBuffer,
-            ByteOrder.nativeOrder()
-        ).get();
-      }
-      catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+    ensureStringDictionaryLoaded();
+    ensureLongDictionaryLoaded();
+    ensureDoubleDictionaryLoaded();
+    ensureArrayDictionaryLoaded();
+    if (id < longOffset()) {
+      return StringUtils.fromUtf8Nullable(stringDictionary.get(id));
+    } else if (id < doubleOffset()) {
+      return longDictionary.get(id - longOffset());
+    } else if (id < arrayOffset()) {
+      return doubleDictionary.get(id - doubleOffset());
+    } else {
+      return arrayDictionary.get(id - arrayOffset());
     }
+  }
+
+  public int lookupString(@Nullable String value)
+  {
+    ensureStringDictionaryLoaded();
     final byte[] bytes = StringUtils.toUtf8Nullable(value);
     final int index = stringDictionary.indexOf(bytes == null ? null : 
ByteBuffer.wrap(bytes));
     if (index < 0) {
@@ -145,13 +130,7 @@ public final class DictionaryIdLookup implements Closeable
 
   public int lookupLong(@Nullable Long value)
   {
-    if (longDictionary == null) {
-      longDictionaryFile = makeTempFile(name + 
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
-      longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
-      longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, 
ByteOrder.nativeOrder(), Long.BYTES).get();
-      // reset position
-      longBuffer.position(0);
-    }
+    ensureLongDictionaryLoaded();
     final int index = longDictionary.indexOf(value);
     if (index < 0) {
       throw DruidException.defensive("Value not found in column[%s] long 
dictionary", name);
@@ -161,18 +140,7 @@ public final class DictionaryIdLookup implements Closeable
 
   public int lookupDouble(@Nullable Double value)
   {
-    if (doubleDictionary == null) {
-      doubleDictionaryFile = makeTempFile(name + 
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
-      doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
-      doubleDictionary = FixedIndexed.read(
-          doubleBuffer,
-          TypeStrategies.DOUBLE,
-          ByteOrder.nativeOrder(),
-          Double.BYTES
-      ).get();
-      // reset position
-      doubleBuffer.position(0);
-    }
+    ensureDoubleDictionaryLoaded();
     final int index = doubleDictionary.indexOf(value);
     if (index < 0) {
       throw DruidException.defensive("Value not found in column[%s] double 
dictionary", name);
@@ -182,13 +150,7 @@ public final class DictionaryIdLookup implements Closeable
 
   public int lookupArray(@Nullable int[] value)
   {
-    if (arrayDictionary == null) {
-      arrayDictionaryFile = makeTempFile(name + 
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
-      arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
-      arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, 
ByteOrder.nativeOrder()).get();
-      // reset position
-      arrayBuffer.position(0);
-    }
+    ensureArrayDictionaryLoaded();
     final int index = arrayDictionary.indexOf(value);
     if (index < 0) {
       throw DruidException.defensive("Value not found in column[%s] array 
dictionary", name);
@@ -256,6 +218,82 @@ public final class DictionaryIdLookup implements Closeable
     return doubleOffset() + (doubleDictionaryWriter != null ? 
doubleDictionaryWriter.getCardinality() : 0);
   }
 
+  private void ensureStringDictionaryLoaded()
+  {
+    if (stringDictionary == null) {
+      // GenericIndexed v2 can write to multiple files if the dictionary is 
larger than 2gb, so we use a smooshfile
+      // for strings because of this. if other type dictionary writers could 
potentially use multiple internal files
+      // in the future, we should transition them to using this approach as 
well (or build a combination smoosher and
+      // mapper so that we can have a mutable smoosh)
+      File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, 
StringUtils.urlEncode(name) + "__stringTempSmoosh");
+      stringDictionaryFile = stringSmoosh.toPath();
+      final String fileName = 
NestedCommonFormatColumnSerializer.getInternalFileName(
+          name,
+          NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
+      );
+
+      try (
+          final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
+          final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
+              fileName,
+              stringDictionaryWriter.getSerializedSize()
+          )
+      ) {
+        stringDictionaryWriter.writeTo(writer, smoosher);
+        writer.close();
+        smoosher.close();
+        stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
+        final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
+        stringDictionary = 
StringEncodingStrategies.getStringDictionarySupplier(
+            stringBufferMapper,
+            stringBuffer,
+            ByteOrder.nativeOrder()
+        ).get();
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private void ensureLongDictionaryLoaded()
+  {
+    if (longDictionary == null) {
+      longDictionaryFile = makeTempFile(name + 
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
+      longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
+      longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, 
ByteOrder.nativeOrder(), Long.BYTES).get();
+      // reset position
+      longBuffer.position(0);
+    }
+  }
+
+  private void ensureDoubleDictionaryLoaded()
+  {
+    if (doubleDictionary == null) {
+      doubleDictionaryFile = makeTempFile(name + 
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
+      doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
+      doubleDictionary = FixedIndexed.read(
+          doubleBuffer,
+          TypeStrategies.DOUBLE,
+          ByteOrder.nativeOrder(),
+          Double.BYTES
+      ).get();
+      // reset position
+      doubleBuffer.position(0);
+    }
+  }
+
+  private void ensureArrayDictionaryLoaded()
+  {
+    if (arrayDictionary == null && arrayDictionaryWriter != null) {
+      arrayDictionaryFile = makeTempFile(name + 
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
+      arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
+      arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, 
ByteOrder.nativeOrder()).get();
+      // reset position
+      arrayBuffer.position(0);
+    }
+  }
+
   private Path makeTempFile(String name)
   {
     try {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
index aa6a71ae754..d9f00bb2321 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
@@ -117,8 +117,8 @@ public abstract class 
GlobalDictionaryEncodedFieldColumnWriter<T>
   }
 
   /**
-   * Hook to allow implementors the chance to do additional operations during 
{@link #addValue(int, Object)}, such as
-   * writing an additional value column
+   * Hook to allow implementors the chance to do additional operations during 
{@link #writeTo(int, FileSmoosher)}, such
+   * as writing an additional value column
    */
   void writeValue(@Nullable T value) throws IOException
   {
@@ -159,7 +159,6 @@ public abstract class 
GlobalDictionaryEncodedFieldColumnWriter<T>
       localId = localDictionary.add(globalId);
     }
     intermediateValueWriter.write(localId);
-    writeValue(value);
     cursorPosition++;
   }
 
@@ -168,11 +167,9 @@ public abstract class 
GlobalDictionaryEncodedFieldColumnWriter<T>
    */
   private void fillNull(int row) throws IOException
   {
-    final T value = processValue(row, null);
     final int localId = localDictionary.add(0);
     while (cursorPosition < row) {
       intermediateValueWriter.write(localId);
-      writeValue(value);
       cursorPosition++;
     }
   }
@@ -252,6 +249,7 @@ public abstract class 
GlobalDictionaryEncodedFieldColumnWriter<T>
       final int unsortedLocalId = rows.nextInt();
       final int sortedLocalId = unsortedToSorted[unsortedLocalId];
       encodedValueSerializer.addValue(sortedLocalId);
+      writeValue((T) 
globalDictionaryIdLookup.getDictionaryValue(unsortedToGlobal[unsortedLocalId]));
       bitmaps[sortedLocalId].add(rowCount++);
     }
 
@@ -307,7 +305,7 @@ public abstract class 
GlobalDictionaryEncodedFieldColumnWriter<T>
     }
   }
 
-  private void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) 
throws IOException
+  public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) 
throws IOException
   {
     if (indexSpec.getDimensionCompression() != 
CompressionStrategy.UNCOMPRESSED) {
       this.version = DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
index 8ccd528715b..09e8dc121c8 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
@@ -59,12 +59,22 @@ public final class ScalarDoubleFieldColumnWriter extends 
GlobalDictionaryEncoded
   }
 
   @Override
-  public void open() throws IOException
+  void writeValue(@Nullable Double value) throws IOException
+  {
+    if (value == null) {
+      doublesSerializer.add(0.0);
+    } else {
+      doublesSerializer.add(value);
+    }
+  }
+
+  @Override
+  public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) 
throws IOException
   {
-    super.open();
+    super.openColumnSerializer(medium, maxId);
     doublesSerializer = CompressionFactory.getDoubleSerializer(
         fieldName,
-        segmentWriteOutMedium,
+        medium,
         StringUtils.format("%s.double_column", fieldName),
         ByteOrder.nativeOrder(),
         indexSpec.getDimensionCompression(),
@@ -73,16 +83,6 @@ public final class ScalarDoubleFieldColumnWriter extends 
GlobalDictionaryEncoded
     doublesSerializer.open();
   }
 
-  @Override
-  void writeValue(@Nullable Double value) throws IOException
-  {
-    if (value == null) {
-      doublesSerializer.add(0.0);
-    } else {
-      doublesSerializer.add(value);
-    }
-  }
-
   @Override
   void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
index 66b5eca18d9..d9191c4e805 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
@@ -59,12 +59,22 @@ public final class ScalarLongFieldColumnWriter extends 
GlobalDictionaryEncodedFi
   }
 
   @Override
-  public void open() throws IOException
+  void writeValue(@Nullable Long value) throws IOException
+  {
+    if (value == null) {
+      longsSerializer.add(0L);
+    } else {
+      longsSerializer.add(value);
+    }
+  }
+
+  @Override
+  public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) 
throws IOException
   {
-    super.open();
+    super.openColumnSerializer(medium, maxId);
     longsSerializer = CompressionFactory.getLongSerializer(
         fieldName,
-        segmentWriteOutMedium,
+        medium,
         StringUtils.format("%s.long_column", fieldName),
         ByteOrder.nativeOrder(),
         indexSpec.getLongEncoding(),
@@ -74,16 +84,6 @@ public final class ScalarLongFieldColumnWriter extends 
GlobalDictionaryEncodedFi
     longsSerializer.open();
   }
 
-  @Override
-  void writeValue(@Nullable Long value) throws IOException
-  {
-    if (value == null) {
-      longsSerializer.add(0L);
-    } else {
-      longsSerializer.add(value);
-    }
-  }
-
   @Override
   void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to