OOZIE-2781 HCat partition available notification is not sent to coordinator actions if coordinator job is using a different hostname (cname, IP address, etc. ) for HCat URL
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c52967df Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c52967df Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c52967df Branch: refs/heads/oya Commit: c52967dfb6883bb6469567a2f951bab0c7948855 Parents: 9035d91 Author: puru <puru.s...@gmail.com> Authored: Sun Jan 29 19:09:45 2017 -0800 Committer: puru <puru.s...@gmail.com> Committed: Sun Jan 29 19:09:45 2017 -0800 ---------------------------------------------------------------------- .../hcat/EhcacheHCatDependencyCache.java | 42 +++++++++---- .../hcat/SimpleHCatDependencyCache.java | 63 ++++++++++++++++++-- core/src/main/resources/oozie-default.xml | 11 ++++ .../TestPartitionDependencyManagerService.java | 55 +++++++++++++++-- release-log.txt | 1 + 5 files changed, 148 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java index 3bc4675..6f0abf6 100644 --- a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java +++ b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java @@ -18,6 +18,7 @@ package org.apache.oozie.dependency.hcat; +import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; @@ -40,6 +41,7 @@ import net.sf.ehcache.config.CacheConfiguration; import net.sf.ehcache.event.CacheEventListener; import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HCatAccessorService; import org.apache.oozie.service.PartitionDependencyManagerService; import org.apache.oozie.service.Services; @@ -56,6 +58,8 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve private CacheManager cacheManager; + private boolean useCanonicalHostName = false; + /** * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of * WaitingAction) which is Serializable (for overflowToDisk) @@ -100,18 +104,20 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve missingDepsByServer = new ConcurrentHashMap<String, Cache>(); partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>(); availableDeps = new ConcurrentHashMap<String, Collection<String>>(); + useCanonicalHostName = ConfigurationService.getBoolean(SimpleHCatDependencyCache.USE_CANONICAL_HOSTNAME); + } @Override public void addMissingDependency(HCatURI hcatURI, String actionID) { - + String serverName = canonicalizeHostname(hcatURI.getServer()); // Create cache for the server if we don't have one - Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); + Cache missingCache = missingDepsByServer.get(serverName); if (missingCache == null) { CacheConfiguration clonedConfig = cacheConfig.clone(); - clonedConfig.setName(hcatURI.getServer()); + clonedConfig.setName(serverName); missingCache = new Cache(clonedConfig); - Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache); + Cache exists = missingDepsByServer.putIfAbsent(serverName, missingCache); if (exists == null) { cacheManager.addCache(missingCache); missingCache.getCacheEventNotificationService().registerListener(this); @@ -148,7 +154,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve // Increment count for the partition key pattern if (newlyAdded) { - String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER + String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable(); synchronized (partKeyPatterns) { ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); @@ -170,7 +176,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve @Override public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { - Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); + Cache missingCache = missingDepsByServer.get(canonicalizeHostname(hcatURI.getServer())); if (missingCache == null) { LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}", hcatURI.toURIString(), actionID); @@ -202,7 +208,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve } // Decrement partition key pattern count if the cache entry is removed if (decrement) { - String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER + String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable(); decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString()); } @@ -212,7 +218,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve @Override public Collection<String> getWaitingActions(HCatURI hcatURI) { Collection<String> actionIDs = null; - Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); + Cache missingCache = missingDepsByServer.get(canonicalizeHostname(hcatURI.getServer())); if (missingCache != null) { SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER @@ -221,7 +227,15 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve if (element != null) { WaitingActions waitingActions = (WaitingActions) element.getObjectValue(); actionIDs = new ArrayList<String>(); - String uriString = hcatURI.getURI().toString(); + URI uri = hcatURI.getURI(); + String uriString = null; + try { + uriString = new URI(uri.getScheme(), canonicalizeHostname(uri.getAuthority()), uri.getPath(), + uri.getQuery(), uri.getFragment()).toString(); + } + catch (URISyntaxException e) { + uriString = hcatURI.toURIString(); + } for (WaitingAction action : waitingActions.getWaitingActions()) { if (action.getDependencyURI().equals(uriString)) { actionIDs.add(action.getActionID()); @@ -235,7 +249,7 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve @Override public Collection<String> markDependencyAvailable(String server, String db, String table, Map<String, String> partitions) { - String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table; + String tableKey = canonicalizeHostname(server) + TABLE_DELIMITER + db + TABLE_DELIMITER + table; synchronized (partKeyPatterns) { Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); if (patternCounts == null) { @@ -512,8 +526,8 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve // Decrement partition key pattern count if the cache entry is removed SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); String partKeys = sortedPKV.getPartKeys(); - String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER - + hcatURI.getTable(); + String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb() + + TABLE_DELIMITER + hcatURI.getTable(); String hcatURIStr = hcatURI.toURIString(); decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr); } @@ -527,4 +541,8 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve // to be implemented when reverse-lookup data structure for purging is added } + public String canonicalizeHostname(String name) { + return SimpleHCatDependencyCache.canonicalizeHostname(name, useCanonicalHostName); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java index 9e24c9a..1b2bd24 100644 --- a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java +++ b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java @@ -18,6 +18,9 @@ package org.apache.oozie.dependency.hcat; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; @@ -31,8 +34,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; - import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HCatAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.util.HCatURI; @@ -43,6 +46,11 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class); private static String DELIMITER = ";"; + public static final String USE_CANONICAL_HOSTNAME ="oozie.service.HCatAccessorService.jms.use.canonical.hostname"; + private boolean useCanonicalHostName = false; + + + /** * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as @@ -65,11 +73,13 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>(); availableDeps = new ConcurrentHashMap<String, Collection<String>>(); actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>(); + useCanonicalHostName = ConfigurationService.getBoolean(USE_CANONICAL_HOSTNAME); } @Override public void addMissingDependency(HCatURI hcatURI, String actionID) { - String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); + String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER + + hcatURI.getTable(); SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); // Partition keys seperated by ;. For eg: date;country;state String partKey = sortedPKV.getPartKeys(); @@ -119,7 +129,8 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { @Override public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { - String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); + String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER + + hcatURI.getTable(); SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); String partKey = sortedPKV.getPartKeys(); String partVal = sortedPKV.getPartVals(); @@ -181,7 +192,8 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { @Override public Collection<String> getWaitingActions(HCatURI hcatURI) { - String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); + String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER + + hcatURI.getTable(); SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); String partKey = sortedPKV.getPartKeys(); String partVal = sortedPKV.getPartVals(); @@ -198,7 +210,16 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { return null; } Collection<String> actionIDs = new ArrayList<String>(); - String uriString = hcatURI.toURIString(); + URI uri = hcatURI.getURI(); + String uriString = null; + try { + uriString = new URI(uri.getScheme(), canonicalizeHostname(uri.getAuthority()), uri.getPath(), + uri.getQuery(), uri.getFragment()).toString(); + } + catch (URISyntaxException e) { + uriString = hcatURI.toURIString(); + } + for (WaitingAction action : waitingActions) { if (action.getDependencyURI().equals(uriString)) { actionIDs.add(action.getActionID()); @@ -210,7 +231,7 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { @Override public Collection<String> markDependencyAvailable(String server, String db, String table, Map<String, String> partitions) { - String tableKey = server + DELIMITER + db + DELIMITER + table; + String tableKey = canonicalizeHostname(server) + DELIMITER + db + DELIMITER + table; Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); if (partKeyPatterns == null) { LOG.warn("Got partition available notification for " + tableKey @@ -411,4 +432,34 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { public void removeCoordActionWithDependenciesAvailable(String coordAction) { actionPartitionMap.remove(coordAction); } + + public String canonicalizeHostname(String name) { + return canonicalizeHostname(name, useCanonicalHostName); + } + + public static String canonicalizeHostname(String name, boolean useCanonicalHostName) { + if (useCanonicalHostName) { + String hostname = name; + String port = null; + if (name.contains(":")) { + hostname = name.split(":")[0]; + port = name.split(":")[1]; + } + try { + InetAddress address = InetAddress.getByName(hostname); + String canonicalHostName = address.getCanonicalHostName(); + if (null != port) { + return canonicalHostName + ":" + port; + } + return canonicalHostName; + } + catch (IOException ex) { + LOG.error(ex); + return name; + } + } + else { + return name; + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index ff9da58..95e0c36 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -211,6 +211,17 @@ </description> </property> + <!-- HCatAccessorService --> + <property> + <name>oozie.service.HCatAccessorService.jms.use.canonical.hostname</name> + <value>false</value> + <description>The JMS messages published from a HCat server usually contains the canonical hostname of the HCat server + in standalone mode or the canonical name of the VIP in a case of multiple nodes in a HA setup. This setting is used + to translate the HCat server hostname or its aliases specified by the user in the HCat URIs of the coordinator dependencies + to its canonical name so that they can be exactly matched with the JMS dependency availability notifications. + </description> + </property> + <!-- TopicService --> <property> http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java index a5d2ed9..6996779 100644 --- a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java +++ b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java @@ -20,16 +20,13 @@ package org.apache.oozie.service; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; +import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.client.CoordinatorAction.Status; -import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.dependency.hcat.EhcacheHCatDependencyCache; import org.apache.oozie.dependency.hcat.HCatMessageHandler; -import org.apache.oozie.executor.jpa.BatchQueryExecutor; +import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache; import org.apache.oozie.jms.JMSConnectionInfo; import org.apache.oozie.service.Services; import org.apache.oozie.test.XDataTestCase; @@ -137,6 +134,39 @@ public class TestPartitionDependencyManagerService extends XDataTestCase { assertFalse(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable())); } + @Test + public void testHCatCanonicalHostName() throws Exception { + ConfigurationService.setBoolean(SimpleHCatDependencyCache.USE_CANONICAL_HOSTNAME, true); + ConfigurationService.set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL, + SimpleHCatDependencyCacheExtended.class.getName()); + services.init(); + + // Test all APIs related to dependency caching + String actionId1 = "1"; + + String server1 = "hcat-server1-A:5080"; + String server2 = "hcat-server1-B:5080"; + String db = "mydb"; + String table1 = "mytbl1"; + HCatURI dep1 = new HCatURI(new URI("hcat://" + server1 + "/" + db + "/" + table1 + "/dt=20120101;country=us")); + HCatURI dep2 = new HCatURI(new URI("hcat://" + server2 + "/" + db + "/" + table1 + "/dt=20120101;country=us")); + + PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); + addMissingDependencyAndRegister(dep1, actionId1, pdms); + assertTrue(pdms.getWaitingActions(dep1).contains(actionId1)); + assertTrue(pdms.getWaitingActions(dep2).contains(actionId1)); + + ConfigurationService.set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL, + EhcacheHCatDependencyCacheExtended.class.getName()); + services.init(); + + pdms = Services.get().get(PartitionDependencyManagerService.class); + addMissingDependencyAndRegister(dep1, actionId1, pdms); + assertTrue(pdms.getWaitingActions(dep1).contains(actionId1)); + assertTrue(pdms.getWaitingActions(dep2).contains(actionId1)); + + } + protected void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId, PartitionDependencyManagerService pdms) { pdms.addMissingDependency(hcatURI, actionId); HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); @@ -198,3 +228,16 @@ public class TestPartitionDependencyManagerService extends XDataTestCase { } } } + +class SimpleHCatDependencyCacheExtended extends SimpleHCatDependencyCache { + public String canonicalizeHostname(String name) { + return name.replace("-B", "-A"); + } + +} + +class EhcacheHCatDependencyCacheExtended extends EhcacheHCatDependencyCache { + public String canonicalizeHostname(String name) { + return name.replace("-B", "-A"); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 081ead8..2b806fc 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2781 HCat partition available notification is not sent to coordinator actions if coordinator job is using a different hostname (cname, IP address, etc. ) for HCat URL (puru) OOZIE-2770 Show missing dependencies for coord actions (puru) OOZIE-2630 Oozie Coordinator EL Functions to get first day of the week/month (satishsaley) OOZIE-2771 Allow retrieving keystore and truststore passwords from Hadoop Credential Provider (asasvari via abhishekbafna)