Author: mkataria
Date: Thu Jul 23 09:25:39 2020
New Revision: 1880194

URL: http://svn.apache.org/viewvc?rev=1880194&view=rev
Log:
OAK-9146: Elastic indexes - Updates get lost if connection is not available 
(Patch by averma21)

Modified:
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java?rev=1880194&r1=1880193&r2=1880194&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 Thu Jul 23 09:25:39 2020
@@ -76,6 +76,11 @@ class ElasticBulkProcessorHandler {
     private final Phaser phaser = new Phaser(1); // register main controller
 
     /**
+     * IOException object wrapping any error/exception which occurred while 
trying to update index in elasticsearch.
+     */
+    private volatile IOException ioException;
+
+    /**
      * Key-value structure to keep the history of bulk requests. Keys are the 
bulk execution ids, the boolean
      * value is {@code true} when at least an update is performed, otherwise 
{@code false}.
      */
@@ -148,7 +153,7 @@ class ElasticBulkProcessorHandler {
         totalOperations++;
     }
 
-    public boolean close() {
+    public boolean close() throws IOException {
         LOG.trace("Calling close on bulk processor {}", bulkProcessor);
         bulkProcessor.close();
         LOG.trace("Bulk Processor {} closed", bulkProcessor);
@@ -167,6 +172,10 @@ class ElasticBulkProcessorHandler {
             LOG.error("Error waiting for bulk requests to return", e);
         }
 
+        if (ioException != null) {
+            throw ioException;
+        }
+
         if (LOG.isTraceEnabled()) {
             LOG.trace("Bulk identifier -> update status = {}", updatesMap);
         }
@@ -250,6 +259,7 @@ class ElasticBulkProcessorHandler {
         @Override
         public void afterBulk(long executionId, BulkRequest bulkRequest, 
Throwable throwable) {
             LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, throwable);
+            ElasticBulkProcessorHandler.this.ioException = new 
IOException(throwable);
             phaser.arriveAndDeregister();
         }
     }
@@ -284,7 +294,7 @@ class ElasticBulkProcessorHandler {
         }
 
         @Override
-        public boolean close() {
+        public boolean close() throws IOException {
             isClosed.set(true);
             // calling super closes the bulk processor. If not empty it calls 
#requestConsumer for the last time
             boolean closed = super.close();

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java?rev=1880194&r1=1880193&r2=1880194&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
 Thu Jul 23 09:25:39 2020
@@ -93,7 +93,7 @@ class ElasticIndexWriter implements Full
     }
 
     @Override
-    public boolean close(long timestamp) {
+    public boolean close(long timestamp) throws IOException {
         return bulkProcessorHandler.close();
     }
 

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java?rev=1880194&r1=1880193&r2=1880194&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
 Thu Jul 23 09:25:39 2020
@@ -27,6 +27,8 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import java.io.IOException;
+
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticTestUtils.randomString;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -106,7 +108,7 @@ public class ElasticIndexWriterTest {
     }
 
     @Test
-    public void closeBulkProcessor() {
+    public void closeBulkProcessor() throws IOException {
         indexWriter.close(System.currentTimeMillis());
         verify(bulkProcessorHandlerMock).close();
     }


Reply via email to