This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b1c3e38da3 BugFix: Handle BYTES column type for partitioning. (#10110)
b1c3e38da3 is described below
commit b1c3e38da368b3aef543809edc3a4ddc5e44ac96
Author: Mayank Shrivastava <[email protected]>
AuthorDate: Wed Jan 11 13:38:49 2023 -0800
BugFix: Handle BYTES column type for partitioning. (#10110)
The MurmurPartition function does not work with byte[] data type as it
converts the value to String `(value.toString().getBytes(UTF_8)` first.
In the offline flow, we already convert byte[] type to ByteArray for the
MurmurPartition function to handle it correctly. This PR addresses the
real-time ingestion path to also do the same conversion to ByteArray.
---
.../local/indexsegment/mutable/MutableSegmentImpl.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 456b95d275..0efa6395e5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -349,8 +349,8 @@ public class MutableSegmentImpl implements MutableSegment {
// it is beyond the scope of realtime index pluggability to do this
refactoring, so realtime
// text indexes remain statically defined. Revisit this after this
refactoring has been done.
RealtimeLuceneTextIndex luceneTextIndex =
- new RealtimeLuceneTextIndex(column, new
File(config.getConsumerDir()), _segmentName,
- stopWordsInclude, stopWordsExclude);
+ new RealtimeLuceneTextIndex(column, new
File(config.getConsumerDir()), _segmentName, stopWordsInclude,
+ stopWordsExclude);
if (_realtimeLuceneReaders == null) {
_realtimeLuceneReaders = new
RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName);
}
@@ -640,15 +640,20 @@ public class MutableSegmentImpl implements MutableSegment
{
// results, hence the extra care. A metric will already have been
emitted when trying to update the dictionary.
continue;
}
+
FieldSpec fieldSpec = indexContainer._fieldSpec;
+ DataType dataType = fieldSpec.getDataType();
+
if (fieldSpec.isSingleValueField()) {
// Single-value column
// Check partitions
if (column.equals(_partitionColumn)) {
- int partition = _partitionFunction.getPartition(value);
+ Object valueToPartition = (dataType == BYTES) ? new
ByteArray((byte[]) value) : value;
+ int partition = _partitionFunction.getPartition(valueToPartition);
if (indexContainer._partitions.add(partition)) {
- _logger.warn("Found new partition: {} from partition column: {},
value: {}", partition, column, value);
+ _logger.warn("Found new partition: {} from partition column: {},
value: {}", partition, column,
+ valueToPartition);
if (_serverMetrics != null) {
_serverMetrics.addMeteredTableValue(_realtimeTableName,
ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
}
@@ -680,7 +685,6 @@ public class MutableSegmentImpl implements MutableSegment {
// Single-value column with raw index
// Update forward index
- DataType dataType = fieldSpec.getDataType();
switch (dataType.getStoredType()) {
case INT:
forwardIndex.setInt(docId, (Integer) value);
@@ -787,8 +791,6 @@ public class MutableSegmentImpl implements MutableSegment {
} else {
// Raw MV columns
- // Update forward index and numValues info
- DataType dataType = fieldSpec.getDataType();
switch (dataType.getStoredType()) {
case INT:
Object[] values = (Object[]) value;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]