This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/28.0.0 by this push:
new 72e607c6677 use datasketches-java 4.2.0 (#15257) (#15267)
72e607c6677 is described below
commit 72e607c6677a7e2ac17fe9399dcb844a813e92b7
Author: Laksh Singla <[email protected]>
AuthorDate: Fri Oct 27 13:56:38 2023 +0530
use datasketches-java 4.2.0 (#15257) (#15267)
Backport of : https://github.com/apache/druid/pull/15267
---------
Co-authored-by: Alexander Saydakov
<[email protected]>
Co-authored-by: AlexanderSaydakov
<[email protected]>
Co-authored-by: Gian Merlino <[email protected]>
Co-authored-by: Adarsh Sanjeev <[email protected]>
---
.../kll/KllDoublesSketchAggregatorFactory.java | 4 +-
.../kll/KllFloatsSketchAggregatorFactory.java | 4 +-
.../KllDoublesSketchComplexMetricSerdeTest.java | 6 +-
.../kll/KllFloatsSketchComplexMetricSerdeTest.java | 4 +-
.../statistics/QuantilesSketchKeyCollector.java | 3 +-
.../QuantilesSketchKeyCollectorFactory.java | 60 +++++++++++++--
.../org/apache/druid/msq/exec/MSQSelectTest.java | 28 ++++---
.../resources/SqlMSQStatementResourcePostTest.java | 25 ++++--
.../druid/msq/statistics/ByteRowKeySerdeTest.java | 83 ++++++++++++++++++++
.../distribution/ArrayOfStringTuplesSerDe.java | 81 +++++++++++++++++---
.../distribution/ArrayOfStringsNullSafeSerde.java | 12 ++-
.../batch/parallel/distribution/StringSketch.java | 3 +-
.../distribution/ArrayOfStringTuplesSerDeTest.java | 89 ++++++++++++++++++++++
.../ArrayOfStringsNullSafeSerdeTest.java | 3 +-
licenses.yaml | 2 +-
pom.xml | 2 +-
16 files changed, 359 insertions(+), 50 deletions(-)
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
index 815227adf55..267953e2364 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
@@ -22,6 +22,8 @@ package org.apache.druid.query.aggregation.datasketches.kll;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.datasketches.kll.KllSketch;
+import org.apache.datasketches.kll.KllSketch.SketchType;
import org.apache.druid.query.aggregation.AggregatorFactory;
import
org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -124,7 +126,7 @@ public class KllDoublesSketchAggregatorFactory extends
KllSketchAggregatorFactor
@Override
int getMaxSerializedSizeBytes(final int k, final long n)
{
- return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, true);
+ return KllSketch.getMaxSerializedSizeBytes(k, n,
SketchType.DOUBLES_SKETCH, true);
}
@Override
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
index 9cc61524615..bdd672ab125 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
@@ -22,6 +22,8 @@ package org.apache.druid.query.aggregation.datasketches.kll;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.kll.KllFloatsSketch;
+import org.apache.datasketches.kll.KllSketch;
+import org.apache.datasketches.kll.KllSketch.SketchType;
import org.apache.druid.query.aggregation.AggregatorFactory;
import
org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -124,7 +126,7 @@ public class KllFloatsSketchAggregatorFactory extends
KllSketchAggregatorFactory
@Override
int getMaxSerializedSizeBytes(final int k, final long n)
{
- return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, true);
+ return KllSketch.getMaxSerializedSizeBytes(k, n, SketchType.FLOATS_SKETCH,
true);
}
@Override
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
index d0a26307990..730fb54c541 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
@@ -114,7 +114,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
objectStrategy.fromByteBufferSafe(buf, bytes.length).toByteArray();
// corrupted sketch should fail with a regular java buffer exception, not
all subsets actually fail with the same
- // index out of bounds exceptions, but at least this many do
+ // sketches exceptions, but at least this many do
for (int subset = 3; subset < 24; subset++) {
final byte[] garbage2 = new byte[subset];
for (int i = 0; i < garbage2.length; i++) {
@@ -123,7 +123,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
final ByteBuffer buf2 =
ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
- IndexOutOfBoundsException.class,
+ Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf2,
garbage2.length).toByteArray()
);
}
@@ -132,7 +132,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
final byte[] garbage = new byte[]{0x01, 0x02};
final ByteBuffer buf3 =
ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
- IndexOutOfBoundsException.class,
+ Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf3,
garbage.length).toByteArray()
);
}
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
index 56a39778990..ee505fe65b8 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
@@ -123,7 +123,7 @@ public class KllFloatsSketchComplexMetricSerdeTest
final ByteBuffer buf2 =
ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
- IndexOutOfBoundsException.class,
+ Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf2,
garbage2.length).toByteArray()
);
}
@@ -132,7 +132,7 @@ public class KllFloatsSketchComplexMetricSerdeTest
final byte[] garbage = new byte[]{0x01, 0x02};
final ByteBuffer buf3 =
ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
- IndexOutOfBoundsException.class,
+ Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf3,
garbage.length).toByteArray()
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
index 607265367c2..a20ff40cc87 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
@@ -23,6 +23,7 @@ import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.datasketches.quantiles.ItemsUnion;
+import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
import org.apache.druid.frame.key.ClusterByPartition;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.frame.key.RowKey;
@@ -149,7 +150,7 @@ public class QuantilesSketchKeyCollector implements
KeyCollector<QuantilesSketch
final int numPartitions = Ints.checkedCast(LongMath.divide(sketch.getN(),
targetWeight, RoundingMode.CEILING));
- final byte[][] quantiles =
(sketch.getPartitionBoundaries(numPartitions)).boundaries;
+ final byte[][] quantiles = (sketch.getPartitionBoundaries(numPartitions,
QuantileSearchCriteria.EXCLUSIVE)).boundaries;
final List<ClusterByPartition> partitions = new ArrayList<>();
for (int i = 0; i < numPartitions; i++) {
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
index 3192813cfe1..674dfe15acb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
+import org.apache.datasketches.common.ByteArrayUtil;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.ItemsSketch;
@@ -32,6 +33,7 @@ import org.apache.druid.java.util.common.StringUtils;
import java.io.IOException;
import java.nio.ByteOrder;
+import java.util.Arrays;
import java.util.Comparator;
public class QuantilesSketchKeyCollectorFactory
@@ -91,9 +93,9 @@ public class QuantilesSketchKeyCollectorFactory
return new QuantilesSketchKeyCollector(comparator, sketch,
snapshot.getAverageKeyLength());
}
- private static class ByteRowKeySerde extends ArrayOfItemsSerDe<byte[]>
+ static class ByteRowKeySerde extends ArrayOfItemsSerDe<byte[]>
{
- private static final ByteRowKeySerde INSTANCE = new ByteRowKeySerde();
+ static final ByteRowKeySerde INSTANCE = new ByteRowKeySerde();
private ByteRowKeySerde()
{
@@ -126,22 +128,66 @@ public class QuantilesSketchKeyCollectorFactory
}
@Override
- public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
+ public byte[][] deserializeFromMemory(final Memory mem, long offsetBytes,
final int numItems)
{
final byte[][] keys = new byte[numItems][];
- long keyPosition = (long) Integer.BYTES * numItems;
+ final long start = offsetBytes;
+ offsetBytes += (long) Integer.BYTES * numItems;
for (int i = 0; i < numItems; i++) {
- final int keyLength = mem.getInt((long) Integer.BYTES * i);
+ final int keyLength = mem.getInt(start + (long) Integer.BYTES * i);
final byte[] keyBytes = new byte[keyLength];
- mem.getByteArray(keyPosition, keyBytes, 0, keyLength);
+ mem.getByteArray(offsetBytes, keyBytes, 0, keyLength);
keys[i] = keyBytes;
- keyPosition += keyLength;
+ offsetBytes += keyLength;
}
return keys;
}
+
+ @Override
+ public byte[] serializeToByteArray(final byte[] item)
+ {
+ final byte[] bytes = new byte[Integer.BYTES + item.length];
+ ByteArrayUtil.putIntLE(bytes, 0, item.length);
+ ByteArrayUtil.copyBytes(item, 0, bytes, Integer.BYTES, item.length);
+ return bytes;
+ }
+
+ @Override
+ public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
+ {
+ return deserializeFromMemory(mem, 0, numItems);
+ }
+
+ @Override
+ public int sizeOf(final byte[] item)
+ {
+ return Integer.BYTES + item.length;
+ }
+
+ @Override
+ public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
+ {
+ int length = Integer.BYTES * numItems;
+ for (int i = 0; i < numItems; i++) {
+ length += mem.getInt(offsetBytes + (long) Integer.BYTES * i);
+ }
+ return length;
+ }
+
+ @Override
+ public String toString(final byte[] item)
+ {
+ return Arrays.toString(item);
+ }
+
+ @Override
+ public Class<?> getClassOfT()
+ {
+ return byte[].class;
+ }
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index b63ee479e20..441c98b91d8 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -226,7 +226,7 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
- .rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new
long[]{6})
+ .rows(isPageSizeLimited() ? new long[]{2, 2, 2} : new
long[]{6})
.frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new
long[]{1}),
0, 0, "shuffle"
)
@@ -290,7 +290,9 @@ public class MSQSelectTest extends MSQTestBase
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
- .with().rows(3).frames(1),
+ .with()
+ .rows(isPageSizeLimited() ? new long[]{1L, 2L} : new
long[]{3L})
+ .frames(isPageSizeLimited() ? new long[]{1L, 1L} : new
long[]{1L}),
0, 0, "shuffle"
)
.verifyResults();
@@ -353,7 +355,7 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
- .rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new
long[]{6})
+ .rows(isPageSizeLimited() ? new long[]{2, 2, 2} : new
long[]{6})
.frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new
long[]{1}),
0, 0, "shuffle"
)
@@ -1442,8 +1444,8 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
- .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new
long[]{5L})
- .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new
long[]{1L}),
+ .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} :
new long[]{5L})
+ .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} :
new long[]{1L}),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
@@ -1459,27 +1461,27 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
- .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new
long[]{5L})
- .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new
long[]{1L}),
+ .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} :
new long[]{5L})
+ .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} :
new long[]{1L}),
0, 1, "shuffle"
);
// adding result stage counter checks
if (isPageSizeLimited()) {
selectTester.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
- .with().rows(2, 0, 2),
+ .with().rows(2, 0, 2, 0, 2),
1, 0, "input0"
).setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
- .with().rows(2, 0, 2),
+ .with().rows(2, 0, 2, 0, 2),
1, 0, "output"
).setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
- .with().rows(0, 2, 0, 4),
+ .with().rows(0, 2, 0, 2),
1, 1, "input0"
).setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
- .with().rows(0, 2, 0, 4),
+ .with().rows(0, 2, 0, 2),
1, 1, "output"
);
}
@@ -1600,7 +1602,9 @@ public class MSQSelectTest extends MSQTestBase
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
- .with().rows(3).frames(1),
+ .with()
+ .rows(isPageSizeLimited() ? new long[]{1, 2} : new long[]{3})
+ .frames(isPageSizeLimited() ? new long[]{1, 1} : new
long[]{1}),
0, 0, "shuffle"
)
.verifyResults();
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
index 6650c778555..070b3ae46a7 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
@@ -325,9 +325,9 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
).getEntity();
Assert.assertEquals(ImmutableList.of(
- new PageInformation(0, 1L, 75L, 0, 0),
- new PageInformation(1, 2L, 121L, 0, 1),
- new PageInformation(2, 3L, 164L, 0, 2)
+ new PageInformation(0, 2L, 120L, 0, 0),
+ new PageInformation(1, 2L, 118L, 0, 1),
+ new PageInformation(2, 2L, 122L, 0, 2)
), sqlStatementResult.getResultSetInformation().getPages());
assertExpectedResults(
@@ -348,7 +348,9 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
);
assertExpectedResults(
- "{\"cnt\":1,\"dim1\":\"\"}\n\n",
+ "{\"cnt\":1,\"dim1\":\"\"}\n"
+ + "{\"cnt\":1,\"dim1\":\"10.1\"}\n"
+ + "\n",
resource.doGetResults(
sqlStatementResult.getQueryId(),
0L,
@@ -359,8 +361,7 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
);
assertExpectedResults(
- "{\"cnt\":1,\"dim1\":\"1\"}\n"
- + "{\"cnt\":1,\"dim1\":\"def\"}\n"
+ "{\"cnt\":1,\"dim1\":\"def\"}\n"
+ "{\"cnt\":1,\"dim1\":\"abc\"}\n"
+ "\n",
resource.doGetResults(
@@ -412,7 +413,8 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
new PageInformation(0, 2L, 128L, 0, 0),
new PageInformation(1, 2L, 132L, 1, 1),
new PageInformation(2, 2L, 128L, 0, 2),
- new PageInformation(3, 4L, 228L, 1, 3)
+ new PageInformation(3, 2L, 132L, 1, 3),
+ new PageInformation(4, 2L, 130L, 0, 4)
), sqlStatementResult.getResultSetInformation().getPages());
@@ -457,12 +459,19 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
SqlStatementResourceTest.makeOkRequest()
)));
- Assert.assertEquals(rows.subList(6, 10),
SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
+ Assert.assertEquals(rows.subList(6, 8),
SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
3L,
ResultFormat.ARRAY.name(),
SqlStatementResourceTest.makeOkRequest()
)));
+
+ Assert.assertEquals(rows.subList(8, 10),
SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
+ sqlStatementResult.getQueryId(),
+ 4L,
+ ResultFormat.ARRAY.name(),
+ SqlStatementResourceTest.makeOkRequest()
+ )));
}
@Test
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java
new file mode 100644
index 00000000000..9226ab4c81f
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.key.KeyTestUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ByteRowKeySerdeTest extends InitializedNullHandlingTest
+{
+ private final QuantilesSketchKeyCollectorFactory.ByteRowKeySerde serde =
+ QuantilesSketchKeyCollectorFactory.ByteRowKeySerde.INSTANCE;
+
+ @Test
+ public void testByteArraySerde()
+ {
+ testSerde(new byte[]{1, 5, 9, 3});
+ testSerde(new byte[][]{new byte[]{1, 5}, new byte[]{2, 3}, new byte[]{6,
7}});
+ }
+
+ @Test
+ public void testSerdeWithRowKeys()
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("x", ColumnType.LONG)
+ .add("y", ColumnType.LONG)
+ .build();
+
+ testSerde(KeyTestUtils.createKey(rowSignature, 2, 4).array());
+ }
+
+ @Test
+ public void testEmptyArray()
+ {
+ testSerde(new byte[][]{});
+ testSerde(new byte[][]{new byte[]{1, 5}, new byte[]{}, new byte[]{2, 3}});
+ }
+
+ private void testSerde(byte[] byteRowKey)
+ {
+ byte[] bytes = serde.serializeToByteArray(byteRowKey);
+ Assert.assertEquals(serde.sizeOf(byteRowKey), bytes.length);
+
+ Memory wrappedMemory = Memory.wrap(bytes);
+ Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, 1), bytes.length);
+
+ byte[][] deserialized = serde.deserializeFromMemory(wrappedMemory, 1);
+ Assert.assertArrayEquals(new byte[][]{byteRowKey}, deserialized);
+ }
+
+ private void testSerde(byte[][] inputArray)
+ {
+ byte[] bytes = serde.serializeToByteArray(inputArray);
+ Assert.assertEquals(serde.sizeOf(inputArray), bytes.length);
+
+ Memory wrappedMemory = Memory.wrap(bytes);
+ Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, inputArray.length),
bytes.length);
+
+ byte[][] deserialized = serde.deserializeFromMemory(wrappedMemory,
inputArray.length);
+ Assert.assertArrayEquals(inputArray, deserialized);
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
index e3f76b5b92f..8bba9f65aaf 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
@@ -21,8 +21,9 @@ package
org.apache.druid.indexing.common.task.batch.parallel.distribution;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.common.ByteArrayUtil;
+import org.apache.datasketches.common.Util;
import org.apache.datasketches.memory.Memory;
-import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.memory.internal.UnsafeUtil;
import org.apache.druid.data.input.StringTuple;
@@ -36,7 +37,7 @@ public class ArrayOfStringTuplesSerDe extends
ArrayOfItemsSerDe<StringTuple>
private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new
ArrayOfStringsNullSafeSerde();
@Override
- public byte[] serializeToByteArray(StringTuple[] items)
+ public byte[] serializeToByteArray(final StringTuple[] items)
{
int length = 0;
final byte[][] itemsBytes = new byte[items.length][];
@@ -49,29 +50,27 @@ public class ArrayOfStringTuplesSerDe extends
ArrayOfItemsSerDe<StringTuple>
}
final byte[] bytes = new byte[length];
- final WritableMemory mem = WritableMemory.writableWrap(bytes);
- long offsetBytes = 0;
+ int offsetBytes = 0;
for (int i = 0; i < items.length; i++) {
// Add the number of items in the StringTuple
- mem.putInt(offsetBytes, items[i].size());
+ ByteArrayUtil.putIntLE(bytes, offsetBytes, items[i].size());
offsetBytes += Integer.BYTES;
// Add the size of byte content for the StringTuple
- mem.putInt(offsetBytes, itemsBytes[i].length);
+ ByteArrayUtil.putIntLE(bytes, offsetBytes, itemsBytes[i].length);
offsetBytes += Integer.BYTES;
// Add the byte contents of the StringTuple
- mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
+ ByteArrayUtil.copyBytes(itemsBytes[i], 0, bytes, offsetBytes,
itemsBytes[i].length);
offsetBytes += itemsBytes[i].length;
}
return bytes;
}
@Override
- public StringTuple[] deserializeFromMemory(Memory mem, int numItems)
+ public StringTuple[] deserializeFromMemory(final Memory mem, long
offsetBytes, final int numItems)
{
final StringTuple[] array = new StringTuple[numItems];
- long offsetBytes = 0;
for (int i = 0; i < numItems; i++) {
// Read the number of items in the StringTuple
UnsafeUtil.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
@@ -96,4 +95,68 @@ public class ArrayOfStringTuplesSerDe extends
ArrayOfItemsSerDe<StringTuple>
}
return array;
}
+
+ @Override
+ public byte[] serializeToByteArray(final StringTuple item)
+ {
+ final byte[] itemBytes =
STRINGS_SERDE.serializeToByteArray(item.toArray());
+ final byte[] bytes = new byte[Integer.BYTES * 2 + itemBytes.length];
+ int offsetBytes = 0;
+ ByteArrayUtil.putIntLE(bytes, offsetBytes, item.size());
+ offsetBytes += Integer.BYTES;
+ ByteArrayUtil.putIntLE(bytes, offsetBytes, itemBytes.length);
+ offsetBytes += Integer.BYTES;
+ ByteArrayUtil.copyBytes(itemBytes, 0, bytes, offsetBytes,
itemBytes.length);
+ return bytes;
+ }
+
+ @Override
+ public StringTuple[] deserializeFromMemory(final Memory mem, final int
numItems)
+ {
+ return deserializeFromMemory(mem, 0, numItems);
+ }
+
+ @Override
+ public int sizeOf(final StringTuple item)
+ {
+ // Two integers to store number of strings in the tuple and the size of
the byte array
+ int length = 2 * Integer.BYTES;
+ for (final String s : item.toArray()) {
+ length += STRINGS_SERDE.sizeOf(s);
+ }
+ return length;
+ }
+
+ @Override
+ public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
+ {
+ final long start = offsetBytes;
+ for (int i = 0; i < numItems; i++) {
+ // Skip the number of items in the StringTuple
+ Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+ offsetBytes += Integer.BYTES;
+
+ // Read the size of byte content
+ Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+ final int byteContentSize = mem.getInt(offsetBytes);
+ offsetBytes += Integer.BYTES;
+
+ // Skip the byte content
+ Util.checkBounds(offsetBytes, byteContentSize, mem.getCapacity());
+ offsetBytes += byteContentSize;
+ }
+ return (int) (offsetBytes - start);
+ }
+
+ @Override
+ public String toString(final StringTuple item)
+ {
+ return item.toString();
+ }
+
+ @Override
+ public Class<?> getClassOfT()
+ {
+ return StringTuple.class;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
index b5a8393b172..cd105741717 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel.distribution;
-import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.ArrayOfStringsSerDe;
import org.apache.datasketches.common.Util;
import org.apache.datasketches.memory.Memory;
@@ -35,7 +34,7 @@ import java.nio.charset.StandardCharsets;
* The implementation is the same as {@link ArrayOfStringsSerDe}, except this
* class handles null String values as well.
*/
-public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String>
+public class ArrayOfStringsNullSafeSerde extends ArrayOfStringsSerDe
{
private static final int NULL_STRING_LENGTH = -1;
@@ -106,5 +105,14 @@ public class ArrayOfStringsNullSafeSerde extends
ArrayOfItemsSerDe<String>
return array;
}
+ @Override
+ public int sizeOf(String item)
+ {
+ if (item == null) {
+ return Integer.BYTES;
+ } else {
+ return super.sizeOf(item);
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
index e47fcf78c13..e2794f6b1ca 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
@@ -33,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.quantiles.ItemsSketch;
+import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.timeline.partition.PartitionBoundaries;
@@ -128,7 +129,7 @@ public class StringSketch implements StringDistribution
if (delegate.isEmpty()) {
return new PartitionBoundaries(new StringTuple[0]);
}
- return new
PartitionBoundaries((delegate.getPartitionBoundaries(evenPartitionCount)).boundaries);
+ return new
PartitionBoundaries((delegate.getPartitionBoundaries(evenPartitionCount,
QuantileSearchCriteria.EXCLUSIVE)).boundaries);
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java
new file mode 100644
index 00000000000..9b909948ad0
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel.distribution;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.data.input.StringTuple;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ArrayOfStringTuplesSerDeTest
+{
+
+ private final ArrayOfStringTuplesSerDe serde = new
ArrayOfStringTuplesSerDe();
+
+ @Test
+ public void testStringTupleSerde()
+ {
+ testSerde(StringTuple.create("abc"));
+ testSerde(StringTuple.create("abc", "def", "xyz"));
+ testSerde(new StringTuple[]{StringTuple.create("abc"),
StringTuple.create("def", "efg"), StringTuple.create("z")});
+ }
+
+ @Test
+ public void testEmptyTuple()
+ {
+ testSerde(StringTuple.create());
+ testSerde(new StringTuple[]{});
+ }
+
+ @Test
+ public void testArrayWithNullAndEmptyString()
+ {
+ testSerde(StringTuple.create(""));
+ testSerde(StringTuple.create("abc", "def", ""));
+ testSerde(StringTuple.create("abc", null, "def"));
+ testSerde(new StringTuple[]{StringTuple.create(null, null),
StringTuple.create(null, null)});
+ testSerde(new StringTuple[]{StringTuple.create("", ""),
StringTuple.create("")});
+ testSerde(StringTuple.create("", null, "abc"));
+ }
+
+ @Test
+ public void testSizeOf()
+ {
+ StringTuple stringTuple = StringTuple.create("a", "b");
+ Assert.assertEquals(serde.sizeOf(stringTuple),
serde.serializeToByteArray(stringTuple).length);
+ }
+
+ private void testSerde(StringTuple stringTuple)
+ {
+ byte[] bytes = serde.serializeToByteArray(stringTuple);
+ Assert.assertEquals(serde.sizeOf(stringTuple), bytes.length);
+
+ Memory wrappedMemory = Memory.wrap(bytes);
+ Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, 1), bytes.length);
+
+ StringTuple[] deserialized = serde.deserializeFromMemory(wrappedMemory, 1);
+ Assert.assertArrayEquals(new StringTuple[]{stringTuple}, deserialized);
+ }
+
+ private void testSerde(StringTuple[] inputArray)
+ {
+ byte[] bytes = serde.serializeToByteArray(inputArray);
+ Assert.assertEquals(serde.sizeOf(inputArray), bytes.length);
+
+ Memory wrappedMemory = Memory.wrap(bytes);
+ Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, inputArray.length),
bytes.length);
+
+ StringTuple[] deserialized = serde.deserializeFromMemory(wrappedMemory,
inputArray.length);
+ Assert.assertArrayEquals(inputArray, deserialized);
+ }
+
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
index 927f311e4f9..bce82ee0dbc 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
@@ -84,8 +84,9 @@ public class ArrayOfStringsNullSafeSerdeTest
private void testSerde(String... inputArray)
{
byte[] bytes = serde.serializeToByteArray(inputArray);
+ Assert.assertEquals(serde.sizeOf(inputArray), bytes.length);
String[] deserialized = serde.deserializeFromMemory(Memory.wrap(bytes),
inputArray.length);
- Assert.assertEquals(inputArray, deserialized);
+ Assert.assertArrayEquals(inputArray, deserialized);
}
}
diff --git a/licenses.yaml b/licenses.yaml
index f2c01acaffd..0632e36f7fd 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -3477,7 +3477,7 @@ name: DataSketches
license_category: binary
module: java-core
license_name: Apache License version 2.0
-version: 4.1.0
+version: 4.2.0
libraries:
- org.apache.datasketches: datasketches-java
diff --git a/pom.xml b/pom.xml
index bf7a43e09d8..5fb1f49b2f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,7 @@
default_config.fmpp
-->
<calcite.version>1.35.0</calcite.version>
- <datasketches.version>4.1.0</datasketches.version>
+ <datasketches.version>4.2.0</datasketches.version>
<datasketches.memory.version>2.2.0</datasketches.memory.version>
<derby.version>10.14.2.0</derby.version>
<dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]