Author: mkataria Date: Thu Jul 23 09:25:39 2020 New Revision: 1880194 URL: http://svn.apache.org/viewvc?rev=1880194&view=rev Log: OAK-9146: Elastic indexes - Updates get lost if connection is not available (Patch by averma21)
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java?rev=1880194&r1=1880193&r2=1880194&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java (original) +++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java Thu Jul 23 09:25:39 2020 @@ -76,6 +76,11 @@ class ElasticBulkProcessorHandler { private final Phaser phaser = new Phaser(1); // register main controller /** + * IOException object wrapping any error/exception which occurred while trying to update index in elasticsearch. + */ + private volatile IOException ioException; + + /** * Key-value structure to keep the history of bulk requests. Keys are the bulk execution ids, the boolean * value is {@code true} when at least an update is performed, otherwise {@code false}. */ @@ -148,7 +153,7 @@ class ElasticBulkProcessorHandler { totalOperations++; } - public boolean close() { + public boolean close() throws IOException { LOG.trace("Calling close on bulk processor {}", bulkProcessor); bulkProcessor.close(); LOG.trace("Bulk Processor {} closed", bulkProcessor); @@ -167,6 +172,10 @@ class ElasticBulkProcessorHandler { LOG.error("Error waiting for bulk requests to return", e); } + if (ioException != null) { + throw ioException; + } + if (LOG.isTraceEnabled()) { LOG.trace("Bulk identifier -> update status = {}", updatesMap); } @@ -250,6 +259,7 @@ class ElasticBulkProcessorHandler { @Override public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) { LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} threw an error", executionId, throwable); + ElasticBulkProcessorHandler.this.ioException = new IOException(throwable); phaser.arriveAndDeregister(); } } @@ -284,7 +294,7 @@ class ElasticBulkProcessorHandler { } @Override - public boolean close() { + public boolean close() throws IOException { isClosed.set(true); // calling super closes the bulk processor. If not empty it calls #requestConsumer for the last time boolean closed = super.close(); Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java?rev=1880194&r1=1880193&r2=1880194&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java (original) +++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java Thu Jul 23 09:25:39 2020 @@ -93,7 +93,7 @@ class ElasticIndexWriter implements Full } @Override - public boolean close(long timestamp) { + public boolean close(long timestamp) throws IOException { return bulkProcessorHandler.close(); } Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java?rev=1880194&r1=1880193&r2=1880194&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java (original) +++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java Thu Jul 23 09:25:39 2020 @@ -27,6 +27,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import java.io.IOException; + import static org.apache.jackrabbit.oak.plugins.index.elastic.ElasticTestUtils.randomString; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; @@ -106,7 +108,7 @@ public class ElasticIndexWriterTest { } @Test - public void closeBulkProcessor() { + public void closeBulkProcessor() throws IOException { indexWriter.close(System.currentTimeMillis()); verify(bulkProcessorHandlerMock).close(); }