http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java deleted file mode 100644 index 2c0d6fe..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.util.Arrays; - -import org.apache.hadoop.hbase.util.VersionInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HBaseCompatLoader { - - private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class); - - private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1"; - - private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1"; - - private static HBaseCompat cachedCompat; - - public synchronized static HBaseCompat getCompat(String classOverride) { - - if (null != cachedCompat) { - log.debug("Returning cached HBase compatibility layer: {}", cachedCompat); - return cachedCompat; - } - - HBaseCompat compat; - String className = null; - String classNameSource = null; - - if (null != classOverride) { - className = classOverride; - classNameSource = "from explicit configuration"; - } else { - String hbaseVersion = VersionInfo.getVersion(); - for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) { - if (hbaseVersion.startsWith(supportedVersion + ".")) { - className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_"); - classNameSource = "supporting runtime HBase version " + hbaseVersion; - break; - } - } - if (null == className) { - log.info("The HBase version {} is not explicitly supported by Titan. " + - "Loading Titan's compatibility layer for its most recent supported HBase version ({})", - hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION); - className = DEFAULT_HBASE_CLASS_NAME; - classNameSource = " by default"; - } - } - - final String errTemplate = " when instantiating HBase compatibility class " + className; - - try { - compat = (HBaseCompat)Class.forName(className).newInstance(); - log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName()); - } catch (IllegalAccessException e) { - throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); - } catch (InstantiationException e) { - throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); - } - - return cachedCompat = compat; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java deleted file mode 100644 index c5f6e0d..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java +++ /dev/null @@ -1,425 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.thinkaurelius.titan.core.attribute.Duration; -import com.thinkaurelius.titan.diskstorage.*; -import com.thinkaurelius.titan.diskstorage.configuration.Configuration; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; -import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; -import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException; -import com.thinkaurelius.titan.diskstorage.util.KeyColumn; -import com.thinkaurelius.titan.diskstorage.util.RecordIterator; -import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; -import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; -import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList; -import com.thinkaurelius.titan.diskstorage.util.time.Timepoint; -import com.thinkaurelius.titan.diskstorage.util.time.Timestamps; -import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; -import com.thinkaurelius.titan.util.system.IOUtils; - -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; -import org.apache.hadoop.hbase.filter.ColumnRangeFilter; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.util.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.Closeable; -import java.io.IOException; -import java.util.*; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * Here are some areas that might need work: - * <p/> - * - batching? (consider HTable#batch, HTable#setAutoFlush(false) - * - tuning HTable#setWriteBufferSize (?) - * - writing a server-side filter to replace ColumnCountGetFilter, which drops - * all columns on the row where it reaches its limit. This requires getSlice, - * currently, to impose its limit on the client side. That obviously won't - * scale. - * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this) - * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache - * <p/> - * There may be other problem areas. These are just the ones of which I'm aware. - */ -public class HBaseKeyColumnValueStore implements KeyColumnValueStore { - - private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class); - - private final String tableName; - private final HBaseStoreManager storeManager; - - // When using shortened CF names, columnFamily is the shortname and storeName is the longname - // When not using shortened CF names, they are the same - //private final String columnFamily; - private final String storeName; - // This is columnFamily.getBytes() - private final byte[] columnFamilyBytes; - private final HBaseGetter entryGetter; - - private final ConnectionMask cnx; - - private LocalLockMediator<StoreTransaction> localLockMediator; - - private final Duration lockExpiryTimeMs; - private final Duration lockMaxWaitTimeMs; - private final Integer lockMaxRetries; - - HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) { - this.storeManager = storeManager; - this.cnx = cnx; - this.tableName = tableName; - //this.columnFamily = columnFamily; - this.storeName = storeName; - this.columnFamilyBytes = columnFamily.getBytes(); - this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName)); - this.localLockMediator = llm; - Configuration storageConfig = storeManager.getStorageConfig(); - this.lockExpiryTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE); - this.lockMaxWaitTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT); - this.lockMaxRetries = storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY); - } - - @Override - public void close() throws BackendException { - } - - @Override - public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { - Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query)); - return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST); - } - - @Override - public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException { - return getHelper(keys, getFilter(query)); - } - - @Override - public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException { - Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions)); - mutateMany(mutations, txh); - } - - @Override - public void acquireLock(StaticBuffer key, - StaticBuffer column, - StaticBuffer expectedValue, - StoreTransaction txh) throws BackendException { - - KeyColumn lockID = new KeyColumn(key, column); - logger.debug("Attempting to acquireLock on {} ", lockID); - int trialCount = 0; - boolean locked; - while (trialCount < lockMaxRetries) { - final Timepoint lockStartTime = Timestamps.MILLI.getTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS); - locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTimeMs)); - trialCount++; - if (!locked) { - handleLockFailure(txh, lockID, trialCount); - } else { - logger.debug("Acquired lock on {}, {}", lockID, txh); - break; - } - } - ((HBaseTransaction) txh).updateLocks(lockID, expectedValue); - } - - void handleLockFailure(StoreTransaction txh, KeyColumn lockID, int trialCount) throws PermanentLockingException { - if (trialCount < lockMaxRetries) { - try { - Thread.sleep(lockMaxWaitTimeMs.getLength(TimeUnit.DAYS.MILLISECONDS)); - } catch (InterruptedException e) { - throw new PermanentLockingException( - "Interrupted while waiting for acquiring lock for transaction " - + txh + " lockID " + lockID + " on retry " + trialCount, e); - } - } else { - throw new PermanentLockingException("Could not lock the keyColumn " + - lockID + " on CF {} " + Bytes.toString(columnFamilyBytes)); - } - } - - @Override - public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { - return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY), - query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY), - new FilterList(FilterList.Operator.MUST_PASS_ALL), - query); - } - - @Override - public String getName() { - return storeName; - } - - @Override - public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException { - return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query); - } - - public static Filter getFilter(SliceQuery query) { - byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null; - byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null; - - Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false); - - if (query.hasLimit()) { - filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, - filter, - new ColumnPaginationFilter(query.getLimit(), 0)); - } - - logger.debug("Generated HBase Filter {}", filter); - - return filter; - } - - private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException { - List<Get> requests = new ArrayList<Get>(keys.size()); - { - for (StaticBuffer key : keys) { - Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter); - try { - g.setTimeRange(0, Long.MAX_VALUE); - } catch (IOException e) { - throw new PermanentBackendException(e); - } - requests.add(g); - } - } - - Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size()); - - try { - TableMask table = null; - Result[] results = null; - - try { - table = cnx.getTable(tableName); - logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size()); - results = table.get(requests); - logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size()); - } finally { - IOUtils.closeQuietly(table); - } - - if (results == null) - return KCVSUtil.emptyResults(keys); - - assert results.length==keys.size(); - - for (int i = 0; i < results.length; i++) { - Result result = results[i]; - NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap(); - - if (f == null) { // no result for this key - resultMap.put(keys.get(i), EntryList.EMPTY_LIST); - continue; - } - - // actual key with <timestamp, value> - NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes); - resultMap.put(keys.get(i), (r == null) - ? EntryList.EMPTY_LIST - : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter)); - } - - return resultMap; - } catch (IOException e) { - throw new TemporaryBackendException(e); - } - } - - private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException { - storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh); - } - - private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException { - return executeKeySliceQuery(null, null, filters, columnSlice); - } - - private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey, - @Nullable byte[] endKey, - FilterList filters, - @Nullable SliceQuery columnSlice) throws BackendException { - Scan scan = new Scan().addFamily(columnFamilyBytes); - - try { - scan.setTimeRange(0, Long.MAX_VALUE); - } catch (IOException e) { - throw new PermanentBackendException(e); - } - - if (startKey != null) - scan.setStartRow(startKey); - - if (endKey != null) - scan.setStopRow(endKey); - - if (columnSlice != null) { - filters.addFilter(getFilter(columnSlice)); - } - - TableMask table = null; - - logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey)); - try { - table = cnx.getTable(tableName); - return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes); - } catch (IOException e) { - IOUtils.closeQuietly(table); - throw new PermanentBackendException(e); - } - } - - private class RowIterator implements KeyIterator { - private final Closeable table; - private final Iterator<Result> rows; - private final byte[] columnFamilyBytes; - - private Result currentRow; - private boolean isClosed; - - public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) { - this.table = table; - this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length); - this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() { - @Override - public boolean apply(@Nullable Result result) { - if (result == null) - return false; - - try { - StaticBuffer id = StaticArrayBuffer.of(result.getRow()); - id.getLong(0); - } catch (NumberFormatException e) { - return false; - } - - return true; - } - }); - } - - @Override - public RecordIterator<Entry> getEntries() { - ensureOpen(); - - return new RecordIterator<Entry>() { - private final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> currentMap = currentRow.getMap(); - private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentMap == null ? null : currentMap.get(columnFamilyBytes).entrySet().iterator(); - - @Override - public boolean hasNext() { - ensureOpen(); - return kv == null ? false : kv.hasNext(); - } - - @Override - public Entry next() { - ensureOpen(); - return kv == null ? null : StaticArrayEntry.ofBytes(kv.next(), entryGetter); - } - - @Override - public void close() { - isClosed = true; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public boolean hasNext() { - ensureOpen(); - return rows.hasNext(); - } - - @Override - public StaticBuffer next() { - ensureOpen(); - - currentRow = rows.next(); - return StaticArrayBuffer.of(currentRow.getRow()); - } - - @Override - public void close() { - IOUtils.closeQuietly(table); - isClosed = true; - logger.debug("RowIterator closed table {}", table); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - private void ensureOpen() { - if (isClosed) - throw new IllegalStateException("Iterator has been closed."); - } - } - - private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> { - - private final EntryMetaData[] schema; - - private HBaseGetter(EntryMetaData[] schema) { - this.schema = schema; - } - - @Override - public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { - return element.getKey(); - } - - @Override - public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { - return element.getValue().lastEntry().getValue(); - } - - @Override - public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { - return schema; - } - - @Override - public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) { - switch(meta) { - case TIMESTAMP: - return element.getValue().lastEntry().getKey(); - default: - throw new UnsupportedOperationException("Unsupported meta data: " + meta); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java deleted file mode 100644 index a94a7e4..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java +++ /dev/null @@ -1,935 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import com.thinkaurelius.titan.diskstorage.Backend; -import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.CustomizeStoreKCVSManager; -import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; -import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediators; -import com.thinkaurelius.titan.diskstorage.util.time.Timestamps; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableNotEnabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.VersionInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.BiMap; -import com.google.common.collect.ImmutableBiMap; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; -import com.thinkaurelius.titan.core.TitanException; -import com.thinkaurelius.titan.diskstorage.BackendException; -import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; -import com.thinkaurelius.titan.diskstorage.Entry; -import com.thinkaurelius.titan.diskstorage.PermanentBackendException; -import com.thinkaurelius.titan.diskstorage.StaticBuffer; -import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; -import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager; -import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; -import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; -import com.thinkaurelius.titan.diskstorage.configuration.Configuration; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; -import com.thinkaurelius.titan.diskstorage.util.BufferUtil; -import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; -import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; -import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; -import com.thinkaurelius.titan.util.system.IOUtils; -import com.thinkaurelius.titan.util.system.NetworkUtil; - -/** - * Storage Manager for HBase - * - * @author Dan LaRocque <[email protected]> - */ -@PreInitializeConfigOptions -public class HBaseStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager, CustomizeStoreKCVSManager { - - private static final Logger logger = LoggerFactory.getLogger(HBaseStoreManager.class); - - public static final ConfigNamespace HBASE_NS = - new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options"); - - public static final ConfigOption<Boolean> SHORT_CF_NAMES = - new ConfigOption<Boolean>(HBASE_NS, "short-cf-names", - "Whether to shorten the names of Titan's column families to one-character mnemonics " + - "to conserve storage space", ConfigOption.Type.FIXED, true); - - public static final String COMPRESSION_DEFAULT = "-DEFAULT-"; - - public static final ConfigOption<String> COMPRESSION = - new ConfigOption<String>(HBASE_NS, "compression-algorithm", - "An HBase Compression.Algorithm enum string which will be applied to newly created column families. " + - "The compression algorithm must be installed and available on the HBase cluster. Titan cannot install " + - "and configure new compression algorithms on the HBase cluster by itself.", - ConfigOption.Type.MASKABLE, "GZ"); - - public static final ConfigOption<Boolean> SKIP_SCHEMA_CHECK = - new ConfigOption<Boolean>(HBASE_NS, "skip-schema-check", - "Assume that Titan's HBase table and column families already exist. " + - "When this is true, Titan will not check for the existence of its table/CFs, " + - "nor will it attempt to create them under any circumstances. This is useful " + - "when running Titan without HBase admin privileges.", - ConfigOption.Type.MASKABLE, false); - - public static final ConfigOption<String> HBASE_TABLE = - new ConfigOption<String>(HBASE_NS, "table", - "The name of the table Titan will use. When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) + - " is false, Titan will automatically create this table if it does not already exist.", - ConfigOption.Type.LOCAL, "titan"); - - /** - * Related bug fixed in 0.98.0, 0.94.7, 0.95.0: - * - * https://issues.apache.org/jira/browse/HBASE-8170 - */ - public static final int MIN_REGION_COUNT = 3; - - /** - * The total number of HBase regions to create with Titan's table. This - * setting only effects table creation; this normally happens just once when - * Titan connects to an HBase backend for the first time. - */ - public static final ConfigOption<Integer> REGION_COUNT = - new ConfigOption<Integer>(HBASE_NS, "region-count", - "The number of initial regions set when creating Titan's HBase table", - ConfigOption.Type.MASKABLE, Integer.class, new Predicate<Integer>() { - @Override - public boolean apply(Integer input) { - return null != input && MIN_REGION_COUNT <= input; - } - } - ); - - /** - * This setting is used only when {@link #REGION_COUNT} is unset. - * <p/> - * If Titan's HBase table does not exist, then it will be created with total - * region count = (number of servers reported by ClusterStatus) * (this - * value). - * <p/> - * The Apache HBase manual suggests an order-of-magnitude range of potential - * values for this setting: - * - * <ul> - * <li> - * <a href="https://hbase.apache.org/book/important_configurations.html#disable.splitting">2.5.2.7. Managed Splitting</a>: - * <blockquote> - * What's the optimal number of pre-split regions to create? Mileage will - * vary depending upon your application. You could start low with 10 - * pre-split regions / server and watch as data grows over time. It's - * better to err on the side of too little regions and rolling split later. - * </blockquote> - * </li> - * <li> - * <a href="https://hbase.apache.org/book/regions.arch.html">9.7 Regions</a>: - * <blockquote> - * In general, HBase is designed to run with a small (20-200) number of - * relatively large (5-20Gb) regions per server... Typically you want to - * keep your region count low on HBase for numerous reasons. Usually - * right around 100 regions per RegionServer has yielded the best results. - * </blockquote> - * </li> - * </ul> - * - * These considerations may differ for other HBase implementations (e.g. MapR). - */ - public static final ConfigOption<Integer> REGIONS_PER_SERVER = - new ConfigOption<Integer>(HBASE_NS, "regions-per-server", - "The number of regions per regionserver to set when creating Titan's HBase table", - ConfigOption.Type.MASKABLE, Integer.class); - - /** - * If this key is present in either the JVM system properties or the process - * environment (checked in the listed order, first hit wins), then its value - * must be the full package and class name of an implementation of - * {@link HBaseCompat} that has a no-arg public constructor. - * <p> - * When this <b>is not</b> set, Titan attempts to automatically detect the - * HBase runtime version by calling {@link VersionInfo#getVersion()}. Titan - * then checks the returned version string against a hard-coded list of - * supported version prefixes and instantiates the associated compat layer - * if a match is found. - * <p> - * When this <b>is</b> set, Titan will not call - * {@code VersionInfo.getVersion()} or read its hard-coded list of supported - * version prefixes. Titan will instead attempt to instantiate the class - * specified (via the no-arg constructor which must exist) and then attempt - * to cast it to HBaseCompat and use it as such. Titan will assume the - * supplied implementation is compatible with the runtime HBase version and - * make no attempt to verify that assumption. - * <p> - * Setting this key incorrectly could cause runtime exceptions at best or - * silent data corruption at worst. This setting is intended for users - * running exotic HBase implementations that don't support VersionInfo or - * implementations which return values from {@code VersionInfo.getVersion()} - * that are inconsistent with Apache's versioning convention. It may also be - * useful to users who want to run against a new release of HBase that Titan - * doesn't yet officially support. - * - */ - public static final ConfigOption<String> COMPAT_CLASS = - new ConfigOption<String>(HBASE_NS, "compat-class", - "The package and class name of the HBaseCompat implementation. HBaseCompat masks version-specific HBase API differences. " + - "When this option is unset, Titan calls HBase's VersionInfo.getVersion() and loads the matching compat class " + - "at runtime. Setting this option forces Titan to instead reflectively load and instantiate the specified class.", - ConfigOption.Type.MASKABLE, String.class); - - public static final int PORT_DEFAULT = 9160; - - public static final Timestamps PREFERRED_TIMESTAMPS = Timestamps.MILLI; - - public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE = - new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true); - - private static final BiMap<String, String> SHORT_CF_NAME_MAP = - ImmutableBiMap.<String, String>builder() - .put(Backend.INDEXSTORE_NAME, "g") - .put(Backend.INDEXSTORE_NAME + Backend.LOCK_STORE_SUFFIX, "h") - .put(Backend.ID_STORE_NAME, "i") - .put(Backend.EDGESTORE_NAME, "e") - .put(Backend.EDGESTORE_NAME + Backend.LOCK_STORE_SUFFIX, "f") - .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME, "s") - .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME + Backend.LOCK_STORE_SUFFIX, "t") - .put(Backend.SYSTEM_MGMT_LOG_NAME, "m") - .put(Backend.SYSTEM_TX_LOG_NAME, "l") - .build(); - - private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4); - - static { - // Verify that shortCfNameMap is injective - // Should be guaranteed by Guava BiMap, but it doesn't hurt to check - Preconditions.checkArgument(null != SHORT_CF_NAME_MAP); - Collection<String> shorts = SHORT_CF_NAME_MAP.values(); - Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size()); - } - - // Immutable instance fields - private final String tableName; - private final String compression; - private final int regionCount; - private final int regionsPerServer; - private final ConnectionMask cnx; - private final org.apache.hadoop.conf.Configuration hconf; - private final boolean shortCfNames; - private final boolean skipSchemaCheck; - private final String compatClass; - private final HBaseCompat compat; - - private static final ConcurrentHashMap<HBaseStoreManager, Throwable> openManagers = - new ConcurrentHashMap<HBaseStoreManager, Throwable>(); - - // Mutable instance state - private final ConcurrentMap<String, HBaseKeyColumnValueStore> openStores; - - private LocalLockMediator<StoreTransaction> llm; - - public HBaseStoreManager(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) throws BackendException { - super(config, PORT_DEFAULT); - - checkConfigDeprecation(config); - - this.tableName = config.get(HBASE_TABLE); - this.compression = config.get(COMPRESSION); - this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1; - this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1; - this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK); - this.compatClass = config.has(COMPAT_CLASS) ? config.get(COMPAT_CLASS) : null; - this.compat = HBaseCompatLoader.getCompat(compatClass); - - /* - * Specifying both region count options is permitted but may be - * indicative of a misunderstanding, so issue a warning. - */ - if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) { - logger.warn("Both {} and {} are set in Titan's configuration, but " - + "the former takes precedence and the latter will be ignored.", - REGION_COUNT, REGIONS_PER_SERVER); - } - - /* This static factory calls HBaseConfiguration.addHbaseResources(), - * which in turn applies the contents of hbase-default.xml and then - * applies the contents of hbase-site.xml. - */ - this.hconf = HBaseConfiguration.create(); - - // Copy a subset of our commons config into a Hadoop config - int keysLoaded=0; - Map<String,Object> configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE); - for (Map.Entry<String,Object> entry : configSub.entrySet()) { - logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue()); - if (entry.getValue()==null) continue; - hconf.set(entry.getKey(), entry.getValue().toString()); - keysLoaded++; - } - - // Special case for STORAGE_HOSTS - if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) { - String zkQuorumKey = "hbase.zookeeper.quorum"; - String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS)); - hconf.set(zkQuorumKey, csHostList); - logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList); - } - - logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded); - - this.shortCfNames = config.get(SHORT_CF_NAMES); - - try { - //this.cnx = HConnectionManager.createConnection(hconf); - this.cnx = compat.createConnection(hconf); - } catch (IOException e) { - throw new PermanentBackendException(e); - } - - if (logger.isTraceEnabled()) { - openManagers.put(this, new Throwable("Manager Opened")); - dumpOpenManagers(); - } - - logger.debug("Dumping HBase config key=value pairs"); - for (Map.Entry<String, String> entry : hconf) { - logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue()); - } - logger.debug("End of HBase config key=value pairs"); - - openStores = new ConcurrentHashMap<String, HBaseKeyColumnValueStore>(); - } - - @Override - public Deployment getDeployment() { - return Deployment.REMOTE; - - /* If just one of the regions for titan table is in the localhost, - * this method returns Deployment.LOCAL - which does not sound right. - * - List<KeyRange> local; - try { - local = getLocalKeyPartition(); - return null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE; - } catch (BackendException e) { - // propagating StorageException might be a better approach - throw new RuntimeException(e); - } - * - */ - } - - @Override - public String toString() { - return "hbase[" + tableName + "@" + super.toString() + "]"; - } - - public void dumpOpenManagers() { - int estimatedSize = openManagers.size(); - logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize); - for (HBaseStoreManager m : openManagers.keySet()) { - logger.trace("Manager {} opened at:", m, openManagers.get(m)); - } - logger.trace("---- End open HBase store manager list ({} managers) ----", estimatedSize); - } - - @Override - public void close() { - openStores.clear(); - if (logger.isTraceEnabled()) - openManagers.remove(this); - IOUtils.closeQuietly(cnx); - } - - @Override - public StoreFeatures getFeatures() { - - Configuration c = GraphDatabaseConfiguration.buildConfiguration(); - - StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder() - .orderedScan(true).unorderedScan(true).batchMutation(true) - .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true) - .timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS) - .locking(true) - .keyConsistent(c); - - try { - fb.localKeyPartition(getDeployment() == Deployment.LOCAL); - } catch (Exception e) { - logger.warn("Unexpected exception during getDeployment()", e); - } - - return fb.build(); - } - - @Override - public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException { - logger.debug("Enter mutateMany"); - final MaskedTimestamp commitTime = new MaskedTimestamp(txh); - // In case of an addition and deletion with identical timestamps, the - // deletion tombstone wins. - // http://hbase.apache.org/book/versions.html#d244e4250 - Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey = - convertToCommands( - mutations, - commitTime.getAdditionTime(times.getUnit()), - commitTime.getDeletionTime(times.getUnit())); - - List<Row> batch = new ArrayList<Row>(commandsPerKey.size()); // actual batch operation - - // convert sorted commands into representation required for 'batch' operation - for (Pair<Put, Delete> commands : commandsPerKey.values()) { - if (commands.getFirst() != null) - batch.add(commands.getFirst()); - - if (commands.getSecond() != null) - batch.add(commands.getSecond()); - } - - try { - TableMask table = null; - - try { - table = cnx.getTable(tableName); - logger.debug("mutateMany : batch mutate started size {} ", batch.size()); - table.batch(batch, new Object[batch.size()]); - logger.debug("mutateMany : batch mutate finished {} ", batch.size()); - } finally { - IOUtils.closeQuietly(table); - } - } catch (IOException e) { - throw new TemporaryBackendException(e); - } catch (InterruptedException e) { - throw new TemporaryBackendException(e); - } - - sleepAfterWrite(txh, commitTime); - } - - @Override - public KeyColumnValueStore openDatabase(String longName) throws BackendException { - - return openDatabase(longName, -1); - } - - @Override - public KeyColumnValueStore openDatabase(final String longName, int ttlInSeconds) throws BackendException { - - HBaseKeyColumnValueStore store = openStores.get(longName); - - if (store == null) { - final String cfName = shortCfNames ? shortenCfName(longName) : longName; - - final String llmPrefix = getName(); - llm = LocalLockMediators.INSTANCE.<StoreTransaction>get(llmPrefix, times); - HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName, llm); - - store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread - - if (store == null) { - if (!skipSchemaCheck) - ensureColumnFamilyExists(tableName, cfName, ttlInSeconds); - - store = newStore; - } - logger.info("Loaded 1.x Hbase Client Store Manager"); - } - - return store; - } - - - @Override - public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException { - return new HBaseTransaction(config, llm); - } - - @Override - public String getName() { - return tableName; - } - - /** - * Deletes the specified table with all its columns. - * ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss. - */ - @Override - public void clearStorage() throws BackendException { - try (AdminMask adm = getAdminInterface()) { - adm.clearTable(tableName, times.getTime().getNativeTimestamp()); - } catch (IOException e) - { - throw new TemporaryBackendException(e); - } - } - - @Override - public List<KeyRange> getLocalKeyPartition() throws BackendException { - - List<KeyRange> result = new LinkedList<KeyRange>(); - - TableMask table = null; - try { - ensureTableExists(tableName, getCfNameForStoreName(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME), 0); - - table = cnx.getTable(tableName); - - HTable hTable = (HTable)table.getTableObject(); - - Map<KeyRange, ServerName> normed = - normalizeKeyBounds(hTable.getRegionLocations()); - - for (Map.Entry<KeyRange, ServerName> e : normed.entrySet()) { - if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) { - result.add(e.getKey()); - logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue()); - } else { - logger.debug("Discarding remote {}", e.getValue()); - } - } - } catch (MasterNotRunningException e) { - logger.warn("Unexpected MasterNotRunningException", e); - } catch (ZooKeeperConnectionException e) { - logger.warn("Unexpected ZooKeeperConnectionException", e); - } catch (IOException e) { - logger.warn("Unexpected IOException", e); - } finally { - IOUtils.closeQuietly(table); - } - return result; - } - - /** - * Given a map produced by {@link HTable#getRegionLocations()}, transform - * each key from an {@link HRegionInfo} to a {@link KeyRange} expressing the - * region's start and end key bounds using Titan-partitioning-friendly - * conventions (start inclusive, end exclusive, zero bytes appended where - * necessary to make all keys at least 4 bytes long). - * <p/> - * This method iterates over the entries in its map parameter and performs - * the following conditional conversions on its keys. "Require" below means - * either a {@link Preconditions} invocation or an assertion. HRegionInfo - * sometimes returns start and end keys of zero length; this method replaces - * zero length keys with null before doing any of the checks described - * below. The parameter map and the values it contains are only read and - * never modified. - * - * <ul> - * <li>If an entry's HRegionInfo has null start and end keys, then first - * require that the parameter map is a singleton, and then return a - * single-entry map whose {@code KeyRange} has start and end buffers that - * are both four bytes of zeros.</li> - * <li>If the entry has a null end key (but non-null start key), put an - * equivalent entry in the result map with a start key identical to the - * input, except that zeros are appended to values less than 4 bytes long, - * and an end key that is four bytes of zeros. - * <li>If the entry has a null start key (but non-null end key), put an - * equivalent entry in the result map where the start key is four bytes of - * zeros, and the end key has zeros appended, if necessary, to make it at - * least 4 bytes long, after which one is added to the padded value in - * unsigned 32-bit arithmetic with overflow allowed.</li> - * <li>Any entry which matches none of the above criteria results in an - * equivalent entry in the returned map, except that zeros are appended to - * both keys to make each at least 4 bytes long, and the end key is then - * incremented as described in the last bullet point.</li> - * </ul> - * - * After iterating over the parameter map, this method checks that it either - * saw no entries with null keys, one entry with a null start key and a - * different entry with a null end key, or one entry with both start and end - * keys null. If any null keys are observed besides these three cases, the - * method will die with a precondition failure. - * - * @param raw - * A map of HRegionInfo and ServerName from HBase - * @return Titan-friendly expression of each region's rowkey boundaries - */ - private Map<KeyRange, ServerName> normalizeKeyBounds(NavigableMap<HRegionInfo, ServerName> raw) { - - Map.Entry<HRegionInfo, ServerName> nullStart = null; - Map.Entry<HRegionInfo, ServerName> nullEnd = null; - - ImmutableMap.Builder<KeyRange, ServerName> b = ImmutableMap.builder(); - - for (Map.Entry<HRegionInfo, ServerName> e : raw.entrySet()) { - HRegionInfo regionInfo = e.getKey(); - byte startKey[] = regionInfo.getStartKey(); - byte endKey[] = regionInfo.getEndKey(); - - if (0 == startKey.length) { - startKey = null; - logger.trace("Converted zero-length HBase startKey byte array to null"); - } - - if (0 == endKey.length) { - endKey = null; - logger.trace("Converted zero-length HBase endKey byte array to null"); - } - - if (null == startKey && null == endKey) { - Preconditions.checkState(1 == raw.size()); - logger.debug("HBase table {} has a single region {}", tableName, regionInfo); - // Choose arbitrary shared value = startKey = endKey - return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), e.getValue()).build(); - } else if (null == startKey) { - logger.debug("Found HRegionInfo with null startKey on server {}: {}", e.getValue(), regionInfo); - Preconditions.checkState(null == nullStart); - nullStart = e; - // I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive - StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey)); - // Replace null start key with zeroes - b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), e.getValue()); - } else if (null == endKey) { - logger.debug("Found HRegionInfo with null endKey on server {}: {}", e.getValue(), regionInfo); - Preconditions.checkState(null == nullEnd); - nullEnd = e; - // Replace null end key with zeroes - b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), e.getValue()); - } else { - Preconditions.checkState(null != startKey); - Preconditions.checkState(null != endKey); - - // Convert HBase's inclusive end keys into exclusive Titan end keys - StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey)); - StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey)); - - KeyRange kr = new KeyRange(startBuf, endBuf); - b.put(kr, e.getValue()); - logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", e.getValue(), regionInfo); - } - } - - // Require either no null key bounds or a pair of them - Preconditions.checkState(!(null == nullStart ^ null == nullEnd)); - - // Check that every key in the result is at least 4 bytes long - Map<KeyRange, ServerName> result = b.build(); - for (KeyRange kr : result.keySet()) { - Preconditions.checkState(4 <= kr.getStart().length()); - Preconditions.checkState(4 <= kr.getEnd().length()); - } - - return result; - } - - /** - * If the parameter is shorter than 4 bytes, then create and return a new 4 - * byte array with the input array's bytes followed by zero bytes. Otherwise - * return the parameter. - * - * @param dataToPad non-null but possibly zero-length byte array - * @return either the parameter or a new array - */ - private final byte[] zeroExtend(byte[] dataToPad) { - assert null != dataToPad; - - final int targetLength = 4; - - if (targetLength <= dataToPad.length) - return dataToPad; - - byte padded[] = new byte[targetLength]; - - for (int i = 0; i < dataToPad.length; i++) - padded[i] = dataToPad[i]; - - for (int i = dataToPad.length; i < padded.length; i++) - padded[i] = (byte)0; - - return padded; - } - - public static String shortenCfName(String longName) throws PermanentBackendException { - final String s; - if (SHORT_CF_NAME_MAP.containsKey(longName)) { - s = SHORT_CF_NAME_MAP.get(longName); - Preconditions.checkNotNull(s); - logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s); - } else { - if (SHORT_CF_NAME_MAP.containsValue(longName)) { - String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true"; - String msg = String.format(fmt, SHORT_CF_NAME_MAP.inverse().get(longName), longName, SHORT_CF_NAMES.getName()); - throw new PermanentBackendException(msg); - } - s = longName; - logger.debug("Kept default CF name \"{}\" because it has no associated short form", s); - } - return s; - } - - private HTableDescriptor ensureTableExists(String tableName, String initialCFName, int ttlInSeconds) throws BackendException { - AdminMask adm = null; - - HTableDescriptor desc; - - try { // Create our table, if necessary - adm = getAdminInterface(); - /* - * Some HBase versions/impls respond badly to attempts to create a - * table without at least one CF. See #661. Creating a CF along with - * the table avoids HBase carping. - */ - if (adm.tableExists(tableName)) { - desc = adm.getTableDescriptor(tableName); - } else { - desc = createTable(tableName, initialCFName, ttlInSeconds, adm); - } - } catch (IOException e) { - throw new TemporaryBackendException(e); - } finally { - IOUtils.closeQuietly(adm); - } - - return desc; - } - - private HTableDescriptor createTable(String tableName, String cfName, int ttlInSeconds, AdminMask adm) throws IOException { - HTableDescriptor desc = compat.newTableDescriptor(tableName); - - HColumnDescriptor cdesc = new HColumnDescriptor(cfName); - setCFOptions(cdesc, ttlInSeconds); - - compat.addColumnFamilyToTableDescriptor(desc, cdesc); - - int count; // total regions to create - String src; - - if (MIN_REGION_COUNT <= (count = regionCount)) { - src = "region count configuration"; - } else if (0 < regionsPerServer && - MIN_REGION_COUNT <= (count = regionsPerServer * adm.getEstimatedRegionServerCount())) { - src = "ClusterStatus server count"; - } else { - count = -1; - src = "default"; - } - - if (MIN_REGION_COUNT < count) { - adm.createTable(desc, getStartKey(count), getEndKey(count), count); - logger.debug("Created table {} with region count {} from {}", tableName, count, src); - } else { - adm.createTable(desc); - logger.debug("Created table {} with default start key, end key, and region count", tableName); - } - - return desc; - } - - /** - * This method generates the second argument to - * {@link HBaseAdmin#createTable(HTableDescriptor, byte[], byte[], int)}. - * <p/> - * From the {@code createTable} javadoc: - * "The start key specified will become the end key of the first region of - * the table, and the end key specified will become the start key of the - * last region of the table (the first region has a null start key and - * the last region has a null end key)" - * <p/> - * To summarize, the {@code createTable} argument called "startKey" is - * actually the end key of the first region. - */ - private byte[] getStartKey(int regionCount) { - ByteBuffer regionWidth = ByteBuffer.allocate(4); - regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip(); - return StaticArrayBuffer.of(regionWidth).getBytes(0, 4); - } - - /** - * Companion to {@link #getStartKey(int)}. See its javadoc for details. - */ - private byte[] getEndKey(int regionCount) { - ByteBuffer regionWidth = ByteBuffer.allocate(4); - regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip(); - return StaticArrayBuffer.of(regionWidth).getBytes(0, 4); - } - - private void ensureColumnFamilyExists(String tableName, String columnFamily, int ttlInSeconds) throws BackendException { - AdminMask adm = null; - try { - adm = getAdminInterface(); - HTableDescriptor desc = ensureTableExists(tableName, columnFamily, ttlInSeconds); - - Preconditions.checkNotNull(desc); - - HColumnDescriptor cf = desc.getFamily(columnFamily.getBytes()); - - // Create our column family, if necessary - if (cf == null) { - try { - if (!adm.isTableDisabled(tableName)) { - adm.disableTable(tableName); - } - } catch (TableNotEnabledException e) { - logger.debug("Table {} already disabled", tableName); - } catch (IOException e) { - throw new TemporaryBackendException(e); - } - - try { - HColumnDescriptor cdesc = new HColumnDescriptor(columnFamily); - - setCFOptions(cdesc, ttlInSeconds); - - adm.addColumn(tableName, cdesc); - - logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily); - - adm.enableTable(tableName); - } catch (TableNotFoundException ee) { - logger.error("TableNotFoundException", ee); - throw new PermanentBackendException(ee); - } catch (org.apache.hadoop.hbase.TableExistsException ee) { - logger.debug("Swallowing exception {}", ee); - } catch (IOException ee) { - throw new TemporaryBackendException(ee); - } - } - } finally { - IOUtils.closeQuietly(adm); - } - } - - private void setCFOptions(HColumnDescriptor cdesc, int ttlInSeconds) { - if (null != compression && !compression.equals(COMPRESSION_DEFAULT)) - compat.setCompression(cdesc, compression); - - if (ttlInSeconds > 0) - cdesc.setTimeToLive(ttlInSeconds); - - cdesc.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); - } - - /** - * Convert Titan internal Mutation representation into HBase native commands. - * - * @param mutations Mutations to convert into HBase commands. - * @param putTimestamp The timestamp to use for Put commands. - * @param delTimestamp The timestamp to use for Delete commands. - * @return Commands sorted by key converted from Titan internal representation. - * @throws com.thinkaurelius.titan.diskstorage.PermanentBackendException - */ - private Map<StaticBuffer, Pair<Put, Delete>> convertToCommands(Map<String, Map<StaticBuffer, KCVMutation>> mutations, - final long putTimestamp, - final long delTimestamp) throws PermanentBackendException { - Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey = new HashMap<StaticBuffer, Pair<Put, Delete>>(); - - for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> entry : mutations.entrySet()) { - - String cfString = getCfNameForStoreName(entry.getKey()); - byte[] cfName = cfString.getBytes(); - - for (Map.Entry<StaticBuffer, KCVMutation> m : entry.getValue().entrySet()) { - byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY); - KCVMutation mutation = m.getValue(); - - Pair<Put, Delete> commands = commandsPerKey.get(m.getKey()); - - if (commands == null) { - commands = new Pair<Put, Delete>(); - commandsPerKey.put(m.getKey(), commands); - } - - if (mutation.hasDeletions()) { - if (commands.getSecond() == null) { - Delete d = new Delete(key); - compat.setTimestamp(d, delTimestamp); - commands.setSecond(d); - } - - for (StaticBuffer b : mutation.getDeletions()) { - commands.getSecond().deleteColumns(cfName, b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp); - } - } - - if (mutation.hasAdditions()) { - if (commands.getFirst() == null) { - Put p = new Put(key, putTimestamp); - commands.setFirst(p); - } - - for (Entry e : mutation.getAdditions()) { - commands.getFirst().add(cfName, - e.getColumnAs(StaticBuffer.ARRAY_FACTORY), - putTimestamp, - e.getValueAs(StaticBuffer.ARRAY_FACTORY)); - } - } - } - } - - return commandsPerKey; - } - - private String getCfNameForStoreName(String storeName) throws PermanentBackendException { - return shortCfNames ? shortenCfName(storeName) : storeName; - } - - private void checkConfigDeprecation(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) { - if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) { - logger.warn("The configuration property {} is ignored for HBase. Set hbase.zookeeper.property.clientPort in hbase-site.xml or {}.hbase.zookeeper.property.clientPort in Titan's configuration file.", - ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), ConfigElement.getPath(HBASE_CONFIGURATION_NAMESPACE)); - } - } - - private AdminMask getAdminInterface() { - try { - return cnx.getAdmin(); - } catch (IOException e) { - throw new TitanException(e); - } - } - - /** - * Similar to {@link Function}, except that the {@code apply} method is allowed - * to throw {@link BackendException}. - */ - private static interface BackendFunction<F, T> { - - T apply(F input) throws BackendException; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java deleted file mode 100644 index e13593f..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import com.thinkaurelius.titan.diskstorage.BackendException; -import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; -import com.thinkaurelius.titan.diskstorage.StaticBuffer; -import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; -import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; -import com.thinkaurelius.titan.diskstorage.util.KeyColumn; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedHashSet; -import java.util.Set; - -/** - * This class overrides and adds nothing compared with - * {@link com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific - * to HBase, which lets us check for user errors like passing a Cassandra - * transaction into a HBase method. - * - * @author Dan LaRocque <[email protected]> - */ -public class HBaseTransaction extends AbstractStoreTransaction { - - private static final Logger log = LoggerFactory.getLogger(HBaseTransaction.class); - - LocalLockMediator<StoreTransaction> llm; - - Set<KeyColumn> keyColumnLocks = new LinkedHashSet<>(); - - public HBaseTransaction(final BaseTransactionConfig config, LocalLockMediator<StoreTransaction> llm) { - super(config); - this.llm = llm; - } - - @Override - public synchronized void rollback() throws BackendException { - super.rollback(); - log.debug("Rolled back transaction"); - deleteAllLocks(); - } - - @Override - public synchronized void commit() throws BackendException { - super.commit(); - log.debug("Committed transaction"); - deleteAllLocks(); - } - - public void updateLocks(KeyColumn lockID, StaticBuffer expectedValue) { - keyColumnLocks.add(lockID); - } - - private void deleteAllLocks() { - for(KeyColumn kc : keyColumnLocks) { - log.debug("Removed lock {} ", kc); - llm.unlock(kc, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java deleted file mode 100644 index 8660644..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; - -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; - -public class HConnection0_98 implements ConnectionMask -{ - - private final HConnection cnx; - - public HConnection0_98(HConnection cnx) - { - this.cnx = cnx; - } - - @Override - public TableMask getTable(String name) throws IOException - { - return new HTable0_98(cnx.getTable(name)); - } - - @Override - public AdminMask getAdmin() throws IOException - { - return new HBaseAdmin0_98(new HBaseAdmin(cnx)); - } - - @Override - public void close() throws IOException - { - cnx.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java deleted file mode 100644 index 91e5026..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.HBaseAdmin; - -public class HConnection1_0 implements ConnectionMask -{ - - private final Connection cnx; - - public HConnection1_0(Connection cnx) - { - this.cnx = cnx; - } - - @Override - public TableMask getTable(String name) throws IOException - { - return new HTable1_0(cnx.getTable(TableName.valueOf(name))); - } - - @Override - public AdminMask getAdmin() throws IOException - { - return new HBaseAdmin1_0(new HBaseAdmin(cnx)); - } - - @Override - public void close() throws IOException - { - cnx.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java deleted file mode 100644 index b11532a..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.Scan; - -public class HTable0_98 implements TableMask -{ - private final HTableInterface table; - - public HTable0_98(HTableInterface table) - { - this.table = table; - } - - @Override - public ResultScanner getScanner(Scan filter) throws IOException - { - return table.getScanner(filter); - } - - @Override - public Result[] get(List<Get> gets) throws IOException - { - return table.get(gets); - } - - @Override - public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException - { - table.batch(writes, results); - table.flushCommits(); - } - - @Override - public void close() throws IOException - { - table.close(); - } - - @Override - public Object getTableObject() { - return table; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java deleted file mode 100644 index 5c90617..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; - -public class HTable1_0 implements TableMask -{ - private final Table table; - - public HTable1_0(Table table) - { - this.table = table; - } - - @Override - public ResultScanner getScanner(Scan filter) throws IOException - { - return table.getScanner(filter); - } - - @Override - public Result[] get(List<Get> gets) throws IOException - { - return table.get(gets); - } - - @Override - public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException - { - table.batch(writes, results); - /* table.flushCommits(); not needed anymore */ - } - - @Override - public void close() throws IOException - { - table.close(); - } - - @Override - public Object getTableObject() { - return table; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java deleted file mode 100644 index 54f8743..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.Scan; - -/** - * This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course - * of development from 0.94 to 1.0 and beyond. - */ -public interface TableMask extends Closeable -{ - - ResultScanner getScanner(Scan filter) throws IOException; - - Result[] get(List<Get> gets) throws IOException; - - void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException; - - Object getTableObject(); -}
