http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 0901b54..6382559 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -30,15 +30,14 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -70,7 +69,7 @@ public class HBaseResourceStore extends ResourceStore { final String tableNameBase; final String hbaseUrl; - Connection getConnection() throws IOException { + HConnection getConnection() throws IOException { return HBaseConnection.get(hbaseUrl); } @@ -122,7 +121,7 @@ public class HBaseResourceStore extends ResourceStore { byte[] endRow = Bytes.toBytes(lookForPrefix); endRow[endRow.length - 1]++; - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); Scan scan = new Scan(startRow, endRow); if ((filter != null && filter instanceof KeyOnlyFilter) == false) { scan.addColumn(B_FAMILY, B_COLUMN_TS); @@ -239,12 +238,13 @@ public class HBaseResourceStore extends ResourceStore { IOUtils.copy(content, bout); bout.close(); - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); try { byte[] row = Bytes.toBytes(resPath); Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); table.put(put); + table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -252,7 +252,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); try { byte[] row = Bytes.toBytes(resPath); byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); @@ -265,6 +265,8 @@ public class HBaseResourceStore extends ResourceStore { throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); } + table.flushCommits(); + return newTS; } finally { IOUtils.closeQuietly(table); @@ -273,7 +275,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected void deleteResourceImpl(String resPath) throws IOException { - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); try { boolean hdfsResourceExist = false; Result result = internalGetFromHTable(table, resPath, true, false); @@ -286,6 +288,7 @@ public class HBaseResourceStore extends ResourceStore { Delete del = new Delete(Bytes.toBytes(resPath)); table.delete(del); + table.flushCommits(); if (hdfsResourceExist) { // remove hdfs cell value Path redirectPath = bigCellHDFSPath(resPath); @@ -306,7 +309,7 @@ public class HBaseResourceStore extends ResourceStore { } private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); try { return internalGetFromHTable(table, path, fetchContent, fetchTimestamp); } finally { @@ -315,7 +318,7 @@ public class HBaseResourceStore extends ResourceStore { } - private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { + private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { byte[] rowkey = Bytes.toBytes(path); Get get = new Get(rowkey); @@ -334,7 +337,7 @@ public class HBaseResourceStore extends ResourceStore { return exists ? result : null; } - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException { + private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(); @@ -359,7 +362,7 @@ public class HBaseResourceStore extends ResourceStore { return redirectPath; } - private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException { + private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { int kvSizeLimit = Integer.parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760")); if (content.length > kvSizeLimit) { writeLargeCellToHdfs(resPath, content, table); @@ -367,8 +370,8 @@ public class HBaseResourceStore extends ResourceStore { } Put put = new Put(row); - put.addColumn(B_FAMILY, B_COLUMN, content); - put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); + put.add(B_FAMILY, B_COLUMN, content); + put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); return put; }
http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java index f63d9c2..b141190 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java @@ -26,13 +26,12 @@ import java.util.NoSuchElementException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.kv.RowConstants; @@ -87,13 +86,14 @@ public class SimpleHBaseStore implements IGTStore { } private class Writer implements IGTWriter { - final BufferedMutator table; + final HTableInterface table; final ByteBuffer rowkey = ByteBuffer.allocate(50); final ByteBuffer value = ByteBuffer.allocate(50); Writer() throws IOException { - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - table = conn.getBufferedMutator(htableName); + HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + table = conn.getTable(htableName); + table.setAutoFlush(false, true); } @Override @@ -113,24 +113,24 @@ public class SimpleHBaseStore implements IGTStore { Put put = new Put(rowkey); put.addImmutable(CF_B, ByteBuffer.wrap(COL_B), HConstants.LATEST_TIMESTAMP, value); - table.mutate(put); + table.put(put); } @Override public void close() throws IOException { - table.flush(); + table.flushCommits(); table.close(); } } class Reader implements IGTScanner { - final Table table; + final HTableInterface table; final ResultScanner scanner; int count = 0; Reader() throws IOException { - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); table = conn.getTable(htableName); Scan scan = new Scan(); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index cad5a3f..df1817e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -26,9 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.DataFormatException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -52,10 +51,10 @@ import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest; -import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse; -import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitService; +import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList; +import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +117,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); // globally shared connection, does not require close - final Connection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); + final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList(); List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); @@ -173,7 +172,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final boolean[] abnormalFinish = new boolean[1]; try { - Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool()); + HTableInterface table = conn.getTable(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool()); final CubeVisitRequest request = builder.build(); final byte[] startKey = epRange.getFirst(); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index a52af90..3cefc5f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -24,12 +24,11 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.ShardingHash; @@ -155,8 +154,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { // primary key (also the 0th column block) is always selected final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); // globally shared connection, does not require close - Connection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); - final Table hbaseTable = hbaseConn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier())); + HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); + final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 810747f..21a0efb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -142,7 +142,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement if (shardLength == 0) { return; } - byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey(); + byte[] regionStartKey = ArrayUtils.isEmpty(region.getStartKey()) ? new byte[shardLength] : region.getStartKey(); Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength); Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength); } @@ -179,7 +179,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) { this.serviceStartTime = System.currentTimeMillis(); - region = (HRegion)env.getRegion(); + region = env.getRegion(); region.startRegionOperation(); // if user change kylin.properties on kylin server, need to manually redeploy coprocessor jar to update KylinConfig of Env. http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java index feb4842..2814ad6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java @@ -26,8 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.BloomType; @@ -80,8 +79,7 @@ public class CubeHTableUtil { tableDesc.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString()); Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); - Admin admin = conn.getAdmin(); + HBaseAdmin admin = new HBaseAdmin(conf); try { if (User.isHBaseSecurityEnabled(conf)) { @@ -94,7 +92,7 @@ public class CubeHTableUtil { tableDesc.addFamily(cf); } - if (admin.tableExists(TableName.valueOf(tableName))) { + if (admin.tableExists(tableName)) { // admin.disableTable(tableName); // admin.deleteTable(tableName); throw new RuntimeException("HBase table " + tableName + " exists!"); @@ -103,7 +101,7 @@ public class CubeHTableUtil { DeployCoprocessorCLI.deployCoprocessor(tableDesc); admin.createTable(tableDesc, splitKeys); - Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons"); + Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons"); logger.info("create hbase table " + tableName + " done."); } finally { IOUtils.closeQuietly(admin); @@ -112,7 +110,8 @@ public class CubeHTableUtil { } public static void deleteHTable(TableName tableName) throws IOException { - Admin admin = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getAdmin(); + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + HBaseAdmin admin = new HBaseAdmin(conf); try { if (admin.tableExists(tableName)) { logger.info("disabling hbase table " + tableName); @@ -127,7 +126,8 @@ public class CubeHTableUtil { /** create a HTable that has the same performance settings as normal cube table, for benchmark purpose */ public static void createBenchmarkHTable(TableName tableName, String cfName) throws IOException { - Admin admin = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getAdmin(); + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + HBaseAdmin admin = new HBaseAdmin(conf); try { if (admin.tableExists(tableName)) { logger.info("disabling hbase table " + tableName); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java index df3cf08..eacff9f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java @@ -25,13 +25,13 @@ import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.HiveCmdBuilder; import org.apache.kylin.job.exception.ExecuteException; @@ -100,21 +100,19 @@ public class DeprecatedGCStep extends AbstractExecutable { List<String> oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Admin admin = null; + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + HBaseAdmin admin = null; try { - - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - admin = conn.getAdmin(); - + admin = new HBaseAdmin(conf); for (String table : oldTables) { - if (admin.tableExists(TableName.valueOf(table))) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table)); + if (admin.tableExists(table)) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(TableName.valueOf(table))) { - admin.disableTable(TableName.valueOf(table)); + if (admin.isTableEnabled(table)) { + admin.disableTable(table); } - admin.deleteTable(TableName.valueOf(table)); + admin.deleteTable(table); logger.debug("Dropped HBase table " + table); output.append("Dropped HBase table " + table + " \n"); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index 6587d4e..d5b36df 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -23,8 +23,8 @@ import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; @@ -49,7 +49,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { private final List<KeyValueCreator> keyValueCreators; private final int nColumns; - private final Table hTable; + private final HTableInterface hTable; private final CubeDesc cubeDesc; private final CubeSegment cubeSegment; private final Object[] measureValues; @@ -58,7 +58,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { private AbstractRowKeyEncoder rowKeyEncoder; private byte[] keybuf; - public HBaseCuboidWriter(CubeSegment segment, Table hTable) { + public HBaseCuboidWriter(CubeSegment segment, HTableInterface hTable) { this.keyValueCreators = Lists.newArrayList(); this.cubeSegment = segment; this.cubeDesc = cubeSegment.getCubeDesc(); @@ -117,6 +117,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { long t = System.currentTimeMillis(); if (hTable != null) { hTable.put(puts); + hTable.flushCommits(); } logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); puts.clear(); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index 2f7e164..5b2441c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -69,20 +69,19 @@ public class MergeGCStep extends AbstractExecutable { List<String> oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Admin admin = null; + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + HBaseAdmin admin = null; try { - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - admin = conn.getAdmin(); - + admin = new HBaseAdmin(conf); for (String table : oldTables) { - if (admin.tableExists(TableName.valueOf(table))) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table))); + if (admin.tableExists(table)) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(TableName.valueOf(table))) { - admin.disableTable(TableName.valueOf(table)); + if (admin.isTableEnabled(table)) { + admin.disableTable(table); } - admin.deleteTable(TableName.valueOf(table)); + admin.deleteTable(table); logger.debug("Dropped htable: " + table); output.append("HBase table " + table + " is dropped. \n"); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java index 56f867a..a150607 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java @@ -21,11 +21,9 @@ package org.apache.kylin.storage.hbase.util; import java.io.IOException; import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.kylin.common.KylinConfig; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.metadata.realization.IRealizationConstants; @@ -40,8 +38,8 @@ public class CleanHtableCLI extends AbstractApplication { protected static final Logger logger = LoggerFactory.getLogger(CleanHtableCLI.class); private void clean() throws IOException { - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - Admin hbaseAdmin = conn.getAdmin(); + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); for (HTableDescriptor descriptor : hbaseAdmin.listTables()) { String name = descriptor.getNameAsString().toLowerCase(); @@ -52,7 +50,7 @@ public class CleanHtableCLI extends AbstractApplication { System.out.println(); descriptor.setValue(IRealizationConstants.HTableOwner, "dl-ebay-ky...@ebay.com"); - hbaseAdmin.modifyTable(TableName.valueOf(descriptor.getNameAsString()), descriptor); + hbaseAdmin.modifyTable(descriptor.getNameAsString(), descriptor); } } hbaseAdmin.close(); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 581de38..68c0a39 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -26,19 +26,19 @@ import java.util.Map; import java.util.Set; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RawResource; @@ -89,7 +89,7 @@ public class CubeMigrationCLI { private static ResourceStore srcStore; private static ResourceStore dstStore; private static FileSystem hdfsFS; - private static Admin hbaseAdmin; + private static HBaseAdmin hbaseAdmin; public static final String ACL_INFO_FAMILY = "i"; private static final String ACL_TABLE_NAME = "_acl"; @@ -134,8 +134,8 @@ public class CubeMigrationCLI { checkAndGetHbaseUrl(); - Connection conn = HBaseConnection.get(srcConfig.getStorageUrl()); - hbaseAdmin = conn.getAdmin(); + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + hbaseAdmin = new HBaseAdmin(conf); hdfsFS = HadoopUtil.getWorkingFileSystem(); @@ -233,7 +233,6 @@ public class CubeMigrationCLI { operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() })); } } - private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException { String projectResPath = ProjectInstance.concatResourcePath(projectName); if (!dstStore.exists(projectResPath)) @@ -327,8 +326,8 @@ public class CubeMigrationCLI { switch (opt.type) { case CHANGE_HTABLE_HOST: { - TableName tableName = TableName.valueOf((String) opt.params[0]); - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(tableName); + String tableName = (String) opt.params[0]; + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); hbaseAdmin.disableTable(tableName); desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); hbaseAdmin.modifyTable(tableName, desc); @@ -450,11 +449,11 @@ public class CubeMigrationCLI { Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); String projUUID = project.getUuid(); - Table srcAclHtable = null; - Table destAclHtable = null; + HTableInterface srcAclHtable = null; + HTableInterface destAclHtable = null; try { - srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); // cube acl Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); @@ -474,6 +473,7 @@ public class CubeMigrationCLI { destAclHtable.put(put); } } + destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(srcAclHtable); IOUtils.closeQuietly(destAclHtable); @@ -504,8 +504,8 @@ public class CubeMigrationCLI { switch (opt.type) { case CHANGE_HTABLE_HOST: { - TableName tableName = TableName.valueOf((String) opt.params[0]); - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(tableName); + String tableName = (String) opt.params[0]; + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); hbaseAdmin.disableTable(tableName); desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); hbaseAdmin.modifyTable(tableName, desc); @@ -539,12 +539,13 @@ public class CubeMigrationCLI { case COPY_ACL: { String cubeId = (String) opt.params[0]; String modelId = (String) opt.params[1]; - Table destAclHtable = null; + HTableInterface destAclHtable = null; try { - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); destAclHtable.delete(new Delete(Bytes.toBytes(modelId))); + destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(destAclHtable); } @@ -561,7 +562,7 @@ public class CubeMigrationCLI { } } - private static void updateMeta(KylinConfig config) { + private static void updateMeta(KylinConfig config){ String[] nodes = config.getRestServers(); for (String node : nodes) { RestClient restClient = new RestClient(node); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java index 20d0f7d..8bd4abf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java @@ -26,10 +26,10 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.cube.CubeInstance; @@ -61,7 +61,7 @@ public class CubeMigrationCheckCLI { private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The name of cube migrated").create("cube"); private KylinConfig dstCfg; - private Admin hbaseAdmin; + private HBaseAdmin hbaseAdmin; private List<String> issueExistHTables; private List<String> inconsistentHTables; @@ -130,8 +130,9 @@ public class CubeMigrationCheckCLI { this.dstCfg = kylinConfig; this.ifFix = isFix; - Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); - hbaseAdmin = conn.getAdmin(); + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + hbaseAdmin = new HBaseAdmin(conf); + issueExistHTables = Lists.newArrayList(); inconsistentHTables = Lists.newArrayList(); } @@ -188,10 +189,10 @@ public class CubeMigrationCheckCLI { String[] sepNameList = segFullName.split(","); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0])); logger.info("Change the host of htable " + sepNameList[0] + "belonging to cube " + sepNameList[1] + " from " + desc.getValue(IRealizationConstants.HTableTag) + " to " + dstCfg.getMetadataUrlPrefix()); - hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0])); + hbaseAdmin.disableTable(sepNameList[0]); desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), desc); - hbaseAdmin.enableTable(TableName.valueOf(sepNameList[0])); + hbaseAdmin.modifyTable(sepNameList[0], desc); + hbaseAdmin.enableTable(sepNameList[0]); } } else { logger.info("------ Inconsistent HTables Needed To Be Fixed ------"); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index e72859d..c8410f9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -44,8 +44,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; @@ -82,8 +81,7 @@ public class DeployCoprocessorCLI { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); - Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); - Admin hbaseAdmin = conn.getAdmin(); + HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf); String localCoprocessorJar; if ("default".equals(args[0])) { @@ -167,10 +165,10 @@ public class DeployCoprocessorCLI { public static void deployCoprocessor(HTableDescriptor tableDesc) { try { initHTableCoprocessor(tableDesc); - logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor."); + logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor."); } catch (Exception ex) { - logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex); + logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex); logger.error("Will try creating the table without coprocessor."); } } @@ -191,7 +189,7 @@ public class DeployCoprocessorCLI { desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null); } - public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { + public static boolean resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); @@ -206,7 +204,7 @@ public class DeployCoprocessorCLI { logger.info("reset coprocessor on " + tableName); logger.info("Disable " + tableName); - hbaseAdmin.disableTable(TableName.valueOf(tableName)); + hbaseAdmin.disableTable(tableName); while (desc.hasCoprocessor(CubeObserverClassOld2)) { desc.removeCoprocessor(CubeObserverClassOld2); @@ -232,15 +230,16 @@ public class DeployCoprocessorCLI { desc.setValue(IRealizationConstants.HTableGitTag, commitInfo); } - hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.modifyTable(tableName, desc); logger.info("Enable " + tableName); - hbaseAdmin.enableTable(TableName.valueOf(tableName)); + hbaseAdmin.enableTable(tableName); return true; } - private static List<String> resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { + + private static List<String> resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { List<String> processedTables = Collections.synchronizedList(new ArrayList<String>()); ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); CountDownLatch countDownLatch = new CountDownLatch(tableNames.size()); @@ -261,12 +260,12 @@ public class DeployCoprocessorCLI { private static class ResetCoprocessorWorker implements Runnable { private final CountDownLatch countDownLatch; - private final Admin hbaseAdmin; + private final HBaseAdmin hbaseAdmin; private final Path hdfsCoprocessorJar; private final String tableName; private final List<String> processedTables; - public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables) { + public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables) { this.countDownLatch = countDownLatch; this.hbaseAdmin = hbaseAdmin; this.hdfsCoprocessorJar = hdfsCoprocessorJar; @@ -387,7 +386,7 @@ public class DeployCoprocessorCLI { return coprocessorDir; } - private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException { + private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException { HashSet<String> result = new HashSet<String>(); for (String tableName : tableNames) { http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java index 1cdb2f8..61c73d5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java @@ -25,11 +25,10 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; @@ -236,9 +235,9 @@ public class ExtendCubeToHybridCLI { Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); ProjectInstance project = store.getResource(projectResPath, ProjectInstance.class, projectSerializer); String projUUID = project.getUuid(); - Table aclHtable = null; + HTableInterface aclHtable = null; try { - aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl")); + aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(kylinConfig.getMetadataUrlPrefix() + "_acl"); // cube acl Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId))); @@ -258,6 +257,7 @@ public class ExtendCubeToHybridCLI { aclHtable.put(put); } } + aclHtable.flushCommits(); } finally { IOUtils.closeQuietly(aclHtable); } http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java index dd5f8fa..86ba22f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java @@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Pair; @@ -75,7 +75,7 @@ public class GridTableHBaseBenchmark { System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio); String hbaseUrl = "hbase"; // use hbase-site.xml on classpath - Connection conn = HBaseConnection.get(hbaseUrl); + HConnection conn = HBaseConnection.get(hbaseUrl); createHTableIfNeeded(conn, TEST_TABLE); prepareData(conn); @@ -91,10 +91,10 @@ public class GridTableHBaseBenchmark { } - private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException { + private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException { Stats stats = new Stats("COLUMN_SCAN"); - Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); + HTableInterface table = conn.getTable(TEST_TABLE); try { stats.markStart(); @@ -122,20 +122,20 @@ public class GridTableHBaseBenchmark { } } - private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException { fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL")); } - private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP")); } - private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException { + private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_IDX")); } - private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException { - Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); + private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { + HTableInterface table = conn.getTable(TEST_TABLE); try { stats.markStart(); @@ -156,11 +156,11 @@ public class GridTableHBaseBenchmark { } } - private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException { + private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience - Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); + HTableInterface table = conn.getTable(TEST_TABLE); try { stats.markStart(); @@ -204,8 +204,8 @@ public class GridTableHBaseBenchmark { } } - private static void prepareData(Connection conn) throws IOException { - Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); + private static void prepareData(HConnection conn) throws IOException { + HTableInterface table = conn.getTable(TEST_TABLE); try { // check how many rows existing @@ -258,8 +258,8 @@ public class GridTableHBaseBenchmark { return bytes; } - private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException { - Admin hbase = conn.getAdmin(); + private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException { + HBaseAdmin hbase = new HBaseAdmin(conn); try { boolean tableExist = false; http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java index 940d64a..6749d6c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java @@ -24,11 +24,9 @@ import java.util.List; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.kylin.common.KylinConfig; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.metadata.realization.IRealizationConstants; @@ -57,8 +55,8 @@ public class HBaseClean extends AbstractApplication { private void cleanUp() { try { // get all kylin hbase tables - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - Admin hbaseAdmin = conn.getAdmin(); + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List<String> allTablesNeedToBeDropped = Lists.newArrayList(); @@ -73,12 +71,12 @@ public class HBaseClean extends AbstractApplication { // drop tables for (String htableName : allTablesNeedToBeDropped) { logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { - if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) { - hbaseAdmin.disableTable(TableName.valueOf(htableName)); + if (hbaseAdmin.tableExists(htableName)) { + if (hbaseAdmin.isTableEnabled(htableName)) { + hbaseAdmin.disableTable(htableName); } - hbaseAdmin.deleteTable(TableName.valueOf(htableName)); + hbaseAdmin.deleteTable(htableName); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java index 1daca0a..937b65f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -32,15 +31,12 @@ import java.util.TreeSet; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kylin.common.util.Pair; import org.slf4j.Logger; @@ -62,31 +58,30 @@ public class HBaseRegionSizeCalculator { /** * Computes size of each region for table and given column families. * */ - public HBaseRegionSizeCalculator(String tableName, Connection hbaseConnection) throws IOException { + public HBaseRegionSizeCalculator(HTable table) throws IOException { + this(table, new HBaseAdmin(table.getConfiguration())); + } - Table table = null; - Admin admin = null; - try { - table = hbaseConnection.getTable(TableName.valueOf(tableName)); - admin = hbaseConnection.getAdmin(); + /** Constructor for unit testing */ + HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException { + try { if (!enabled(table.getConfiguration())) { logger.info("Region size calculation disabled."); return; } - logger.info("Calculating region sizes for table \"" + table.getName() + "\"."); + logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); // Get regions for table. - RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName()); - List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations(); + Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet(); Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); - for (HRegionLocation hRegionLocation : regionLocationList) { - tableRegions.add(hRegionLocation.getRegionInfo().getRegionName()); + for (HRegionInfo regionInfo : tableRegionInfos) { + tableRegions.add(regionInfo.getRegionName()); } - ClusterStatus clusterStatus = admin.getClusterStatus(); + ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus(); Collection<ServerName> servers = clusterStatus.getServers(); final long megaByte = 1024L * 1024L; @@ -110,7 +105,7 @@ public class HBaseRegionSizeCalculator { } } } finally { - IOUtils.closeQuietly(admin); + IOUtils.closeQuietly(hBaseAdmin); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java index a2f60d4..266f7e7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java @@ -23,10 +23,9 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.kylin.common.KylinConfig; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -43,8 +42,8 @@ public class HBaseUsage { Map<String, List<String>> envs = Maps.newHashMap(); // get all kylin hbase tables - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - Admin hbaseAdmin = conn.getAdmin(); + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); for (HTableDescriptor desc : tableDescriptors) { http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java index 8dd2164..1db60fb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java @@ -32,15 +32,15 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; @@ -58,11 +58,11 @@ public class HbaseStreamingInput { private static final byte[] QN = "C".getBytes(); public static void createTable(String tableName) throws IOException { - Connection conn = getConnection(); - Admin hadmin = conn.getAdmin(); + HConnection conn = getConnection(); + HBaseAdmin hadmin = new HBaseAdmin(conn); try { - boolean tableExist = hadmin.tableExists(TableName.valueOf(tableName)); + boolean tableExist = hadmin.tableExists(tableName); if (tableExist) { logger.info("HTable '" + tableName + "' already exists"); return; @@ -120,8 +120,8 @@ public class HbaseStreamingInput { e.printStackTrace(); } - Connection conn = getConnection(); - Table table = conn.getTable(TableName.valueOf(tableName)); + HConnection conn = getConnection(); + HTableInterface table = conn.getTable(tableName); byte[] key = new byte[8 + 4];//time + id @@ -136,7 +136,7 @@ public class HbaseStreamingInput { Bytes.putInt(key, 8, i); Put put = new Put(key); byte[] cell = randomBytes(CELL_SIZE); - put.addColumn(CF, QN, cell); + put.add(CF, QN, cell); buffer.add(put); } table.put(buffer); @@ -172,8 +172,8 @@ public class HbaseStreamingInput { } Random r = new Random(); - Connection conn = getConnection(); - Table table = conn.getTable(TableName.valueOf(tableName)); + HConnection conn = getConnection(); + HTableInterface table = conn.getTable(tableName); long leftBound = getFirstKeyTime(table); long rightBound = System.currentTimeMillis(); @@ -208,7 +208,7 @@ public class HbaseStreamingInput { } } - private static long getFirstKeyTime(Table table) throws IOException { + private static long getFirstKeyTime(HTableInterface table) throws IOException { long startTime = 0; Scan scan = new Scan(); @@ -226,8 +226,8 @@ public class HbaseStreamingInput { } - private static Connection getConnection() throws IOException { - return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + private static HConnection getConnection() throws IOException { + return HConnectionManager.createConnection(HBaseConnection.getCurrentHBaseConfiguration()); } private static String formatTime(long time) { http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java index ea05ab2..ca1a060 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java @@ -23,11 +23,10 @@ import java.io.IOException; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.kylin.common.KylinConfig; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.engine.mr.common.BatchConstants; @@ -51,8 +50,8 @@ public class HtableAlterMetadataCLI extends AbstractApplication { String metadataValue; private void alter() throws IOException { - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - Admin hbaseAdmin = conn.getAdmin(); + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); hbaseAdmin.disableTable(table.getTableName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java index df4e912..8ff5b0f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java @@ -30,14 +30,10 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.kylin.common.KylinConfig; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,9 +52,9 @@ public class OrphanHBaseCleanJob extends AbstractApplication { Set<String> metastoreWhitelistSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); private void cleanUnusedHBaseTables(Configuration conf) throws IOException { - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + // get all kylin hbase tables - Admin hbaseAdmin = conn.getAdmin(); + HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List<String> allTablesNeedToBeDropped = new ArrayList<String>(); @@ -77,13 +73,12 @@ public class OrphanHBaseCleanJob extends AbstractApplication { // drop tables for (String htableName : allTablesNeedToBeDropped) { logger.info("Deleting HBase table " + htableName); - TableName tableName = TableName.valueOf(htableName); - if (hbaseAdmin.tableExists(tableName)) { - if (hbaseAdmin.isTableEnabled(tableName)) { - hbaseAdmin.disableTable(tableName); + if (hbaseAdmin.tableExists(htableName)) { + if (hbaseAdmin.isTableEnabled(htableName)) { + hbaseAdmin.disableTable(htableName); } - hbaseAdmin.deleteTable(tableName); + hbaseAdmin.deleteTable(htableName); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java index bba6745..1ea8e8d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java @@ -22,13 +22,12 @@ import java.io.IOException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -60,12 +59,12 @@ public class PingHBaseCLI { Scan scan = new Scan(); int limit = 20; - Connection conn = null; - Table table = null; + HConnection conn = null; + HTableInterface table = null; ResultScanner scanner = null; try { - conn = ConnectionFactory.createConnection(hconf); - table = conn.getTable(TableName.valueOf(hbaseTable)); + conn = HConnectionManager.createConnection(hconf); + table = conn.getTable(hbaseTable); scanner = table.getScanner(scan); int count = 0; for (Result r : scanner) { http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java index db516bb..01edb1f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java @@ -22,12 +22,11 @@ import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -71,8 +70,8 @@ public class RowCounterCLI { logger.info("My Scan " + scan.toString()); - Connection conn = ConnectionFactory.createConnection(conf); - Table tableInterface = conn.getTable(TableName.valueOf(htableName)); + HConnection conn = HConnectionManager.createConnection(conf); + HTableInterface tableInterface = conn.getTable(htableName); Iterator<Result> iterator = tableInterface.getScanner(scan).iterator(); int counter = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index f6b65ab..23e7e10 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -40,9 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.CliCommandExecutor; @@ -59,7 +57,6 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,8 +77,7 @@ public class StorageCleanupJob extends AbstractApplication { private void cleanUnusedHBaseTables(Configuration conf) throws IOException { CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); // get all kylin hbase tables - Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - Admin hbaseAdmin = conn.getAdmin(); + HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List<String> allTablesNeedToBeDropped = new ArrayList<String>(); @@ -157,22 +153,22 @@ public class StorageCleanupJob extends AbstractApplication { } class DeleteHTableRunnable implements Callable { - Admin hbaseAdmin; + HBaseAdmin hbaseAdmin; String htableName; - DeleteHTableRunnable(Admin hbaseAdmin, String htableName) { + DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) { this.hbaseAdmin = hbaseAdmin; this.htableName = htableName; } public Object call() throws Exception { logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { - if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) { - hbaseAdmin.disableTable(TableName.valueOf(htableName)); + if (hbaseAdmin.tableExists(htableName)) { + if (hbaseAdmin.isTableEnabled(htableName)) { + hbaseAdmin.disableTable(htableName); } - hbaseAdmin.deleteTable(TableName.valueOf(htableName)); + hbaseAdmin.deleteTable(htableName); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java index 42a54c8..e36f662 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java @@ -24,18 +24,16 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,15 +49,14 @@ public class UpdateHTableHostCLI { private List<String> errorMsgs = Lists.newArrayList(); private List<String> htables; - private Admin hbaseAdmin; + private HBaseAdmin hbaseAdmin; private KylinConfig kylinConfig; private String oldHostValue; public UpdateHTableHostCLI(List<String> htables, String oldHostValue) throws IOException { this.htables = htables; this.oldHostValue = oldHostValue; - Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create()); - hbaseAdmin = conn.getAdmin(); + this.hbaseAdmin = new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()); this.kylinConfig = KylinConfig.getInstanceFromEnv(); } @@ -169,9 +166,9 @@ public class UpdateHTableHostCLI { HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); if (oldHostValue.equals(desc.getValue(IRealizationConstants.HTableTag))) { desc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); - hbaseAdmin.disableTable(TableName.valueOf(tableName)); - hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); - hbaseAdmin.enableTable(TableName.valueOf(tableName)); + hbaseAdmin.disableTable(tableName); + hbaseAdmin.modifyTable(tableName, desc); + hbaseAdmin.enableTable(tableName); updatedResources.add(tableName); } http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/tool/pom.xml ---------------------------------------------------------------------- diff --git a/tool/pom.xml b/tool/pom.xml index 91040d4..919a903 100644 --- a/tool/pom.xml +++ b/tool/pom.xml @@ -60,16 +60,6 @@ <artifactId>hbase-client</artifactId> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> <!-- Env & Test --> <dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/198b2f9e/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index c0042f3..c8bff89 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -36,9 +36,9 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RawResource; @@ -231,7 +231,6 @@ public class CubeMigrationCLI { operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() })); } } - private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException { String projectResPath = ProjectInstance.concatResourcePath(projectName); if (!dstStore.exists(projectResPath)) @@ -448,11 +447,11 @@ public class CubeMigrationCLI { Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); String projUUID = project.getUuid(); - Table srcAclHtable = null; - Table destAclHtable = null; + HTableInterface srcAclHtable = null; + HTableInterface destAclHtable = null; try { - srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); // cube acl Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); @@ -472,6 +471,7 @@ public class CubeMigrationCLI { destAclHtable.put(put); } } + destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(srcAclHtable); IOUtils.closeQuietly(destAclHtable); @@ -537,12 +537,13 @@ public class CubeMigrationCLI { case COPY_ACL: { String cubeId = (String) opt.params[0]; String modelId = (String) opt.params[1]; - Table destAclHtable = null; + HTableInterface destAclHtable = null; try { - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); destAclHtable.delete(new Delete(Bytes.toBytes(modelId))); + destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(destAclHtable); } @@ -559,7 +560,7 @@ public class CubeMigrationCLI { } } - private static void updateMeta(KylinConfig config) { + private static void updateMeta(KylinConfig config){ String[] nodes = config.getRestServers(); for (String node : nodes) { RestClient restClient = new RestClient(node);