aoli-al opened a new issue, #13593:
URL: https://github.com/apache/lucene/issues/13593
### Description
I saw a failure in the test `testMaxMergeCount` due to concurrency issues.
```
𞤃𞤮𞤪𞤧𞤮 19, 2024 12:08:03 𞤇𞤎
com.carrotsearch.randomizedtesting.RandomizedRunner$QueueUncaughtExceptionsHandler
uncaughtException
WARNING: Uncaught exception in thread: Thread[#25,Lucene Merge Thread
#2,5,TGRP-TestConcurrentMergeScheduler]
org.apache.lucene.index.MergePolicy$MergeException:
java.lang.RuntimeException: java.lang.AssertionError: count=2 vs maxMergeCount=1
at __randomizedtesting.SeedInfo.seed([852F062F4E7F50BA]:0)
at
org.apache.lucene.index.ConcurrentMergeScheduler.handleMergeException(ConcurrentMergeScheduler.java:774)
at
org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:766)
Caused by: java.lang.RuntimeException: java.lang.AssertionError: count=2 vs
maxMergeCount=1
at
org.apache.lucene.index.TestConcurrentMergeScheduler$3.doMerge(TestConcurrentMergeScheduler.java:372)
at
org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:739)
Caused by: java.lang.AssertionError: count=2 vs maxMergeCount=1
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.assertTrue(Assert.java:42)
at
org.apache.lucene.index.TestConcurrentMergeScheduler$3.doMerge(TestConcurrentMergeScheduler.java:347)
... 1 more
```
The root cause is that a race could happen in the merge method.
https://github.com/apache/lucene/blob/cc3b412183c7acbb4a6c498e9babffba4e7d1f28/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java#L569-L573
The method calls `maybeStall` to check if the merge should be stalled due to
the thread limit. However, the loop will break if
`mergeSource.hasPendingMerges()` returns false. So, a concurrency issue may
arise if `mergeSource` adds new pending merges after this method is evaluated.
This will allow `ConcurrentMergeScheduler` to create more merge threads than
specified.
https://github.com/apache/lucene/blob/cc3b412183c7acbb4a6c498e9babffba4e7d1f28/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java#L618
Here is the code to reproduce the failure:
```
diff --git
a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index d0e79375ea6..a479637155e 100644
---
a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++
b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -541,6 +541,7 @@ public class ConcurrentMergeScheduler extends
MergeScheduler {
}
}
+ public static boolean threadMergeIssued = false;
@Override
public synchronized void merge(MergeSource mergeSource, MergeTrigger
trigger) throws IOException {
@@ -562,6 +563,7 @@ public class ConcurrentMergeScheduler extends
MergeScheduler {
message(" index(source): " + mergeSource.toString());
}
+ boolean mergeThreadCreated = false;
// Iterate, pulling from the IndexWriter's queue of
// pending merges, until it's empty:
while (true) {
@@ -569,6 +571,13 @@ public class ConcurrentMergeScheduler extends
MergeScheduler {
if (maybeStall(mergeSource) == false) {
break;
}
+ if (Thread.currentThread().getName().contains("Merge Thread") &&
mergeThreadCreated) {
+ threadMergeIssued = true;
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ }
+ }
OneMerge merge = mergeSource.getNextMerge();
if (merge == null) {
@@ -591,6 +600,7 @@ public class ConcurrentMergeScheduler extends
MergeScheduler {
message(" launch new thread [" + newMergeThread.getName() +
"]");
}
+ mergeThreadCreated = true;
newMergeThread.start();
updateMergeThreads();
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 44d8bee8460..f83c4cac519 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2305,10 +2305,25 @@ public class IndexWriter
maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT,
UNBOUNDED_MAX_MERGE_SEGMENTS);
}
+ int index = 0;
private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger
trigger, int maxNumSegments)
throws IOException {
ensureOpen(false);
- if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
+ boolean shouldWait = false;
+ if (index == 1) {
+ while (!ConcurrentMergeScheduler.threadMergeIssued) {
+ Thread.yield();
+ shouldWait = true;
+ }
+ }
+ MergePolicy.MergeSpecification result =
updatePendingMerges(mergePolicy, trigger, maxNumSegments);
+ if (result != null) {
+ index = 1;
+ if (shouldWait) {
+ try {
+ Thread.sleep(50000);
+ } catch (Exception e) {}
+ }
executeMerge(trigger);
}
}
diff --git
a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
index 5f85b5d3774..407e6f5f4bb 100644
---
a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
+++
b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
@@ -321,8 +321,8 @@ public class TestConcurrentMergeScheduler extends
LuceneTestCase {
IndexWriterConfig iwc =
new IndexWriterConfig(new
MockAnalyzer(random())).setCommitOnClose(false);
- final int maxMergeCount = TestUtil.nextInt(random(), 1, 5);
- final int maxMergeThreads = TestUtil.nextInt(random(), 1,
maxMergeCount);
+ final int maxMergeCount = 1;
+ final int maxMergeThreads = 1;
final CountDownLatch enoughMergesWaiting = new
CountDownLatch(maxMergeCount);
final AtomicInteger runningMergeCount = new AtomicInteger(0);
final AtomicBoolean failed = new AtomicBoolean();
@@ -331,6 +331,7 @@ public class TestConcurrentMergeScheduler extends
LuceneTestCase {
System.out.println(
"TEST: maxMergeCount=" + maxMergeCount + " maxMergeThreads=" +
maxMergeThreads);
}
+ int[] index = new int[] {0};
ConcurrentMergeScheduler cms =
new ConcurrentMergeScheduler() {
@@ -358,6 +359,9 @@ public class TestConcurrentMergeScheduler extends
LuceneTestCase {
// Then sleep a bit to give a chance for the bug
// (too many pending merges) to appear:
Thread.sleep(20);
+ if (Thread.currentThread().getName().contains("#1")) {
+ Thread.sleep(50000);
+ }
super.doMerge(mergeSource, merge);
} finally {
runningMergeCount.decrementAndGet();
@@ -381,17 +385,17 @@ public class TestConcurrentMergeScheduler extends
LuceneTestCase {
IndexWriter w = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(newField("field", "field", TextField.TYPE_NOT_STORED));
- while (enoughMergesWaiting.getCount() != 0 && !failed.get()) {
+ while (true) {
for (int i = 0; i < 10; i++) {
w.addDocument(doc);
}
}
- try {
- w.commit();
- } finally {
- w.close();
- }
- dir.close();
+// try {
+// w.commit();
+// } finally {
+// w.close();
+// }
+// dir.close();
}
public void testSmallMergesDonNotGetThreads() throws IOException {
```
With the patch `TestConcurrentMergeScheduler#testMaxMergeCount` will raise
assertion failure.
### Version and environment details
Version: 33a4c1d8ef999902dacedde9c7f04a3c7e2e78c9
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]