OOZIE-2781 HCat partition available notification is not sent to coordinator 
actions if coordinator job is using a different hostname (cname, IP address, 
etc. ) for HCat URL


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c52967df
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c52967df
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c52967df

Branch: refs/heads/oya
Commit: c52967dfb6883bb6469567a2f951bab0c7948855
Parents: 9035d91
Author: puru <puru.s...@gmail.com>
Authored: Sun Jan 29 19:09:45 2017 -0800
Committer: puru <puru.s...@gmail.com>
Committed: Sun Jan 29 19:09:45 2017 -0800

----------------------------------------------------------------------
 .../hcat/EhcacheHCatDependencyCache.java        | 42 +++++++++----
 .../hcat/SimpleHCatDependencyCache.java         | 63 ++++++++++++++++++--
 core/src/main/resources/oozie-default.xml       | 11 ++++
 .../TestPartitionDependencyManagerService.java  | 55 +++++++++++++++--
 release-log.txt                                 |  1 +
 5 files changed, 148 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
 
b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
index 3bc4675..6f0abf6 100644
--- 
a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
+++ 
b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.dependency.hcat;
 
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -40,6 +41,7 @@ import net.sf.ehcache.config.CacheConfiguration;
 import net.sf.ehcache.event.CacheEventListener;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HCatAccessorService;
 import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
@@ -56,6 +58,8 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
 
     private CacheManager cacheManager;
 
+    private boolean useCanonicalHostName = false;
+
     /**
      * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and 
value as WaitingActions (list of
      * WaitingAction) which is Serializable (for overflowToDisk)
@@ -100,18 +104,20 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
         missingDepsByServer = new ConcurrentHashMap<String, Cache>();
         partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, 
SettableInteger>>();
         availableDeps = new ConcurrentHashMap<String, Collection<String>>();
+        useCanonicalHostName = 
ConfigurationService.getBoolean(SimpleHCatDependencyCache.USE_CANONICAL_HOSTNAME);
+
     }
 
     @Override
     public void addMissingDependency(HCatURI hcatURI, String actionID) {
-
+        String serverName = canonicalizeHostname(hcatURI.getServer());
         // Create cache for the server if we don't have one
-        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+        Cache missingCache = missingDepsByServer.get(serverName);
         if (missingCache == null) {
             CacheConfiguration clonedConfig = cacheConfig.clone();
-            clonedConfig.setName(hcatURI.getServer());
+            clonedConfig.setName(serverName);
             missingCache = new Cache(clonedConfig);
-            Cache exists = 
missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
+            Cache exists = missingDepsByServer.putIfAbsent(serverName, 
missingCache);
             if (exists == null) {
                 cacheManager.addCache(missingCache);
                 
missingCache.getCacheEventNotificationService().registerListener(this);
@@ -148,7 +154,7 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
 
         // Increment count for the partition key pattern
         if (newlyAdded) {
-            String tableKey = hcatURI.getServer() + TABLE_DELIMITER + 
hcatURI.getDb() + TABLE_DELIMITER
+            String tableKey = canonicalizeHostname(hcatURI.getServer()) + 
TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
                     + hcatURI.getTable();
             synchronized (partKeyPatterns) {
                 ConcurrentMap<String, SettableInteger> patternCounts = 
partKeyPatterns.get(tableKey);
@@ -170,7 +176,7 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
     @Override
     public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
 
-        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+        Cache missingCache = 
missingDepsByServer.get(canonicalizeHostname(hcatURI.getServer()));
         if (missingCache == null) {
             LOG.warn("Remove missing dependency - Missing cache entry for 
server - uri={0}, actionID={1}",
                     hcatURI.toURIString(), actionID);
@@ -202,7 +208,7 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
         }
         // Decrement partition key pattern count if the cache entry is removed
         if (decrement) {
-            String tableKey = hcatURI.getServer() + TABLE_DELIMITER + 
hcatURI.getDb() + TABLE_DELIMITER
+            String tableKey = canonicalizeHostname(hcatURI.getServer()) + 
TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
                     + hcatURI.getTable();
             decrementPartKeyPatternCount(tableKey, partKeys, 
hcatURI.toURIString());
         }
@@ -212,7 +218,7 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
     @Override
     public Collection<String> getWaitingActions(HCatURI hcatURI) {
         Collection<String> actionIDs = null;
-        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+        Cache missingCache = 
missingDepsByServer.get(canonicalizeHostname(hcatURI.getServer()));
         if (missingCache != null) {
             SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
             String missingKey = hcatURI.getDb() + TABLE_DELIMITER + 
hcatURI.getTable() + TABLE_DELIMITER
@@ -221,7 +227,15 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
             if (element != null) {
                 WaitingActions waitingActions = (WaitingActions) 
element.getObjectValue();
                 actionIDs = new ArrayList<String>();
-                String uriString = hcatURI.getURI().toString();
+                URI uri = hcatURI.getURI();
+                String uriString = null;
+                try {
+                    uriString = new URI(uri.getScheme(), 
canonicalizeHostname(uri.getAuthority()), uri.getPath(),
+                            uri.getQuery(), uri.getFragment()).toString();
+                }
+                catch (URISyntaxException e) {
+                    uriString = hcatURI.toURIString();
+                }
                 for (WaitingAction action : 
waitingActions.getWaitingActions()) {
                     if (action.getDependencyURI().equals(uriString)) {
                         actionIDs.add(action.getActionID());
@@ -235,7 +249,7 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
     @Override
     public Collection<String> markDependencyAvailable(String server, String 
db, String table,
             Map<String, String> partitions) {
-        String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + 
table;
+        String tableKey = canonicalizeHostname(server) + TABLE_DELIMITER + db 
+ TABLE_DELIMITER + table;
         synchronized (partKeyPatterns) {
             Map<String, SettableInteger> patternCounts = 
partKeyPatterns.get(tableKey);
             if (patternCounts == null) {
@@ -512,8 +526,8 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
                         // Decrement partition key pattern count if the cache 
entry is removed
                         SortedPKV sortedPKV = new 
SortedPKV(hcatURI.getPartitionMap());
                         String partKeys = sortedPKV.getPartKeys();
-                        String tableKey = hcatURI.getServer() + 
TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
-                                + hcatURI.getTable();
+                        String tableKey = 
canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb()
+                                + TABLE_DELIMITER + hcatURI.getTable();
                         String hcatURIStr = hcatURI.toURIString();
                         decrementPartKeyPatternCount(tableKey, partKeys, 
hcatURIStr);
                     }
@@ -527,4 +541,8 @@ public class EhcacheHCatDependencyCache implements 
HCatDependencyCache, CacheEve
         // to be implemented when reverse-lookup data structure for purging is 
added
     }
 
+    public String canonicalizeHostname(String name) {
+        return SimpleHCatDependencyCache.canonicalizeHostname(name, 
useCanonicalHostName);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
 
b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
index 9e24c9a..1b2bd24 100644
--- 
a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
+++ 
b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
@@ -18,6 +18,9 @@
 
 package org.apache.oozie.dependency.hcat;
 
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,8 +34,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HCatAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.HCatURI;
@@ -43,6 +46,11 @@ public class SimpleHCatDependencyCache implements 
HCatDependencyCache {
     private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class);
     private static String DELIMITER = ";";
 
+    public static final String USE_CANONICAL_HOSTNAME 
="oozie.service.HCatAccessorService.jms.use.canonical.hostname";
+    private boolean useCanonicalHostName = false;
+
+
+
     /**
      * Map of server;db;table - sorter partition key order (country;dt;state) 
- sorted partition
      * value (us;20120101;CA) - Collection of waiting actions (actionID and 
original hcat uri as
@@ -65,11 +73,13 @@ public class SimpleHCatDependencyCache implements 
HCatDependencyCache {
         missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, 
Map<String, Collection<WaitingAction>>>>();
         availableDeps = new ConcurrentHashMap<String, Collection<String>>();
         actionPartitionMap = new ConcurrentHashMap<String, 
ConcurrentMap<String, Collection<String>>>();
+        useCanonicalHostName = 
ConfigurationService.getBoolean(USE_CANONICAL_HOSTNAME);
     }
 
     @Override
     public void addMissingDependency(HCatURI hcatURI, String actionID) {
-        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + 
DELIMITER + hcatURI.getTable();
+        String tableKey = canonicalizeHostname(hcatURI.getServer()) + 
DELIMITER + hcatURI.getDb() + DELIMITER
+                + hcatURI.getTable();
         SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
         // Partition keys seperated by ;. For eg: date;country;state
         String partKey = sortedPKV.getPartKeys();
@@ -119,7 +129,8 @@ public class SimpleHCatDependencyCache implements 
HCatDependencyCache {
 
     @Override
     public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
-        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + 
DELIMITER + hcatURI.getTable();
+        String tableKey = canonicalizeHostname(hcatURI.getServer()) + 
DELIMITER + hcatURI.getDb() + DELIMITER
+                + hcatURI.getTable();
         SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
         String partKey = sortedPKV.getPartKeys();
         String partVal = sortedPKV.getPartVals();
@@ -181,7 +192,8 @@ public class SimpleHCatDependencyCache implements 
HCatDependencyCache {
 
     @Override
     public Collection<String> getWaitingActions(HCatURI hcatURI) {
-        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + 
DELIMITER + hcatURI.getTable();
+        String tableKey = canonicalizeHostname(hcatURI.getServer()) + 
DELIMITER + hcatURI.getDb() + DELIMITER
+                + hcatURI.getTable();
         SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
         String partKey = sortedPKV.getPartKeys();
         String partVal = sortedPKV.getPartVals();
@@ -198,7 +210,16 @@ public class SimpleHCatDependencyCache implements 
HCatDependencyCache {
             return null;
         }
         Collection<String> actionIDs = new ArrayList<String>();
-        String uriString = hcatURI.toURIString();
+        URI uri = hcatURI.getURI();
+        String uriString = null;
+        try {
+            uriString = new URI(uri.getScheme(), 
canonicalizeHostname(uri.getAuthority()), uri.getPath(),
+                    uri.getQuery(), uri.getFragment()).toString();
+        }
+        catch (URISyntaxException e) {
+            uriString = hcatURI.toURIString();
+        }
+
         for (WaitingAction action : waitingActions) {
             if (action.getDependencyURI().equals(uriString)) {
                 actionIDs.add(action.getActionID());
@@ -210,7 +231,7 @@ public class SimpleHCatDependencyCache implements 
HCatDependencyCache {
     @Override
     public Collection<String> markDependencyAvailable(String server, String 
db, String table,
             Map<String, String> partitions) {
-        String tableKey = server + DELIMITER + db + DELIMITER + table;
+        String tableKey = canonicalizeHostname(server) + DELIMITER + db + 
DELIMITER + table;
         Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = 
missingDeps.get(tableKey);
         if (partKeyPatterns == null) {
             LOG.warn("Got partition available notification for " + tableKey
@@ -411,4 +432,34 @@ public class SimpleHCatDependencyCache implements 
HCatDependencyCache {
     public void removeCoordActionWithDependenciesAvailable(String coordAction) 
{
         actionPartitionMap.remove(coordAction);
     }
+
+    public String canonicalizeHostname(String name) {
+        return canonicalizeHostname(name, useCanonicalHostName);
+    }
+
+    public static String canonicalizeHostname(String name, boolean 
useCanonicalHostName) {
+        if (useCanonicalHostName) {
+            String hostname = name;
+            String port = null;
+            if (name.contains(":")) {
+                hostname = name.split(":")[0];
+                port = name.split(":")[1];
+            }
+            try {
+                InetAddress address = InetAddress.getByName(hostname);
+                String canonicalHostName = address.getCanonicalHostName();
+                if (null != port) {
+                    return canonicalHostName + ":" + port;
+                }
+                return canonicalHostName;
+            }
+            catch (IOException ex) {
+                LOG.error(ex);
+                return name;
+            }
+        }
+        else {
+            return name;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index ff9da58..95e0c36 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -211,6 +211,17 @@
         </description>
    </property>
 
+ <!-- HCatAccessorService -->
+   <property>
+        
<name>oozie.service.HCatAccessorService.jms.use.canonical.hostname</name>
+        <value>false</value>
+        <description>The JMS messages published from a HCat server usually 
contains the canonical hostname of the HCat server
+        in standalone mode or the canonical name of the VIP in a case of 
multiple nodes in a HA setup. This setting is used
+        to translate the HCat server hostname or its aliases specified by the 
user in the HCat URIs of the coordinator dependencies
+        to its canonical name so that they can be exactly matched with the JMS 
dependency availability notifications.
+        </description>
+   </property>
+
     <!-- TopicService -->
 
    <property>

http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
 
b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
index a5d2ed9..6996779 100644
--- 
a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
+++ 
b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
@@ -20,16 +20,13 @@ package org.apache.oozie.service;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
+import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.client.CoordinatorAction.Status;
-import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.dependency.hcat.EhcacheHCatDependencyCache;
 import org.apache.oozie.dependency.hcat.HCatMessageHandler;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
 import org.apache.oozie.jms.JMSConnectionInfo;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
@@ -137,6 +134,39 @@ public class TestPartitionDependencyManagerService extends 
XDataTestCase {
         assertFalse(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." 
+ dep3.getTable()));
     }
 
+    @Test
+    public void testHCatCanonicalHostName() throws Exception {
+        
ConfigurationService.setBoolean(SimpleHCatDependencyCache.USE_CANONICAL_HOSTNAME,
 true);
+        
ConfigurationService.set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL,
+                SimpleHCatDependencyCacheExtended.class.getName());
+        services.init();
+
+        // Test all APIs related to dependency caching
+        String actionId1 = "1";
+
+        String server1 = "hcat-server1-A:5080";
+        String server2 = "hcat-server1-B:5080";
+        String db = "mydb";
+        String table1 = "mytbl1";
+        HCatURI dep1 = new HCatURI(new URI("hcat://" + server1 + "/" + db + 
"/" + table1 + "/dt=20120101;country=us"));
+        HCatURI dep2 = new HCatURI(new URI("hcat://" + server2 + "/" + db + 
"/" + table1 + "/dt=20120101;country=us"));
+
+        PartitionDependencyManagerService pdms = 
Services.get().get(PartitionDependencyManagerService.class);
+        addMissingDependencyAndRegister(dep1, actionId1, pdms);
+        assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
+        assertTrue(pdms.getWaitingActions(dep2).contains(actionId1));
+
+        
ConfigurationService.set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL,
+                EhcacheHCatDependencyCacheExtended.class.getName());
+        services.init();
+
+        pdms = Services.get().get(PartitionDependencyManagerService.class);
+        addMissingDependencyAndRegister(dep1, actionId1, pdms);
+        assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
+        assertTrue(pdms.getWaitingActions(dep2).contains(actionId1));
+
+    }
+
     protected void addMissingDependencyAndRegister(HCatURI hcatURI, String 
actionId, PartitionDependencyManagerService pdms) {
         pdms.addMissingDependency(hcatURI, actionId);
         HCatAccessorService hcatService = 
Services.get().get(HCatAccessorService.class);
@@ -198,3 +228,16 @@ public class TestPartitionDependencyManagerService extends 
XDataTestCase {
         }
     }
 }
+
+class SimpleHCatDependencyCacheExtended extends SimpleHCatDependencyCache {
+    public String canonicalizeHostname(String name) {
+        return name.replace("-B", "-A");
+    }
+
+}
+
+class EhcacheHCatDependencyCacheExtended extends EhcacheHCatDependencyCache {
+    public String canonicalizeHostname(String name) {
+        return name.replace("-B", "-A");
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/c52967df/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 081ead8..2b806fc 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2781 HCat partition available notification is not sent to coordinator 
actions if coordinator job is using a different hostname (cname, IP address, 
etc. ) for HCat URL (puru)
 OOZIE-2770 Show missing dependencies for coord actions (puru)
 OOZIE-2630 Oozie Coordinator EL Functions to get first day of the week/month 
(satishsaley)
 OOZIE-2771 Allow retrieving keystore and truststore passwords from Hadoop 
Credential Provider (asasvari via abhishekbafna)

Reply via email to