Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-942-test e44bc25aa -> db65a1c67
KYLIN-957 Support HBase in a separate cluster Signed-off-by: shaofengshi <shaofeng...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8581df4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8581df4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8581df4a Branch: refs/heads/KYLIN-942-test Commit: 8581df4a252c313e303d5bd6f93272a5e189ee4e Parents: d7c37e0 Author: sunyerui <sunye...@gmail.com> Authored: Wed Sep 16 00:03:29 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Sep 22 11:17:10 2015 +0800 ---------------------------------------------------------------------- build/conf/kylin.properties | 4 +++ .../org/apache/kylin/common/KylinConfig.java | 6 ++++ engine-mr/pom.xml | 5 +++ .../org/apache/kylin/engine/mr/HadoopUtil.java | 32 +++++++++++++++++++- .../kylin/storage/hbase/HBaseConnection.java | 8 +++++ .../kylin/storage/hbase/HBaseResourceStore.java | 4 +-- .../kylin/storage/hbase/steps/BulkLoadJob.java | 3 +- .../storage/hbase/steps/CreateHTableJob.java | 2 +- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 5 +-- .../hbase/util/DeployCoprocessorCLI.java | 2 +- .../hbase/steps/ITHBaseResourceStoreTest.java | 2 +- 11 files changed, 64 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 8dfb05b..5b56f31 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -12,6 +12,10 @@ kylin.storage.url=hbase # Temp folder in hdfs, make sure user has the right access to the hdfs directory kylin.hdfs.working.dir=/kylin +# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020 +# leave empty if hbase running on same cluster with hive and mapreduce +kylin.hbase.cluster.fs= + kylin.job.mapreduce.default.reduce.input.mb=500 # If true, job engine will not assume that hadoop CLI reside on the same server as it self http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 43b8c4d..376327a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -104,6 +104,8 @@ public class KylinConfig implements Serializable { public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir"; + public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs"; + public static final String HIVE_DATABASE_FOR_INTERMEDIATE_TABLE = "kylin.job.hive.database.for.intermediatetable"; public static final String HIVE_PASSWORD = "hive.password"; @@ -293,6 +295,10 @@ public class KylinConfig implements Serializable { return root + getMetadataUrlPrefix() + "/"; } + public String getHBaseClusterFs() { + return getOptional(KYLIN_HBASE_CLUSTER_FS, ""); + } + public String getKylinJobLogDir() { return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs"); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/engine-mr/pom.xml ---------------------------------------------------------------------- diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml index e00a693..7a2bfe5 100644 --- a/engine-mr/pom.xml +++ b/engine-mr/pom.xml @@ -102,6 +102,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.mrunit</groupId> <artifactId>mrunit</artifactId> <classifier>hadoop2</classifier> http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java index 1c00993..7fcbf1f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java @@ -24,16 +24,25 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.kylin.common.KylinConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HadoopUtil { + private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class); private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>(); + private static ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>(); + public static void setCurrentConfiguration(Configuration conf) { hadoopConfig.set(conf); } @@ -45,6 +54,18 @@ public class HadoopUtil { return hadoopConfig.get(); } + public static Configuration getCurrentHBaseConfiguration() { + if (hbaseConfig.get() == null) { + Configuration configuration = HBaseConfiguration.create(new Configuration()); + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + if (StringUtils.isNotEmpty(hbaseClusterFs)) { + configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); + } + hbaseConfig.set(configuration); + } + return hbaseConfig.get(); + } + public static FileSystem getFileSystem(String path) throws IOException { return FileSystem.get(makeURI(path), getCurrentConfiguration()); } @@ -57,6 +78,15 @@ public class HadoopUtil { } } + public static String makeQualifiedPathInHBaseCluster(String path) { + try { + FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration()); + return fs.makeQualified(new Path(path)).toString(); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); + } + } + public static String fixWindowsPath(String path) { // fix windows path if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) { @@ -87,7 +117,7 @@ public class HadoopUtil { } public static void deletePath(Configuration conf, Path path) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(path.toUri(), conf); if (fs.exists(path)) { fs.delete(path, true); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index d1bb216..16bb30a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +82,12 @@ public class HBaseConnection { conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000"); conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5"); conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000"); + + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + if (StringUtils.isNotEmpty(hbaseClusterFs)) { + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); + } + // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true"); if (StringUtils.isEmpty(url)) { return conf; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 59f9e84..baec4b8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -185,7 +185,7 @@ public class HBaseResourceStore extends ResourceStore { byte[] value = r.getValue(B_FAMILY, B_COLUMN); if (value.length == 0) { Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); return fileSystem.open(redirectPath); @@ -304,7 +304,7 @@ public class HBaseResourceStore extends ResourceStore { private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); if (fileSystem.exists(redirectPath)) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java index 2be61f4..cbddfae 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java @@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +61,7 @@ public class BulkLoadJob extends AbstractHadoopJob { // end with "/" String input = getOptionValue(OPTION_INPUT_PATH); - Configuration conf = HBaseConfiguration.create(getConf()); + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fs = FileSystem.get(conf); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 35a35c1..eb1256c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -103,7 +103,7 @@ public class CreateHTableJob extends AbstractHadoopJob { CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - Configuration conf = HBaseConfiguration.create(getConf()); + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); try { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 03b4361..dfb4f33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -4,6 +4,7 @@ import java.util.List; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; @@ -129,11 +130,11 @@ public class HBaseMRSteps extends JobBuilderSupport { } public String getHFilePath(String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"); } public String getRowkeyDistributionOutputPath(String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index 5c7d46e..21d7c38 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -108,7 +108,7 @@ public class DeployCoprocessorCLI { private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java index e1976cb..ba95176 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java @@ -80,7 +80,7 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase { assertEquals(content, t); Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); assertTrue(fileSystem.exists(redirectPath));