This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 84ee70e HIVE-23659: Add Retry for Ranger Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha) 84ee70e is described below commit 84ee70e8ae6e9cd41e36feab16cbcc5afad50a0b Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Wed Jun 10 19:49:44 2020 +0530 HIVE-23659: Add Retry for Ranger Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha) --- .../ql/exec/repl/ranger/RangerRestClientImpl.java | 171 ++++++++++++--------- .../apache/hadoop/hive/metastore/utils/Retry.java | 21 ++- .../hadoop/hive/metastore/utils/RetryTest.java | 69 ++++++++- 3 files changed, 187 insertions(+), 74 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java index 1b17632..13d3836 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java @@ -72,7 +72,6 @@ public class RangerRestClientImpl implements RangerRestClient { String dbName, String rangerHiveServiceName)throws SemanticException { LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint); - ClientResponse clientResp; String uri; if (StringUtils.isEmpty(rangerHiveServiceName)) { throw new SemanticException("Ranger Service Name cannot be empty"); @@ -85,33 +84,43 @@ public class RangerRestClientImpl implements RangerRestClient { } String url = sourceRangerEndpoint + (uri.startsWith("/") ? uri : ("/" + uri)); LOG.debug("Url to export policies from source Ranger: {}", url); - RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList(); - WebResource.Builder builder = getRangerResourceBuilder(url); - clientResp = builder.get(ClientResponse.class); - - String response = null; - if (clientResp != null) { - if (clientResp.getStatus() == HttpServletResponse.SC_OK) { - Gson gson = new GsonBuilder().create(); - response = clientResp.getEntity(String.class); - LOG.debug("Response received for ranger export {} ", response); - if (StringUtils.isNotEmpty(response)) { - rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class); - return rangerExportPolicyList; + + Retry<RangerExportPolicyList> retriable = new Retry<RangerExportPolicyList>(Exception.class) { + @Override + public RangerExportPolicyList execute() throws Exception { + WebResource.Builder builder = getRangerResourceBuilder(url); + RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList(); + ClientResponse clientResp = builder.get(ClientResponse.class); + String response = null; + if (clientResp != null) { + if (clientResp.getStatus() == HttpServletResponse.SC_OK) { + Gson gson = new GsonBuilder().create(); + response = clientResp.getEntity(String.class); + LOG.debug("Response received for ranger export {} ", response); + if (StringUtils.isNotEmpty(response)) { + rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class); + return rangerExportPolicyList; + } + } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { + LOG.debug("Ranger policy export request returned empty list"); + return rangerExportPolicyList; + } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { + throw new SemanticException("Authentication Failure while communicating to Ranger admin"); + } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) { + throw new SemanticException("Authorization Failure while communicating to Ranger admin"); + } } - } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { - LOG.debug("Ranger policy export request returned empty list"); - return rangerExportPolicyList; - } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { - throw new SemanticException("Authentication Failure while communicating to Ranger admin"); - } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) { - throw new SemanticException("Authorization Failure while communicating to Ranger admin"); + if (StringUtils.isEmpty(response)) { + LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs."); + } + return null; } + }; + try { + return retriable.runWithDelay(); + } catch (Exception e) { + throw new SemanticException(e); } - if (StringUtils.isEmpty(response)) { - LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs."); - } - return rangerExportPolicyList; } public List<RangerPolicy> removeMultiResourcePolicies(List<RangerPolicy> rangerPolicies) { @@ -170,50 +179,60 @@ public class RangerRestClientImpl implements RangerRestClient { + (uri.startsWith("/") ? uri : ("/" + uri)); LOG.debug("URL to import policies on target Ranger: {}", url); - ClientResponse clientResp = null; - - StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file", - new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)), - rangerPoliciesJsonFileName); - StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson", - new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName); - - FormDataMultiPart formDataMultiPart = new FormDataMultiPart(); - MultiPart multipartEntity = null; - try { - multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap); - WebResource.Builder builder = getRangerResourceBuilder(url); - clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA) - .post(ClientResponse.class, multipartEntity); - if (clientResp != null) { - if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { - LOG.debug("Ranger policy import finished successfully"); - - } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { - throw new Exception("Authentication Failure while communicating to Ranger admin"); - } else { - throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs."); - } - } - } finally { - try { - if (filePartPolicies != null) { - filePartPolicies.cleanup(); - } - if (filePartServiceMap != null) { - filePartServiceMap.cleanup(); - } - if (formDataMultiPart != null) { - formDataMultiPart.close(); - } - if (multipartEntity != null) { - multipartEntity.close(); + Retry<RangerExportPolicyList> retriable = new Retry<RangerExportPolicyList>(Exception.class) { + @Override + public RangerExportPolicyList execute() throws Exception { + ClientResponse clientResp = null; + + StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file", + new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)), + rangerPoliciesJsonFileName); + StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson", + new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName); + + FormDataMultiPart formDataMultiPart = new FormDataMultiPart(); + MultiPart multipartEntity = null; + try { + multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap); + WebResource.Builder builder = getRangerResourceBuilder(url); + clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA) + .post(ClientResponse.class, multipartEntity); + if (clientResp != null) { + if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { + LOG.debug("Ranger policy import finished successfully"); + + } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { + throw new Exception("Authentication Failure while communicating to Ranger admin"); + } else { + throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs."); + } + } + } finally { + try { + if (filePartPolicies != null) { + filePartPolicies.cleanup(); + } + if (filePartServiceMap != null) { + filePartServiceMap.cleanup(); + } + if (formDataMultiPart != null) { + formDataMultiPart.close(); + } + if (multipartEntity != null) { + multipartEntity.close(); + } + } catch (IOException e) { + LOG.error("Exception occurred while closing resources: {}", e); + } } - } catch (IOException e) { - LOG.error("Exception occurred while closing resources: {}", e); + return rangerExportPolicyList; } + }; + try { + return retriable.runWithDelay(); + } catch (Exception e) { + throw new SemanticException(e); } - return rangerExportPolicyList; } private synchronized Client getRangerClient() { @@ -342,11 +361,21 @@ public class RangerRestClientImpl implements RangerRestClient { } @Override - public boolean checkConnection(String url) { - WebResource.Builder builder; - builder = getRangerResourceBuilder(url); - ClientResponse clientResp = builder.get(ClientResponse.class); - return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED); + public boolean checkConnection(String url) throws SemanticException { + Retry<Boolean> retriable = new Retry<Boolean>(Exception.class) { + @Override + public Boolean execute() throws Exception { + WebResource.Builder builder; + builder = getRangerResourceBuilder(url); + ClientResponse clientResp = builder.get(ClientResponse.class); + return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED); + } + }; + try { + return retriable.runWithDelay(); + } catch (Exception e) { + throw new SemanticException(e); + } } @Override diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java index bdb269a..032eaf4 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java @@ -23,7 +23,8 @@ package org.apache.hadoop.hive.metastore.utils; */ public abstract class Retry<T> { - public static final int MAX_RETRIES = 3; + public static final int MAX_RETRIES = 4; + public static final int DELAY = 30 * 1000; private int tries = 0; private Class retryExceptionType; @@ -49,4 +50,22 @@ public abstract class Retry<T> { } } } + + public T runWithDelay() throws Exception { + try { + return execute(); + } catch(Exception e) { + if (e.getClass().equals(retryExceptionType)){ + tries++; + if (MAX_RETRIES == tries) { + throw e; + } else { + Thread.sleep(DELAY * tries); + return runWithDelay(); + } + } else { + throw e; + } + } + } } diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java index 67bd658..8cff68d 100644 --- a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java +++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java @@ -28,15 +28,21 @@ public class RetryTest { @Test public void testRetrySuccess() { Retry<Void> retriable = new Retry<Void>(NullPointerException.class) { + private int count = 0; @Override public Void execute() { - throw new NullPointerException(); + if (count < 1) { + count++; + throw new NullPointerException(); + } else { + return null; + } } }; try { retriable.run(); } catch (Exception e) { - Assert.assertEquals(NullPointerException.class, e.getClass()); + Assert.fail(); } } @@ -50,8 +56,67 @@ public class RetryTest { }; try { retriable.run(); + Assert.fail(); } catch (Exception e) { Assert.assertEquals(RuntimeException.class, e.getClass()); } } + + @Test + public void testRetryFailureWithDelay() { + Retry<Void> retriable = new Retry<Void>(NullPointerException.class) { + @Override + public Void execute() { + throw new RuntimeException(); + } + }; + try { + retriable.runWithDelay(); + Assert.fail(); + } catch (Exception e) { + Assert.assertEquals(RuntimeException.class, e.getClass()); + } + } + + @Test + public void testRetrySuccessWithDelay() { + Retry<Void> retriable = new Retry<Void>(NullPointerException.class) { + private long startTime = System.currentTimeMillis(); + @Override + public Void execute() { + executeWithDelay(startTime); + return null; + } + }; + try { + retriable.runWithDelay(); + } catch (Exception e) { + Assert.fail(); + } + } + + private void executeWithDelay(long startTime) { + long currentTime = System.currentTimeMillis(); + if (currentTime - startTime < 40 * 1000) { + throw new NullPointerException(); + } + } + + @Test + public void testRetryFailureWithDelayMoreThanTimeout() { + Retry<Void> retriable = new Retry<Void>(NullPointerException.class) { + @Override + public Void execute() { + throw new NullPointerException(); + } + }; + long startTime = System.currentTimeMillis(); + try { + retriable.runWithDelay(); + Assert.fail(); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + Assert.assertTrue(System.currentTimeMillis() - startTime > 180 * 1000); + } + } }