Add Zookeeper Lock

Signed-off-by: shaofengshi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/24cae5c7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/24cae5c7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/24cae5c7

Branch: refs/heads/KYLIN-2374
Commit: 24cae5c73889dcca4e18b2de4e12b68415ef2d62
Parents: 0f88801
Author: xiefan46 <[email protected]>
Authored: Fri Jan 20 09:48:17 2017 +0800
Committer: shaofengshi <[email protected]>
Committed: Fri Jan 20 17:43:47 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  13 +-
 .../storage/hdfs/ITHDFSResourceStoreTest.java   |  65 +-----
 .../kylin/storage/hdfs/ITLockManagerTest.java   | 206 +++++++++++++++++++
 .../kylin/storage/hbase/HBaseResourceStore.java |   1 +
 .../org/apache/kylin/storage/hdfs/HDFSLock.java |  41 ----
 .../kylin/storage/hdfs/HDFSLockManager.java     |  45 ----
 .../kylin/storage/hdfs/HDFSResourceStore.java   |  90 ++++++--
 .../apache/kylin/storage/hdfs/LockManager.java  | 117 +++++++++++
 .../apache/kylin/storage/hdfs/ResourceLock.java |  51 +++++
 .../kylin/storage/hdfs/ZookeeperConfig.java     |  27 +++
 10 files changed, 483 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 29ad5eb..0602e9d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -193,6 +193,14 @@ abstract public class KylinConfigBase implements 
Serializable {
         return new 
StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', 
'-')).append("/").toString();
     }
 
+    public String getRawHdfsWorkingDirectory() {
+        String root = getRequired("kylin.env.hdfs-working-dir");
+        if (!root.endsWith("/")) {
+            root += "/";
+        }
+        return root;
+    }
+
     // 
============================================================================
     // METADATA
     // 
============================================================================
@@ -201,11 +209,6 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.metadata.url");
     }
 
-    //for hdfs resource store
-    public String getHDFSMetadataUrl() {
-        return getOptional("kylin.metadata.hdfs.url", 
"kylin_default_instance_hdfs@hdfs");
-    }
-
     // for test only
     public void setMetadataUrl(String metadataUrl) {
         setProperty("kylin.metadata.url", metadataUrl);

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
index ef04957..41bbcc0 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -17,10 +17,8 @@
 */
 
 package org.apache.kylin.storage.hdfs;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.ResourceStoreTest;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -29,11 +27,6 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
 
 /**
  * Created by xiefan on 17-1-10.
@@ -53,65 +46,13 @@ public class ITHDFSResourceStoreTest extends 
HBaseMetadataTestCase {
         this.cleanupTestMetadata();
     }
 
-    @Ignore
-    @Test
-    public void testHDFSUrl() throws Exception {
-        assertEquals("kylin_default_instance_hdfs@hdfs", 
kylinConfig.getHDFSMetadataUrl());
-        System.out.println("hdfs working dir : " + 
kylinConfig.getHdfsWorkingDirectory());
-    }
-
 
-    @Ignore
     @Test
-    public void testMultiThreadWriteHDFS() throws Exception{
-        //System.out.println(kylinConfig.getHdfsWorkingDirectory());
-        final Path testDir = new Path("hdfs:///test123");
-        final FileSystem fs = HadoopUtil.getFileSystem(testDir);
-        final String fileName = "test.json";
-        int threadNum = 3;
-        ExecutorService service = Executors.newFixedThreadPool(threadNum);
-        final CountDownLatch latch = new CountDownLatch(threadNum);
-        Path p = new Path(testDir,fileName);
-        fs.deleteOnExit(p);
-        fs.createNewFile(p);
-        for(int i=0;i<threadNum;i++) {
-            service.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        long id = Thread.currentThread().getId();
-                        Path p = new Path(testDir, fileName);
-                        /*while(fs.exists(p)){
-                            System.out.println("Thread id : " + id + " can not 
get lock,sleep a while");
-                            Thread.currentThread().sleep(1000);
-                        }*/
-                        while(!fs.createNewFile(p)){
-                            System.out.println("Thread id : " + id + " can not 
get lock,sleep a while");
-                            Thread.currentThread().sleep(1000);
-                        }
-                        System.out.println("Thread id : " + id + " get lock, 
sleep a while");
-                        Thread.currentThread().sleep(1000);
-                        fs.delete(p,true);
-                        System.out.println("Thread id : " + id + " release 
lock");
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-        Thread.currentThread().sleep(1000);
-        fs.delete(p,true);
-        System.out.println("main thread release lock.Waiting threads down");
-        System.out.println("file still exist : " + fs.exists(p));
-        latch.await();
-    }
-
-    @Test
-    public void testHDFSStore() throws Exception {
+    public void testResourceStoreBasic() throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         ResourceStore store = new HDFSResourceStore(config);
         ResourceStoreTest.testAStore(store);
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java 
b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
new file mode 100644
index 0000000..56347e4
--- /dev/null
+++ 
b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class ITLockManagerTest extends HBaseMetadataTestCase {
+
+
+    private String zkConnection = "sandbox:2181";
+
+    private KylinConfig kylinConfig;
+
+    private CuratorFramework zkClient;
+
+    private static final String lockRootPath = "/test_lock";
+
+    private LockManager manager;
+
+    private static final int QTY = 5;
+
+    private static final int REPETITIONS = QTY * 10;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnection, 
retryPolicy);
+        zkClient.start();
+        manager = new LockManager(kylinConfig, lockRootPath);
+        System.out.println("nodes in lock root : " + 
zkClient.getChildren().forPath(lockRootPath));
+
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        zkClient.delete().deletingChildrenIfNeeded().forPath(lockRootPath);
+        List<String> nodes = zkClient.getChildren().forPath("/");
+        System.out.println("nodes in zk after delete : " + nodes);
+        manager.close();
+    }
+
+    @Test
+    public void testCreateLock() throws Exception {
+
+        ResourceLock lock = manager.getLock("/dictionary/numberdict.json");
+        lock.acquire();
+        manager.releaseLock(lock);
+        System.out.println(zkClient.getChildren().forPath(lockRootPath + 
"/dictionary"));
+        List<String> nodes = zkClient.getChildren().forPath(lockRootPath + 
"/dictionary");
+        assertEquals(1, nodes.size());
+        assertEquals("numberdict.json", nodes.get(0));
+    }
+
+    @Test
+    public void testLockSafty() throws Exception {
+        // all of the useful sample code is in ExampleClientThatLocks.java
+
+        // FakeLimitedResource simulates some external resource that can only 
be access by one process at a time
+        final FakeLimitedResource resource = new FakeLimitedResource();
+        ExecutorService service = Executors.newFixedThreadPool(QTY);
+        final TestingServer server = new TestingServer(zkConnection);
+        final List<FutureTask<Void>> tasks = new ArrayList<>();
+        try {
+            for (int i = 0; i < QTY; ++i) {
+                final int index = i;
+                FutureTask<Void> task = new FutureTask<Void>(new 
Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        LockManager threadLocalLockManager = new 
LockManager(kylinConfig, lockRootPath);
+                        try {
+                            ExampleClientThatLocks example = new 
ExampleClientThatLocks(threadLocalLockManager, lockRootPath, resource, "Client 
" + index);
+                            for (int j = 0; j < REPETITIONS; ++j) {
+                                example.doWork(10, TimeUnit.SECONDS);
+                            }
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            // log or do something
+                        } finally {
+                            threadLocalLockManager.close();
+                        }
+                        return null;
+                    }
+                });
+                tasks.add(task);
+                service.submit(task);
+            }
+            for (FutureTask<Void> task : tasks) {
+                task.get();
+            }
+        } finally {
+            CloseableUtils.closeQuietly(server);
+        }
+    }
+
+    class FakeLimitedResource {
+        private final AtomicBoolean inUse = new AtomicBoolean(false);
+
+        public void use() throws InterruptedException {
+            // in a real application this would be accessing/manipulating a 
shared resource
+
+            if (!inUse.compareAndSet(false, true)) {
+                throw new IllegalStateException("Needs to be used by one 
client at a time");
+            }
+
+            try {
+                Thread.sleep((long) (3 * Math.random()));
+            } finally {
+                inUse.set(false);
+            }
+        }
+    }
+
+    class TestingServer implements Closeable {
+
+        private String connectionString;
+
+        public TestingServer(String connectionStr) {
+            this.connectionString = connectionStr;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        public String getConnectString() {
+            return connectionString;
+        }
+    }
+
+    class ExampleClientThatLocks {
+
+        private final FakeLimitedResource resource;
+
+        private final String clientName;
+
+        private LockManager lockManager;
+
+        private String lockPath;
+
+        public ExampleClientThatLocks(LockManager lockManager, String 
lockPath, FakeLimitedResource resource, String clientName) {
+            this.resource = resource;
+            this.clientName = clientName;
+            this.lockManager = lockManager;
+            this.lockPath = lockPath;
+        }
+
+        public void doWork(long time, TimeUnit unit) throws Exception {
+            ResourceLock lock = lockManager.getLock(lockPath);
+            if (!lock.acquire(time, unit)) {
+                throw new IllegalStateException(clientName + " could not 
acquire the lock");
+            }
+            try {
+                System.out.println(clientName + " has the lock");
+                resource.use();
+            } finally {
+                System.out.println(clientName + " releasing the lock");
+                lock.release(); // always release the lock in a finally block
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 6217350..5980cb5 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -314,6 +314,7 @@ public class HBaseResourceStore extends ResourceStore {
         } finally {
             IOUtils.closeQuietly(table);
         }
+
     }
 
     private Result internalGetFromHTable(HTableInterface table, String path, 
boolean fetchContent, boolean fetchTimestamp) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
deleted file mode 100644
index 8710edf..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLock {
-
-    private Path rawLock;
-
-    private static final Logger logger = 
LoggerFactory.getLogger(HDFSLock.class);
-
-    protected HDFSLock(String resourceFullPath) {
-        this.rawLock = new Path(resourceFullPath);
-    }
-
-    public boolean init(FileSystem fs) throws IOException, 
InterruptedException {
-        if (!fs.isFile(rawLock)) {
-            logger.info("Not support directory lock yet");
-            return false;
-        }
-        while (!fs.createNewFile(rawLock)) {
-            Thread.currentThread().sleep(1000);
-        }
-        return true;
-    }
-
-    public boolean release(FileSystem fs) throws IOException, 
InterruptedException {
-        while (!fs.delete(rawLock, false)) {
-            Thread.currentThread().sleep(1000);
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
deleted file mode 100644
index 1cd0800..0000000
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
-
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLockManager {
-
-    private static final String LOCK_HOME = "LOCK_HOME";
-
-    private Path lockPath;
-
-    private FileSystem fs;
-
-    public HDFSLockManager(String hdfsWorkingDir) throws IOException{
-        this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
-        this.fs = HadoopUtil.getFileSystem(lockPath);
-        if(!fs.exists(lockPath)){
-            fs.create(lockPath);
-        }
-    }
-
-    public HDFSLock getLock(String resourceFullPath) throws 
IOException,InterruptedException,IllegalStateException{
-        HDFSLock lock = new HDFSLock(resourceFullPath);
-        boolean success = lock.init(fs);
-        if(success){
-            return lock;
-        }else{
-            throw new IllegalStateException("Try get lock fail. Resourse path 
: " + resourceFullPath);
-        }
-    }
-
-    public void releaseLock(HDFSLock lock) throws 
IOException,InterruptedException,IllegalStateException{
-        boolean success = lock.release(fs);
-        if(!success)
-            throw new IllegalStateException("Release lock fail");
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index c7f0f25..8cb9f0d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,44 +39,58 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
-/**
- * Created by xiefan on 17-1-10.
- */
 public class HDFSResourceStore extends ResourceStore {
 
-    private static final String DEFAULT_TABLE_NAME = 
"kylin_default_instance_hdfs";
+    private static final String DEFAULT_FOLDER_NAME = "kylin_default_instance";
 
     private Path hdfsMetaPath;
 
     private FileSystem fs;
 
-    private HDFSLockManager lockManager;
-
     private static final Logger logger = 
LoggerFactory.getLogger(HDFSResourceStore.class);
 
+    private LockManager lockManager;
+
     //public for test. Normal should be protected
-    public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+    public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
         super(kylinConfig);
-        String metadataUrl = kylinConfig.getHDFSMetadataUrl();
-        // split TABLE@HBASE_URL
+        String metadataUrl = kylinConfig.getMetadataUrl();
         int cut = metadataUrl.indexOf('@');
-        String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : 
metadataUrl.substring(0, cut);
+        String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : 
metadataUrl.substring(0, cut);
+        metaDirName += "/hdfs_metadata";
+        logger.info("meta dir name :" + metaDirName);
         createMetaFolder(metaDirName, kylinConfig);
     }
 
-    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) 
throws IOException {
+    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) 
throws Exception {
         String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
         fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+        logger.info("hdfs working dir : " + hdfsWorkingDir);
         Path hdfsWorkingPath = new Path(hdfsWorkingDir);
         if (!fs.exists(hdfsWorkingPath)) {
             throw new IOException("HDFS working dir not exist");
         }
+        //creat lock manager
+        this.lockManager = new 
LockManager(kylinConfig,kylinConfig.getRawHdfsWorkingDirectory() + metaDirName);
+        //create hdfs meta path
         hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
         if (!fs.exists(hdfsMetaPath)) {
-            fs.create(hdfsMetaPath, true);
+            ResourceLock lock = 
lockManager.getLock(lockManager.getLockPath("/"));
+            try {
+                if (lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, 
TimeUnit.MINUTES)) {
+                    logger.info("get root lock successfully");
+                    if (!fs.exists(hdfsMetaPath)) {
+                        fs.mkdirs(hdfsMetaPath);
+                        logger.info("create hdfs meta path");
+                    }
+                }
+            } finally {
+                lockManager.releaseLock(lock);
+            }
         }
-        lockManager = new HDFSLockManager(hdfsWorkingDir);
+        logger.info("hdfs meta path : " + hdfsMetaPath.toString());
     }
 
     @Override
@@ -132,7 +146,8 @@ public class HDFSResourceStore extends ResourceStore {
                 logger.warn("Zero length file: " + p.toString());
             }
             FSDataInputStream in = fs.open(p);
-            return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+            long t = in.readLong();
+            return new RawResource(in, t);
         } else {
             return null;
         }
@@ -144,19 +159,42 @@ public class HDFSResourceStore extends ResourceStore {
         if (!fs.exists(p) || !fs.isFile(p)) {
             return 0;
         }
-        FileStatus status = fs.getFileStatus(p);
-        return status.getModificationTime();
+        FSDataInputStream in = null;
+        ResourceLock lock = null;
+        try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
+            in = fs.open(p);
+            long t = in.readLong();
+            return t;
+        } catch (Exception e) {
+            throw new IOException("Put resource fail", e);
+        } finally {
+            IOUtils.closeQuietly(in);
+            lockManager.releaseLock(lock);
+        }
+
     }
 
     @Override
     protected void putResourceImpl(String resPath, InputStream content, long 
ts) throws IOException {
+        logger.info("res path : " + resPath);
         Path p = getRealHDFSPath(resPath);
+        logger.info("put resource : " + p.toUri());
         FSDataOutputStream out = null;
+        ResourceLock lock = null;
         try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
             out = fs.create(p, true);
+            out.writeLong(ts);
             IOUtils.copy(content, out);
+
+        } catch (Exception e) {
+            throw new IOException("Put resource fail", e);
         } finally {
             IOUtils.closeQuietly(out);
+            lockManager.releaseLock(lock);
         }
     }
 
@@ -180,9 +218,18 @@ public class HDFSResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        Path p = getRealHDFSPath(resPath);
-        if (fs.exists(p)) {
-            fs.delete(p, true);
+        ResourceLock lock = null;
+        try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
+            Path p = getRealHDFSPath(resPath);
+            if (fs.exists(p)) {
+                fs.delete(p, true);
+            }
+        } catch (Exception e) {
+            throw new IOException("Delete resource fail", e);
+        } finally {
+            lockManager.releaseLock(lock);
         }
     }
 
@@ -192,7 +239,10 @@ public class HDFSResourceStore extends ResourceStore {
     }
 
     private Path getRealHDFSPath(String resourcePath) {
+        if (resourcePath.startsWith("/") && resourcePath.length() > 1)
+            resourcePath = resourcePath.substring(1, resourcePath.length());
         return new Path(this.hdfsMetaPath, resourcePath);
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
new file mode 100644
index 0000000..a4d0080
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+public class LockManager {
+
+    private static Logger logger = 
LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+
+    final private KylinConfig config;
+
+    final CuratorFramework zkClient;
+
+    private String lockRootPath;
+
+    public LockManager(String lockRootPath) throws Exception{
+
+        this(KylinConfig.getInstanceFromEnv(),lockRootPath);
+    }
+
+    public LockManager(KylinConfig config,String lockRootPath) throws 
Exception{
+        this.config = config;
+        this.lockRootPath = lockRootPath;
+        String zkConnectString = getZKConnectString();
+        logger.info("zk connection string:" + zkConnectString);
+        if (StringUtils.isEmpty(zkConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, 
retryPolicy);
+        zkClient.start();
+        if(zkClient.checkExists().forPath(lockRootPath) == null)
+            zkClient.create().creatingParentsIfNeeded().forPath(lockRootPath);
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                close();
+            }
+        }));
+    }
+
+    public ResourceLock getLock(String name) throws Exception {
+        String lockPath = getLockPath(name);
+        InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
+        return new ResourceLock(lockPath, lock);
+    }
+
+    public void releaseLock(ResourceLock lock) {
+        try {
+            if (lock != null)
+                lock.release();
+        } catch (Exception e) {
+            logger.error("Fail to release lock");
+            e.printStackTrace();
+        }
+    }
+
+    private static String getZKConnectString() {
+        final String serverList = ZookeeperConfig.DEFAULT_ZK_HOST;
+        final String port = ZookeeperConfig.DEFAULT_ZK_PORT;
+        return 
StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new 
Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+
+    public String getLockPath(String resourceName) {
+        if (!resourceName.startsWith("/"))
+            resourceName = "/" + resourceName;
+        if (resourceName.endsWith("/"))
+            resourceName = resourceName.substring(0, resourceName.length() - 
1);
+        return lockRootPath + resourceName;
+    }
+
+    public void close() {
+        try {
+            zkClient.close();
+        } catch (Exception e) {
+            logger.error("error occurred to close PathChildrenCache", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
new file mode 100644
index 0000000..9d51871
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class ResourceLock {
+
+    private String resourcePath;
+
+    private InterProcessMutex lock;
+
+    protected ResourceLock(String resourcePath, InterProcessMutex lock) {
+        this.resourcePath = resourcePath;
+        this.lock = lock;
+    }
+
+    public boolean acquire(long time, TimeUnit unit) throws Exception {
+        return lock.acquire(time, unit);
+    }
+
+    public void acquire() throws Exception{
+       lock.acquire();
+    }
+
+    protected void release() throws Exception {
+        lock.release();
+    }
+
+    public String getResourcePath() {
+        return resourcePath;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
new file mode 100644
index 0000000..3f67e8d
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+
+public interface ZookeeperConfig {
+    String DEFAULT_ZK_HOST = "sandbox";
+
+    String DEFAULT_ZK_PORT = "2181";
+
+    long DEFAULT_TIMEOUT = 10;
+}

Reply via email to