This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch nested-object-indexing-1 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a230c647551bf5de33646f58edeb690be2e54186 Author: kishore gopalakrishna <g.kish...@gmail.com> AuthorDate: Tue Feb 19 09:29:40 2019 -0800 Adding support for bytes type in realtime + nested object indexing --- .../org/apache/pinot/common/data/FieldSpec.java | 1 + .../org/apache/pinot/common/data/PinotObject.java | 2 +- .../pinot/common/data/PinotObjectFactory.java | 67 +++++++++ .../org/apache/pinot/core/common/Predicate.java | 28 +--- .../realtime/LLRealtimeSegmentDataManager.java | 3 +- .../indexsegment/mutable/MutableSegmentImpl.java | 149 ++++++++++++++++----- ...VarByteSingleColumnSingleValueReaderWriter.java | 143 ++++++++++++++++++++ .../core/realtime/impl/RealtimeSegmentConfig.java | 18 ++- .../invertedindex/RealtimeInvertedIndexReader.java | 41 +++++- .../creator/impl/inv/LuceneIndexCreator.java | 14 +- .../loader/invertedindex/InvertedIndexHandler.java | 29 +--- .../index/readers/LuceneInvertedIndexReader.java | 9 +- ...yteSingleColumnSingleValueReaderWriterTest.java | 31 +++++ 13 files changed, 433 insertions(+), 102 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java index ad5a5a3..cb43a4c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java @@ -457,6 +457,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, ConfigNodeLife case DOUBLE: return Double.BYTES; case BYTES: + case STRING: // TODO: Metric size is only used for Star-tree generation, which is not supported yet. return MetricFieldSpec.UNDEFINED_METRIC_SIZE; default: diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObject.java b/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObject.java index 3f1ca33..9b68f73 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObject.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObject.java @@ -50,7 +50,7 @@ public interface PinotObject { List<String> getPropertyNames(); /** - * @param fieldName + * @param propertyName * @return the value of the property, it can be a single object or a list of objects. */ Object getProperty(String propertyName); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObjectFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObjectFactory.java new file mode 100644 index 0000000..b15dc58 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObjectFactory.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.data; + +import java.util.List; +import org.apache.pinot.common.data.objects.JSONObject; +import org.apache.pinot.common.data.objects.MapObject; +import org.apache.pinot.common.data.objects.TextObject; +import org.apache.pinot.common.segment.fetcher.HdfsSegmentFetcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Factory class that create PinotObject from bytes + */ +public class PinotObjectFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(PinotObjectFactory.class); + + public static PinotObject create(FieldSpec spec, byte[] buf) { + return create(spec.getObjectType(), buf); + } + + public static PinotObject create(String objectType, byte[] buf) { + + Class<? extends PinotObject> pinotObjectClazz; + PinotObject pinotObject = null; + try { + switch (objectType.toUpperCase()) { + case "MAP": + pinotObjectClazz = MapObject.class; + break; + case "JSON": + pinotObjectClazz = JSONObject.class; + break; + case "TEXT": + pinotObjectClazz = TextObject.class; + break; + default: + // custom object type. + pinotObjectClazz = (Class<? extends PinotObject>) Class.forName(objectType); + } + pinotObject = pinotObjectClazz.getConstructor(new Class[]{}).newInstance(new Object[]{}); + pinotObject.init(buf); + } catch (Exception e) { + LOGGER.error("Error pinot object for type:{}. Skipping inverted index creation", objectType); + } + return pinotObject; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java index 302dd0c..c7fa371 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java @@ -78,7 +78,6 @@ public abstract class Predicate { Predicate predicate; switch (filterType) { -<<<<<<< HEAD case EQUALITY: predicate = new EqPredicate(column, value); break; @@ -97,36 +96,11 @@ public abstract class Predicate { case IN: predicate = new InPredicate(column, value); break; - case TEXT_MATCH: + case MATCHES: predicate = new MatchesPredicate(column, value); break; default: throw new UnsupportedOperationException("Unsupported filterType:" + filterType); -======= - case EQUALITY: - predicate = new EqPredicate(column, value); - break; - case RANGE: - predicate = new RangePredicate(column, value); - break; - case REGEXP_LIKE: - predicate = new RegexpLikePredicate(column, value); - break; - case NOT: - predicate = new NEqPredicate(column, value); - break; - case NOT_IN: - predicate = new NotInPredicate(column, value); - break; - case IN: - predicate = new InPredicate(column, value); - break; - case MATCHES: - predicate = new MatchesPredicate(column, value); - break; - default: - throw new UnsupportedOperationException("Unsupported filterType:" + filterType); ->>>>>>> Enhancing PQL to support MATCHES predicate, can be used for searching within text, map, json and other complex objects } return predicate; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 18b1409..683bfec 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1067,7 +1067,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { .setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata) .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager) .setStatsHistory(realtimeTableDataManager.getStatsHistory()) - .setAggregateMetrics(indexingConfig.isAggregateMetrics()); + .setAggregateMetrics(indexingConfig.isAggregateMetrics()) + .setConsumerDir(realtimeTableDataManager.getConsumerDir()); // Create message decoder _messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java index 1b554e8..6d1170f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java @@ -28,15 +28,21 @@ import java.util.Map; import java.util.Set; import org.apache.pinot.common.config.SegmentPartitionConfig; import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.PinotObject; +import org.apache.pinot.common.data.PinotObjectFactory; import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.segment.SegmentMetadata; import org.apache.pinot.common.utils.NetUtil; import org.apache.pinot.core.data.GenericRow; import org.apache.pinot.core.indexsegment.IndexSegmentUtils; import org.apache.pinot.core.io.reader.DataFileReader; +import org.apache.pinot.core.io.readerwriter.BaseSingleColumnMultiValueReaderWriter; +import org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter; +import org.apache.pinot.core.io.readerwriter.BaseSingleValueMultiColumnReaderWriter; import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager; import org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnMultiValueReaderWriter; import org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnSingleValueReaderWriter; +import org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter; import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionary; @@ -85,6 +91,9 @@ public class MutableSegmentImpl implements MutableSegment { private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = new HashMap<>(); private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>(); private final IdMap<FixedIntArray> _recordIdMap; + private final Set<String> _noDictionaryColumns; + private final Set<String> _invertedIndexColumns; + private boolean _aggregateMetrics; private volatile int _numDocsIndexed = 0; @@ -120,9 +129,9 @@ public class MutableSegmentImpl implements MutableSegment { _logger = LoggerFactory.getLogger(MutableSegmentImpl.class.getName() + "_" + _segmentName + "_" + config.getStreamName()); - Set<String> noDictionaryColumns = config.getNoDictionaryColumns(); + _noDictionaryColumns = config.getNoDictionaryColumns(); - Set<String> invertedIndexColumns = config.getInvertedIndexColumns(); + _invertedIndexColumns = config.getInvertedIndexColumns(); int avgNumMultiValues = config.getAvgNumMultiValues(); // Initialize for each column @@ -134,52 +143,85 @@ public class MutableSegmentImpl implements MutableSegment { // Only support generating raw index on single-value non-string columns that do not have inverted index while // consuming. After consumption completes and the segment is built, all single-value columns can have raw index FieldSpec.DataType dataType = fieldSpec.getDataType(); - int indexColumnSize = FieldSpec.DataType.INT.size(); - if (noDictionaryColumns.contains(column) && fieldSpec.isSingleValueField() - && dataType != FieldSpec.DataType.STRING && !invertedIndexColumns.contains(column)) { - // No dictionary - indexColumnSize = dataType.size(); - } else { + DataFileReader indexReaderWriter; + boolean createDictionary = shouldCreateDictionary(fieldSpec); + if (createDictionary) { int dictionaryColumnSize; if (dataType == FieldSpec.DataType.STRING) { dictionaryColumnSize = _statsHistory.getEstimatedAvgColSize(column); } else { dictionaryColumnSize = dataType.size(); } - String allocationContext = buildAllocationContext(_segmentName, column, V1Constants.Dict.FILE_EXTENSION); + String dictAllocationContext = buildAllocationContext(_segmentName, column, V1Constants.Dict.FILE_EXTENSION); MutableDictionary dictionary = MutableDictionaryFactory .getMutableDictionary(dataType, _offHeap, _memoryManager, dictionaryColumnSize, - Math.min(_statsHistory.getEstimatedCardinality(column), _capacity), allocationContext); + Math.min(_statsHistory.getEstimatedCardinality(column), _capacity), dictAllocationContext); _dictionaryMap.put(column, dictionary); // Even though the column is defined as 'no-dictionary' in the config, we did create dictionary for consuming segment. - noDictionaryColumns.remove(column); + _noDictionaryColumns.remove(column); } - DataFileReader indexReaderWriter; - if (fieldSpec.isSingleValueField()) { - String allocationContext = - buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION); - indexReaderWriter = new FixedByteSingleColumnSingleValueReaderWriter(_capacity, indexColumnSize, _memoryManager, - allocationContext); + int fwdIndexColumnSize; + if (createDictionary) { + //dictionary will always be int + fwdIndexColumnSize = FieldSpec.DataType.INT.size(); + } else { + fwdIndexColumnSize = fieldSpec.getDataType().size(); + } + //The size is FIXED for primitive data types (INT, LONG, FLOAT, DOUBLE) + if (fwdIndexColumnSize > 0) { + if (fieldSpec.isSingleValueField()) { + String svAllocationContext = buildAllocationContext(_segmentName, column, + V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION); + indexReaderWriter = + new FixedByteSingleColumnSingleValueReaderWriter(_capacity, fwdIndexColumnSize, _memoryManager, + svAllocationContext); + } else { + // TODO: Fix the bug in MultiValueReaderWriter + String mvAllocationContext = buildAllocationContext(_segmentName, column, + V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION); + indexReaderWriter = + new FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, avgNumMultiValues, _capacity, + fwdIndexColumnSize, _memoryManager, mvAllocationContext); + } } else { - // TODO: Start with a smaller capacity on FixedByteSingleColumnMultiValueReaderWriter and let it expand - String allocationContext = - buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION); - indexReaderWriter = - new FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, avgNumMultiValues, _capacity, - indexColumnSize, _memoryManager, allocationContext); + //TODO: Get it from stats + int avgColumnSizeInBytes = -1; + //For STRING and BYTES, use varByte implementation + if (fieldSpec.isSingleValueField()) { + String allocationContext = buildAllocationContext(_segmentName, column, + V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION); + indexReaderWriter = + new VarByteSingleColumnSingleValueReaderWriter(_capacity, avgColumnSizeInBytes, _memoryManager, + allocationContext); + } else { + // TODO: Fix the bug in MultiValueReaderWriter + String mvAllocationContext = buildAllocationContext(_segmentName, column, + V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION); + indexReaderWriter = + new FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, avgNumMultiValues, _capacity, + avgColumnSizeInBytes, _memoryManager, mvAllocationContext); + } } _indexReaderWriterMap.put(column, indexReaderWriter); - if (invertedIndexColumns.contains(column)) { - _invertedIndexMap.put(column, new RealtimeInvertedIndexReader()); + if (_invertedIndexColumns.contains(column)) { + _invertedIndexMap.put(column, new RealtimeInvertedIndexReader(fieldSpec, config.getConsumerDir())); } } // Metric aggregation can be enabled only if config is specified, and all dimensions have dictionary, // and no metrics have dictionary. If not enabled, the map returned is null. - _recordIdMap = enableMetricsAggregationIfPossible(config, _schema, noDictionaryColumns); + _recordIdMap = enableMetricsAggregationIfPossible(config, _schema, _noDictionaryColumns); + } + + private boolean shouldCreateDictionary(FieldSpec fieldSpec) { + boolean createDictionary = true; + String columnName = fieldSpec.getName(); + //if user has explicitly specified not to createDictionary + createDictionary = createDictionary && !_noDictionaryColumns.contains(columnName); + return createDictionary; } public SegmentPartitionConfig getSegmentPartitionConfig() { @@ -209,7 +251,7 @@ public class MutableSegmentImpl implements MutableSegment { if (docId == numDocs) { // Add forward and inverted indices for new document. addForwardIndex(row, docId, dictIdMap); - addInvertedIndex(docId, dictIdMap); + addInvertedIndex(row, docId, dictIdMap); // Update number of document indexed at last to make the latest record queryable return _numDocsIndexed++ < _capacity; } else { @@ -270,8 +312,8 @@ public class MutableSegmentImpl implements MutableSegment { String column = fieldSpec.getName(); Object value = row.getValue(column); if (fieldSpec.isSingleValueField()) { - FixedByteSingleColumnSingleValueReaderWriter indexReaderWriter = - (FixedByteSingleColumnSingleValueReaderWriter) _indexReaderWriterMap.get(column); + BaseSingleColumnSingleValueReaderWriter indexReaderWriter = + (BaseSingleColumnSingleValueReaderWriter) _indexReaderWriterMap.get(column); Integer dictId = (Integer) dictIdMap.get(column); if (dictId != null) { // Column with dictionary @@ -292,6 +334,12 @@ public class MutableSegmentImpl implements MutableSegment { case DOUBLE: indexReaderWriter.setDouble(docId, (Double) value); break; + case STRING: + indexReaderWriter.setString(docId, (String) value); + break; + case BYTES: + indexReaderWriter.setBytes(docId, (byte[]) value); + break; default: throw new UnsupportedOperationException( "Unsupported data type: " + dataType + " for no-dictionary column: " + column); @@ -299,19 +347,56 @@ public class MutableSegmentImpl implements MutableSegment { } } else { int[] dictIds = (int[]) dictIdMap.get(column); - ((FixedByteSingleColumnMultiValueReaderWriter) _indexReaderWriterMap.get(column)).setIntArray(docId, dictIds); + if (dictIds != null) { + ((FixedByteSingleColumnMultiValueReaderWriter) _indexReaderWriterMap.get(column)).setIntArray(docId, dictIds); + } else { + BaseSingleColumnMultiValueReaderWriter indexReaderWriter = + (BaseSingleColumnMultiValueReaderWriter) _indexReaderWriterMap.get(column); + // No-dictionary column + FieldSpec.DataType dataType = fieldSpec.getDataType(); + switch (dataType) { + case INT: + indexReaderWriter.setIntArray(docId, (int[]) value); + break; + case LONG: + indexReaderWriter.setLongArray(docId, (long[]) value); + break; + case FLOAT: + indexReaderWriter.setFloatArray(docId, (float[]) value); + break; + case DOUBLE: + indexReaderWriter.setDoubleArray(docId, (double[]) value); + break; + case STRING: + indexReaderWriter.setStringArray(docId, (String[]) value); + break; + case BYTES: + indexReaderWriter.setBytesArray(docId, (byte[][]) value); + break; + default: + throw new UnsupportedOperationException( + "Unsupported data type: " + dataType + " for no-dictionary column: " + column); + } + } } } } - private void addInvertedIndex(int docId, Map<String, Object> dictIdMap) { + private void addInvertedIndex(GenericRow row, int docId, Map<String, Object> dictIdMap) { // Update inverted index at last // NOTE: inverted index have to be updated at last because once it gets updated, the latest record will become // queryable for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) { String column = fieldSpec.getName(); RealtimeInvertedIndexReader invertedIndex = _invertedIndexMap.get(column); - if (invertedIndex != null) { + if (invertedIndex == null) { + continue; + } + if (fieldSpec.getObjectType() != null ) { + byte[] value = (byte[]) row.getValue(column); + PinotObject pinotObject = PinotObjectFactory.create(fieldSpec, value); + invertedIndex.add(docId, pinotObject); + } else { if (fieldSpec.isSingleValueField()) { invertedIndex.add(((Integer) dictIdMap.get(column)), docId); } else { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java new file mode 100644 index 0000000..687391c --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.readerwriter.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter; +import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager; +import org.apache.pinot.core.io.writer.impl.DirectMemoryManager; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class VarByteSingleColumnSingleValueReaderWriter extends BaseSingleColumnSingleValueReaderWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(FixedByteSingleColumnSingleValueReaderWriter.class); + + private static int DEFAULT_COLUMN_SIZE_IN_BYTES = 100; + private final PinotDataBufferMemoryManager _memoryManager; + private final String _allocationContext; + + private FixedByteSingleValueMultiColumnReaderWriter _headerReaderWriter; + private List<PinotDataBuffer> _dataBuffers; + + //capacity of the chunk, it can adjust itself based on the avgSize + private long _chunkCapacityInBytes = 0; + private final int _numRowsPerChunk; + private int _avgColumnSizeInBytes; + + //amount of data written to the current buffer + private int _currentDataSize = 0; + //number of rows written to the current buffer + private int _currentBufferRows = 0; + //PinotDataBuffer where the actual bytes are written to + private PinotDataBuffer _currentDataBuffer; + //index pointing to the element in _dataBuffers + private int _currentDataBufferId; + + /** + * @param numRowsPerChunk Number of rows to pack in one chunk before a new chunk is created. + * @param avgColumnSizeInBytes Max Size of column value in bytes. Set this to -1 if its unknown + * @param memoryManager Memory manager to be used for allocating memory. + * @param allocationContext Allocation allocationContext. + */ + public VarByteSingleColumnSingleValueReaderWriter(int numRowsPerChunk, int avgColumnSizeInBytes, + PinotDataBufferMemoryManager memoryManager, String allocationContext) { + _avgColumnSizeInBytes = avgColumnSizeInBytes; + if (avgColumnSizeInBytes < 0) { + _avgColumnSizeInBytes = DEFAULT_COLUMN_SIZE_IN_BYTES; + } + _numRowsPerChunk = numRowsPerChunk; + _memoryManager = memoryManager; + _allocationContext = allocationContext; + _dataBuffers = new ArrayList<>(); + //bufferId, Offset, length for each row + //we can eliminate the length as an optimization later + _headerReaderWriter = + new FixedByteSingleValueMultiColumnReaderWriter(_numRowsPerChunk, new int[]{4, 4, 4}, _memoryManager, + _allocationContext); + } + + @Override + public void close() + throws IOException { + for (PinotDataBuffer buffer : _dataBuffers) { + buffer.close(); + } + } + + @Override + public void setString(int row, String value) { + setBytes(row, StringUtil.encodeUtf8(value)); + } + + @Override + public String getString(int row) { + return StringUtil.decodeUtf8(getBytes(row)); + } + + @Override + public void setBytes(int row, byte[] buf) { + + if (_currentDataSize + buf.length >= _chunkCapacityInBytes) { + addDataBuffer(); + System.out.println("Added data buffer row:" + row + " numDataBuffers:" + _dataBuffers.size()); + } + try { + _headerReaderWriter.setInt(row, 0, _currentDataBufferId); + _headerReaderWriter.setInt(row, 1, _currentDataSize); + _headerReaderWriter.setInt(row, 2, buf.length); + _currentDataBuffer.readFrom(_currentDataSize, buf); + } catch (Exception e) { + e.printStackTrace(); + } + _currentDataSize = _currentDataSize + buf.length; + } + + @Override + public byte[] getBytes(int row) { + int dataBufferId = _headerReaderWriter.getInt(row, 0); + int startOffset = _headerReaderWriter.getInt(row, 1); + int length = _headerReaderWriter.getInt(row, 2); + byte[] buf = new byte[length]; + PinotDataBuffer dataBuffer = _dataBuffers.get(dataBufferId); + dataBuffer.copyTo(startOffset, buf); + return buf; + } + + private void addDataBuffer() { + //set the avgColumnSize based on the data seen so far. + if (_currentDataSize > 0 && _currentBufferRows > 0) { + _avgColumnSizeInBytes = _currentDataSize / _currentBufferRows; + } + _chunkCapacityInBytes = _numRowsPerChunk * _avgColumnSizeInBytes; + LOGGER.info("Allocating bytes for: {}, dataBufferSize: {} ", _allocationContext, _chunkCapacityInBytes); + PinotDataBuffer dataBuffer = _memoryManager.allocate(_chunkCapacityInBytes, _allocationContext); + _dataBuffers.add(dataBuffer); + _currentDataBuffer = dataBuffer; + //start from 0 + _currentDataBufferId = _dataBuffers.size() - 1; + _currentDataSize = 0; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java index 63ebbbb..6388617 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java @@ -39,12 +39,13 @@ public class RealtimeSegmentConfig { private final RealtimeSegmentStatsHistory _statsHistory; private final SegmentPartitionConfig _segmentPartitionConfig; private final boolean _aggregateMetrics; + private String _consumerDir; private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> invertedIndexColumns, RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, SegmentPartitionConfig segmentPartitionConfig, - boolean aggregateMetrics) { + boolean aggregateMetrics, String consumerDir) { _segmentName = segmentName; _streamName = streamName; _schema = schema; @@ -58,6 +59,7 @@ public class RealtimeSegmentConfig { _statsHistory = statsHistory; _segmentPartitionConfig = segmentPartitionConfig; _aggregateMetrics = aggregateMetrics; + _consumerDir = consumerDir; } public String getSegmentName() { @@ -112,6 +114,10 @@ public class RealtimeSegmentConfig { return _aggregateMetrics; } + public String getConsumerDir() { + return _consumerDir; + } + public static class Builder { private String _segmentName; private String _streamName; @@ -126,6 +132,7 @@ public class RealtimeSegmentConfig { private RealtimeSegmentStatsHistory _statsHistory; private SegmentPartitionConfig _segmentPartitionConfig; private boolean _aggregateMetrics = false; + private String _consumerDir; public Builder() { } @@ -195,10 +202,17 @@ public class RealtimeSegmentConfig { return this; } + public Builder setConsumerDir(String consumerDir) { + _consumerDir = consumerDir; + return this; + } + public RealtimeSegmentConfig build() { return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _capacity, _avgNumMultiValues, _noDictionaryColumns, _invertedIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager, - _statsHistory, _segmentPartitionConfig, _aggregateMetrics); + _statsHistory, _segmentPartitionConfig, _aggregateMetrics, _consumerDir); } + + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java index 4c45850..484eeb3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java @@ -18,24 +18,56 @@ */ package org.apache.pinot.core.realtime.impl.invertedindex; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import org.apache.lucene.index.DirectoryReader; +import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.PinotObject; import org.apache.pinot.core.common.Predicate; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.creator.impl.inv.LuceneIndexCreator; import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; +import org.apache.pinot.core.segment.index.readers.LuceneInvertedIndexReader; import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RealtimeInvertedIndexReader implements InvertedIndexReader<MutableRoaringBitmap> { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeInvertedIndexReader.class); + private final List<ThreadSafeMutableRoaringBitmap> _bitmaps = new ArrayList<>(); private final ReentrantReadWriteLock.ReadLock _readLock; private final ReentrantReadWriteLock.WriteLock _writeLock; + private LuceneInvertedIndexReader _reader; + private LuceneIndexCreator _creator; + boolean isLuceneInitialized; - public RealtimeInvertedIndexReader() { + public RealtimeInvertedIndexReader(FieldSpec spec, String indexDir) { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); _readLock = readWriteLock.readLock(); _writeLock = readWriteLock.writeLock(); + if (spec.getObjectType() != null) { + try { + File outputDirectory = new File(indexDir, spec.getName() + V1Constants.Indexes.LUCENE_INVERTED_INDEX_DIR); + _creator = new LuceneIndexCreator(spec.getObjectType(), outputDirectory); + _reader = new LuceneInvertedIndexReader(DirectoryReader.open(_creator.getIndexDirectory())); + LOGGER.info("Initializing Lucene for column:{}", spec.getName()); + isLuceneInitialized = true; + } catch (IOException e) { + LOGGER.error("Error initializing Lucene for column:{}", spec.getName(), e); + } + } + } + + /** + * Add the document id to the bitmap for the given dictionary id. + */ + public void add(int docId, PinotObject pinotObject) { + _creator.add(pinotObject); } /** @@ -56,11 +88,12 @@ public class RealtimeInvertedIndexReader implements InvertedIndexReader<MutableR _bitmaps.get(dictId).checkAndAdd(docId); } } - + @Override public MutableRoaringBitmap getDocIds(Predicate predicate) { - throw new UnsupportedOperationException("Predicate:"+ predicate + " is not supported"); + return _reader.getDocIds(predicate); } + @Override public MutableRoaringBitmap getDocIds(int dictId) { ThreadSafeMutableRoaringBitmap bitmap; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java index 6622b57..87f9f56 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java @@ -52,17 +52,16 @@ public class LuceneIndexCreator implements InvertedIndexCreator { private final IndexWriter _writer; private final IndexWriterConfig _indexWriterConfig; private final Directory _indexDirectory; - // TODO:Figure out a way to avoid this - boolean _isText = false; + private String _objectType; - public LuceneIndexCreator(ColumnMetadata columnMetadata, File outputDirectory) { + public LuceneIndexCreator(String objectType, File outputDirectory) { + _objectType = objectType; // TODO: Get IndexConfig and set the different analyzer for each field by default we set // StandardAnalyzer and use TextField. This can be expensive and inefficient if all we need is // exact match. See keyword analyzer _analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer()); _indexWriterConfig = new IndexWriterConfig(_analyzer); _indexWriterConfig.setRAMBufferSizeMB(MAX_BUFFER_SIZE_MB); - _isText = "TEXT".equalsIgnoreCase(columnMetadata.getObjectType()); try { _indexDirectory = FSDirectory.open(outputDirectory.toPath()); _writer = new IndexWriter(_indexDirectory, _indexWriterConfig); @@ -72,6 +71,10 @@ public class LuceneIndexCreator implements InvertedIndexCreator { } } + public Directory getIndexDirectory() { + return _indexDirectory; + } + @Override public void add(int dictId) { throw new UnsupportedOperationException( @@ -91,9 +94,6 @@ public class LuceneIndexCreator implements InvertedIndexCreator { List<String> propertyNames = object.getPropertyNames(); for (String propertyName : propertyNames) { Field field; - // TODO: Figure out a way to avoid special casing Text, have a way to get propertyType from - // pinotObject? - // TODO: Handle list field Object value = object.getProperty(propertyName); if (value.getClass().isAssignableFrom(List.class)) { List<?> list = (List<?>) value; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java index b9a2bec..d0e688d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java @@ -26,6 +26,7 @@ import java.util.Set; import javax.annotation.Nonnull; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.data.PinotObject; +import org.apache.pinot.common.data.PinotObjectFactory; import org.apache.pinot.common.data.objects.JSONObject; import org.apache.pinot.common.data.objects.MapObject; import org.apache.pinot.common.data.objects.TextObject; @@ -117,38 +118,16 @@ public class InvertedIndexHandler { LOGGER.info("Creating new lucene based inverted index for segment: {}, column: {}", _segmentName, column); int numDocs = columnMetadata.getTotalDocs(); String objectType = columnMetadata.getObjectType(); - Class<? extends PinotObject> pinotObjectClazz; - PinotObject pinotObject = null; - try { - switch (objectType.toUpperCase()) { - case "MAP": - pinotObjectClazz = MapObject.class; - break; - case "JSON": - pinotObjectClazz = JSONObject.class; - break; - case "TEXT": - pinotObjectClazz = TextObject.class; - break; - default: - // custom object type. - pinotObjectClazz = (Class<? extends PinotObject>) Class.forName(objectType); - } - pinotObject = pinotObjectClazz.getConstructor(new Class[]{}).newInstance(new Object[]{}); - } catch (Exception e) { - LOGGER.error("Error pinot object for type:{}. Skipping inverted index creation", objectType); - return; - } - try (LuceneIndexCreator luceneIndexCreator = new LuceneIndexCreator(columnMetadata, invertedIndexDir)) { + + try (LuceneIndexCreator luceneIndexCreator = new LuceneIndexCreator(objectType, invertedIndexDir)) { try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, _segmentWriter)) { if (columnMetadata.isSingleValue()) { // Single-value column. VarByteChunkSingleValueReader svFwdIndex = (VarByteChunkSingleValueReader) fwdIndex; for (int i = 0; i < numDocs; i++) { byte[] bytes = svFwdIndex.getBytes(i); - - pinotObject.init(bytes); + PinotObject pinotObject = PinotObjectFactory.create(objectType, bytes); luceneIndexCreator.add(pinotObject); } } else { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java index 576bcc4..e4b8850 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java @@ -65,8 +65,7 @@ public class LuceneInvertedIndexReader implements InvertedIndexReader<MutableRoa /** * TODO: Change this to take PinotDataBuffer, work around for now since Lucene needs actual * directory - * @param metadata - * @param indexDir + * @param segmentIndexDir * @param metadata */ public LuceneInvertedIndexReader(File segmentIndexDir, ColumnMetadata metadata) { @@ -83,6 +82,10 @@ public class LuceneInvertedIndexReader implements InvertedIndexReader<MutableRoa } } + public LuceneInvertedIndexReader(IndexReader reader) { + _searcher = new IndexSearcher(reader); + } + @Override public void close() throws IOException { _index.close(); @@ -104,7 +107,7 @@ public class LuceneInvertedIndexReader implements InvertedIndexReader<MutableRoa public MutableRoaringBitmap getDocIds(String queryStr, String options) { QueryParser queryParser = new QueryParser(TextObject.DEFAULT_FIELD, _analyzer); - Query query = null; + Query query; try { query = queryParser.parse(queryStr); } catch (ParseException e) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/io/writer/impl/VarByteSingleColumnSingleValueReaderWriterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/io/writer/impl/VarByteSingleColumnSingleValueReaderWriterTest.java new file mode 100644 index 0000000..0fc33db --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/io/writer/impl/VarByteSingleColumnSingleValueReaderWriterTest.java @@ -0,0 +1,31 @@ +package org.apache.pinot.core.io.writer.impl; + +import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager; +import org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class VarByteSingleColumnSingleValueReaderWriterTest { + + @Test + public void testSimple() { + VarByteSingleColumnSingleValueReaderWriter readerWriter; + PinotDataBufferMemoryManager mem = new DirectMemoryManager("test"); + readerWriter = new VarByteSingleColumnSingleValueReaderWriter(100, -1, mem, "test"); + + for (int i = 0; i < 10000; i++) { + String data = "TEST-" + i; + readerWriter.setBytes(i, data.getBytes()); + } + boolean passed = true; + for (int i = 0; i < 10000; i++) { + byte[] data = readerWriter.getBytes(i); + if (!new String(data).equals("TEST-" + i)) { + passed = false; + break; + } + } + Assert.assertTrue(passed); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org