Add Zookeeper Lock Signed-off-by: shaofengshi <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/870cbf26 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/870cbf26 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/870cbf26 Branch: refs/heads/KYLIN-2374 Commit: 870cbf2697f91b068b56064c302a67d09e45a12e Parents: a594da7 Author: xiefan46 <[email protected]> Authored: Fri Jan 20 09:48:17 2017 +0800 Committer: shaofengshi <[email protected]> Committed: Fri Jan 20 16:13:05 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 2 +- .../storage/hdfs/ITHDFSResourceStoreTest.java | 117 ----------------- .../kylin/storage/hbase/HBaseResourceStore.java | 1 + .../org/apache/kylin/storage/hdfs/Config.java | 29 +++++ .../org/apache/kylin/storage/hdfs/HDFSLock.java | 41 ------ .../kylin/storage/hdfs/HDFSLockManager.java | 45 ------- .../kylin/storage/hdfs/HDFSResourceStore.java | 78 ++++++++--- .../apache/kylin/storage/hdfs/LockManager.java | 117 +++++++++++++++++ .../apache/kylin/storage/hdfs/ResourceLock.java | 53 ++++++++ .../storage/hdfs/ExampleClientThatLocks.java | 50 +++++++ .../kylin/storage/hdfs/FakeLimitedResource.java | 41 ++++++ .../storage/hdfs/HDFSResourceStoreTest.java | 118 +++++++++++++++++ .../kylin/storage/hdfs/LockManagerTest.java | 69 ++++++++++ .../kylin/storage/hdfs/TestingServer.java | 42 ++++++ .../org/apache/kylin/storage/hdfs/ZkDemo.java | 129 +++++++++++++++++++ 15 files changed, 713 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 29ad5eb..a4d6ca0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -203,7 +203,7 @@ abstract public class KylinConfigBase implements Serializable { //for hdfs resource store public String getHDFSMetadataUrl() { - return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs"); + return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs_meta@hdfs"); } // for test only http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java deleted file mode 100644 index ef04957..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hdfs; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.ResourceStoreTest; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; - -/** - * Created by xiefan on 17-1-10. - */ -public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase { - - KylinConfig kylinConfig; - - @Before - public void setup() throws Exception { - this.createTestMetadata(); - kylinConfig = KylinConfig.getInstanceFromEnv(); - } - - @After - public void after() throws Exception { - this.cleanupTestMetadata(); - } - - @Ignore - @Test - public void testHDFSUrl() throws Exception { - assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl()); - System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory()); - } - - - @Ignore - @Test - public void testMultiThreadWriteHDFS() throws Exception{ - //System.out.println(kylinConfig.getHdfsWorkingDirectory()); - final Path testDir = new Path("hdfs:///test123"); - final FileSystem fs = HadoopUtil.getFileSystem(testDir); - final String fileName = "test.json"; - int threadNum = 3; - ExecutorService service = Executors.newFixedThreadPool(threadNum); - final CountDownLatch latch = new CountDownLatch(threadNum); - Path p = new Path(testDir,fileName); - fs.deleteOnExit(p); - fs.createNewFile(p); - for(int i=0;i<threadNum;i++) { - service.execute(new Runnable() { - @Override - public void run() { - try { - long id = Thread.currentThread().getId(); - Path p = new Path(testDir, fileName); - /*while(fs.exists(p)){ - System.out.println("Thread id : " + id + " can not get lock,sleep a while"); - Thread.currentThread().sleep(1000); - }*/ - while(!fs.createNewFile(p)){ - System.out.println("Thread id : " + id + " can not get lock,sleep a while"); - Thread.currentThread().sleep(1000); - } - System.out.println("Thread id : " + id + " get lock, sleep a while"); - Thread.currentThread().sleep(1000); - fs.delete(p,true); - System.out.println("Thread id : " + id + " release lock"); - latch.countDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - Thread.currentThread().sleep(1000); - fs.delete(p,true); - System.out.println("main thread release lock.Waiting threads down"); - System.out.println("file still exist : " + fs.exists(p)); - latch.await(); - } - - @Test - public void testHDFSStore() throws Exception { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - ResourceStore store = new HDFSResourceStore(config); - ResourceStoreTest.testAStore(store); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 6217350..5980cb5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -314,6 +314,7 @@ public class HBaseResourceStore extends ResourceStore { } finally { IOUtils.closeQuietly(table); } + } private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/Config.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/Config.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/Config.java new file mode 100644 index 0000000..c9b50ae --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/Config.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +/** + * Created by xiefan on 17-1-18. + */ +public interface Config { + String ZK_HOST = "sandbox"; + + String ZK_PORT = "2181"; + + long TIME = 10; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java deleted file mode 100644 index 8710edf..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.apache.kylin.storage.hdfs; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; - -/** - * Created by xiefan on 17-1-17. - */ -public class HDFSLock { - - private Path rawLock; - - private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class); - - protected HDFSLock(String resourceFullPath) { - this.rawLock = new Path(resourceFullPath); - } - - public boolean init(FileSystem fs) throws IOException, InterruptedException { - if (!fs.isFile(rawLock)) { - logger.info("Not support directory lock yet"); - return false; - } - while (!fs.createNewFile(rawLock)) { - Thread.currentThread().sleep(1000); - } - return true; - } - - public boolean release(FileSystem fs) throws IOException, InterruptedException { - while (!fs.delete(rawLock, false)) { - Thread.currentThread().sleep(1000); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java deleted file mode 100644 index 1cd0800..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.kylin.storage.hdfs; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.engine.mr.HadoopUtil; - -import java.io.IOException; - -/** - * Created by xiefan on 17-1-17. - */ -public class HDFSLockManager { - - private static final String LOCK_HOME = "LOCK_HOME"; - - private Path lockPath; - - private FileSystem fs; - - public HDFSLockManager(String hdfsWorkingDir) throws IOException{ - this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME); - this.fs = HadoopUtil.getFileSystem(lockPath); - if(!fs.exists(lockPath)){ - fs.create(lockPath); - } - } - - public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{ - HDFSLock lock = new HDFSLock(resourceFullPath); - boolean success = lock.init(fs); - if(success){ - return lock; - }else{ - throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath); - } - } - - public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{ - boolean success = lock.release(fs); - if(!success) - throw new IllegalStateException("Release lock fail"); - } - - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java index c7f0f25..717c27f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,44 +39,57 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; /** * Created by xiefan on 17-1-10. */ public class HDFSResourceStore extends ResourceStore { - private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs"; + private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs_meta"; private Path hdfsMetaPath; private FileSystem fs; - private HDFSLockManager lockManager; - private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class); + private LockManager lockManager; + //public for test. Normal should be protected - public HDFSResourceStore(KylinConfig kylinConfig) throws IOException { + public HDFSResourceStore(KylinConfig kylinConfig) throws Exception { super(kylinConfig); String metadataUrl = kylinConfig.getHDFSMetadataUrl(); - // split TABLE@HBASE_URL int cut = metadataUrl.indexOf('@'); String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); + this.lockManager = new LockManager(); createMetaFolder(metaDirName, kylinConfig); } - private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException { + private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception { String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory(); fs = HadoopUtil.getFileSystem(hdfsWorkingDir); + logger.info("hdfs working dir : " + hdfsWorkingDir); Path hdfsWorkingPath = new Path(hdfsWorkingDir); if (!fs.exists(hdfsWorkingPath)) { throw new IOException("HDFS working dir not exist"); } hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName); if (!fs.exists(hdfsMetaPath)) { - fs.create(hdfsMetaPath, true); + ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/")); + try { + if (lock.acquire(Config.TIME, TimeUnit.MINUTES)) { + logger.info("get root lock successfully"); + if (!fs.exists(hdfsMetaPath)) { + fs.mkdirs(hdfsMetaPath); + logger.info("create hdfs meta path"); + } + } + } finally { + lockManager.releaseLock(lock); + } } - lockManager = new HDFSLockManager(hdfsWorkingDir); + logger.info("hdfs meta path : " + hdfsMetaPath.toString()); } @Override @@ -132,7 +145,8 @@ public class HDFSResourceStore extends ResourceStore { logger.warn("Zero length file: " + p.toString()); } FSDataInputStream in = fs.open(p); - return new RawResource(fs.open(p), getResourceTimestamp(resPath)); + long t = in.readLong(); + return new RawResource(in, t); } else { return null; } @@ -144,19 +158,42 @@ public class HDFSResourceStore extends ResourceStore { if (!fs.exists(p) || !fs.isFile(p)) { return 0; } - FileStatus status = fs.getFileStatus(p); - return status.getModificationTime(); + FSDataInputStream in = null; + ResourceLock lock = null; + try { + lock = lockManager.getLock(resPath); + lock.acquire(Config.TIME, TimeUnit.MINUTES); + in = fs.open(p); + long t = in.readLong(); + return t; + } catch (Exception e) { + throw new IOException("Put resource fail", e); + } finally { + IOUtils.closeQuietly(in); + lockManager.releaseLock(lock); + } + } @Override protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { + logger.info("res path : " + resPath); Path p = getRealHDFSPath(resPath); + logger.info("put resource : " + p.toUri()); FSDataOutputStream out = null; + ResourceLock lock = null; try { + lock = lockManager.getLock(resPath); + lock.acquire(Config.TIME, TimeUnit.MINUTES); out = fs.create(p, true); + out.writeLong(ts); IOUtils.copy(content, out); + + } catch (Exception e) { + throw new IOException("Put resource fail", e); } finally { IOUtils.closeQuietly(out); + lockManager.releaseLock(lock); } } @@ -180,9 +217,18 @@ public class HDFSResourceStore extends ResourceStore { @Override protected void deleteResourceImpl(String resPath) throws IOException { - Path p = getRealHDFSPath(resPath); - if (fs.exists(p)) { - fs.delete(p, true); + ResourceLock lock = null; + try { + lock = lockManager.getLock(resPath); + lock.acquire(Config.TIME, TimeUnit.MINUTES); + Path p = getRealHDFSPath(resPath); + if (fs.exists(p)) { + fs.delete(p, true); + } + } catch (Exception e) { + throw new IOException("Delete resource fail", e); + } finally { + lockManager.releaseLock(lock); } } @@ -192,6 +238,8 @@ public class HDFSResourceStore extends ResourceStore { } private Path getRealHDFSPath(String resourcePath) { + if (resourcePath.startsWith("/") && resourcePath.length() > 1) + resourcePath = resourcePath.substring(1, resourcePath.length()); return new Path(this.hdfsMetaPath, resourcePath); } http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java new file mode 100644 index 0000000..9b18749 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.Arrays; + +/** + * Created by xiefan on 17-1-18. + */ +public class LockManager { + + private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class); + + public static final String ZOOKEEPER_LOCK_PATH = "/kylin/hdfs_meta/lock"; + + final private KylinConfig config; + + final CuratorFramework zkClient; + + public LockManager() { + this(KylinConfig.getInstanceFromEnv()); + } + + public LockManager(KylinConfig config) { + this.config = config; + + String zkConnectString = getZKConnectString(); + logger.info("zk connection string:" + zkConnectString); + if (StringUtils.isEmpty(zkConnectString)) { + throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!"); + } + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy); + zkClient.start(); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + close(); + } + })); + } + + public ResourceLock getLock(String name) throws Exception { + String lockPath = getLockPath(name); + InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath); + return new ResourceLock(lockPath, lock); + } + + public void releaseLock(ResourceLock lock) { + try { + if (lock != null) + lock.release(); + } catch (Exception e) { + logger.error("Fail to release lock"); + e.printStackTrace(); + } + } + + private static String getZKConnectString() { + final String serverList = Config.ZK_HOST; + final String port = Config.ZK_PORT; + return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { + @Nullable + @Override + public String apply(String input) { + return input + ":" + port; + } + }), ","); + } + + public String getLockPath(String resourceName) { + if (!resourceName.startsWith("/")) + resourceName = "/" + resourceName; + if (resourceName.endsWith("/")) + resourceName = resourceName.substring(0, resourceName.length() - 1); + return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + resourceName; + } + + public void close() { + try { + zkClient.close(); + } catch (Exception e) { + logger.error("error occurred to close PathChildrenCache", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java new file mode 100644 index 0000000..dc99a50 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import org.apache.curator.framework.recipes.locks.InterProcessMutex; + +import java.util.concurrent.TimeUnit; + +/** + * Created by xiefan on 17-1-18. + */ +public class ResourceLock { + + private String resourcePath; + + private InterProcessMutex lock; + + protected ResourceLock(String resourcePath, InterProcessMutex lock) { + this.resourcePath = resourcePath; + this.lock = lock; + } + + public boolean acquire(long time, TimeUnit unit) throws Exception { + return lock.acquire(time, unit); + } + + public void acquire() throws Exception{ + lock.acquire(); + } + + protected void release() throws Exception { + lock.release(); + } + + public String getResourcePath() { + return resourcePath; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ExampleClientThatLocks.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ExampleClientThatLocks.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ExampleClientThatLocks.java new file mode 100644 index 0000000..fbb2c85 --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ExampleClientThatLocks.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +/** + * Created by xiefan on 17-1-18. + */ +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import java.util.concurrent.TimeUnit; + +public class ExampleClientThatLocks { + private final InterProcessMutex lock; + private final FakeLimitedResource resource; + private final String clientName; + + public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { + this.resource = resource; + this.clientName = clientName; + lock = new InterProcessMutex(client, lockPath); + } + + public void doWork(long time, TimeUnit unit) throws Exception { + if (!lock.acquire(time, unit)) { + throw new IllegalStateException(clientName + " could not acquire the lock"); + } + try { + System.out.println(clientName + " has the lock"); + resource.use(); + } finally { + System.out.println(clientName + " releasing the lock"); + lock.release(); // always release the lock in a finally block + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/FakeLimitedResource.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/FakeLimitedResource.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/FakeLimitedResource.java new file mode 100644 index 0000000..28c69ad --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/FakeLimitedResource.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Simulates some external resource that can only be access by one process at a time + */ +public class FakeLimitedResource { + private final AtomicBoolean inUse = new AtomicBoolean(false); + + public void use() throws InterruptedException { + // in a real application this would be accessing/manipulating a shared resource + + if (!inUse.compareAndSet(false, true)) { + throw new IllegalStateException("Needs to be used by one client at a time"); + } + + try { + Thread.sleep((long) (3 * Math.random())); + } finally { + inUse.set(false); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/HDFSResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/HDFSResourceStoreTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/HDFSResourceStoreTest.java new file mode 100644 index 0000000..b844a60 --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/HDFSResourceStoreTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hdfs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.ResourceStoreTest; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.common.util.HadoopUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; + +/** + * Created by xiefan on 17-1-10. + */ +@Ignore +public class HDFSResourceStoreTest extends HBaseMetadataTestCase { + + KylinConfig kylinConfig; + + @Before + public void setup() throws Exception { + this.createTestMetadata(); + kylinConfig = KylinConfig.getInstanceFromEnv(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Ignore + @Test + public void testHDFSUrl() throws Exception { + assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl()); + System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory()); + } + + @Ignore + @Test + public void testMultiThreadWriteHDFS() throws Exception { + //System.out.println(kylinConfig.getHdfsWorkingDirectory()); + final Path testDir = new Path("hdfs:///test123"); + final FileSystem fs = HadoopUtil.getFileSystem(testDir); + final String fileName = "test.json"; + int threadNum = 3; + ExecutorService service = Executors.newFixedThreadPool(threadNum); + final CountDownLatch latch = new CountDownLatch(threadNum); + Path p = new Path(testDir, fileName); + fs.deleteOnExit(p); + fs.createNewFile(p); + for (int i = 0; i < threadNum; i++) { + service.execute(new Runnable() { + @Override + public void run() { + try { + long id = Thread.currentThread().getId(); + Path p = new Path(testDir, fileName); + /*while(fs.exists(p)){ + System.out.println("Thread id : " + id + " can not get lock,sleep a while"); + Thread.currentThread().sleep(1000); + }*/ + while (!fs.createNewFile(p)) { + System.out.println("Thread id : " + id + " can not get lock,sleep a while"); + Thread.currentThread().sleep(1000); + } + System.out.println("Thread id : " + id + " get lock, sleep a while"); + Thread.currentThread().sleep(1000); + fs.delete(p, true); + System.out.println("Thread id : " + id + " release lock"); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + Thread.currentThread().sleep(1000); + fs.delete(p, true); + System.out.println("main thread release lock.Waiting threads down"); + System.out.println("file still exist : " + fs.exists(p)); + latch.await(); + } + + @Test + public void testHDFSStore() throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + ResourceStore store = new HDFSResourceStore(config); + ResourceStoreTest.testAStore(store); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/LockManagerTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/LockManagerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/LockManagerTest.java new file mode 100644 index 0000000..1f239d9 --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/LockManagerTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Created by xiefan on 17-1-20. + */ +@Ignore +public class LockManagerTest extends HBaseMetadataTestCase{ + + public static String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox"; + + private String zkConnection = "sandbox:2181"; + + @Before + public void setup() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testCreateLock() throws Exception{ + LockManager manager = new LockManager(); + ResourceLock lock = manager.getLock("/dictionary/numberdict.json"); + lock.acquire(); + manager.releaseLock(lock); + } + + @Test + public void testConnect() throws Exception { + ZooKeeper zk = new ZooKeeper(zkConnection, 60000, new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + System.out.println("EVENT:" + watchedEvent.getType()); + } + }); + System.out.println("ls / => " + zk.getChildren("/kylin/hdfs_meta/lock/kylin_default_instance/dictionary", true)); + } + + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/TestingServer.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/TestingServer.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/TestingServer.java new file mode 100644 index 0000000..f462905 --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/TestingServer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Created by xiefan on 17-1-18. + */ +public class TestingServer implements Closeable { + + private String connectionString; + + public TestingServer(String connectionStr) { + this.connectionString = connectionStr; + } + + @Override + public void close() throws IOException { + + } + + public String getConnectString() { + return connectionString; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ZkDemo.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ZkDemo.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ZkDemo.java new file mode 100644 index 0000000..785ac09 --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ZkDemo.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +/** + * Created by xiefan on 17-1-18. + */ +@Ignore +public class ZkDemo { + + private String zkConnection = "sandbox:2181"; + + private static final int QTY = 5; + + private static final int REPETITIONS = QTY * 10; + + private static final String PATH = "/examples/lock"; + + @Test + public void testConnect() throws Exception { + ZooKeeper zk = new ZooKeeper(zkConnection, 60000, new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + System.out.println("EVENT:" + watchedEvent.getType()); + } + }); + System.out.println("ls / => " + zk.getChildren("/kylin", true)); + } + + @Test + public void testCreateNode() throws Exception { + ZooKeeper zk = new ZooKeeper(zkConnection, 60000, new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + System.out.println("EVENT:" + watchedEvent.getType()); + } + }); + zk.create(PATH, PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + System.out.println("ls / => " + zk.getChildren("/examples", true)); + } + + @Test + public void testStartCurator() throws Exception { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework client = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy); + client.start(); + } + + @Test + public void testCuratorLock() throws Exception { + // all of the useful sample code is in ExampleClientThatLocks.java + + // FakeLimitedResource simulates some external resource that can only be access by one process at a time + final FakeLimitedResource resource = new FakeLimitedResource(); + ExecutorService service = Executors.newFixedThreadPool(QTY); + final TestingServer server = new TestingServer(zkConnection); + final List<FutureTask<Void>> tasks = new ArrayList<>(); + try { + for (int i = 0; i < QTY; ++i) { + final int index = i; + FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() { + @Override + public Void call() throws Exception { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); + try { + client.start(); + + ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index); + for (int j = 0; j < REPETITIONS; ++j) { + example.doWork(10, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + e.printStackTrace(); + // log or do something + } finally { + CloseableUtils.closeQuietly(client); + } + return null; + } + }); + tasks.add(task); + service.submit(task); + } + for (FutureTask<Void> task : tasks) { + task.get(); + } + } finally { + CloseableUtils.closeQuietly(server); + } + } +}
