ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f8dd5dcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f8dd5dcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f8dd5dcb Branch: refs/heads/branch-0.6-incubating Commit: f8dd5dcb358b0328d34f57d5b1c6fcd730fb48e1 Parents: 6dfbda1 Author: Suma Shivaprasad <sumasai.shivapra...@gmail.com> Authored: Mon Dec 14 16:29:47 2015 +0530 Committer: Suma Shivaprasad <sumasai.shivapra...@gmail.com> Committed: Mon Dec 14 16:29:47 2015 +0530 ---------------------------------------------------------------------- client/pom.xml | 6 +++ .../main/java/org/apache/atlas/AtlasClient.java | 22 +++++++++- .../notification/NotificationHookConsumer.java | 42 ++++++++++++++++++++ pom.xml | 2 +- release-log.txt | 1 + .../atlas/web/listeners/GuiceServletConfig.java | 2 +- 6 files changed, 72 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index 279d894..d41b5bf 100755 --- a/client/pom.xml +++ b/client/pom.xml @@ -67,5 +67,11 @@ <groupId>org.testng</groupId> <artifactId>testng</artifactId> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index becc4db..b108b25 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -19,6 +19,7 @@ package org.apache.atlas; import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; @@ -67,6 +68,7 @@ public class AtlasClient { public static final String DATATYPE = "dataType"; public static final String BASE_URI = "api/atlas/"; + public static final String ADMIN_VERSION = "admin/version"; public static final String TYPES = "types"; public static final String URI_ENTITY = "entities"; public static final String URI_SEARCH = "discovery/search"; @@ -126,11 +128,29 @@ public class AtlasClient { service = client.resource(UriBuilder.fromUri(baseUrl).build()); } + // for testing + AtlasClient(WebResource service) { + this.service = service; + } + protected Configuration getClientProperties() throws AtlasException { return ApplicationProperties.get(); } - enum API { + public boolean isServerReady() throws AtlasServiceException { + WebResource resource = getResource(API.VERSION); + try { + callAPIWithResource(API.VERSION, resource); + return true; + } catch (ClientHandlerException che) { + return false; + } + } + + public enum API { + + //Admin operations + VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK), //Type operations CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index ffeb406..1bee26f 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -22,6 +22,7 @@ import com.google.inject.Singleton; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; import org.codehaus.jettison.json.JSONArray; @@ -42,6 +43,7 @@ public class NotificationHookConsumer implements Service { public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address"; + public static final int SERVER_READY_WAIT_TIME_MS = 1000; @Inject private NotificationInterface notificationInterface; @@ -77,15 +79,32 @@ public class NotificationHookConsumer implements Service { } } + static class Timer { + public void sleep(int interval) throws InterruptedException { + Thread.sleep(interval); + } + } + class HookConsumer implements Runnable { private final NotificationConsumer<JSONArray> consumer; + private final AtlasClient client; public HookConsumer(NotificationConsumer<JSONArray> consumer) { + this(atlasClient, consumer); + } + + public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) { + this.client = client; this.consumer = consumer; } @Override public void run() { + + if (!serverAvailable(new NotificationHookConsumer.Timer())) { + return; + } + while(consumer.hasNext()) { JSONArray entityJson = consumer.next(); LOG.info("Processing message {}", entityJson); @@ -98,5 +117,28 @@ public class NotificationHookConsumer implements Service { } } } + + boolean serverAvailable(Timer timer) { + try { + while (!client.isServerReady()) { + try { + LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", + SERVER_READY_WAIT_TIME_MS); + timer.sleep(SERVER_READY_WAIT_TIME_MS); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for Atlas Server to become ready, " + + "exiting consumer thread.", e); + return false; + } + } + } catch (AtlasServiceException e) { + LOG.info( + "Handled AtlasServiceException while waiting for Atlas Server to become ready, " + + "exiting consumer thread.", e); + return false; + } + LOG.info("Atlas Server is ready, can start reading Kafka events."); + return true; + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 929d255..d9c3df1 100755 --- a/pom.xml +++ b/pom.xml @@ -323,7 +323,7 @@ <node.version>v0.10.30</node.version> <slf4j.version>1.7.7</slf4j.version> <jetty.version>9.2.12.v20150709</jetty.version> - <jersey.version>1.10</jersey.version> + <jersey.version>1.19</jersey.version> <jackson.version>1.8.3</jackson.version> <tinkerpop.version>2.6.0</tinkerpop.version> <titan.version>0.5.4</titan.version> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 07a3e9b..ffd69ea 100644 --- a/release-log.txt +++ b/release-log.txt @@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ALL CHANGES: +ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai) ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai) ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai) ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java index f0d80cb..c1f6a9b 100755 --- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java +++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java @@ -124,7 +124,7 @@ public class GuiceServletConfig extends GuiceServletContextListener { } protected void startServices() { - LOG.debug("Starting services"); + LOG.info("Starting services"); Services services = injector.getInstance(Services.class); services.start(); }