[ 
https://issues.apache.org/jira/browse/BEAM-3848?focusedWorklogId=85677&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85677
 ]

ASF GitHub Bot logged work on BEAM-3848:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Mar/18 15:15
            Start Date: 29/Mar/18 15:15
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #4905: 
[BEAM-3848] Enables ability to retry Solr writes on error (SolrIO)
URL: https://github.com/apache/beam/pull/4905#discussion_r178089696
 
 

 ##########
 File path: 
sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
 ##########
 @@ -263,4 +276,109 @@ public void testSplit() throws Exception {
     // therefore, can not exist an empty shard.
     assertEquals("Wrong number of empty splits", expectedNumSplits, 
nonEmptySplits);
   }
+
+  /**
+   * Ensure that the retrying is ignored under success conditions.
+   */
+  @Test
+  public void testWriteDefaultRetrySuccess() throws Exception {
+    SolrIO.Write write = mock(SolrIO.Write.class);
+    when(write.getRetryConfiguration())
+        .thenReturn(SolrIO.RetryConfiguration.create(10, 
Duration.standardSeconds(10)));
+    SolrIO.Write.WriteFn writeFn = new SolrIO.Write.WriteFn(write);
+    AuthorizedSolrClient solrClient = mock(AuthorizedSolrClient.class);
+
+    // simulate success
+    when(solrClient.process(any(String.class), any(SolrRequest.class)))
+        .thenReturn(mock(SolrResponse.class));
+
+    List<SolrInputDocument> batch = SolrIOTestUtils.createDocuments(1);
+    writeFn.flushBatch(solrClient, batch);
+    verify(solrClient, times(1)).process(any(String.class), 
any(SolrRequest.class));
+  }
+
+  /**
+   * Ensure that the default retrying behavior surfaces errors immediately 
under failure conditions.
+   */
+  @Test
+  public void testWriteRetryFail() throws Exception {
+    SolrIO.Write write = mock(SolrIO.Write.class);
+    
when(write.getRetryConfiguration()).thenReturn(SolrIO.DEFAULT_RETRY_CONFIGURATION);
+    SolrIO.Write.WriteFn writeFn = new SolrIO.Write.WriteFn(write);
+    AuthorizedSolrClient solrClient = mock(AuthorizedSolrClient.class);
+
+    // simulate failure
+    when(solrClient.process(any(String.class), any(SolrRequest.class)))
+        .thenThrow(new SolrServerException("Fail"));
+
+    List<SolrInputDocument> batch = SolrIOTestUtils.createDocuments(1);
+    try {
+      writeFn.flushBatch(solrClient, batch);
+      fail("Error should have been surfaced when flushing batch");
+    } catch (IOException e) {
+      verify(solrClient, times(1)).process(any(String.class), 
any(SolrRequest.class));
+    }
+  }
+
+  /**
+   * Ensure that a time bounded retrying is observed.
+   */
+  @Test
+  public void testWriteRetryTimeBound() throws Exception {
+    SolrIO.Write write = mock(SolrIO.Write.class);
+    when(write.getRetryConfiguration())
+        .thenReturn(
+            SolrIO.RetryConfiguration.create(Integer.MAX_VALUE, 
Duration.standardSeconds(3)));
+    SolrIO.Write.WriteFn writeFn = new SolrIO.Write.WriteFn(write);
+    AuthorizedSolrClient solrClient = mock(AuthorizedSolrClient.class);
+
+    // simulate failure
+    when(solrClient.process(any(String.class), any(SolrRequest.class)))
+        .thenThrow(
+            new HttpSolrClient.RemoteSolrException(
+                "localhost", 1, "ignore", new IOException("Network")));
+
+    List<SolrInputDocument> batch = SolrIOTestUtils.createDocuments(1);
+    Stopwatch stopwatch = Stopwatch.createStarted();
+
+    try {
+      writeFn.flushBatch(solrClient, batch);
+      fail("Error should have been surfaced when flushing batch");
+    } catch (IOException e) {
+      // at least two attempts must be made
+      verify(solrClient, Mockito.atLeast(2)).process(any(String.class), 
any(SolrRequest.class));
+      long seconds = stopwatch.elapsed(TimeUnit.SECONDS);
+      assertTrue(
+          "Retrying should have executed for at least 3 seconds but was " + 
seconds,
+          seconds >= 3);
+    }
+  }
+
+  /**
+   * Ensure that retries are initiated up to a limited number.
+   */
+  @Test
+  public void testWriteRetryAttemptBound() throws Exception {
+    SolrIO.Write write = mock(SolrIO.Write.class);
 
 Review comment:
   And about the second alternative of stopping the server, I am afraid that 
this introduces some flakiness, so better to have not to do it, It is better to 
have more reproducible (consistent) tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 85677)
    Time Spent: 5h 10m  (was: 5h)

> SolrIO: Improve retrying mechanism in client writes
> ---------------------------------------------------
>
>                 Key: BEAM-3848
>                 URL: https://issues.apache.org/jira/browse/BEAM-3848
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-solr
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Tim Robertson
>            Assignee: Tim Robertson
>            Priority: Minor
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> A busy SOLR server is prone to return RemoteSOLRException on writing which 
> currently failsĀ a complete task (e.g. a partition of a spark RDD being 
> written to SOLR).
> A good addition would be the ability to provide a retrying mechanism for the 
> batch in flight, rather than failingĀ fast, which will most likely trigger a 
> much larger retry of more writes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to