Jackie-Jiang commented on code in PR #8636: URL: https://github.com/apache/pinot/pull/8636#discussion_r868468538
########## pinot-core/src/test/java/org/apache/pinot/queries/LuceneFSTVsNativeMutableFSTTest.java: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; +import org.apache.pinot.segment.spi.IndexSegment; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class LuceneFSTVsNativeMutableFSTTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RealTimeNativeVsLuceneTest"); + private static final String TEXT_COLUMN_NAME = "testColumnName"; + + private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; + private NativeMutableTextIndex _nativeMutableTextIndex; + + private List<String> getTextData() { + return Arrays.asList("Prince Andrew kept looking with an amused smile from Pierre", + "vicomte and from the vicomte to their hostess. In the first moment of", + "Pierre’s outburst Anna Pávlovna, despite her social experience, was", + "horror-struck. But when she saw that Pierre’s sacrilegious words", + "had not exasperated the vicomte, and had convinced herself that it was", + "impossible to stop him, she rallied her forces and joined the vicomte in", "a vigorous attack on the orator", + "horror-struck. But when she", "she rallied her forces and joined", "outburst Anna Pávlovna", + "she rallied her forces and", "despite her social experience", "had not exasperated the vicomte", + " despite her social experience", "impossible to stop him", "despite her social experience"); + } + + @BeforeClass + public void setUp() + throws Exception { + _realtimeLuceneTextIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar"); + _nativeMutableTextIndex = new NativeMutableTextIndex(TEXT_COLUMN_NAME); + List<String> documents = getTextData(); + + for (String doc : documents) { + _realtimeLuceneTextIndex.add(doc); + _nativeMutableTextIndex.add(doc); + } + + SearcherManager searcherManager = _realtimeLuceneTextIndex.getSearcherManager(); + try { + searcherManager.maybeRefresh(); + } catch (Exception e) { + // we should never be here since the locking semantics between MutableSegmentImpl::destroy() Review Comment: This is a test, and we do want to catch the unexpected exception. Let's remove the try-catch block and let the test fail if it happens ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java: ########## @@ -201,16 +219,35 @@ public void testTextSearchCountQuery() //Lucene index on consuming segments to update the latest records TestUtils.waitForCondition(aVoid -> { try { - return getTextColumnQueryResult() == NUM_MATCHING_RECORDS; + return getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY) == NUM_MATCHING_RECORDS; } catch (Exception e) { fail("Caught exception while getting text column query result"); return false; } }, 10_000L, "Failed to reach expected number of matching records"); } - private long getTextColumnQueryResult() + @Test + public void testTextSearchCountQueryNative() + throws Exception { + // Keep posting queries until all records are consumed + long previousResult = 0; + while (getCurrentCountStarResult() < NUM_RECORDS) { + long result = getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE); + assertTrue(result >= previousResult); + previousResult = result; + Thread.sleep(100); + } + + try { + Assert.assertTrue(getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE) == NUM_MATCHING_RECORDS_NATIVE); + } catch (Exception e) { + fail("Caught exception while getting text column query result"); + } Review Comment: No need to catch the exception ```suggestion assertTrue(getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE) == NUM_MATCHING_RECORDS_NATIVE); ``` ########## pinot-core/src/test/java/org/apache/pinot/queries/LuceneFSTVsNativeMutableFSTTest.java: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; +import org.apache.pinot.segment.spi.IndexSegment; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class LuceneFSTVsNativeMutableFSTTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RealTimeNativeVsLuceneTest"); + private static final String TEXT_COLUMN_NAME = "testColumnName"; + + private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; + private NativeMutableTextIndex _nativeMutableTextIndex; + + private List<String> getTextData() { + return Arrays.asList("Prince Andrew kept looking with an amused smile from Pierre", + "vicomte and from the vicomte to their hostess. In the first moment of", + "Pierre’s outburst Anna Pávlovna, despite her social experience, was", + "horror-struck. But when she saw that Pierre’s sacrilegious words", + "had not exasperated the vicomte, and had convinced herself that it was", + "impossible to stop him, she rallied her forces and joined the vicomte in", "a vigorous attack on the orator", + "horror-struck. But when she", "she rallied her forces and joined", "outburst Anna Pávlovna", + "she rallied her forces and", "despite her social experience", "had not exasperated the vicomte", + " despite her social experience", "impossible to stop him", "despite her social experience"); + } + + @BeforeClass + public void setUp() Review Comment: Add a `tearDown()` method to delete the `INDEX_DIR` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.IOException; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.text.TermsParser; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFSTImpl; +import org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher; +import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class NativeMutableTextIndex implements MutableTextIndex { + private final String _column; + private final MutableFST _mutableFST; + private final RealtimeInvertedIndex _invertedIndex; + //TODO: Move to mutable dictionary + private final Object2IntOpenHashMap<String> _termToDictIdMapping; + private final ReentrantReadWriteLock.ReadLock _readLock; + private final ReentrantReadWriteLock.WriteLock _writeLock; + private final TermsParser _termsParser; + + private int _nextDocId = 0; + private int _nextDictId = 0; + + public NativeMutableTextIndex(String column) { + _column = column; + _mutableFST = new MutableFSTImpl(); + _termToDictIdMapping = new Object2IntOpenHashMap<>(); + _invertedIndex = new RealtimeInvertedIndex(); + + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + _readLock = readWriteLock.readLock(); + _writeLock = readWriteLock.writeLock(); + _termsParser = new TermsParser(new StandardAnalyzer(LuceneTextIndexCreator.ENGLISH_STOP_WORDS_SET), _column); + } + + @Override + public void add(String document) { + Iterable<String> tokens; + + tokens = _termsParser.parse(document); Review Comment: I tried this block of code and it works ``` List<String> tokens = new ArrayList<>(); try (TokenStream tokenStream = _analyzer.tokenStream(_column, document)) { tokenStream.reset(); CharTermAttribute attribute = tokenStream.getAttribute(CharTermAttribute.class); while (tokenStream.incrementToken()) { tokens.add(attribute.toString()); } tokenStream.end(); } catch (IOException e) { throw new RuntimeException("Caught exception while tokenizing the document for column: " + _column, e); } ``` ########## pinot-core/src/test/java/org/apache/pinot/queries/LuceneFSTVsNativeMutableFSTTest.java: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; +import org.apache.pinot.segment.spi.IndexSegment; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class LuceneFSTVsNativeMutableFSTTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RealTimeNativeVsLuceneTest"); + private static final String TEXT_COLUMN_NAME = "testColumnName"; + + private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; + private NativeMutableTextIndex _nativeMutableTextIndex; + + private List<String> getTextData() { + return Arrays.asList("Prince Andrew kept looking with an amused smile from Pierre", + "vicomte and from the vicomte to their hostess. In the first moment of", + "Pierre’s outburst Anna Pávlovna, despite her social experience, was", + "horror-struck. But when she saw that Pierre’s sacrilegious words", + "had not exasperated the vicomte, and had convinced herself that it was", + "impossible to stop him, she rallied her forces and joined the vicomte in", "a vigorous attack on the orator", + "horror-struck. But when she", "she rallied her forces and joined", "outburst Anna Pávlovna", + "she rallied her forces and", "despite her social experience", "had not exasperated the vicomte", + " despite her social experience", "impossible to stop him", "despite her social experience"); + } + + @BeforeClass + public void setUp() + throws Exception { + _realtimeLuceneTextIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar"); + _nativeMutableTextIndex = new NativeMutableTextIndex(TEXT_COLUMN_NAME); + List<String> documents = getTextData(); + + for (String doc : documents) { + _realtimeLuceneTextIndex.add(doc); + _nativeMutableTextIndex.add(doc); + } + + SearcherManager searcherManager = _realtimeLuceneTextIndex.getSearcherManager(); + try { + searcherManager.maybeRefresh(); + } catch (Exception e) { + // we should never be here since the locking semantics between MutableSegmentImpl::destroy() + // and this code along with volatile state "isSegmentDestroyed" protect against the cases + // where this thread might attempt to refresh a realtime lucene reader after it has already + // been closed duing segment destroy. + } + } + + @Override + protected String getFilter() { + return null; + } + + @Override + protected IndexSegment getIndexSegment() { + return null; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return null; + } + + @Test + public void testQueries() { + String nativeQuery = "vico.*"; + String luceneQuery = "vico*"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "vicomte"; + luceneQuery = "vicomte"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "s.*"; + luceneQuery = "s*"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "impossible"; + luceneQuery = "impossible"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "forc.*s"; + luceneQuery = "forc*s"; + testSelectionResults(nativeQuery, luceneQuery); + } + + private void testSelectionResults(String nativeQuery, String luceneQuery) { + MutableRoaringBitmap resultset = _realtimeLuceneTextIndex.getDocIds(luceneQuery); + Assert.assertNotNull(resultset); Review Comment: You can directly compare the bitmaps ```suggestion assertEquals(_nativeMutableTextIndex.getDocIds(nativeQuery), _realtimeLuceneTextIndex.getDocIds(luceneQuery)); ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java: ########## @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFSTImpl; +import org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher; +import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class NativeMutableTextIndex implements MutableTextIndex { + private final String _column; + private final MutableFST _mutableFST; + private final RealtimeInvertedIndex _invertedIndex; + //TODO: Move to mutable dictionary + private final Object2IntOpenHashMap<String> _termToDictIdMapping; + private final ReentrantReadWriteLock.ReadLock _readLock; + private final ReentrantReadWriteLock.WriteLock _writeLock; + + private int _nextDocId = 0; + private int _nextDictId = 0; + + public NativeMutableTextIndex(String column) { + _column = column; + _mutableFST = new MutableFSTImpl(); + _termToDictIdMapping = new Object2IntOpenHashMap<>(); + _invertedIndex = new RealtimeInvertedIndex(); + + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + _readLock = readWriteLock.readLock(); + _writeLock = readWriteLock.writeLock(); + } + + @Override + public void add(String document) { + List<String> tokens; + try { + tokens = analyze(document, new StandardAnalyzer(LuceneTextIndexCreator.ENGLISH_STOP_WORDS_SET)); Review Comment: I tried reusing the analyzer and am able to do that without causing issues. Here are the code I tried: ``` List<String> tokens = new ArrayList<>(); try (TokenStream tokenStream = _analyzer.tokenStream(_column, document)) { tokenStream.reset(); CharTermAttribute attribute = tokenStream.getAttribute(CharTermAttribute.class); while (tokenStream.incrementToken()) { tokens.add(attribute.toString()); } tokenStream.end(); } catch (IOException e) { throw new RuntimeException("Caught exception while tokenizing the document for column: " + _column, e); } ``` If I remove the `end()` and `close()` for the `TokenStream`, it will hang, and I think that's why you run into issue for your previous version of the code ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.IOException; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.text.TermsParser; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFSTImpl; +import org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher; +import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class NativeMutableTextIndex implements MutableTextIndex { + private final String _column; + private final MutableFST _mutableFST; + private final RealtimeInvertedIndex _invertedIndex; + //TODO: Move to mutable dictionary + private final Object2IntOpenHashMap<String> _termToDictIdMapping; + private final ReentrantReadWriteLock.ReadLock _readLock; + private final ReentrantReadWriteLock.WriteLock _writeLock; + private final TermsParser _termsParser; + + private int _nextDocId = 0; + private int _nextDictId = 0; + + public NativeMutableTextIndex(String column) { + _column = column; + _mutableFST = new MutableFSTImpl(); + _termToDictIdMapping = new Object2IntOpenHashMap<>(); + _invertedIndex = new RealtimeInvertedIndex(); + + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + _readLock = readWriteLock.readLock(); + _writeLock = readWriteLock.writeLock(); + _termsParser = new TermsParser(new StandardAnalyzer(LuceneTextIndexCreator.ENGLISH_STOP_WORDS_SET), _column); + } + + @Override + public void add(String document) { + Iterable<String> tokens; + + tokens = _termsParser.parse(document); + _writeLock.lock(); + try { + for (String token : tokens) { + Integer currentDictId = _termToDictIdMapping.computeIntIfAbsent(token, k -> { + int localDictId = _nextDictId++; + _mutableFST.addPath(token, localDictId); + return localDictId; + }); + _invertedIndex.add(currentDictId, _nextDocId); + } + _nextDocId++; + } finally { + _writeLock.unlock(); + } + } + + @Override + public ImmutableRoaringBitmap getDictIds(String searchQuery) { + throw new UnsupportedOperationException(); + } + + @Override + public MutableRoaringBitmap getDocIds(String searchQuery) { + MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap(); + _readLock.lock(); + try { + RealTimeRegexpMatcher.regexMatch(searchQuery, _mutableFST, + dictId -> matchingDocIds.or(_invertedIndex.getDocIds(dictId))); + return matchingDocIds; + } finally { + _readLock.unlock(); + } + } + + @Override + public void close() + throws IOException { Review Comment: We should close the analyzer ########## pinot-core/src/test/java/org/apache/pinot/queries/LuceneFSTVsNativeMutableFSTTest.java: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; +import org.apache.pinot.segment.spi.IndexSegment; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class LuceneFSTVsNativeMutableFSTTest extends BaseQueriesTest { Review Comment: Don't extend `BaseQueriesTest` because it is not a queries test. Suggest renaming it to `NativeAndLuceneMutableTextIndexTest` (this is not only testing FST either) and move it under the package `org.apache.pinot.segment.local.realtime.impl.invertedindex` ########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexConcurrentTest.java: ########## @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class NativeMutableTextIndexConcurrentTest { + private ExecutorService _threadPool; + private Set<String> _resultSet; + + @BeforeClass + private void setup() { + _threadPool = Executors.newFixedThreadPool(10); + _resultSet = new ConcurrentSkipListSet<>(); + } + + @AfterClass + private void shutDown() { + _threadPool.shutdown(); + } + + @Test + public void testConcurrentWriteAndRead() + throws InterruptedException, IOException { + CountDownLatch countDownLatch = new CountDownLatch(2); + List<String> words = new ArrayList<>(); + words.add("ab"); + words.add("abba"); + words.add("aba"); + words.add("bab"); + words.add("cdd"); + words.add("efg"); + + try (NativeMutableTextIndex textIndex = new NativeMutableTextIndex("testFSTColumn")) { + _threadPool.submit(() -> { + try { + performReads(textIndex, words, 20, 200, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(textIndex, words, 10, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + countDownLatch.await(); + } + + assertEquals(_resultSet.size(), words.size()); + + assertTrue(_resultSet.contains("ab"), "ab not found in result set"); + assertTrue(_resultSet.contains("abba"), "abba not found in result set"); + assertTrue(_resultSet.contains("aba"), "aba not found in result set"); + assertTrue(_resultSet.contains("bab"), "bab not found in result set"); + assertTrue(_resultSet.contains("cdd"), "cdd not found in result set"); + assertTrue(_resultSet.contains("efg"), "efg not found in result set"); + } + + @Test + public void testConcurrentWriteWithMultipleThreads() + throws InterruptedException, IOException { + List<String> firstThreadWords = new ArrayList<>(); + List<String> secondThreadWords = new ArrayList<>(); + List<String> mergedThreadWords = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(3); + firstThreadWords.add("ab"); + firstThreadWords.add("abba"); + firstThreadWords.add("aba"); + secondThreadWords.add("bab"); + secondThreadWords.add("cdd"); + secondThreadWords.add("efg"); + + mergedThreadWords.addAll(firstThreadWords); + mergedThreadWords.addAll(secondThreadWords); + + try (NativeMutableTextIndex textIndex = new NativeMutableTextIndex("testFSTColumn")) { + _threadPool.submit(() -> { + try { + performReads(textIndex, mergedThreadWords, 20, 200, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(textIndex, firstThreadWords, 10, countDownLatch); Review Comment: We don't need to test multiple writer scenario (although it seems work). For mutable index, pinot always follow single writer multiple readers. This gives us a lot of optimization opportunities since we don't need to worry about concurrent modification of the index ########## pinot-core/src/test/java/org/apache/pinot/queries/LuceneFSTVsNativeMutableFSTTest.java: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; +import org.apache.pinot.segment.spi.IndexSegment; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class LuceneFSTVsNativeMutableFSTTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RealTimeNativeVsLuceneTest"); + private static final String TEXT_COLUMN_NAME = "testColumnName"; + + private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; + private NativeMutableTextIndex _nativeMutableTextIndex; + + private List<String> getTextData() { + return Arrays.asList("Prince Andrew kept looking with an amused smile from Pierre", + "vicomte and from the vicomte to their hostess. In the first moment of", + "Pierre’s outburst Anna Pávlovna, despite her social experience, was", + "horror-struck. But when she saw that Pierre’s sacrilegious words", + "had not exasperated the vicomte, and had convinced herself that it was", + "impossible to stop him, she rallied her forces and joined the vicomte in", "a vigorous attack on the orator", + "horror-struck. But when she", "she rallied her forces and joined", "outburst Anna Pávlovna", + "she rallied her forces and", "despite her social experience", "had not exasperated the vicomte", + " despite her social experience", "impossible to stop him", "despite her social experience"); + } + + @BeforeClass + public void setUp() + throws Exception { + _realtimeLuceneTextIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar"); + _nativeMutableTextIndex = new NativeMutableTextIndex(TEXT_COLUMN_NAME); + List<String> documents = getTextData(); + + for (String doc : documents) { + _realtimeLuceneTextIndex.add(doc); + _nativeMutableTextIndex.add(doc); + } + + SearcherManager searcherManager = _realtimeLuceneTextIndex.getSearcherManager(); + try { + searcherManager.maybeRefresh(); + } catch (Exception e) { + // we should never be here since the locking semantics between MutableSegmentImpl::destroy() + // and this code along with volatile state "isSegmentDestroyed" protect against the cases + // where this thread might attempt to refresh a realtime lucene reader after it has already + // been closed duing segment destroy. + } + } + + @Override + protected String getFilter() { + return null; + } + + @Override + protected IndexSegment getIndexSegment() { + return null; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return null; + } + + @Test + public void testQueries() { + String nativeQuery = "vico.*"; + String luceneQuery = "vico*"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "vicomte"; + luceneQuery = "vicomte"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "s.*"; + luceneQuery = "s*"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "impossible"; + luceneQuery = "impossible"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "forc.*s"; + luceneQuery = "forc*s"; + testSelectionResults(nativeQuery, luceneQuery); + } + + private void testSelectionResults(String nativeQuery, String luceneQuery) { + MutableRoaringBitmap resultset = _realtimeLuceneTextIndex.getDocIds(luceneQuery); + Assert.assertNotNull(resultset); Review Comment: (nit) Use static import for asserts in test ########## pinot-core/src/test/java/org/apache/pinot/queries/LuceneFSTVsNativeMutableFSTTest.java: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; +import org.apache.pinot.segment.spi.IndexSegment; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class LuceneFSTVsNativeMutableFSTTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RealTimeNativeVsLuceneTest"); Review Comment: (minor) Change the file name to match the test name ########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java: ########## @@ -102,8 +100,54 @@ public void testConcurrentWriteAndRead() assertTrue("efg not found in result set", _resultSet.contains("efg")); } - private void performReads(MutableFST fst, List<String> words, int count, - long sleepTime) + @Test + public void testConcurrentLongWriteAndRead() + throws InterruptedException { + MutableFST mutableFST = new MutableFSTImpl(); + List<String> words = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(2); + + mutableFST.addPath("ab", 1); + + List<Pair<String, Integer>> wordsWithMetadata = new ArrayList<>(); + + // Add some write pressure + wordsWithMetadata.add(Pair.of("egegdgrbsbrsegzgzegzegegjntnmtj", 2)); + wordsWithMetadata.add(Pair.of("hrwbwefweg4wreghrtbrassregfesfefefefzew4ere", 2)); + wordsWithMetadata.add(Pair.of("easzegfegrertegbxzzez3erfezgzeddzdewstfefed", 2)); + wordsWithMetadata.add(Pair.of("tjntrhndsrsgezgrsxzetgteszetgezfzezedrefzdzdzdzdz", 2)); + wordsWithMetadata.add(Pair.of("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa", 2)); + + words.add("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa"); + + _threadPool.submit(() -> { + try { + performReads(mutableFST, words, 10, 10, countDownLatch); + } catch (InterruptedException e) { + e.printStackTrace(); Review Comment: Please don't swallow the exception in the test. We want to catch them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
