Repository: incubator-eagle
Updated Branches:
  refs/heads/master 9f4e7633d -> 5da9df822


[EAGLE-823] Refactor zkroot and lockpath to solve the problem occured adding 
zookeeper lock

EAGLE-823 Refactor zkroot and lockpath to solve the problem occured adding 
zookeeper lock

- Refactor zkroot and lockpath to uniform structure.
- Add unit test for eagle-security-hive.

https://issues.apache.org/jira/browse/EAGLE-823

Author: chitin <chitin1...@gmail.com>

Closes #713 from chitin/EAGLE-823.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/5da9df82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/5da9df82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/5da9df82

Branch: refs/heads/master
Commit: 5da9df8221a51252b5e91127d20dea3165553b68
Parents: 9f4e763
Author: chitin <chitin1...@gmail.com>
Authored: Tue Dec 6 17:06:18 2016 +0800
Committer: wujinhu <wujinhu...@126.com>
Committed: Tue Dec 6 17:06:18 2016 +0800

----------------------------------------------------------------------
 .../jpm/mr/running/MRRunningJobConfig.java      |  6 +-
 .../jpm/mr/running/parser/MRJobParserTest.java  |  4 +-
 .../spark/running/SparkRunningJobAppConfig.java |  3 +-
 .../java/org/apache/eagle/jpm/util/Utils.java   |  2 +-
 .../org/apache/eagle/jpm/util/UtilsTest.java    |  4 +-
 ...HiveJobRunningSourcedStormSpoutProvider.java |  5 +-
 .../hive/jobrunning/TestHiveJobFetchSpout.java  | 93 ++++++++++++++++++++
 7 files changed, 107 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 119867d..f733b95 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -32,6 +32,8 @@ public class MRRunningJobConfig implements Serializable {
 
     private static final String ZK_ROOT_PREFIX = "/apps/mr/running";
 
+    private static final String JOB_SYMBOL = "/jobs";
+
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -109,8 +111,8 @@ public class MRRunningJobConfig implements Serializable {
         this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("zookeeper.zkSessionTimeoutMs");
         this.zkStateConfig.zkRetryTimes = 
config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkRetryInterval = 
config.getInt("zookeeper.zkRetryInterval");
-        this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + 
config.getString("siteId");
-        this.zkStateConfig.zkLockPath = 
Utils.makeLockPath(this.zkStateConfig.zkRoot);
+        this.zkStateConfig.zkLockPath = Utils.makeLockPath(ZK_ROOT_PREFIX + 
"/" + config.getString("siteId"));
+        this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + 
config.getString("siteId") + JOB_SYMBOL;
 
         // parse eagle service endpoint
         this.eagleServiceConfig.eagleServiceHost = 
config.getString("service.host");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index 3a71384..f2e581c 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -63,8 +63,8 @@ import static 
org.powermock.api.mockito.PowerMockito.mockStatic;
 @PrepareForTest({InputStreamUtils.class, MRJobParser.class, 
URLConnectionUtils.class, Math.class, MRJobEntityCreationHandler.class})
 @PowerMockIgnore({"javax.*", "org.w3c.*", 
"com.sun.org.apache.xerces.*","org.apache.xerces.*"})
 public class MRJobParserTest {
-    private static final String ZK_JOB_PATH = 
"/apps/mr/running/sandbox/application_1479206441898_30784/job_1479206441898_30784";
-    private static final String ZK_APP_PATH = 
"/apps/mr/running/sandbox/application_1479206441898_30784";
+    private static final String ZK_JOB_PATH = 
"/apps/mr/running/sandbox/jobs/application_1479206441898_30784/job_1479206441898_30784";
+    private static final String ZK_APP_PATH = 
"/apps/mr/running/sandbox/jobs/application_1479206441898_30784";
     private static final String JOB_CONF_URL = 
"http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/conf?anonymous=true";;
     private static final String JOB_COUNT_URL = 
"http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/counters?anonymous=true";;
     private static final String JOB_ID = "job_1479206441898_30784";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
 
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index c5ec6ce..b48784f 100644
--- 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ 
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -34,6 +34,7 @@ public class SparkRunningJobAppConfig implements Serializable 
{
     static final String JOB_PARSE_BOLT_NAME = "sparkRunningJobParseBolt";
 
     static final String DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT = 
"/apps/spark/running";
+    static final String JOB_SYMBOL = "/jobs";
 
     ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
@@ -120,8 +121,8 @@ public class SparkRunningJobAppConfig implements 
Serializable {
         this.zkStateConfig.zkRetryInterval = 
config.getInt("zookeeper.zkRetryInterval");
         this.zkStateConfig.zkRetryTimes = 
config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("zookeeper.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT;
         this.zkStateConfig.zkLockPath = 
Utils.makeLockPath(DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + "/" + 
config.getString("siteId"));
+        this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + 
"/" + config.getString("siteId") + JOB_SYMBOL;
         if (config.hasPath("zookeeper.zkRoot")) {
             this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 7fb6643..9025d36 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -116,6 +116,6 @@ public class Utils {
 
     public static String makeLockPath(String zkrootWithSiteId) {
         Preconditions.checkArgument(StringUtils.isNotBlank(zkrootWithSiteId), 
"zkrootWithSiteId must not be blank");
-        return "/locks" + zkrootWithSiteId.toLowerCase();
+        return zkrootWithSiteId.toLowerCase() + "/locks";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
 
b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
index 75d46e9..19c0f00 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
@@ -67,8 +67,8 @@ public class UtilsTest {
 
     @Test
     public void testMakeLockPath() {
-        String lockpath = Utils.makeLockPath("/apps/mr/running/sitdId");
-        Assert.assertEquals("/locks/apps/mr/running/sitdid", lockpath);
+        String lockpath = Utils.makeLockPath("/apps/mr/jobs/sitdId");
+        Assert.assertEquals("/apps/mr/jobs/sitdid/locks", lockpath);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
 
b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index c8b1f61..597593b 100644
--- 
a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ 
b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 public class HiveJobRunningSourcedStormSpoutProvider {
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class);
+    private static final String JOB_SYMBOL = "/jobs";
 
     public BaseRichSpout getSpout(Config config, int parallelism) {
         RunningJobEndpointConfig endPointConfig = new 
RunningJobEndpointConfig();
@@ -59,11 +60,11 @@ public class HiveJobRunningSourcedStormSpoutProvider {
         //controlConfig.numTotalPartitions = parallelism == null ? 1 : 
parallelism;
         ZKStateConfig zkStateConfig = new ZKStateConfig();
         zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
-        zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+        zkStateConfig.zkLockPath = 
Utils.makeLockPath(config.getString("dataSourceConfig.zkRoot") + "/" + 
config.getString("siteId"));
+        zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot") + 
"/" + config.getString("siteId") + JOB_SYMBOL;
         zkStateConfig.zkSessionTimeoutMs = 
config.getInt("dataSourceConfig.zkSessionTimeoutMs");
         zkStateConfig.zkRetryTimes = 
config.getInt("dataSourceConfig.zkRetryTimes");
         zkStateConfig.zkRetryInterval = 
config.getInt("dataSourceConfig.zkRetryInterval");
-        zkStateConfig.zkLockPath = Utils.makeLockPath(zkStateConfig.zkRoot + 
"/" + config.getString("siteId"));
         RunningJobCrawlConfig crawlConfig = new 
RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
 
b/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
new file mode 100644
index 0000000..d674e02
--- /dev/null
+++ 
b/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hive.jobrunning;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.junit.*;
+import static org.mockito.Mockito.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @Since 12/5/16.
+ */
+public class TestHiveJobFetchSpout {
+
+    private static TestingServer zk;
+    private static com.typesafe.config.Config config;
+    private static CuratorFramework curator;
+    private static final String SHARE_RESOURCES = 
"/apps/hive/running/sanbox/jobs/0/lastFinishTime";
+
+    @BeforeClass
+    public static void setupZookeeper() throws Exception {
+        zk = new TestingServer();
+        curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new 
ExponentialBackoffRetry(1000, 3));
+        curator.start();
+        
curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(SHARE_RESOURCES);
+        config = ConfigFactory.parseMap(new HashMap<String, Object>() {{
+            put("dataSourceConfig.RMEndPoints", 
"http://server.eagle.apache.org:8088";);
+            put("dataSourceConfig.HSEndPoint", 
"http://server.eagle.apache.org:19888";);
+            put("dataSourceConfig.zkQuorum", zk.getConnectString());
+            put("dataSourceConfig.zkRoot", "/apps/hive/running");
+            put("dataSourceConfig.zkSessionTimeoutMs", 15000);
+            put("dataSourceConfig.zkRetryTimes", 3);
+            put("dataSourceConfig.zkRetryInterval", 2000);
+            put("dataSourceConfig.partitionerCls", 
"org.apache.eagle.job.DefaultJobPartitionerImpl");
+            put("siteId", "sanbox");
+        }});
+    }
+
+    @AfterClass
+    public static void teardownZookeeper() throws Exception {
+        if(curator.checkExists().forPath(SHARE_RESOURCES) != null) {
+            
curator.delete().deletingChildrenIfNeeded().forPath(SHARE_RESOURCES);
+        }
+        CloseableUtils.closeQuietly(curator);
+        CloseableUtils.closeQuietly(zk);
+    }
+
+    @Before
+    public void setDefaultValues() throws Exception {
+        curator.setData().forPath(SHARE_RESOURCES, 
String.valueOf(0).getBytes());
+    }
+
+    @Test
+    public void testOpen() throws Exception {
+        Map conf = mock(HashMap.class);
+        TopologyContext context = mock(TopologyContext.class);
+        SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+        when(context.getThisTaskId()).thenReturn(1);
+        when(context.getComponentTasks(anyString())).thenReturn(new 
ArrayList<Integer>() {{
+            add(1);
+        }});
+        HiveJobRunningSourcedStormSpoutProvider provider = new 
HiveJobRunningSourcedStormSpoutProvider();
+        HiveJobFetchSpout spout = (HiveJobFetchSpout)provider.getSpout(config, 
1);
+        spout.open(conf, context, collector);
+        Long yesterday = Long.valueOf(new 
String(curator.getData().forPath(SHARE_RESOURCES)));
+        Assert.assertTrue(System.currentTimeMillis() > yesterday);
+    }
+}

Reply via email to