http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index 1f0191a,0000000..1dadf20 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@@ -1,265 -1,0 +1,265 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.File; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import com.google.common.util.concurrent.RateLimiter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.service.StorageService; + +/** + * A multi-threaded (by default) executor for dispatching hints. + * + * Most of dispatch is triggered by {@link HintsDispatchTrigger} running every ~10 seconds. + */ +final class HintsDispatchExecutor +{ + private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class); + + private final File hintsDirectory; + private final ExecutorService executor; + private final AtomicBoolean isPaused; + private final Map<UUID, Future> scheduledDispatches; + + HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused) + { + this.hintsDirectory = hintsDirectory; + this.isPaused = isPaused; + + scheduledDispatches = new ConcurrentHashMap<>(); + executor = new JMXEnabledThreadPoolExecutor(1, + maxThreads, + 1, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("HintsDispatcher", Thread.MIN_PRIORITY), + "internal"); + } + + /* + * It's safe to terminate dispatch in process and to deschedule dispatch. + */ + void shutdownBlocking() + { + scheduledDispatches.clear(); + executor.shutdownNow(); + } + + boolean isScheduled(HintsStore store) + { + return scheduledDispatches.containsKey(store.hostId); + } + + Future dispatch(HintsStore store) + { + return dispatch(store, store.hostId); + } + + Future dispatch(HintsStore store, UUID hostId) + { + /* + * It is safe to perform dispatch for the same host id concurrently in two or more threads, + * however there is nothing to win from it - so we don't. + * + * Additionally, having just one dispatch task per host id ensures that we'll never violate our per-destination + * rate limit, without having to share a ratelimiter between threads. + * + * It also simplifies reasoning about dispatch sessions. + */ + return scheduledDispatches.computeIfAbsent(hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId))); + } + + Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier) + { + return executor.submit(new TransferHintsTask(catalog, hostIdSupplier)); + } + + void completeDispatchBlockingly(HintsStore store) + { + Future future = scheduledDispatches.get(store.hostId); + try + { + if (future != null) + future.get(); + } + catch (ExecutionException | InterruptedException e) + { + throw new RuntimeException(e); + } + } + + private final class TransferHintsTask implements Runnable + { + private final HintsCatalog catalog; + + /* + * Supplies target hosts to stream to. Generally returns the one the DynamicSnitch thinks is closest. + * We use a supplier here to be able to get a new host if the current one dies during streaming. + */ + private final Supplier<UUID> hostIdSupplier; + + private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> hostIdSupplier) + { + this.catalog = catalog; + this.hostIdSupplier = hostIdSupplier; + } + + @Override + public void run() + { + UUID hostId = hostIdSupplier.get(); + logger.info("Transferring all hints to {}", hostId); + if (transfer(hostId)) + return; + + logger.warn("Failed to transfer all hints to {}; will retry in {} seconds", hostId, 10); + + try + { + TimeUnit.SECONDS.sleep(10); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + hostId = hostIdSupplier.get(); + logger.info("Transferring all hints to {}", hostId); + if (!transfer(hostId)) + { + logger.error("Failed to transfer all hints to {}", hostId); + throw new RuntimeException("Failed to transfer all hints to " + hostId); + } + } + + private boolean transfer(UUID hostId) + { + catalog.stores() + .map(store -> new DispatchHintsTask(store, hostId)) + .forEach(Runnable::run); + + return !catalog.hasFiles(); + } + } + + private final class DispatchHintsTask implements Runnable + { + private final HintsStore store; + private final UUID hostId; + private final RateLimiter rateLimiter; + + DispatchHintsTask(HintsStore store, UUID hostId) + { + this.store = store; + this.hostId = hostId; + + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272). + // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster, + // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between + // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time). + int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1); + int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount; + this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + } + + public void run() + { + try + { + dispatch(); + } + finally + { + scheduledDispatches.remove(hostId); + } + } + + private void dispatch() + { + while (true) + { + if (isPaused.get()) + break; + + HintsDescriptor descriptor = store.poll(); + if (descriptor == null) + break; + + try + { + if (!dispatch(descriptor)) + break; + } + catch (FSReadError e) + { + logger.error("Failed to dispatch hints file {}: file is corrupted ({})", descriptor.fileName(), e); + store.cleanUp(descriptor); + store.blacklist(descriptor); + throw e; + } + } + } + + /* + * Will return true if dispatch was successful, false if we hit a failure (destination node went down, for example). + */ + private boolean dispatch(HintsDescriptor descriptor) + { - logger.debug("Dispatching hints file {}", descriptor.fileName()); ++ logger.trace("Dispatching hints file {}", descriptor.fileName()); + + File file = new File(hintsDirectory, descriptor.fileName()); + Long offset = store.getDispatchOffset(descriptor).orElse(null); + + try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused)) + { + if (offset != null) + dispatcher.seek(offset); + + if (dispatcher.dispatch()) + { + if (!file.delete()) + logger.error("Failed to delete hints file {}", descriptor.fileName()); + store.cleanUp(descriptor); + logger.info("Finished hinted handoff of file {} to endpoint {}", descriptor.fileName(), hostId); + return true; + } + else + { + store.markDispatchOffset(descriptor, dispatcher.dispatchOffset()); + store.offerFirst(descriptor); + logger.info("Finished hinted handoff of file {} to endpoint {}, partially", descriptor.fileName(), hostId); + return false; + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 47364f6,0000000..3daf147 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@@ -1,1002 -1,0 +1,1002 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index; + +import java.lang.reflect.Constructor; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.internal.CassandraIndex; +import org.apache.cassandra.index.transactions.*; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Refs; + +/** + * Handles the core maintenance functionality associated with indexes: adding/removing them to or from + * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata + * and so on. + * + * The Index interface defines a number of methods which return Callable<?>. These are primarily the + * management tasks for an index implementation. Most of them are currently executed in a blocking + * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty + * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially + * deadlocking on the FlushWriter or PostFlusher. Several of these Callable<?> returning methods on Index could + * then be defined with as void and called directly from SIM (rather than being run via the executor service). + * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example, + * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously. + * + * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may + * involve a significant effort, building a new index over any existing data. We perform this task asynchronously; + * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom + * indexes is performed on the CompactionManager. + * + * This class also provides instances of processors which listen to updates to the base table and forward to + * registered Indexes the info required to keep those indexes up to date. + * There are two variants of these processors, each with a factory method provided by SIM: + * IndexTransaction: deals with updates generated on the regular write path. + * CleanupTransaction: used when partitions are modified during compaction or cleanup operations. + * Further details on their usage and lifecycles can be found in the interface definitions below. + * + * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able + * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object + * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle. + * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for + * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on + * a target replica. + */ +public class SecondaryIndexManager implements IndexRegistry +{ + private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class); + + private Map<String, Index> indexes = Maps.newConcurrentMap(); + + // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built + private static final ExecutorService asyncExecutor = + new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("SecondaryIndexManagement"), + "internal"); + + // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc + private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService(); + + /** + * The underlying column family containing the source data for these indexes + */ + public final ColumnFamilyStore baseCfs; + + public SecondaryIndexManager(ColumnFamilyStore baseCfs) + { + this.baseCfs = baseCfs; + } + + + /** + * Drops and adds new indexes associated with the underlying CF + */ + public void reload() + { + // figure out what needs to be added and dropped. + Indexes tableIndexes = baseCfs.metadata.getIndexes(); + indexes.keySet() + .stream() + .filter(indexName -> !tableIndexes.has(indexName)) + .forEach(this::removeIndex); + + // we call add for every index definition in the collection as + // some may not have been created here yet, only added to schema + for (IndexMetadata tableIndex : tableIndexes) + addIndex(tableIndex); + } + + private Future<?> reloadIndex(IndexMetadata indexDef) + { + // if the index metadata has changed, reload the index + IndexMetadata registered = indexes.get(indexDef.name).getIndexMetadata(); + if (!registered.equals(indexDef)) + { + Index index = indexes.remove(registered.name); + index.register(this); + return blockingExecutor.submit(index.getMetadataReloadTask(indexDef)); + } + + // otherwise, nothing to do + return Futures.immediateFuture(null); + } + + private Future<?> createIndex(IndexMetadata indexDef) + { + Index index = createInstance(indexDef); + index.register(this); + final Callable<?> initialBuildTask = index.getInitializationTask(); + return initialBuildTask == null + ? Futures.immediateFuture(null) + : asyncExecutor.submit(initialBuildTask); + } + + /** + * Adds and builds a index + * @param indexDef the IndexMetadata describing the index + */ + public synchronized Future<?> addIndex(IndexMetadata indexDef) + { + if (indexes.containsKey(indexDef.name)) + return reloadIndex(indexDef); + else + return createIndex(indexDef); + } + + public synchronized void removeIndex(String indexName) + { + Index index = indexes.remove(indexName); + if (null != index) + { + executeBlocking(index.getInvalidateTask()); + unregisterIndex(index); + } + } + + + public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column) + { + if (indexes.isEmpty()) + return Collections.emptySet(); + + Set<IndexMetadata> dependentIndexes = new HashSet<>(); + for (Index index : indexes.values()) + if (index.dependsOn(column)) + dependentIndexes.add(index.getIndexMetadata()); + + return dependentIndexes; + } + + + /** + * Called when dropping a Table + */ + public void markAllIndexesRemoved() + { + getBuiltIndexNames().forEach(this::markIndexRemoved); + } + + /** + * Does a full, blocking rebuild of the indexes specified by columns from the sstables. + * Caller must acquire and release references to the sstables used here. + * Note also that only this method of (re)building indexes: + * a) takes a set of index *names* rather than Indexers + * b) marks exsiting indexes removed prior to rebuilding + * + * @param sstables the data to build from + * @param indexNames the list of indexes to be rebuilt + */ + public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames) + { + Set<Index> toRebuild = indexes.values().stream() + .filter(index -> indexNames.contains(index.getIndexName())) + .filter(Index::shouldBuildBlocking) + .collect(Collectors.toSet()); + if (toRebuild.isEmpty()) + { + logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames)); + return; + } + + toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexName())); + + buildIndexesBlocking(sstables, toRebuild); + + toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexName())); + } + + public void buildAllIndexesBlocking(Collection<SSTableReader> sstables) + { + buildIndexesBlocking(sstables, indexes.values() + .stream() + .filter(Index::shouldBuildBlocking) + .collect(Collectors.toSet())); + } + + // For convenience, may be called directly from Index impls + public void buildIndexBlocking(Index index) + { + if (index.shouldBuildBlocking()) + { + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + Refs<SSTableReader> sstables = viewFragment.refs) + { + buildIndexesBlocking(sstables, Collections.singleton(index)); + markIndexBuilt(index.getIndexName()); + } + } + } + + /** + * Checks if the specified {@link ColumnFamilyStore} is a secondary index. + * + * @param cfs the <code>ColumnFamilyStore</code> to check. + * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index, + * <code>false</code> otherwise. + */ + public static boolean isIndexColumnFamilyStore(ColumnFamilyStore cfs) + { + return isIndexColumnFamily(cfs.name); + } + + /** + * Checks if the specified {@link ColumnFamilyStore} is the one secondary index. + * + * @param cfs the name of the <code>ColumnFamilyStore</code> to check. + * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index, + * <code>false</code> otherwise. + */ + public static boolean isIndexColumnFamily(String cfName) + { + return cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR); + } + + /** + * Returns the parent of the specified {@link ColumnFamilyStore}. + * + * @param cfs the <code>ColumnFamilyStore</code> + * @return the parent of the specified <code>ColumnFamilyStore</code> + */ + public static ColumnFamilyStore getParentCfs(ColumnFamilyStore cfs) + { + String parentCfs = getParentCfsName(cfs.name); + return cfs.keyspace.getColumnFamilyStore(parentCfs); + } + + /** + * Returns the parent name of the specified {@link ColumnFamilyStore}. + * + * @param cfName the <code>ColumnFamilyStore</code> name + * @return the parent name of the specified <code>ColumnFamilyStore</code> + */ + public static String getParentCfsName(String cfName) + { + assert isIndexColumnFamily(cfName); + return StringUtils.substringBefore(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR); + } + + /** + * Returns the index name + * + * @param cfName the <code>ColumnFamilyStore</code> name + * @return the index name + */ + public static String getIndexName(ColumnFamilyStore cfs) + { + return getIndexName(cfs.name); + } + + /** + * Returns the index name + * + * @param cfName the <code>ColumnFamilyStore</code> name + * @return the index name + */ + public static String getIndexName(String cfName) + { + assert isIndexColumnFamily(cfName); + return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR); + } + + private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes) + { + if (indexes.isEmpty()) + return; + + logger.info("Submitting index build of {} for data in {}", + indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")), + sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(","))); + + SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, + indexes, + new ReducingKeyIterator(sstables)); + Future<?> future = CompactionManager.instance.submitIndexBuild(builder); + FBUtilities.waitOnFuture(future); + + flushIndexesBlocking(indexes); + logger.info("Index build of {} complete", + indexes.stream().map(Index::getIndexName).collect(Collectors.joining(","))); + } + + private void markIndexBuilt(String indexName) + { + SystemKeyspace.setIndexBuilt(baseCfs.name, indexName); + } + + private void markIndexRemoved(String indexName) + { + SystemKeyspace.setIndexRemoved(baseCfs.name, indexName); + } + + + public Index getIndexByName(String indexName) + { + return indexes.get(indexName); + } + + private Index createInstance(IndexMetadata indexDef) + { + Index newIndex; + if (indexDef.isCustom()) + { + assert indexDef.options != null; + String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME); + assert ! Strings.isNullOrEmpty(className); + try + { + Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index"); + Constructor ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class); + newIndex = (Index)ctor.newInstance(baseCfs, indexDef); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + else + { + newIndex = CassandraIndex.newIndex(baseCfs, indexDef); + } + return newIndex; + } + + /** + * Truncate all indexes + */ + public void truncateAllIndexesBlocking(final long truncatedAt) + { + executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt)); + } + + /** + * Remove all indexes + */ + public void invalidateAllIndexesBlocking() + { + executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask); + } + + /** + * Perform a blocking flush all indexes + */ + public void flushAllIndexesBlocking() + { + flushIndexesBlocking(ImmutableSet.copyOf(indexes.values())); + } + + /** + * Perform a blocking flush of selected indexes + */ + public void flushIndexesBlocking(Set<Index> indexes) + { + if (indexes.isEmpty()) + return; + + List<Future<?>> wait = new ArrayList<>(); + List<Index> nonCfsIndexes = new ArrayList<>(); + + // for each CFS backed index, submit a flush task which we'll wait on for completion + // for the non-CFS backed indexes, we'll flush those while we wait. + synchronized (baseCfs.getTracker()) + { + indexes.forEach(index -> + index.getBackingTable() + .map(cfs -> wait.add(cfs.forceFlush())) + .orElseGet(() -> nonCfsIndexes.add(index))); + } + + executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask); + FBUtilities.waitOnFutures(wait); + } + + /** + * Performs a blocking flush of all custom indexes + */ + public void flushAllNonCFSBackedIndexesBlocking() + { + Set<Index> customIndexers = indexes.values().stream() + .filter(index -> !(index.getBackingTable().isPresent())) + .collect(Collectors.toSet()); + flushIndexesBlocking(customIndexers); + } + + /** + * @return all indexes which are marked as built and ready to use + */ + public List<String> getBuiltIndexNames() + { + Set<String> allIndexNames = new HashSet<>(); + indexes.values().stream() + .map(Index::getIndexName) + .forEach(allIndexNames::add); + return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames); + } + + /** + * @return all backing Tables used by registered indexes + */ + public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores() + { + Set<ColumnFamilyStore> backingTables = new HashSet<>(); + indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add)); + return backingTables; + } + + /** + * @return if there are ANY indexes registered for this table + */ + public boolean hasIndexes() + { + return !indexes.isEmpty(); + } + + /** + * When building an index against existing data in sstables, add the given partition to the index + */ + public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec) + { + if (!indexes.isEmpty()) + { + DecoratedKey key = partition.partitionKey(); + Set<Index.Indexer> indexers = indexes.stream() + .map(index -> index.indexerFor(key, + nowInSec, + opGroup, + IndexTransaction.Type.UPDATE)) + .collect(Collectors.toSet()); + + indexers.forEach(Index.Indexer::begin); + + try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec)) + { + if (!filtered.staticRow().isEmpty()) + indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow())); + + while (filtered.hasNext()) + { + Row row = filtered.next(); + indexers.forEach(indexer -> indexer.insertRow(row)); + } + } + + indexers.forEach(Index.Indexer::finish); + } + } + + /** + * Delete all data from all indexes for this partition. + * For when cleanup rips a partition out entirely. + * + * TODO : improve cleanup transaction to batch updates & perform them async + */ + public void deletePartition(UnfilteredRowIterator partition, int nowInSec) + { + // we need to acquire memtable lock because secondary index deletion may + // cause a race (see CASSANDRA-3712). This is done internally by the + // index transaction when it commits + CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(), + partition.columns(), + nowInSec); + indexTransaction.start(); + indexTransaction.onPartitionDeletion(partition.partitionLevelDeletion()); + indexTransaction.commit(); + + while (partition.hasNext()) + { + Unfiltered unfiltered = partition.next(); + if (unfiltered.kind() != Unfiltered.Kind.ROW) + continue; + + indexTransaction = newCleanupTransaction(partition.partitionKey(), + partition.columns(), + nowInSec); + indexTransaction.start(); + indexTransaction.onRowDelete((Row)unfiltered); + indexTransaction.commit(); + } + } + + /** + * Called at query time to choose which (if any) of the registered index implementations to use for a given query. + * + * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces + * the search space the most. + * + * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they + * specify are automatically included. Following that, the registered indexes are filtered to include only those + * which support the standard expressions in the RowFilter. + * + * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows + * method. + * + * Implementation specific validation of the target expression, either custom or standard, by the selected + * index should be performed in the searcherFor method to ensure that we pick the right index regardless of + * the validity of the expression. + * + * This method is only called once during the lifecycle of a ReadCommand and the result is + * cached for future use when obtaining a Searcher, getting the index's underlying CFS for + * ReadOrderGroup, or an estimate of the result size from an average index query. + * + * @param command ReadCommand to be executed + * @return an Index instance, ready to use during execution of the command, or null if none + * of the registered indexes can support the command. + */ + public Index getBestIndexFor(ReadCommand command) + { + if (indexes.isEmpty() || command.rowFilter().isEmpty()) + return null; + + List<Index> searchableIndexes = new ArrayList<>(); + for (RowFilter.Expression expression : command.rowFilter()) + { + if (expression.isCustom()) + { + RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression; + searchableIndexes.add(indexes.get(customExpression.getTargetIndex().name)); + } + else + { + indexes.values().stream() + .filter(index -> index.supportsExpression(expression.column(), expression.operator())) + .forEach(searchableIndexes::add); + } + } + + if (searchableIndexes.isEmpty()) + { - logger.debug("No applicable indexes found"); ++ logger.trace("No applicable indexes found"); + Tracing.trace("No applicable indexes found"); + return null; + } + + Index selected = searchableIndexes.size() == 1 + ? searchableIndexes.get(0) + : searchableIndexes.stream() + .max((a, b) -> Longs.compare(a.getEstimatedResultRows(), + b.getEstimatedResultRows())) + .orElseThrow(() -> new AssertionError("Could not select most selective index")); + + // pay for an additional threadlocal get() rather than build the strings unnecessarily + if (Tracing.isTracing()) + { + Tracing.trace("Index mean cardinalities are {}. Scanning with {}.", + searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows()) + .collect(Collectors.joining(",")), + selected.getIndexName()); + } + return selected; + } + + /** + * Called at write time to ensure that values present in the update + * are valid according to the rules of all registered indexes which + * will process it. The partition key as well as the clustering and + * cell values for each row in the update may be checked by index + * implementations + * @param update PartitionUpdate containing the values to be validated by registered Index implementations + * @throws InvalidRequestException + */ + public void validate(PartitionUpdate update) throws InvalidRequestException + { + indexes.values() + .stream() + .filter(i -> i.indexes(update.columns())) + .forEach(i -> i.validate(update)); + } + + /** + * IndexRegistry methods + */ + public void registerIndex(Index index) + { + indexes.put(index.getIndexMetadata().name, index); - logger.debug("Registered index {}", index.getIndexMetadata().name); ++ logger.trace("Registered index {}", index.getIndexMetadata().name); + } + + public void unregisterIndex(Index index) + { + Index removed = indexes.remove(index.getIndexMetadata().name); - logger.debug(removed == null ? "Index {} was not registered" : "Removed index {} from registry", ++ logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry", + index.getIndexMetadata().name); + } + + public Index getIndex(IndexMetadata metadata) + { + return indexes.get(metadata.name); + } + + public Collection<Index> listIndexes() + { + return ImmutableSet.copyOf(indexes.values()); + } + + /** + * Handling of index updates. + * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data + * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances. + */ + + /** + * Transaction for updates on the write path. + */ + public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec) + { + if (!hasIndexes()) + return UpdateTransaction.NO_OP; + + // todo : optimize lookup, we can probably cache quite a bit of stuff, rather than doing + // a linear scan every time. Holding off that though until CASSANDRA-7771 to figure out + // exactly how indexes are to be identified & associated with a given partition update + Index.Indexer[] indexers = indexes.values().stream() + .filter(i -> i.indexes(update.columns())) + .map(i -> i.indexerFor(update.partitionKey(), + nowInSec, + opGroup, + IndexTransaction.Type.UPDATE)) + .toArray(Index.Indexer[]::new); + + return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers); + } + + /** + * Transaction for use when merging rows during compaction + */ + public CompactionTransaction newCompactionTransaction(DecoratedKey key, + PartitionColumns partitionColumns, + int versions, + int nowInSec) + { + // the check for whether there are any registered indexes is already done in CompactionIterator + + Index[] interestedIndexes = indexes.values().stream() + .filter(i -> i.indexes(partitionColumns)) + .toArray(Index[]::new); + + return interestedIndexes.length == 0 + ? CompactionTransaction.NO_OP + : new IndexGCTransaction(key, versions, nowInSec, interestedIndexes); + } + + /** + * Transaction for use when removing partitions during cleanup + */ + public CleanupTransaction newCleanupTransaction(DecoratedKey key, + PartitionColumns partitionColumns, + int nowInSec) + { + // + if (!hasIndexes()) + return CleanupTransaction.NO_OP; + + Index[] interestedIndexes = indexes.values().stream() + .filter(i -> i.indexes(partitionColumns)) + .toArray(Index[]::new); + + return interestedIndexes.length == 0 + ? CleanupTransaction.NO_OP + : new CleanupGCTransaction(key, nowInSec, interestedIndexes); + } + + /** + * A single use transaction for processing a partition update on the regular write path + */ + private static final class WriteTimeTransaction implements UpdateTransaction + { + private final Index.Indexer[] indexers; + + private WriteTimeTransaction(Index.Indexer...indexers) + { + // don't allow null indexers, if we don't need any use a NullUpdater object + for (Index.Indexer indexer : indexers) assert indexer != null; + this.indexers = indexers; + } + + public void start() + { + for (Index.Indexer indexer : indexers) + indexer.begin(); + } + + public void onPartitionDeletion(DeletionTime deletionTime) + { + for (Index.Indexer indexer : indexers) + indexer.partitionDelete(deletionTime); + } + + public void onRangeTombstone(RangeTombstone tombstone) + { + for (Index.Indexer indexer : indexers) + indexer.rangeTombstone(tombstone); + } + + public void onInserted(Row row) + { + Arrays.stream(indexers).forEach(h -> h.insertRow(row)); + } + + public void onUpdated(Row existing, Row updated) + { + final Row.Builder toRemove = BTreeRow.sortedBuilder(); + toRemove.newRow(existing.clustering()); + final Row.Builder toInsert = BTreeRow.sortedBuilder(); + toInsert.newRow(updated.clustering()); + // diff listener collates the columns to be added & removed from the indexes + RowDiffListener diffListener = new RowDiffListener() + { + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + if (merged != null && merged != original) + toInsert.addPrimaryKeyLivenessInfo(merged); + } + + public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) + { + } + + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) + { + } + + public void onCell(int i, Clustering clustering, Cell merged, Cell original) + { + if (merged != null && merged != original) + toInsert.addCell(merged); + + if (merged == null || (original != null && shouldCleanupOldValue(original, merged))) + toRemove.addCell(original); + + } + }; + Rows.diff(diffListener, updated, existing); + Row oldRow = toRemove.build(); + Row newRow = toInsert.build(); + for (Index.Indexer indexer : indexers) + indexer.updateRow(oldRow, newRow); + } + + public void commit() + { + for (Index.Indexer indexer : indexers) + indexer.finish(); + } + + private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell) + { + // If either the value or timestamp is different, then we + // should delete from the index. If not, then we can infer that + // at least one of the cells is an ExpiringColumn and that the + // difference is in the expiry time. In this case, we don't want to + // delete the old value from the index as the tombstone we insert + // will just hide the inserted value. + // Completely identical cells (including expiring columns with + // identical ttl & localExpirationTime) will not get this far due + // to the oldCell.equals(newCell) in StandardUpdater.update + return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp(); + } + } + + /** + * A single-use transaction for updating indexes for a single partition during compaction where the only + * operation is to merge rows + * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in + * a single partition + */ + private static final class IndexGCTransaction implements CompactionTransaction + { + private final DecoratedKey key; + private final int versions; + private final int nowInSec; + private final Index[] indexes; + + private Row[] rows; + + private IndexGCTransaction(DecoratedKey key, + int versions, + int nowInSec, + Index...indexes) + { + // don't allow null indexers, if we don't have any, use a noop transaction + for (Index index : indexes) assert index != null; + + this.key = key; + this.versions = versions; + this.indexes = indexes; + this.nowInSec = nowInSec; + } + + public void start() + { + if (versions > 0) + rows = new Row[versions]; + } + + public void onRowMerge(Row merged, Row...versions) + { + // Diff listener constructs rows representing deltas between the merged and original versions + // These delta rows are then passed to registered indexes for removal processing + final Row.Builder[] builders = new Row.Builder[versions.length]; + RowDiffListener diffListener = new RowDiffListener() + { + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + } + + public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) + { + } + + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) + { + } + + public void onCell(int i, Clustering clustering, Cell merged, Cell original) + { + if (original != null && merged == null) + { + if (builders[i] == null) + { + builders[i] = BTreeRow.sortedBuilder(); + builders[i].newRow(clustering); + } + builders[i].addCell(original); + } + } + }; + + Rows.diff(diffListener, merged, versions); + + for(int i = 0; i < builders.length; i++) + if (builders[i] != null) + rows[i] = builders[i].build(); + } + + public void commit() + { + if (rows == null) + return; + + try (OpOrder.Group opGroup = Keyspace.writeOrder.start()) + { + for (Index index : indexes) + { + Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.COMPACTION); + indexer.begin(); + for (Row row : rows) + if (row != null) + indexer.removeRow(row); + indexer.finish(); + } + } + } + } + + /** + * A single-use transaction for updating indexes for a single partition during cleanup, where + * partitions and rows are only removed + * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in + * a single partition + */ + private static final class CleanupGCTransaction implements CleanupTransaction + { + private final DecoratedKey key; + private final int nowInSec; + private final Index[] indexes; + + private Row row; + private DeletionTime partitionDelete; + + private CleanupGCTransaction(DecoratedKey key, + int nowInSec, + Index...indexes) + { + // don't allow null indexers, if we don't have any, use a noop transaction + for (Index index : indexes) assert index != null; + + this.key = key; + this.indexes = indexes; + this.nowInSec = nowInSec; + } + + public void start() + { + } + + public void onPartitionDeletion(DeletionTime deletionTime) + { + partitionDelete = deletionTime; + } + + public void onRowDelete(Row row) + { + this.row = row; + } + + public void commit() + { + if (row == null && partitionDelete == null) + return; + + try (OpOrder.Group opGroup = Keyspace.writeOrder.start()) + { + for (Index index : indexes) + { + Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.CLEANUP); + indexer.begin(); + if (row != null) + indexer.removeRow(row); + indexer.finish(); + } + } + } + } + + private static void executeBlocking(Callable<?> task) + { + if (null != task) + FBUtilities.waitOnFuture(blockingExecutor.submit(task)); + } + + private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function) + { + List<Future<?>> waitFor = new ArrayList<>(); + indexers.forEach(indexer -> { + Callable<?> task = function.apply(indexer); + if (null != task) + waitFor.add(blockingExecutor.submit(task)); + }); + FBUtilities.waitOnFutures(waitFor); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java index f6a10e5,0000000..93f5d61 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@@ -1,826 -1,0 +1,826 @@@ +package org.apache.cassandra.index.internal; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.IndexRegistry; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.index.internal.composites.CompositesSearcher; +import org.apache.cassandra.index.internal.keys.KeysSearcher; +import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Refs; + +/** + * Index implementation which indexes the values for a single column in the base + * table and which stores its index data in a local, hidden table. + */ +public abstract class CassandraIndex implements Index +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class); + + public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$"); + + public final ColumnFamilyStore baseCfs; + protected IndexMetadata metadata; + protected ColumnFamilyStore indexCfs; + protected ColumnDefinition indexedColumn; + protected CassandraIndexFunctions functions; + + protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + this.baseCfs = baseCfs; + setMetadata(indexDef); + } + + /** + * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value] + * @param indexedColumn + * @param operator + * @return + */ + protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator) + { + return operator == Operator.EQ; + } + + /** + * Used to construct an the clustering for an entry in the index table based on values from the base data. + * The clustering columns in the index table encode the values required to retrieve the correct data from the base + * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details + * Used whenever a row in the index table is written or deleted. + * @param partitionKey from the base data being indexed + * @param prefix from the base data being indexed + * @param path from the base data being indexed + * @return a clustering prefix to be used to insert into the index table + */ + protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey, + ClusteringPrefix prefix, + CellPath path); + + /** + * Used at search time to convert a row in the index table into a simple struct containing the values required + * to retrieve the corresponding row from the base table. + * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed) + * @param indexEntry a row from the index table + * @return + */ + public abstract IndexEntry decodeEntry(DecoratedKey indexedValue, + Row indexEntry); + + /** + * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table. + * Used at read time to identify out of date index entries so that they can be excluded from search results and + * repaired + * @param row the current row from the primary data table + * @param indexValue the value we retrieved from the index + * @param nowInSec + * @return true if the index is out of date and the entry should be dropped + */ + public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec); + + /** + * Extract the value to be inserted into the index from the components of the base data + * @param partitionKey from the primary data + * @param clustering from the primary data + * @param path from the primary data + * @param cellValue from the primary data + * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition + * key in the index table + */ + protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey, + Clustering clustering, + CellPath path, + ByteBuffer cellValue); + + public ColumnDefinition getIndexedColumn() + { + return indexedColumn; + } + + public ClusteringComparator getIndexComparator() + { + return indexCfs.metadata.comparator; + } + + public ColumnFamilyStore getIndexCfs() + { + return indexCfs; + } + + public void register(IndexRegistry registry) + { + registry.registerIndex(this); + } + + public Callable<?> getInitializationTask() + { + // if we're just linking in the index on an already-built index post-restart + // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder + return isBuilt() ? null : getBuildIndexTask(); + } + + public IndexMetadata getIndexMetadata() + { + return metadata; + } + + public String getIndexName() + { + return metadata.name; + } + + public Optional<ColumnFamilyStore> getBackingTable() + { + return indexCfs == null ? Optional.empty() : Optional.of(indexCfs); + } + + public Callable<Void> getBlockingFlushTask() + { + return () -> { + indexCfs.forceBlockingFlush(); + return null; + }; + } + + public Callable<?> getInvalidateTask() + { + return () -> { + markRemoved(); + invalidate(); + return null; + }; + } + + public Callable<?> getMetadataReloadTask(IndexMetadata indexDef) + { + setMetadata(indexDef); + return () -> { + indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata); + indexCfs.reload(); + return null; + }; + } + + private void setMetadata(IndexMetadata indexDef) + { + metadata = indexDef; + Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef); + functions = getFunctions(indexDef, target); + CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef); + indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace, + cfm.cfName, + cfm, + baseCfs.getTracker().loadsstables); + indexedColumn = target.left; + } + + public Callable<?> getTruncateTask(final long truncatedAt) + { + return () -> { + indexCfs.discardSSTables(truncatedAt); + return null; + }; + } + + public boolean shouldBuildBlocking() + { + // built-in indexes are always included in builds initiated from SecondaryIndexManager + return true; + } + + public boolean indexes(PartitionColumns columns) + { + // if we have indexes on the partition key or clustering columns, return true + return isPrimaryKeyIndex() || columns.contains(indexedColumn); + } + + public boolean dependsOn(ColumnDefinition column) + { + return indexedColumn.name.equals(column.name); + } + + public boolean supportsExpression(ColumnDefinition column, Operator operator) + { + return indexedColumn.name.equals(column.name) + && supportsOperator(indexedColumn, operator); + } + + private boolean supportsExpression(RowFilter.Expression expression) + { + return supportsExpression(expression.column(), expression.operator()); + } + + public AbstractType<?> customExpressionValueType() + { + return null; + } + + public long getEstimatedResultRows() + { + return indexCfs.getMeanColumns(); + } + + /** + * No post processing of query results, just return them unchanged + */ + public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command) + { + return (partitionIterator, readCommand) -> partitionIterator; + } + + public RowFilter getPostIndexQueryFilter(RowFilter filter) + { + return getTargetExpression(filter.getExpressions()).map(filter::without) + .orElse(filter); + } + + private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions) + { + return expressions.stream().filter(this::supportsExpression).findFirst(); + } + + public Index.Searcher searcherFor(ReadCommand command) + { + Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions()); + + if (target.isPresent()) + { + target.get().validateForIndexing(); + switch (getIndexMetadata().kind) + { + case COMPOSITES: + return new CompositesSearcher(command, target.get(), this); + case KEYS: + return new KeysSearcher(command, target.get(), this); + default: + throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s", + metadata.kind, + metadata.name, + indexedColumn.name.toString())); + } + } + + return null; + + } + + public void validate(PartitionUpdate update) throws InvalidRequestException + { + switch (indexedColumn.kind) + { + case PARTITION_KEY: + validatePartitionKey(update.partitionKey()); + break; + case CLUSTERING: + validateClusterings(update); + break; + case REGULAR: + validateRows(update); + break; + case STATIC: + validateRows(Collections.singleton(update.staticRow())); + break; + } + } + + public Indexer indexerFor(final DecoratedKey key, + final int nowInSec, + final OpOrder.Group opGroup, + final IndexTransaction.Type transactionType) + { + return new Indexer() + { + public void begin() + { + } + + public void partitionDelete(DeletionTime deletionTime) + { + } + + public void rangeTombstone(RangeTombstone tombstone) + { + } + + public void insertRow(Row row) + { + if (isPrimaryKeyIndex()) + { + indexPrimaryKey(row.clustering(), + getPrimaryKeyIndexLiveness(row), + row.deletion()); + } + else + { + if (indexedColumn.isComplex()) + indexCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + indexCell(row.clustering(), row.getCell(indexedColumn)); + } + } + + public void removeRow(Row row) + { + if (isPrimaryKeyIndex()) + indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion()); + + if (indexedColumn.isComplex()) + removeCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + removeCell(row.clustering(), row.getCell(indexedColumn)); + } + + + public void updateRow(Row oldRow, Row newRow) + { + if (isPrimaryKeyIndex()) + indexPrimaryKey(newRow.clustering(), + newRow.primaryKeyLivenessInfo(), + newRow.deletion()); + + if (indexedColumn.isComplex()) + { + indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn)); + removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn)); + } + else + { + indexCell(newRow.clustering(), newRow.getCell(indexedColumn)); + removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn)); + } + } + + public void finish() + { + } + + private void indexCells(Clustering clustering, Iterable<Cell> cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + indexCell(clustering, cell); + } + + private void indexCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + insert(key.getKey(), + clustering, + cell, + LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), + opGroup); + } + + private void removeCells(Clustering clustering, Iterable<Cell> cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + removeCell(clustering, cell); + } + + private void removeCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + delete(key.getKey(), clustering, cell, opGroup, nowInSec); + } + + private void indexPrimaryKey(final Clustering clustering, + final LivenessInfo liveness, + final Row.Deletion deletion) + { + if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP) + insert(key.getKey(), clustering, null, liveness, opGroup); + + if (!deletion.isLive()) + delete(key.getKey(), clustering, deletion.time(), opGroup); + } + + private LivenessInfo getPrimaryKeyIndexLiveness(Row row) + { + long timestamp = row.primaryKeyLivenessInfo().timestamp(); + int ttl = row.primaryKeyLivenessInfo().ttl(); + for (Cell cell : row.cells()) + { + long cellTimestamp = cell.timestamp(); + if (cell.isLive(nowInSec)) + { + if (cellTimestamp > timestamp) + { + timestamp = cellTimestamp; + ttl = cell.ttl(); + } + } + } + return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec); + } + }; + } + + /** + * Specific to internal indexes, this is called by a + * searcher when it encounters a stale entry in the index + * @param indexKey the partition key in the index table + * @param indexClustering the clustering in the index table + * @param deletion deletion timestamp etc + * @param opGroup the operation under which to perform the deletion + */ + public void deleteStaleEntry(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + doDelete(indexKey, indexClustering, deletion, opGroup); - logger.debug("Removed index entry for stale value {}", indexKey); ++ logger.trace("Removed index entry for stale value {}", indexKey); + } + + /** + * Called when adding a new entry to the index + */ + private void insert(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + LivenessInfo info, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info); + PartitionUpdate upd = partitionUpdate(valueKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); - logger.debug("Inserted entry into index for value {}", valueKey); ++ logger.trace("Inserted entry into index for value {}", valueKey); + } + + /** + * Called when deleting entries on non-primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + OpOrder.Group opGroup, + int nowInSec) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, cell), + new DeletionTime(cell.timestamp(), nowInSec), + opGroup); + } + + /** + * Called when deleting entries from indexes on primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + null)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, null), + deletion, + opGroup); + } + + private void doDelete(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion)); + PartitionUpdate upd = partitionUpdate(indexKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); - logger.debug("Removed index entry for value {}", indexKey); ++ logger.trace("Removed index entry for value {}", indexKey); + } + + private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException + { + assert indexedColumn.isPartitionKey(); + validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null)); + } + + private void validateClusterings(PartitionUpdate update) throws InvalidRequestException + { + assert indexedColumn.isClusteringColumn(); + for (Row row : update) + validateIndexedValue(getIndexedValue(null, row.clustering(), null)); + } + + private void validateRows(Iterable<Row> rows) + { + assert !indexedColumn.isPrimaryKeyColumn(); + for (Row row : rows) + { + if (indexedColumn.isComplex()) + { + ComplexColumnData data = row.getComplexColumnData(indexedColumn); + if (data != null) + { + for (Cell cell : data) + { + validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value())); + } + } + } + else + { + validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn))); + } + } + } + + private void validateIndexedValue(ByteBuffer value) + { + if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format( + "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)", + value.remaining(), + getIndexName(), + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + indexedColumn.name.toString(), + FBUtilities.MAX_UNSIGNED_SHORT)); + } + + private ByteBuffer getIndexedValue(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return getIndexedValue(rowKey, + clustering, + cell == null ? null : cell.path(), + cell == null ? null : cell.value() + ); + } + + private Clustering buildIndexClustering(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return buildIndexClusteringPrefix(rowKey, + clustering, + cell == null ? null : cell.path()).build(); + } + + private DecoratedKey getIndexKeyFor(ByteBuffer value) + { + return indexCfs.decorateKey(value); + } + + private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row) + { + return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); + } + + private void invalidate() + { + // interrupt in-progress compactions + Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs); + CompactionManager.instance.interruptCompactionForCFs(cfss, true); + CompactionManager.instance.waitForCessation(cfss); + Keyspace.writeOrder.awaitNewBarrier(); + indexCfs.forceBlockingFlush(); + indexCfs.readOrdering.awaitNewBarrier(); + indexCfs.invalidate(); + } + + private boolean isBuilt() + { + return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getIndexName()); + } + + private void markBuilt() + { + SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getIndexName()); + } + + private void markRemoved() + { + SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getIndexName()); + } + + private boolean isPrimaryKeyIndex() + { + return indexedColumn.isPrimaryKeyColumn(); + } + + private Callable<?> getBuildIndexTask() + { + return () -> { + buildBlocking(); + return null; + }; + } + + private void buildBlocking() + { + baseCfs.forceBlockingFlush(); + + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + Refs<SSTableReader> sstables = viewFragment.refs) + { + if (sstables.isEmpty()) + { + logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built", + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + getIndexName()); + markBuilt(); + return; + } + + logger.info("Submitting index build of {} for data in {}", + getIndexName(), + getSSTableNames(sstables)); + + SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, + Collections.singleton(this), + new ReducingKeyIterator(sstables)); + Future<?> future = CompactionManager.instance.submitIndexBuild(builder); + FBUtilities.waitOnFuture(future); + indexCfs.forceBlockingFlush(); + markBuilt(); + } + logger.info("Index build of {} complete", getIndexName()); + } + + private static String getSSTableNames(Collection<SSTableReader> sstables) + { + return StreamSupport.stream(sstables.spliterator(), false) + .map(SSTableReader::toString) + .collect(Collectors.joining(", ")); + } + + /** + * Construct the CFMetadata for an index table, the clustering columns in the index table + * vary dependent on the kind of the indexed value. + * @param baseCfsMetadata + * @param indexMetadata + * @return + */ + public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata) + { + Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata); + CassandraIndexFunctions utils = getFunctions(indexMetadata, target); + ColumnDefinition indexedColumn = target.left; + AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn); + CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName, + baseCfsMetadata.indexColumnFamilyName(indexMetadata)) + .withId(baseCfsMetadata.cfId) + .withPartitioner(new LocalPartitioner(indexedValueType)) + .addPartitionKey(indexedColumn.name, indexedColumn.type); + + builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering()); + builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn); + return builder.build().reloadIndexMetadataProperties(baseCfsMetadata); + } + + /** + * Factory method for new CassandraIndex instances + * @param baseCfs + * @param indexMetadata + * @return + */ + public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata) + { + return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata); + } + + // Public because it's also used to convert index metadata into a thrift-compatible format + public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm, + IndexMetadata indexDef) + { + String target = indexDef.options.get("target"); + assert target != null : String.format("No target definition found for index %s", indexDef.name); + + // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc + // if not, then it must be a simple column name and implictly its type is VALUES + Matcher matcher = TARGET_REGEX.matcher(target); + String columnName; + IndexTarget.Type targetType; + if (matcher.matches()) + { + targetType = IndexTarget.Type.fromString(matcher.group(1)); + columnName = matcher.group(2); + } + else + { + columnName = target; + targetType = IndexTarget.Type.VALUES; + } + + // in the case of a quoted column name the name in the target string + // will be enclosed in quotes, which we need to unwrap. It may also + // include quote characters internally, escaped like so: + // abc"def -> abc""def. + // Because the target string is stored in a CQL compatible form, we + // need to un-escape any such quotes to get the actual column name + if (columnName.startsWith("\"")) + { + columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1); + columnName = columnName.replaceAll("\"\"", "\""); + } + + // if it's not a CQL table, we can't assume that the column name is utf8, so + // in that case we have to do a linear scan of the cfm's columns to get the matching one + if (cfm.isCQLTable()) + return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType); + else + for (ColumnDefinition column : cfm.allColumns()) + if (column.name.toString().equals(columnName)) + return Pair.create(column, targetType); + + throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target)); + } + + static CassandraIndexFunctions getFunctions(IndexMetadata indexDef, + Pair<ColumnDefinition, IndexTarget.Type> target) + { + if (indexDef.isKeys()) + return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS; + + ColumnDefinition indexedColumn = target.left; + if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell()) + { + switch (((CollectionType)indexedColumn.type).kind) + { + case LIST: + return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS; + case SET: + return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS; + case MAP: + switch (target.right) + { + case KEYS: + return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS; + case KEYS_AND_VALUES: + return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS; + case VALUES: + return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS; + } + throw new AssertionError(); + } + } + + switch (indexedColumn.kind) + { + case CLUSTERING: + return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS; + case REGULAR: + return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS; + case PARTITION_KEY: + return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS; + //case COMPACT_VALUE: + // return new CompositesIndexOnCompactValue(); + } + throw new AssertionError(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTable.java index d66638e,b0aa89e..923ef82 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@@ -111,11 -113,9 +111,11 @@@ public abstract class SSTabl FileUtils.deleteWithConfirm(desc.filenameFor(component)); } - FileUtils.delete(desc.filenameFor(Component.SUMMARY)); + + if (components.contains(Component.SUMMARY)) + FileUtils.delete(desc.filenameFor(Component.SUMMARY)); - logger.debug("Deleted {}", desc); + logger.trace("Deleted {}", desc); return true; }