This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch nested-object-indexing-1
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a230c647551bf5de33646f58edeb690be2e54186
Author: kishore gopalakrishna <g.kish...@gmail.com>
AuthorDate: Tue Feb 19 09:29:40 2019 -0800

    Adding support for bytes type in realtime + nested object indexing
---
 .../org/apache/pinot/common/data/FieldSpec.java    |   1 +
 .../org/apache/pinot/common/data/PinotObject.java  |   2 +-
 .../pinot/common/data/PinotObjectFactory.java      |  67 +++++++++
 .../org/apache/pinot/core/common/Predicate.java    |  28 +---
 .../realtime/LLRealtimeSegmentDataManager.java     |   3 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   | 149 ++++++++++++++++-----
 ...VarByteSingleColumnSingleValueReaderWriter.java | 143 ++++++++++++++++++++
 .../core/realtime/impl/RealtimeSegmentConfig.java  |  18 ++-
 .../invertedindex/RealtimeInvertedIndexReader.java |  41 +++++-
 .../creator/impl/inv/LuceneIndexCreator.java       |  14 +-
 .../loader/invertedindex/InvertedIndexHandler.java |  29 +---
 .../index/readers/LuceneInvertedIndexReader.java   |   9 +-
 ...yteSingleColumnSingleValueReaderWriterTest.java |  31 +++++
 13 files changed, 433 insertions(+), 102 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java 
b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
index ad5a5a3..cb43a4c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
@@ -457,6 +457,7 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, ConfigNodeLife
         case DOUBLE:
           return Double.BYTES;
         case BYTES:
+        case STRING:
           // TODO: Metric size is only used for Star-tree generation, which is 
not supported yet.
           return MetricFieldSpec.UNDEFINED_METRIC_SIZE;
         default:
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObject.java 
b/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObject.java
index 3f1ca33..9b68f73 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObject.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObject.java
@@ -50,7 +50,7 @@ public interface PinotObject {
   List<String> getPropertyNames();
 
   /**
-   * @param fieldName
+   * @param propertyName
    * @return the value of the property, it can be a single object or a list of 
objects.
    */
   Object getProperty(String propertyName);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObjectFactory.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObjectFactory.java
new file mode 100644
index 0000000..b15dc58
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/data/PinotObjectFactory.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.data;
+
+import java.util.List;
+import org.apache.pinot.common.data.objects.JSONObject;
+import org.apache.pinot.common.data.objects.MapObject;
+import org.apache.pinot.common.data.objects.TextObject;
+import org.apache.pinot.common.segment.fetcher.HdfsSegmentFetcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Factory class that create PinotObject from bytes
+ */
+public class PinotObjectFactory {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotObjectFactory.class);
+
+  public static PinotObject create(FieldSpec spec, byte[] buf) {
+    return create(spec.getObjectType(), buf);
+  }
+
+  public static PinotObject create(String objectType, byte[] buf) {
+
+    Class<? extends PinotObject> pinotObjectClazz;
+    PinotObject pinotObject = null;
+    try {
+      switch (objectType.toUpperCase()) {
+        case "MAP":
+          pinotObjectClazz = MapObject.class;
+          break;
+        case "JSON":
+          pinotObjectClazz = JSONObject.class;
+          break;
+        case "TEXT":
+          pinotObjectClazz = TextObject.class;
+          break;
+        default:
+          // custom object type.
+          pinotObjectClazz = (Class<? extends PinotObject>) 
Class.forName(objectType);
+      }
+      pinotObject = pinotObjectClazz.getConstructor(new 
Class[]{}).newInstance(new Object[]{});
+      pinotObject.init(buf);
+    } catch (Exception e) {
+      LOGGER.error("Error pinot object  for type:{}. Skipping inverted index 
creation", objectType);
+    }
+    return pinotObject;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
index 302dd0c..c7fa371 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
@@ -78,7 +78,6 @@ public abstract class Predicate {
 
     Predicate predicate;
     switch (filterType) {
-<<<<<<< HEAD
       case EQUALITY:
         predicate = new EqPredicate(column, value);
         break;
@@ -97,36 +96,11 @@ public abstract class Predicate {
       case IN:
         predicate = new InPredicate(column, value);
         break;
-      case TEXT_MATCH:
+      case MATCHES:
         predicate = new MatchesPredicate(column, value);
         break;
       default:
         throw new UnsupportedOperationException("Unsupported filterType:" + 
filterType);
-=======
-    case EQUALITY:
-      predicate = new EqPredicate(column, value);
-      break;
-    case RANGE:
-      predicate = new RangePredicate(column, value);
-      break;
-    case REGEXP_LIKE:
-      predicate = new RegexpLikePredicate(column, value);
-      break;
-    case NOT:
-      predicate = new NEqPredicate(column, value);
-      break;
-    case NOT_IN:
-      predicate = new NotInPredicate(column, value);
-      break;
-    case IN:
-      predicate = new InPredicate(column, value);
-      break;
-    case MATCHES:
-      predicate = new MatchesPredicate(column, value);
-      break;
-    default:
-      throw new UnsupportedOperationException("Unsupported filterType:" + 
filterType);
->>>>>>> Enhancing PQL to support MATCHES predicate, can be used for searching 
within text, map, json and other complex objects
     }
     return predicate;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 18b1409..683bfec 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1067,7 +1067,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             
.setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
             .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
-            .setAggregateMetrics(indexingConfig.isAggregateMetrics());
+            .setAggregateMetrics(indexingConfig.isAggregateMetrics())
+            .setConsumerDir(realtimeTableDataManager.getConsumerDir());
 
     // Create message decoder
     _messageDecoder = 
StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 1b554e8..6d1170f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -28,15 +28,21 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.PinotObject;
+import org.apache.pinot.common.data.PinotObjectFactory;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.indexsegment.IndexSegmentUtils;
 import org.apache.pinot.core.io.reader.DataFileReader;
+import 
org.apache.pinot.core.io.readerwriter.BaseSingleColumnMultiValueReaderWriter;
+import 
org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
+import 
org.apache.pinot.core.io.readerwriter.BaseSingleValueMultiColumnReaderWriter;
 import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
 import 
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnMultiValueReaderWriter;
 import 
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnSingleValueReaderWriter;
+import 
org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionary;
@@ -85,6 +91,9 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = 
new HashMap<>();
   private final Map<String, BloomFilterReader> _bloomFilterMap = new 
HashMap<>();
   private final IdMap<FixedIntArray> _recordIdMap;
+  private final Set<String> _noDictionaryColumns;
+  private final Set<String> _invertedIndexColumns;
+
   private boolean _aggregateMetrics;
 
   private volatile int _numDocsIndexed = 0;
@@ -120,9 +129,9 @@ public class MutableSegmentImpl implements MutableSegment {
     _logger =
         LoggerFactory.getLogger(MutableSegmentImpl.class.getName() + "_" + 
_segmentName + "_" + config.getStreamName());
 
-    Set<String> noDictionaryColumns = config.getNoDictionaryColumns();
+    _noDictionaryColumns = config.getNoDictionaryColumns();
 
-    Set<String> invertedIndexColumns = config.getInvertedIndexColumns();
+    _invertedIndexColumns = config.getInvertedIndexColumns();
     int avgNumMultiValues = config.getAvgNumMultiValues();
 
     // Initialize for each column
@@ -134,52 +143,85 @@ public class MutableSegmentImpl implements MutableSegment 
{
       // Only support generating raw index on single-value non-string columns 
that do not have inverted index while
       // consuming. After consumption completes and the segment is built, all 
single-value columns can have raw index
       FieldSpec.DataType dataType = fieldSpec.getDataType();
-      int indexColumnSize = FieldSpec.DataType.INT.size();
-      if (noDictionaryColumns.contains(column) && 
fieldSpec.isSingleValueField()
-          && dataType != FieldSpec.DataType.STRING && 
!invertedIndexColumns.contains(column)) {
-        // No dictionary
-        indexColumnSize = dataType.size();
-      } else {
+      DataFileReader indexReaderWriter;
+      boolean createDictionary = shouldCreateDictionary(fieldSpec);
+      if (createDictionary) {
         int dictionaryColumnSize;
         if (dataType == FieldSpec.DataType.STRING) {
           dictionaryColumnSize = _statsHistory.getEstimatedAvgColSize(column);
         } else {
           dictionaryColumnSize = dataType.size();
         }
-        String allocationContext = buildAllocationContext(_segmentName, 
column, V1Constants.Dict.FILE_EXTENSION);
+        String dictAllocationContext = buildAllocationContext(_segmentName, 
column, V1Constants.Dict.FILE_EXTENSION);
         MutableDictionary dictionary = MutableDictionaryFactory
             .getMutableDictionary(dataType, _offHeap, _memoryManager, 
dictionaryColumnSize,
-                Math.min(_statsHistory.getEstimatedCardinality(column), 
_capacity), allocationContext);
+                Math.min(_statsHistory.getEstimatedCardinality(column), 
_capacity), dictAllocationContext);
         _dictionaryMap.put(column, dictionary);
 
         // Even though the column is defined as 'no-dictionary' in the config, 
we did create dictionary for consuming segment.
-        noDictionaryColumns.remove(column);
+        _noDictionaryColumns.remove(column);
       }
 
-      DataFileReader indexReaderWriter;
-      if (fieldSpec.isSingleValueField()) {
-        String allocationContext =
-            buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
-        indexReaderWriter = new 
FixedByteSingleColumnSingleValueReaderWriter(_capacity, indexColumnSize, 
_memoryManager,
-            allocationContext);
+      int fwdIndexColumnSize;
+      if (createDictionary) {
+        //dictionary will always be int
+        fwdIndexColumnSize = FieldSpec.DataType.INT.size();
+      } else {
+        fwdIndexColumnSize = fieldSpec.getDataType().size();
+      }
+      //The size is FIXED for primitive data types (INT, LONG, FLOAT, DOUBLE)
+      if (fwdIndexColumnSize > 0) {
+        if (fieldSpec.isSingleValueField()) {
+          String svAllocationContext = buildAllocationContext(_segmentName, 
column,
+              V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+          indexReaderWriter =
+              new FixedByteSingleColumnSingleValueReaderWriter(_capacity, 
fwdIndexColumnSize, _memoryManager,
+                  svAllocationContext);
+        } else {
+          // TODO: Fix the bug in MultiValueReaderWriter
+          String mvAllocationContext = buildAllocationContext(_segmentName, 
column,
+              V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+          indexReaderWriter =
+              new 
FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, 
avgNumMultiValues, _capacity,
+                  fwdIndexColumnSize, _memoryManager, mvAllocationContext);
+        }
       } else {
-        // TODO: Start with a smaller capacity on 
FixedByteSingleColumnMultiValueReaderWriter and let it expand
-        String allocationContext =
-            buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
-        indexReaderWriter =
-            new 
FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, 
avgNumMultiValues, _capacity,
-                indexColumnSize, _memoryManager, allocationContext);
+        //TODO: Get it from stats
+        int avgColumnSizeInBytes = -1;
+        //For STRING and BYTES, use varByte implementation
+        if (fieldSpec.isSingleValueField()) {
+          String allocationContext = buildAllocationContext(_segmentName, 
column,
+              V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+          indexReaderWriter =
+              new VarByteSingleColumnSingleValueReaderWriter(_capacity, 
avgColumnSizeInBytes, _memoryManager,
+                  allocationContext);
+        } else {
+          // TODO: Fix the bug in MultiValueReaderWriter
+          String mvAllocationContext = buildAllocationContext(_segmentName, 
column,
+              V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+          indexReaderWriter =
+              new 
FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, 
avgNumMultiValues, _capacity,
+                  avgColumnSizeInBytes, _memoryManager, mvAllocationContext);
+        }
       }
       _indexReaderWriterMap.put(column, indexReaderWriter);
 
-      if (invertedIndexColumns.contains(column)) {
-        _invertedIndexMap.put(column, new RealtimeInvertedIndexReader());
+      if (_invertedIndexColumns.contains(column)) {
+        _invertedIndexMap.put(column, new 
RealtimeInvertedIndexReader(fieldSpec, config.getConsumerDir()));
       }
     }
 
     // Metric aggregation can be enabled only if config is specified, and all 
dimensions have dictionary,
     // and no metrics have dictionary. If not enabled, the map returned is 
null.
-    _recordIdMap = enableMetricsAggregationIfPossible(config, _schema, 
noDictionaryColumns);
+    _recordIdMap = enableMetricsAggregationIfPossible(config, _schema, 
_noDictionaryColumns);
+  }
+
+  private boolean shouldCreateDictionary(FieldSpec fieldSpec) {
+    boolean createDictionary = true;
+    String columnName = fieldSpec.getName();
+    //if user has explicitly specified not to createDictionary
+    createDictionary = createDictionary && 
!_noDictionaryColumns.contains(columnName);
+    return createDictionary;
   }
 
   public SegmentPartitionConfig getSegmentPartitionConfig() {
@@ -209,7 +251,7 @@ public class MutableSegmentImpl implements MutableSegment {
     if (docId == numDocs) {
       // Add forward and inverted indices for new document.
       addForwardIndex(row, docId, dictIdMap);
-      addInvertedIndex(docId, dictIdMap);
+      addInvertedIndex(row, docId, dictIdMap);
       // Update number of document indexed at last to make the latest record 
queryable
       return _numDocsIndexed++ < _capacity;
     } else {
@@ -270,8 +312,8 @@ public class MutableSegmentImpl implements MutableSegment {
       String column = fieldSpec.getName();
       Object value = row.getValue(column);
       if (fieldSpec.isSingleValueField()) {
-        FixedByteSingleColumnSingleValueReaderWriter indexReaderWriter =
-            (FixedByteSingleColumnSingleValueReaderWriter) 
_indexReaderWriterMap.get(column);
+        BaseSingleColumnSingleValueReaderWriter indexReaderWriter =
+            (BaseSingleColumnSingleValueReaderWriter) 
_indexReaderWriterMap.get(column);
         Integer dictId = (Integer) dictIdMap.get(column);
         if (dictId != null) {
           // Column with dictionary
@@ -292,6 +334,12 @@ public class MutableSegmentImpl implements MutableSegment {
             case DOUBLE:
               indexReaderWriter.setDouble(docId, (Double) value);
               break;
+            case STRING:
+              indexReaderWriter.setString(docId, (String) value);
+              break;
+            case BYTES:
+              indexReaderWriter.setBytes(docId, (byte[]) value);
+              break;
             default:
               throw new UnsupportedOperationException(
                   "Unsupported data type: " + dataType + " for no-dictionary 
column: " + column);
@@ -299,19 +347,56 @@ public class MutableSegmentImpl implements MutableSegment 
{
         }
       } else {
         int[] dictIds = (int[]) dictIdMap.get(column);
-        ((FixedByteSingleColumnMultiValueReaderWriter) 
_indexReaderWriterMap.get(column)).setIntArray(docId, dictIds);
+        if (dictIds != null) {
+          ((FixedByteSingleColumnMultiValueReaderWriter) 
_indexReaderWriterMap.get(column)).setIntArray(docId, dictIds);
+        } else {
+          BaseSingleColumnMultiValueReaderWriter indexReaderWriter =
+              (BaseSingleColumnMultiValueReaderWriter) 
_indexReaderWriterMap.get(column);
+          // No-dictionary column
+          FieldSpec.DataType dataType = fieldSpec.getDataType();
+          switch (dataType) {
+            case INT:
+              indexReaderWriter.setIntArray(docId, (int[]) value);
+              break;
+            case LONG:
+              indexReaderWriter.setLongArray(docId, (long[]) value);
+              break;
+            case FLOAT:
+              indexReaderWriter.setFloatArray(docId, (float[]) value);
+              break;
+            case DOUBLE:
+              indexReaderWriter.setDoubleArray(docId, (double[]) value);
+              break;
+            case STRING:
+              indexReaderWriter.setStringArray(docId, (String[]) value);
+              break;
+            case BYTES:
+              indexReaderWriter.setBytesArray(docId, (byte[][]) value);
+              break;
+            default:
+              throw new UnsupportedOperationException(
+                  "Unsupported data type: " + dataType + " for no-dictionary 
column: " + column);
+          }
+        }
       }
     }
   }
 
-  private void addInvertedIndex(int docId, Map<String, Object> dictIdMap) {
+  private void addInvertedIndex(GenericRow row, int docId, Map<String, Object> 
dictIdMap) {
     // Update inverted index at last
     // NOTE: inverted index have to be updated at last because once it gets 
updated, the latest record will become
     // queryable
     for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
       String column = fieldSpec.getName();
       RealtimeInvertedIndexReader invertedIndex = 
_invertedIndexMap.get(column);
-      if (invertedIndex != null) {
+      if (invertedIndex == null) {
+        continue;
+      }
+      if (fieldSpec.getObjectType() != null ) {
+        byte[] value = (byte[]) row.getValue(column);
+        PinotObject pinotObject = PinotObjectFactory.create(fieldSpec, value);
+        invertedIndex.add(docId, pinotObject);
+      } else {
         if (fieldSpec.isSingleValueField()) {
           invertedIndex.add(((Integer) dictIdMap.get(column)), docId);
         } else {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
new file mode 100644
index 0000000..687391c
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.readerwriter.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.utils.StringUtil;
+import 
org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class VarByteSingleColumnSingleValueReaderWriter extends 
BaseSingleColumnSingleValueReaderWriter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FixedByteSingleColumnSingleValueReaderWriter.class);
+
+  private static int DEFAULT_COLUMN_SIZE_IN_BYTES = 100;
+  private final PinotDataBufferMemoryManager _memoryManager;
+  private final String _allocationContext;
+
+  private FixedByteSingleValueMultiColumnReaderWriter _headerReaderWriter;
+  private List<PinotDataBuffer> _dataBuffers;
+
+  //capacity of the chunk, it can adjust itself based on the avgSize
+  private long _chunkCapacityInBytes = 0;
+  private final int _numRowsPerChunk;
+  private int _avgColumnSizeInBytes;
+
+  //amount of data written to the current buffer
+  private int _currentDataSize = 0;
+  //number of rows written to the current buffer
+  private int _currentBufferRows = 0;
+  //PinotDataBuffer where the actual bytes are written to
+  private PinotDataBuffer _currentDataBuffer;
+  //index pointing to the element in _dataBuffers
+  private int _currentDataBufferId;
+
+  /**
+   * @param numRowsPerChunk Number of rows to pack in one chunk before a new 
chunk is created.
+   * @param avgColumnSizeInBytes Max Size of column value in bytes. Set this 
to -1 if its unknown
+   * @param memoryManager Memory manager to be used for allocating memory.
+   * @param allocationContext Allocation allocationContext.
+   */
+  public VarByteSingleColumnSingleValueReaderWriter(int numRowsPerChunk, int 
avgColumnSizeInBytes,
+      PinotDataBufferMemoryManager memoryManager, String allocationContext) {
+    _avgColumnSizeInBytes = avgColumnSizeInBytes;
+    if (avgColumnSizeInBytes < 0) {
+      _avgColumnSizeInBytes = DEFAULT_COLUMN_SIZE_IN_BYTES;
+    }
+    _numRowsPerChunk = numRowsPerChunk;
+    _memoryManager = memoryManager;
+    _allocationContext = allocationContext;
+    _dataBuffers = new ArrayList<>();
+    //bufferId, Offset, length for each row
+    //we can eliminate the length as an optimization later
+    _headerReaderWriter =
+        new FixedByteSingleValueMultiColumnReaderWriter(_numRowsPerChunk, new 
int[]{4, 4, 4}, _memoryManager,
+            _allocationContext);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    for (PinotDataBuffer buffer : _dataBuffers) {
+      buffer.close();
+    }
+  }
+
+  @Override
+  public void setString(int row, String value) {
+    setBytes(row, StringUtil.encodeUtf8(value));
+  }
+
+  @Override
+  public String getString(int row) {
+    return StringUtil.decodeUtf8(getBytes(row));
+  }
+
+  @Override
+  public void setBytes(int row, byte[] buf) {
+
+    if (_currentDataSize + buf.length >= _chunkCapacityInBytes) {
+      addDataBuffer();
+      System.out.println("Added data buffer row:" + row + " numDataBuffers:" + 
_dataBuffers.size());
+    }
+    try {
+      _headerReaderWriter.setInt(row, 0, _currentDataBufferId);
+      _headerReaderWriter.setInt(row, 1, _currentDataSize);
+      _headerReaderWriter.setInt(row, 2, buf.length);
+      _currentDataBuffer.readFrom(_currentDataSize, buf);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    _currentDataSize = _currentDataSize + buf.length;
+  }
+
+  @Override
+  public byte[] getBytes(int row) {
+    int dataBufferId = _headerReaderWriter.getInt(row, 0);
+    int startOffset = _headerReaderWriter.getInt(row, 1);
+    int length = _headerReaderWriter.getInt(row, 2);
+    byte[] buf = new byte[length];
+    PinotDataBuffer dataBuffer = _dataBuffers.get(dataBufferId);
+    dataBuffer.copyTo(startOffset, buf);
+    return buf;
+  }
+
+  private void addDataBuffer() {
+    //set the avgColumnSize based on the data seen so far.
+    if (_currentDataSize > 0 && _currentBufferRows > 0) {
+      _avgColumnSizeInBytes = _currentDataSize / _currentBufferRows;
+    }
+    _chunkCapacityInBytes = _numRowsPerChunk * _avgColumnSizeInBytes;
+    LOGGER.info("Allocating bytes for: {}, dataBufferSize: {} ", 
_allocationContext, _chunkCapacityInBytes);
+    PinotDataBuffer dataBuffer = 
_memoryManager.allocate(_chunkCapacityInBytes, _allocationContext);
+    _dataBuffers.add(dataBuffer);
+    _currentDataBuffer = dataBuffer;
+    //start from 0
+    _currentDataBufferId = _dataBuffers.size() - 1;
+    _currentDataSize = 0;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
index 63ebbbb..6388617 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
@@ -39,12 +39,13 @@ public class RealtimeSegmentConfig {
   private final RealtimeSegmentStatsHistory _statsHistory;
   private final SegmentPartitionConfig _segmentPartitionConfig;
   private final boolean _aggregateMetrics;
+  private String _consumerDir;
 
   private RealtimeSegmentConfig(String segmentName, String streamName, Schema 
schema, int capacity,
       int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> 
invertedIndexColumns,
       RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, 
PinotDataBufferMemoryManager memoryManager,
       RealtimeSegmentStatsHistory statsHistory, SegmentPartitionConfig 
segmentPartitionConfig,
-      boolean aggregateMetrics) {
+      boolean aggregateMetrics, String consumerDir) {
     _segmentName = segmentName;
     _streamName = streamName;
     _schema = schema;
@@ -58,6 +59,7 @@ public class RealtimeSegmentConfig {
     _statsHistory = statsHistory;
     _segmentPartitionConfig = segmentPartitionConfig;
     _aggregateMetrics = aggregateMetrics;
+    _consumerDir = consumerDir;
   }
 
   public String getSegmentName() {
@@ -112,6 +114,10 @@ public class RealtimeSegmentConfig {
     return _aggregateMetrics;
   }
 
+  public String getConsumerDir() {
+    return _consumerDir;
+  }
+
   public static class Builder {
     private String _segmentName;
     private String _streamName;
@@ -126,6 +132,7 @@ public class RealtimeSegmentConfig {
     private RealtimeSegmentStatsHistory _statsHistory;
     private SegmentPartitionConfig _segmentPartitionConfig;
     private boolean _aggregateMetrics = false;
+    private String _consumerDir;
 
     public Builder() {
     }
@@ -195,10 +202,17 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setConsumerDir(String consumerDir) {
+      _consumerDir = consumerDir;
+      return this;
+    }
+
     public RealtimeSegmentConfig build() {
       return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, 
_capacity, _avgNumMultiValues,
           _noDictionaryColumns, _invertedIndexColumns, 
_realtimeSegmentZKMetadata, _offHeap, _memoryManager,
-          _statsHistory, _segmentPartitionConfig, _aggregateMetrics);
+          _statsHistory, _segmentPartitionConfig, _aggregateMetrics, 
_consumerDir);
     }
+
+
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
index 4c45850..484eeb3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
@@ -18,24 +18,56 @@
  */
 package org.apache.pinot.core.realtime.impl.invertedindex;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.PinotObject;
 import org.apache.pinot.core.common.Predicate;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.inv.LuceneIndexCreator;
 import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.LuceneInvertedIndexReader;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class RealtimeInvertedIndexReader implements 
InvertedIndexReader<MutableRoaringBitmap> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeInvertedIndexReader.class);
+
   private final List<ThreadSafeMutableRoaringBitmap> _bitmaps = new 
ArrayList<>();
   private final ReentrantReadWriteLock.ReadLock _readLock;
   private final ReentrantReadWriteLock.WriteLock _writeLock;
+  private LuceneInvertedIndexReader _reader;
+  private LuceneIndexCreator _creator;
+  boolean isLuceneInitialized;
 
-  public RealtimeInvertedIndexReader() {
+  public RealtimeInvertedIndexReader(FieldSpec spec, String indexDir) {
     ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     _readLock = readWriteLock.readLock();
     _writeLock = readWriteLock.writeLock();
+    if (spec.getObjectType() != null) {
+      try {
+        File outputDirectory = new File(indexDir, spec.getName() + 
V1Constants.Indexes.LUCENE_INVERTED_INDEX_DIR);
+        _creator = new LuceneIndexCreator(spec.getObjectType(), 
outputDirectory);
+        _reader = new 
LuceneInvertedIndexReader(DirectoryReader.open(_creator.getIndexDirectory()));
+        LOGGER.info("Initializing Lucene for column:{}", spec.getName());
+        isLuceneInitialized = true;
+      } catch (IOException e) {
+        LOGGER.error("Error initializing Lucene for column:{}", 
spec.getName(), e);
+      }
+    }
+  }
+
+  /**
+   * Add the document id to the bitmap for the given dictionary id.
+   */
+  public void add(int docId, PinotObject pinotObject) {
+    _creator.add(pinotObject);
   }
 
   /**
@@ -56,11 +88,12 @@ public class RealtimeInvertedIndexReader implements 
InvertedIndexReader<MutableR
       _bitmaps.get(dictId).checkAndAdd(docId);
     }
   }
-  
+
   @Override
   public MutableRoaringBitmap getDocIds(Predicate predicate) {
-    throw new UnsupportedOperationException("Predicate:"+ predicate + " is not 
supported");
+    return _reader.getDocIds(predicate);
   }
+
   @Override
   public MutableRoaringBitmap getDocIds(int dictId) {
     ThreadSafeMutableRoaringBitmap bitmap;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
index 6622b57..87f9f56 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
@@ -52,17 +52,16 @@ public class LuceneIndexCreator implements 
InvertedIndexCreator {
   private final IndexWriter _writer;
   private final IndexWriterConfig _indexWriterConfig;
   private final Directory _indexDirectory;
-  // TODO:Figure out a way to avoid this
-  boolean _isText = false;
+  private String _objectType;
 
-  public LuceneIndexCreator(ColumnMetadata columnMetadata, File 
outputDirectory) {
+  public LuceneIndexCreator(String objectType, File outputDirectory) {
+    _objectType = objectType;
     // TODO: Get IndexConfig and set the different analyzer for each field by 
default we set
     // StandardAnalyzer and use TextField. This can be expensive and 
inefficient if all we need is
     // exact match. See keyword analyzer
     _analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer());
     _indexWriterConfig = new IndexWriterConfig(_analyzer);
     _indexWriterConfig.setRAMBufferSizeMB(MAX_BUFFER_SIZE_MB);
-    _isText = "TEXT".equalsIgnoreCase(columnMetadata.getObjectType());
     try {
       _indexDirectory = FSDirectory.open(outputDirectory.toPath());
       _writer = new IndexWriter(_indexDirectory, _indexWriterConfig);
@@ -72,6 +71,10 @@ public class LuceneIndexCreator implements 
InvertedIndexCreator {
     }
   }
 
+  public Directory getIndexDirectory() {
+    return _indexDirectory;
+  }
+
   @Override
   public void add(int dictId) {
     throw new UnsupportedOperationException(
@@ -91,9 +94,6 @@ public class LuceneIndexCreator implements 
InvertedIndexCreator {
     List<String> propertyNames = object.getPropertyNames();
     for (String propertyName : propertyNames) {
       Field field;
-      // TODO: Figure out a way to avoid special casing Text, have a way to 
get propertyType from
-      // pinotObject?
-      // TODO: Handle list field
       Object value = object.getProperty(propertyName);
       if (value.getClass().isAssignableFrom(List.class)) {
         List<?> list = (List<?>) value;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
index b9a2bec..d0e688d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.PinotObject;
+import org.apache.pinot.common.data.PinotObjectFactory;
 import org.apache.pinot.common.data.objects.JSONObject;
 import org.apache.pinot.common.data.objects.MapObject;
 import org.apache.pinot.common.data.objects.TextObject;
@@ -117,38 +118,16 @@ public class InvertedIndexHandler {
     LOGGER.info("Creating new lucene based inverted index for segment: {}, 
column: {}", _segmentName, column);
     int numDocs = columnMetadata.getTotalDocs();
     String objectType = columnMetadata.getObjectType();
-    Class<? extends PinotObject> pinotObjectClazz;
-    PinotObject pinotObject = null;
-    try {
-      switch (objectType.toUpperCase()) {
-        case "MAP":
-          pinotObjectClazz = MapObject.class;
-          break;
-        case "JSON":
-          pinotObjectClazz = JSONObject.class;
-          break;
-        case "TEXT":
-          pinotObjectClazz = TextObject.class;
-          break;
-        default:
-          // custom object type.
-          pinotObjectClazz = (Class<? extends PinotObject>) 
Class.forName(objectType);
-      }
-      pinotObject = pinotObjectClazz.getConstructor(new 
Class[]{}).newInstance(new Object[]{});
-    } catch (Exception e) {
-      LOGGER.error("Error pinot object  for type:{}. Skipping inverted index 
creation", objectType);
-      return;
-    }
 
-    try (LuceneIndexCreator luceneIndexCreator = new 
LuceneIndexCreator(columnMetadata, invertedIndexDir)) {
+
+    try (LuceneIndexCreator luceneIndexCreator = new 
LuceneIndexCreator(objectType, invertedIndexDir)) {
       try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, 
_segmentWriter)) {
         if (columnMetadata.isSingleValue()) {
           // Single-value column.
           VarByteChunkSingleValueReader svFwdIndex = 
(VarByteChunkSingleValueReader) fwdIndex;
           for (int i = 0; i < numDocs; i++) {
             byte[] bytes = svFwdIndex.getBytes(i);
-
-            pinotObject.init(bytes);
+            PinotObject pinotObject = PinotObjectFactory.create(objectType, 
bytes);
             luceneIndexCreator.add(pinotObject);
           }
         } else {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
index 576bcc4..e4b8850 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
@@ -65,8 +65,7 @@ public class LuceneInvertedIndexReader implements 
InvertedIndexReader<MutableRoa
   /**
    * TODO: Change this to take PinotDataBuffer, work around for now since 
Lucene needs actual
    * directory
-   * @param metadata
-   * @param indexDir
+   * @param segmentIndexDir
    * @param metadata
    */
   public LuceneInvertedIndexReader(File segmentIndexDir, ColumnMetadata 
metadata) {
@@ -83,6 +82,10 @@ public class LuceneInvertedIndexReader implements 
InvertedIndexReader<MutableRoa
     }
   }
 
+  public LuceneInvertedIndexReader(IndexReader reader) {
+    _searcher = new IndexSearcher(reader);
+  }
+
   @Override
   public void close() throws IOException {
     _index.close();
@@ -104,7 +107,7 @@ public class LuceneInvertedIndexReader implements 
InvertedIndexReader<MutableRoa
 
   public MutableRoaringBitmap getDocIds(String queryStr, String options) {
     QueryParser queryParser = new QueryParser(TextObject.DEFAULT_FIELD, 
_analyzer);
-    Query query = null;
+    Query query;
     try {
       query = queryParser.parse(queryStr);
     } catch (ParseException e) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/io/writer/impl/VarByteSingleColumnSingleValueReaderWriterTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/io/writer/impl/VarByteSingleColumnSingleValueReaderWriterTest.java
new file mode 100644
index 0000000..0fc33db
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/io/writer/impl/VarByteSingleColumnSingleValueReaderWriterTest.java
@@ -0,0 +1,31 @@
+package org.apache.pinot.core.io.writer.impl;
+
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import 
org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class VarByteSingleColumnSingleValueReaderWriterTest {
+
+  @Test
+  public void testSimple() {
+    VarByteSingleColumnSingleValueReaderWriter readerWriter;
+    PinotDataBufferMemoryManager mem = new DirectMemoryManager("test");
+    readerWriter = new VarByteSingleColumnSingleValueReaderWriter(100, -1, 
mem, "test");
+
+    for (int i = 0; i < 10000; i++) {
+      String data = "TEST-" + i;
+      readerWriter.setBytes(i, data.getBytes());
+    }
+    boolean passed = true;
+    for (int i = 0; i < 10000; i++) {
+      byte[] data = readerWriter.getBytes(i);
+      if (!new String(data).equals("TEST-" + i)) {
+        passed = false;
+        break;
+      }
+    }
+    Assert.assertTrue(passed);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to