Repository: phoenix Updated Branches: refs/heads/txn 826ebf5ce -> 5a558e16c
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index eb117bf..1e28766 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -18,91 +18,41 @@ package org.apache.phoenix.index; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.types.PVarbinary; -import org.apache.phoenix.util.ScanUtil; -import org.apache.phoenix.util.SchemaUtil; - -import com.google.common.collect.Lists; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; +import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.util.IndexUtil; /** * Index builder for covered-columns index that ties into phoenix for faster use. */ -public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { +public class PhoenixIndexBuilder extends NonTxIndexBuilder { + + @Override + public void setup(RegionCoprocessorEnvironment env) throws IOException { + super.setup(env); + Configuration conf = env.getConfiguration(); + // Install handler that will attempt to disable the index first before killing the region + // server + conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, + PhoenixIndexFailurePolicy.class.getName()); + } @Override public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { // The entire purpose of this method impl is to get the existing rows for the // table rows being indexed into the block cache, as the index maintenance code - // does a point scan per row - List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size()); - Map<ImmutableBytesWritable, IndexMaintainer> maintainers = - new HashMap<ImmutableBytesWritable, IndexMaintainer>(); - ImmutableBytesWritable indexTableName = new ImmutableBytesWritable(); - for (int i = 0; i < miniBatchOp.size(); i++) { - Mutation m = miniBatchOp.getOperation(i); - keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow())); - List<IndexMaintainer> indexMaintainers = getCodec().getIndexMetaData(m.getAttributesMap()).getIndexMaintainers(); - - for(IndexMaintainer indexMaintainer: indexMaintainers) { - if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue; - indexTableName.set(indexMaintainer.getIndexTableName()); - if (maintainers.get(indexTableName) != null) continue; - maintainers.put(indexTableName, indexMaintainer); - } - - } - if (maintainers.isEmpty()) return; - Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values())); - ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); - scanRanges.initializeScan(scan); - scan.setFilter(scanRanges.getSkipScanFilter()); - HRegion region = this.env.getRegion(); - RegionScanner scanner = region.getScanner(scan); - // Run through the scanner using internal nextRaw method - region.startRegionOperation(); - try { - synchronized (scanner) { - boolean hasMore; - do { - List<Cell> results = Lists.newArrayList(); - // Results are potentially returned even when the return value of s.next is - // false since this is an indication of whether or not there are more values - // after the ones returned - hasMore = scanner.nextRaw(results); - } while (hasMore); - } - } finally { - try { - scanner.close(); - } finally { - region.closeRegionOperation(); - } - } + // does a point scan per row. + // TODO: provide a means for the transactional case to just return the Scanner + // for when this is executed as it seems like that would be more efficient. + IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache()); } private PhoenixIndexCodec getCodec() { return (PhoenixIndexCodec)this.codec; } - - @Override - public byte[] getBatchId(Mutation m){ - return this.codec.getBatchId(m); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 8b507b6..7b77c37 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -1,19 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.index; @@ -25,7 +17,6 @@ import java.util.Map; import co.cask.tephra.Transaction; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -41,13 +32,12 @@ import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.IndexCodec; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.TableState; +import org.apache.phoenix.hbase.index.covered.TxIndexBuilder; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ServerUtil; @@ -63,42 +53,28 @@ import com.google.common.collect.Lists; public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MD = "IdxMD"; public static final String INDEX_UUID = "IdxUUID"; + public static final String INDEX_MAINTAINERS = "IndexMaintainers"; private RegionCoprocessorEnvironment env; - private KeyValueBuilder kvBuilder; + private KeyValueBuilder kvBuilder = GenericKeyValueBuilder.INSTANCE;; @Override - public void initialize(RegionCoprocessorEnvironment env) { + public void initialize(RegionCoprocessorEnvironment env) throws IOException { + super.initialize(env); this.env = env; - Configuration conf = env.getConfiguration(); - // Install handler that will attempt to disable the index first before killing the region - // server - conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, - PhoenixIndexFailurePolicy.class.getName()); - // Use the GenericKeyValueBuilder, as it's been shown in perf testing that ClientKeyValue doesn't help - // TODO: Jesse to investigate more - this.kvBuilder = GenericKeyValueBuilder.INSTANCE; } boolean hasIndexMaintainers(Map<String, byte[]> attributes) { - if (attributes == null) { - return false; - } + if (attributes == null) { return false; } byte[] uuid = attributes.get(INDEX_UUID); - if (uuid == null) { - return false; - } + if (uuid == null) { return false; } return true; } - - IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException{ - if (attributes == null) { - return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; - } + + public IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException { + if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } byte[] uuid = attributes.get(INDEX_UUID); - if (uuid == null) { - return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; - } + if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } byte[] md = attributes.get(INDEX_MD); byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); if (md != null) { @@ -107,8 +83,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { return new IndexMetaDataCache() { @Override - public void close() throws IOException { - } + public void close() throws IOException {} @Override public List<IndexMaintainer> getIndexMaintainers() { @@ -119,26 +94,24 @@ public class PhoenixIndexCodec extends BaseIndexCodec { public Transaction getTransaction() { return txn; } - + }; } else { byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); - ImmutableBytesWritable tenantId = - tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes); + ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes); TenantCache cache = GlobalCache.getTenantCache(env, tenantId); - IndexMetaDataCache indexCache = - (IndexMetaDataCache) cache.getServerCache(new ImmutableBytesPtr(uuid)); + IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid)); if (indexCache == null) { - String msg = "key="+ServerCacheClient.idToString(uuid) + " region=" + env.getRegion(); + String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion(); SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND) - .setMessage(msg).build().buildException(); + .setMessage(msg).build().buildException(); ServerUtil.throwIOException("Index update failed", e); // will not return } return indexCache; } - + } - + @Override public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException { return getIndexUpdates(state, true); @@ -150,18 +123,16 @@ public class PhoenixIndexCodec extends BaseIndexCodec { } /** - * * @param state - * @param upsert prepare index upserts if it's true otherwise prepare index deletes. + * @param upsert + * prepare index upserts if it's true otherwise prepare index deletes. * @return * @throws IOException */ private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException { - IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes()); - List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers(); - if (indexMaintainers.isEmpty()) { - return Collections.emptyList(); - } + @SuppressWarnings("unchecked") + List<IndexMaintainer> indexMaintainers = (List<IndexMaintainer>)state.getContext().get(INDEX_MAINTAINERS); + if (indexMaintainers.isEmpty()) { return Collections.emptyList(); } List<IndexUpdate> indexUpdates = Lists.newArrayList(); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy @@ -171,7 +142,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { ValueGetter valueGetter = null; Scanner scanner = null; for (IndexMaintainer maintainer : indexMaintainers) { - if(upsert) { + if (upsert) { // Short-circuit building state when we know it's a row deletion if (maintainer.isRowDeleted(state.getPendingUpdate())) { continue; @@ -180,31 +151,25 @@ public class PhoenixIndexCodec extends BaseIndexCodec { IndexUpdate indexUpdate = null; if (maintainer.isImmutableRows()) { indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns())); - if(maintainer.isLocalIndex()) { + if (maintainer.isLocalIndex()) { indexUpdate.setTable(localIndexTableName); } else { indexUpdate.setTable(maintainer.getIndexTableName()); } valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate()); } else { - // TODO: if more efficient, I could do this just once with all columns in all indexes - Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns()); - scanner = statePair.getFirst(); + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + valueGetter = statePair.getFirst(); indexUpdate = statePair.getSecond(); indexUpdate.setTable(maintainer.getIndexTableName()); - valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey); } Mutation mutation = null; if (upsert) { - mutation = - maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state - .getCurrentTimestamp(), env.getRegion().getStartKey(), env - .getRegion().getEndKey()); + mutation = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), env + .getRegion().getStartKey(), env.getRegion().getEndKey()); } else { - mutation = - maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state - .getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion() - .getStartKey(), env.getRegion().getEndKey()); + mutation = maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(), + state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey()); } indexUpdate.setUpdate(mutation); if (scanner != null) { @@ -215,15 +180,25 @@ public class PhoenixIndexCodec extends BaseIndexCodec { } return indexUpdates; } - - @Override - public boolean isEnabled(Mutation m) throws IOException { - return !hasIndexMaintainers(m.getAttributesMap()); - } - - @Override - public byte[] getBatchId(Mutation m) { - Map<String, byte[]> attributes = m.getAttributesMap(); - return attributes.get(INDEX_UUID); - } + + @Override + public boolean isEnabled(Mutation m) throws IOException { + return !hasIndexMaintainers(m.getAttributesMap()); + } + + @Override + public byte[] getBatchId(Mutation m) { + Map<String, byte[]> attributes = m.getAttributesMap(); + return attributes.get(INDEX_UUID); + } + + @Override + public void setContext(TableState state, Mutation mutation) throws IOException { + IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes()); + List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers(); + Map<String,Object> context = state.getContext(); + context.clear(); + context.put(INDEX_MAINTAINERS, indexMaintainers); + context.put(TxIndexBuilder.TRANSACTION, indexCache.getTransaction()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 2fd168a..806a20a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -75,7 +75,7 @@ import com.google.common.collect.Multimap; * * @since 2.1 */ -public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { +public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class); private RegionCoprocessorEnvironment env; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java new file mode 100644 index 0000000..c471df6 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.index; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.phoenix.hbase.index.covered.TxIndexBuilder; +import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.util.IndexUtil; + +public class PhoenixTxIndexBuilder extends TxIndexBuilder { + @Override + public void setup(RegionCoprocessorEnvironment env) throws IOException { + super.setup(env); + Configuration conf = env.getConfiguration(); + // Install failure policy that just re-throws exception instead of killing RS + // or disabling the index + conf.set(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, PhoenixTxIndexFailurePolicy.class.getName()); + } + + @Override + public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + // The entire purpose of this method impl is to get the existing rows for the + // table rows being indexed into the block cache, as the index maintenance code + // does a point scan per row. + // TODO: provide a means for the transactional case to just return the Scanner + // for when this is executed as it seems like that would be more efficient. + IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache()); + } + + private PhoenixIndexCodec getCodec() { + return (PhoenixIndexCodec)this.codec; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java new file mode 100644 index 0000000..fa70cc9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.index; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.hbase.index.write.IndexFailurePolicy; + +import com.google.common.collect.Multimap; + +public class PhoenixTxIndexFailurePolicy implements IndexFailurePolicy { + private Stoppable parent; + + @Override + public void stop(String why) { + if (parent != null) { + parent.stop(why); + } + } + + @Override + public boolean isStopped() { + return parent == null ? false : parent.isStopped(); + } + + @Override + public void setup(Stoppable parent, RegionCoprocessorEnvironment env) { + this.parent = parent; + } + + @Override + public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) + throws IOException { + if (cause instanceof IOException) { + throw (IOException)cause; + } else if (cause instanceof RuntimeException) { throw (RuntimeException)cause; } + throw new IOException(cause); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 5abf151..c14aa60 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -107,11 +107,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy; import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.index.PhoenixTxIndexBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; @@ -696,6 +697,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); } + boolean isTransactional = Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())); // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use @@ -705,8 +707,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement && !SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(Indexer.class.getName())) { Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); - opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); - Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority); + opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); + Indexer.enableIndexing(descriptor, isTransactional ? PhoenixTxIndexBuilder.class : PhoenixIndexBuilder.class, opts, priority); } if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) { descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(), @@ -743,7 +745,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } - if (Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) { + if (isTransactional && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) { descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java index 6fc480e..1f271f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.schema.tuple; -import static org.apache.phoenix.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary; - import java.io.IOException; import org.apache.hadoop.hbase.HConstants; @@ -28,7 +26,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** * @@ -56,13 +53,22 @@ public class ValueGetterTuple extends BaseTuple { @Override public KeyValue getValue(byte[] family, byte[] qualifier) { - ImmutableBytesPtr value = null; + ImmutableBytesWritable value = null; try { value = valueGetter.getLatestValue(new ColumnReference(family, qualifier)); } catch (IOException e) { throw new RuntimeException(e); } - return new KeyValue(valueGetter.getRowKey(), family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value!=null? copyBytesIfNecessary(value) : null); + byte[] rowKey = valueGetter.getRowKey(); + int valueOffset = 0; + int valueLength = 0; + byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY; + if (value != null) { + valueBytes = value.get(); + valueOffset = value.getOffset(); + valueLength = value.getLength(); + } + return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, qualifier, 0, qualifier.length, HConstants.LATEST_TIMESTAMP, Type.Put, valueBytes, valueOffset, valueLength); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 49956f9..1fcf16a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -24,6 +24,8 @@ import java.io.DataInputStream; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,12 +43,15 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexStatementRewriter; +import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -60,13 +65,16 @@ import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -235,7 +243,7 @@ public class IndexUtil { } @Override - public ImmutableBytesPtr getLatestValue(ColumnReference ref) { + public ImmutableBytesWritable getLatestValue(ColumnReference ref) { // Always return null for our empty key value, as this will cause the index // maintainer to always treat this Put as a new row. if (isEmptyKeyValue(table, ref)) { @@ -636,4 +644,58 @@ public class IndexUtil { return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString()) : col.getExpressionStr(); } + + /* + * The entire purpose of this method is to get the existing rows for the table rows being indexed into the + * block cache, as the index maintenance code does a point scan per row. Though for the transactional + * case we may be loading more than we need, since we're not applying the transaction filters, that + * should still be ok. + */ + public static void loadMutatingRowsIntoBlockCache(HRegion region, PhoenixIndexCodec codec, MiniBatchOperationInProgress<Mutation> miniBatchOp, boolean useRawScan) + throws IOException { + List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size()); + Map<ImmutableBytesWritable, IndexMaintainer> maintainers = + new HashMap<ImmutableBytesWritable, IndexMaintainer>(); + ImmutableBytesWritable indexTableName = new ImmutableBytesWritable(); + for (int i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow())); + List<IndexMaintainer> indexMaintainers = codec.getIndexMetaData(m.getAttributesMap()).getIndexMaintainers(); + + for(IndexMaintainer indexMaintainer: indexMaintainers) { + if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue; + indexTableName.set(indexMaintainer.getIndexTableName()); + if (maintainers.get(indexTableName) != null) continue; + maintainers.put(indexTableName, indexMaintainer); + } + + } + if (maintainers.isEmpty()) return; + Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values())); + scan.setRaw(useRawScan); + ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); + scanRanges.initializeScan(scan); + scan.setFilter(scanRanges.getSkipScanFilter()); + RegionScanner scanner = region.getScanner(scan); + // Run through the scanner using internal nextRaw method + region.startRegionOperation(); + try { + synchronized (scanner) { + boolean hasMore; + do { + List<Cell> results = Lists.newArrayList(); + // Results are potentially returned even when the return value of s.next is + // false since this is an indication of whether or not there are more values + // after the ones returned + hasMore = scanner.nextRaw(results); + } while (hasMore); + } + } finally { + try { + scanner.close(); + } finally { + region.closeRegionOperation(); + } + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java index cb63380..7e73a81 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java @@ -1,19 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.hbase.index.covered; @@ -24,51 +16,50 @@ import java.util.List; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; - -import org.apache.phoenix.hbase.index.covered.IndexCodec; -import org.apache.phoenix.hbase.index.covered.IndexUpdate; -import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.index.BaseIndexCodec; /** - * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless - * of the current tables' state. + * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless of the current + * tables' state. */ public class CoveredIndexCodecForTesting extends BaseIndexCodec { - private List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(); - private List<IndexUpdate> updates = new ArrayList<IndexUpdate>(); + private List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(); + private List<IndexUpdate> updates = new ArrayList<IndexUpdate>(); + + public void addIndexDelete(IndexUpdate... deletes) { + this.deletes.addAll(Arrays.asList(deletes)); + } + + public void addIndexUpserts(IndexUpdate... updates) { + this.updates.addAll(Arrays.asList(updates)); + } + + public void clear() { + this.deletes.clear(); + this.updates.clear(); + } - public void addIndexDelete(IndexUpdate... deletes) { - this.deletes.addAll(Arrays.asList(deletes)); - } - - public void addIndexUpserts(IndexUpdate... updates) { - this.updates.addAll(Arrays.asList(updates)); - } + @Override + public Iterable<IndexUpdate> getIndexDeletes(TableState state) { + return this.deletes; + } - public void clear() { - this.deletes.clear(); - this.updates.clear(); - } - - @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) { - return this.deletes; - } + @Override + public Iterable<IndexUpdate> getIndexUpserts(TableState state) { + return this.updates; + } - @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) { - return this.updates; - } + @Override + public void initialize(RegionCoprocessorEnvironment env) throws IOException { + // noop + } - @Override - public void initialize(RegionCoprocessorEnvironment env) throws IOException { - // noop - } + @Override + public boolean isEnabled(Mutation m) { + return true; + } - @Override - public boolean isEnabled(Mutation m) { - return true; - } + @Override + public void setContext(TableState state, Mutation mutation) throws IOException {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java index 8c15551..3024820 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java @@ -23,9 +23,10 @@ import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -33,17 +34,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import org.apache.phoenix.hbase.index.covered.IndexUpdate; -import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.Scanner; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * @@ -86,7 +84,7 @@ public class TestLocalTableState { LocalHBaseState state = new LocalTable(env); LocalTableState table = new LocalTableState(env, state, m); //add the kvs from the mutation - table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); + table.addPendingUpdates(m.get(fam, qual)); // setup the lookup ColumnReference col = new ColumnReference(fam, qual); @@ -128,8 +126,8 @@ public class TestLocalTableState { LocalHBaseState state = new LocalTable(env); LocalTableState table = new LocalTableState(env, state, m); // add the kvs from the mutation - KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0)); - kv.setMvccVersion(0); + Cell kv = m.get(fam, qual).get(0); + KeyValueUtil.ensureKeyValue(kv).setMvccVersion(0); table.addPendingUpdates(kv); // setup the lookup http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java index 1b61ef0..f0c1483 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java @@ -32,26 +32,22 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.mockito.Mockito; - -import com.google.common.collect.Lists; import org.apache.phoenix.hbase.index.covered.IndexCodec; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; -import org.apache.phoenix.hbase.index.covered.example.ColumnGroup; -import org.apache.phoenix.hbase.index.covered.example.CoveredColumn; -import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec; import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec.ColumnEntry; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Lists; public class TestCoveredColumnIndexCodec { private static final byte[] PK = new byte[] { 'a' }; @@ -190,7 +186,7 @@ public class TestCoveredColumnIndexCodec { // get the updates with the pending update state.setCurrentTimestamp(1); - state.addPendingUpdates(KeyValueUtil.ensureKeyValues(kvs)); + state.addPendingUpdates(kvs); updates = codec.getIndexUpserts(state); assertTrue("Didn't find index updates for pending primary table update!", updates.iterator() .hasNext()); @@ -243,7 +239,7 @@ public class TestCoveredColumnIndexCodec { LocalTableState state = new LocalTableState(env, table, d); state.setCurrentTimestamp(d.getTimeStamp()); // now we shouldn't see anything when getting the index update - state.addPendingUpdates(KeyValueUtil.ensureKeyValues(d.getFamilyCellMap().get(FAMILY))); + state.addPendingUpdates(d.getFamilyCellMap().get(FAMILY)); Iterable<IndexUpdate> updates = codec.getIndexUpserts(state); for (IndexUpdate update : updates) { assertFalse("Had some index updates, though it should have been covered by the delete", http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java index 592ac7c..6e1c28f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java @@ -73,7 +73,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { return new ValueGetter() { @Override - public ImmutableBytesPtr getLatestValue(ColumnReference ref) { + public ImmutableBytesWritable getLatestValue(ColumnReference ref) { return new ImmutableBytesPtr(valueMap.get(ref)); }
