http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index e64ec75..4cb7470 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -32,23 +32,25 @@ import javax.management.ObjectName; import com.google.common.util.concurrent.Futures; -import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.cache.*; import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.partitions.CachedBTreePartition; import org.apache.cassandra.db.partitions.CachedPartition; -import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -265,13 +267,13 @@ public class CacheService implements CacheServiceMBean keyCache.clear(); } - public void invalidateKeyCacheForCf(Pair<String, String> ksAndCFName) + public void invalidateKeyCacheForCf(TableMetadata tableMetadata) { Iterator<KeyCacheKey> keyCacheIterator = keyCache.keyIterator(); while (keyCacheIterator.hasNext()) { KeyCacheKey key = keyCacheIterator.next(); - if (key.ksAndCFName.equals(ksAndCFName)) + if (key.sameTable(tableMetadata)) keyCacheIterator.remove(); } } @@ -281,24 +283,24 @@ public class CacheService implements CacheServiceMBean rowCache.clear(); } - public void invalidateRowCacheForCf(Pair<String, String> ksAndCFName) + public void invalidateRowCacheForCf(TableMetadata tableMetadata) { Iterator<RowCacheKey> rowCacheIterator = rowCache.keyIterator(); while (rowCacheIterator.hasNext()) { - RowCacheKey rowCacheKey = rowCacheIterator.next(); - if (rowCacheKey.ksAndCFName.equals(ksAndCFName)) + RowCacheKey key = rowCacheIterator.next(); + if (key.sameTable(tableMetadata)) rowCacheIterator.remove(); } } - public void invalidateCounterCacheForCf(Pair<String, String> ksAndCFName) + public void invalidateCounterCacheForCf(TableMetadata tableMetadata) { Iterator<CounterCacheKey> counterCacheIterator = counterCache.keyIterator(); while (counterCacheIterator.hasNext()) { - CounterCacheKey counterCacheKey = counterCacheIterator.next(); - if (counterCacheKey.ksAndCFName.equals(ksAndCFName)) + CounterCacheKey key = counterCacheIterator.next(); + if (key.sameTable(tableMetadata)) counterCacheIterator.remove(); } } @@ -353,8 +355,10 @@ public class CacheService implements CacheServiceMBean { public void serialize(CounterCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { - assert(cfs.metadata.isCounter()); - out.write(cfs.metadata.ksAndCFBytes); + assert(cfs.metadata().isCounter()); + TableMetadata tableMetadata = cfs.metadata(); + tableMetadata.id.serialize(out); + out.writeUTF(tableMetadata.indexName().orElse("")); key.write(out); } @@ -362,8 +366,10 @@ public class CacheService implements CacheServiceMBean { //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a //parameter so they aren't deserialized here, even though they are serialized by this serializer - final CounterCacheKey cacheKey = CounterCacheKey.read(cfs.metadata.ksAndCFName, in); - if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled()) + if (cfs == null) + return null; + final CounterCacheKey cacheKey = CounterCacheKey.read(cfs.metadata(), in); + if (!cfs.metadata().isCounter() || !cfs.isCounterCacheEnabled()) return null; return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>() @@ -384,7 +390,9 @@ public class CacheService implements CacheServiceMBean public void serialize(RowCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes - out.write(cfs.metadata.ksAndCFBytes); + TableMetadata tableMetadata = cfs.metadata(); + tableMetadata.id.serialize(out); + out.writeUTF(tableMetadata.indexName().orElse("")); ByteBufferUtil.writeWithLength(key.key, out); } @@ -395,7 +403,7 @@ public class CacheService implements CacheServiceMBean final ByteBuffer buffer = ByteBufferUtil.readWithLength(in); if (cfs == null || !cfs.isRowCacheEnabled()) return null; - final int rowsToCache = cfs.metadata.params.caching.rowsPerPartitionToCache(); + final int rowsToCache = cfs.metadata().params.caching.rowsPerPartitionToCache(); assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>() @@ -404,11 +412,11 @@ public class CacheService implements CacheServiceMBean { DecoratedKey key = cfs.decorateKey(buffer); int nowInSec = FBUtilities.nowInSeconds(); - SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata(), nowInSec, key); try (ReadExecutionController controller = cmd.executionController(); UnfilteredRowIterator iter = cmd.queryMemtableAndDisk(cfs, controller)) { CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec); - return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache); + return Pair.create(new RowCacheKey(cfs.metadata(), key), toCache); } } }); @@ -423,13 +431,15 @@ public class CacheService implements CacheServiceMBean if (entry == null) return; - out.write(cfs.metadata.ksAndCFBytes); + TableMetadata tableMetadata = cfs.metadata(); + tableMetadata.id.serialize(out); + out.writeUTF(tableMetadata.indexName().orElse("")); ByteBufferUtil.writeWithLength(key.key, out); out.writeInt(key.desc.generation); out.writeBoolean(true); - SerializationHeader header = new SerializationHeader(false, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS); - key.desc.getFormat().getIndexSerializer(cfs.metadata, key.desc.version, header).serializeForCache(entry, out); + SerializationHeader header = new SerializationHeader(false, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS); + key.desc.getFormat().getIndexSerializer(cfs.metadata(), key.desc.version, header).serializeForCache(entry, out); } public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException @@ -455,11 +465,11 @@ public class CacheService implements CacheServiceMBean RowIndexEntry.Serializer.skipForCache(input); return null; } - RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata, + RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata(), reader.descriptor.version, reader.header); RowIndexEntry<?> entry = indexSerializer.deserializeForCache(input); - return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry)); + return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata(), reader.descriptor, key), entry)); } private SSTableReader findDesc(int generation, Iterable<SSTableReader> collection)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index a026d6e..2beb039 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -46,10 +46,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.functions.ThreadAwareSecurityManager; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.*; @@ -244,7 +245,7 @@ public class CassandraDaemon if (keyspaceName.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME)) continue; - for (CFMetaData cfm : Schema.instance.getTablesAndViews(keyspaceName)) + for (TableMetadata cfm : Schema.instance.getTablesAndViews(keyspaceName)) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index 52e71ae..80cf810 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -28,13 +28,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.*; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.Validation; import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.AuthenticationException; @@ -255,7 +255,7 @@ public class ClientState { // Skip keyspace validation for non-authenticated users. Apparently, some client libraries // call set_keyspace() before calling login(), and we have to handle that. - if (user != null && Schema.instance.getKSMetaData(ks) == null) + if (user != null && Schema.instance.getKeyspaceMetadata(ks) == null) throw new InvalidRequestException("Keyspace '" + ks + "' does not exist"); keyspace = ks; } @@ -290,14 +290,20 @@ public class ClientState public void hasColumnFamilyAccess(String keyspace, String columnFamily, Permission perm) throws UnauthorizedException, InvalidRequestException { - Validation.validateColumnFamily(keyspace, columnFamily); + Schema.instance.validateTable(keyspace, columnFamily); hasAccess(keyspace, perm, DataResource.table(keyspace, columnFamily)); } - public void hasColumnFamilyAccess(CFMetaData cfm, Permission perm) + public void hasColumnFamilyAccess(TableMetadataRef tableRef, Permission perm) throws UnauthorizedException, InvalidRequestException { - hasAccess(cfm.ksName, perm, cfm.resource); + hasColumnFamilyAccess(tableRef.get(), perm); + } + + public void hasColumnFamilyAccess(TableMetadata table, Permission perm) + throws UnauthorizedException, InvalidRequestException + { + hasAccess(table.keyspace, perm, table.resource); } private void hasAccess(String keyspace, Permission perm, DataResource resource) http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 991c768..cc7ab2d 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -25,8 +25,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ClusteringIndexFilter; @@ -122,7 +122,7 @@ public class DataResolver extends ResponseResolver return new MergeListener(partitionKey, columns(versions), isReversed(versions)); } - private PartitionColumns columns(List<UnfilteredRowIterator> versions) + private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) { Columns statics = Columns.NONE; Columns regulars = Columns.NONE; @@ -131,11 +131,11 @@ public class DataResolver extends ResponseResolver if (iter == null) continue; - PartitionColumns cols = iter.columns(); + RegularAndStaticColumns cols = iter.columns(); statics = statics.mergeTo(cols.statics); regulars = regulars.mergeTo(cols.regulars); } - return new PartitionColumns(statics, regulars); + return new RegularAndStaticColumns(statics, regulars); } private boolean isReversed(List<UnfilteredRowIterator> versions) @@ -175,7 +175,7 @@ public class DataResolver extends ResponseResolver private class MergeListener implements UnfilteredRowIterators.MergeListener { private final DecoratedKey partitionKey; - private final PartitionColumns columns; + private final RegularAndStaticColumns columns; private final boolean isReversed; private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length]; @@ -191,7 +191,7 @@ public class DataResolver extends ResponseResolver // For each source, record if there is an open range to send as repair, and from where. private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length]; - public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) + public MergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed) { this.partitionKey = partitionKey; this.columns = columns; @@ -211,7 +211,7 @@ public class DataResolver extends ResponseResolver currentRow(i, clustering).addRowDeletion(merged); } - public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) + public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original) { if (merged != null && !merged.equals(original)) currentRow(i, clustering).addComplexDeletion(column, merged); @@ -231,7 +231,7 @@ public class DataResolver extends ResponseResolver // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are // not repaired. - ColumnDefinition column = cell.column(); + ColumnMetadata column = cell.column(); ColumnFilter filter = command.columnFilter(); return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column); } @@ -422,12 +422,12 @@ public class DataResolver extends ResponseResolver private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator> { - final CFMetaData metadata; + final TableMetadata metadata; final DecoratedKey partitionKey; Clustering lastClustering; int lastCount = 0; - private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey) + private ShortReadRowProtection(TableMetadata metadata, DecoratedKey partitionKey) { this.metadata = metadata; this.partitionKey = partitionKey; http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/MigrationListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java b/src/java/org/apache/cassandra/service/MigrationListener.java deleted file mode 100644 index 19d2592..0000000 --- a/src/java/org/apache/cassandra/service/MigrationListener.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.util.List; - -import org.apache.cassandra.db.marshal.AbstractType; - -public abstract class MigrationListener -{ - public void onCreateKeyspace(String ksName) - { - } - - public void onCreateColumnFamily(String ksName, String cfName) - { - } - - public void onCreateView(String ksName, String viewName) - { - onCreateColumnFamily(ksName, viewName); - } - - public void onCreateUserType(String ksName, String typeName) - { - } - - public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) - { - } - - public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) - { - } - - public void onUpdateKeyspace(String ksName) - { - } - - // the boolean flag indicates whether the change that triggered this event may have a substantive - // impact on statements using the column family. - public void onUpdateColumnFamily(String ksName, String cfName, boolean affectsStatements) - { - } - - public void onUpdateView(String ksName, String viewName, boolean columnsDidChange) - { - onUpdateColumnFamily(ksName, viewName, columnsDidChange); - } - - public void onUpdateUserType(String ksName, String typeName) - { - } - - public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) - { - } - - public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) - { - } - - public void onDropKeyspace(String ksName) - { - } - - public void onDropColumnFamily(String ksName, String cfName) - { - } - - public void onDropView(String ksName, String viewName) - { - onDropColumnFamily(ksName, viewName); - } - - public void onDropUserType(String ksName, String typeName) - { - } - - public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) - { - } - - public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) - { - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java deleted file mode 100644 index f9e4aff..0000000 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ /dev/null @@ -1,626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.*; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; -import org.apache.cassandra.config.ViewDefinition; -import org.apache.cassandra.cql3.functions.UDAggregate; -import org.apache.cassandra.cql3.functions.UDFunction; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.exceptions.AlreadyExistsException; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.gms.*; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.SchemaKeyspace; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.WrappedRunnable; - -public class MigrationManager -{ - private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); - - public static final MigrationManager instance = new MigrationManager(); - - private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); - - public static final int MIGRATION_DELAY_IN_MS = 60000; - - private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1")); - - private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>(); - - private MigrationManager() {} - - public void register(MigrationListener listener) - { - listeners.add(listener); - } - - public void unregister(MigrationListener listener) - { - listeners.remove(listener); - } - - public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state) - { - VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); - - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) - maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); - } - - /** - * If versions differ this node sends request with local migration list to the endpoint - * and expecting to receive a list of migrations to apply locally. - */ - private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) - { - if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) - { - logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); - return; - } - - if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) - { - // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately - logger.debug("Submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); - } - else - { - // Include a delay to make sure we have a chance to apply any changes being - // pushed out simultaneously. See CASSANDRA-5025 - Runnable runnable = () -> - { - // grab the latest version of the schema since it may have changed again since the initial scheduling - EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - if (epState == null) - { - logger.debug("epState vanished for {}, not submitting migration task", endpoint); - return; - } - VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); - UUID currentVersion = UUID.fromString(value.value); - if (Schema.instance.getVersion().equals(currentVersion)) - { - logger.debug("not submitting migration task for {} because our versions match", endpoint); - return; - } - logger.debug("submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); - }; - ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); - } - } - - private static Future<?> submitMigrationTask(InetAddress endpoint) - { - /* - * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are - * running in the gossip stage. - */ - return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); - } - - public static boolean shouldPullSchemaFrom(InetAddress endpoint) - { - /* - * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) - * Don't request schema from fat clients - */ - return MessagingService.instance().knowsVersion(endpoint) - && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version - && !Gossiper.instance.isGossipOnlyMember(endpoint); - } - - public static boolean isReadyForBootstrap() - { - return MigrationTask.getInflightTasks().isEmpty(); - } - - public static void waitUntilReadyForBootstrap() - { - CountDownLatch completionLatch; - while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null) - { - try - { - if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) - logger.error("Migration task failed to complete"); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - logger.error("Migration task was interrupted"); - } - } - } - - public void notifyCreateKeyspace(KeyspaceMetadata ksm) - { - for (MigrationListener listener : listeners) - listener.onCreateKeyspace(ksm.name); - } - - public void notifyCreateColumnFamily(CFMetaData cfm) - { - for (MigrationListener listener : listeners) - listener.onCreateColumnFamily(cfm.ksName, cfm.cfName); - } - - public void notifyCreateView(ViewDefinition view) - { - for (MigrationListener listener : listeners) - listener.onCreateView(view.ksName, view.viewName); - } - - public void notifyCreateUserType(UserType ut) - { - for (MigrationListener listener : listeners) - listener.onCreateUserType(ut.keyspace, ut.getNameAsString()); - } - - public void notifyCreateFunction(UDFunction udf) - { - for (MigrationListener listener : listeners) - listener.onCreateFunction(udf.name().keyspace, udf.name().name, udf.argTypes()); - } - - public void notifyCreateAggregate(UDAggregate udf) - { - for (MigrationListener listener : listeners) - listener.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()); - } - - public void notifyUpdateKeyspace(KeyspaceMetadata ksm) - { - for (MigrationListener listener : listeners) - listener.onUpdateKeyspace(ksm.name); - } - - public void notifyUpdateColumnFamily(CFMetaData cfm, boolean columnsDidChange) - { - for (MigrationListener listener : listeners) - listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName, columnsDidChange); - } - - public void notifyUpdateView(ViewDefinition view, boolean columnsDidChange) - { - for (MigrationListener listener : listeners) - listener.onUpdateView(view.ksName, view.viewName, columnsDidChange); - } - - public void notifyUpdateUserType(UserType ut) - { - for (MigrationListener listener : listeners) - listener.onUpdateUserType(ut.keyspace, ut.getNameAsString()); - - // FIXME: remove when we get rid of AbstractType in metadata. Doesn't really belong anywhere. - Schema.instance.getKSMetaData(ut.keyspace).functions.udfs().forEach(f -> f.userTypeUpdated(ut.keyspace, ut.getNameAsString())); - } - - public void notifyUpdateFunction(UDFunction udf) - { - for (MigrationListener listener : listeners) - listener.onUpdateFunction(udf.name().keyspace, udf.name().name, udf.argTypes()); - } - - public void notifyUpdateAggregate(UDAggregate udf) - { - for (MigrationListener listener : listeners) - listener.onUpdateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()); - } - - public void notifyDropKeyspace(KeyspaceMetadata ksm) - { - for (MigrationListener listener : listeners) - listener.onDropKeyspace(ksm.name); - } - - public void notifyDropColumnFamily(CFMetaData cfm) - { - for (MigrationListener listener : listeners) - listener.onDropColumnFamily(cfm.ksName, cfm.cfName); - } - - public void notifyDropView(ViewDefinition view) - { - for (MigrationListener listener : listeners) - listener.onDropView(view.ksName, view.viewName); - } - - public void notifyDropUserType(UserType ut) - { - for (MigrationListener listener : listeners) - listener.onDropUserType(ut.keyspace, ut.getNameAsString()); - } - - public void notifyDropFunction(UDFunction udf) - { - for (MigrationListener listener : listeners) - listener.onDropFunction(udf.name().keyspace, udf.name().name, udf.argTypes()); - } - - public void notifyDropAggregate(UDAggregate udf) - { - for (MigrationListener listener : listeners) - listener.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()); - } - - public static void announceNewKeyspace(KeyspaceMetadata ksm) throws ConfigurationException - { - announceNewKeyspace(ksm, false); - } - - public static void announceNewKeyspace(KeyspaceMetadata ksm, boolean announceLocally) throws ConfigurationException - { - announceNewKeyspace(ksm, FBUtilities.timestampMicros(), announceLocally); - } - - public static void announceNewKeyspace(KeyspaceMetadata ksm, long timestamp, boolean announceLocally) throws ConfigurationException - { - ksm.validate(); - - if (Schema.instance.getKSMetaData(ksm.name) != null) - throw new AlreadyExistsException(ksm.name); - - logger.info("Create new Keyspace: {}", ksm); - announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally); - } - - public static void announceNewColumnFamily(CFMetaData cfm) throws ConfigurationException - { - announceNewColumnFamily(cfm, false); - } - - public static void announceNewColumnFamily(CFMetaData cfm, boolean announceLocally) throws ConfigurationException - { - announceNewColumnFamily(cfm, announceLocally, true); - } - - /** - * Announces the table even if the definition is already know locally. - * This should generally be avoided but is used internally when we want to force the most up to date version of - * a system table schema (Note that we don't know if the schema we force _is_ the most recent version or not, we - * just rely on idempotency to basically ignore that announce if it's not. That's why we can't use announceUpdateColumnFamily, - * it would for instance delete new columns if this is not called with the most up-to-date version) - * - * Note that this is only safe for system tables where we know the cfId is fixed and will be the same whatever version - * of the definition is used. - */ - public static void forceAnnounceNewColumnFamily(CFMetaData cfm) throws ConfigurationException - { - announceNewColumnFamily(cfm, false, false); - } - - private static void announceNewColumnFamily(CFMetaData cfm, boolean announceLocally, boolean throwOnDuplicate) throws ConfigurationException - { - cfm.validate(); - - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(cfm.ksName); - if (ksm == null) - throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName)); - // If we have a table or a view which has the same name, we can't add a new one - else if (throwOnDuplicate && ksm.getTableOrViewNullable(cfm.cfName) != null) - throw new AlreadyExistsException(cfm.ksName, cfm.cfName); - - logger.info("Create new table: {}", cfm); - announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewView(ViewDefinition view, boolean announceLocally) throws ConfigurationException - { - view.metadata.validate(); - - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(view.ksName); - if (ksm == null) - throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", view.viewName, view.ksName)); - else if (ksm.getTableOrViewNullable(view.viewName) != null) - throw new AlreadyExistsException(view.ksName, view.viewName); - - logger.info("Create new view: {}", view); - announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewType(UserType newType, boolean announceLocally) - { - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(newType.keyspace); - announce(SchemaKeyspace.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewFunction(UDFunction udf, boolean announceLocally) - { - logger.info("Create scalar function '{}'", udf.name()); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(udf.name().keyspace); - announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) - { - logger.info("Create aggregate function '{}'", udf.name()); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(udf.name().keyspace); - announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceKeyspaceUpdate(KeyspaceMetadata ksm) throws ConfigurationException - { - announceKeyspaceUpdate(ksm, false); - } - - public static void announceKeyspaceUpdate(KeyspaceMetadata ksm, boolean announceLocally) throws ConfigurationException - { - ksm.validate(); - - KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(ksm.name); - if (oldKsm == null) - throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); - - logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, ksm); - announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, ksm.params, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceColumnFamilyUpdate(CFMetaData cfm) throws ConfigurationException - { - announceColumnFamilyUpdate(cfm, false); - } - - public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean announceLocally) throws ConfigurationException - { - cfm.validate(); - - CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName); - if (oldCfm == null) - throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName)); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(cfm.ksName); - - oldCfm.validateCompatibility(cfm); - - logger.info("Update table '{}/{}' From {} To {}", cfm.ksName, cfm.cfName, oldCfm, cfm); - announce(SchemaKeyspace.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceViewUpdate(ViewDefinition view, boolean announceLocally) throws ConfigurationException - { - view.metadata.validate(); - - ViewDefinition oldView = Schema.instance.getView(view.ksName, view.viewName); - if (oldView == null) - throw new ConfigurationException(String.format("Cannot update non existing materialized view '%s' in keyspace '%s'.", view.viewName, view.ksName)); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(view.ksName); - - oldView.metadata.validateCompatibility(view.metadata); - - logger.info("Update view '{}/{}' From {} To {}", view.ksName, view.viewName, oldView, view); - announce(SchemaKeyspace.makeUpdateViewMutation(ksm, oldView, view, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceTypeUpdate(UserType updatedType, boolean announceLocally) - { - logger.info("Update type '{}.{}' to {}", updatedType.keyspace, updatedType.getNameAsString(), updatedType); - announceNewType(updatedType, announceLocally); - } - - public static void announceKeyspaceDrop(String ksName) throws ConfigurationException - { - announceKeyspaceDrop(ksName, false); - } - - public static void announceKeyspaceDrop(String ksName, boolean announceLocally) throws ConfigurationException - { - KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(ksName); - if (oldKsm == null) - throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); - - logger.info("Drop Keyspace '{}'", oldKsm.name); - announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceColumnFamilyDrop(String ksName, String cfName) throws ConfigurationException - { - announceColumnFamilyDrop(ksName, cfName, false); - } - - public static void announceColumnFamilyDrop(String ksName, String cfName, boolean announceLocally) throws ConfigurationException - { - CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName); - if (oldCfm == null) - throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName)); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName); - - logger.info("Drop table '{}/{}'", oldCfm.ksName, oldCfm.cfName); - announce(SchemaKeyspace.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceViewDrop(String ksName, String viewName, boolean announceLocally) throws ConfigurationException - { - ViewDefinition view = Schema.instance.getView(ksName, viewName); - if (view == null) - throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", viewName, ksName)); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName); - - logger.info("Drop table '{}/{}'", view.ksName, view.viewName); - announce(SchemaKeyspace.makeDropViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceTypeDrop(UserType droppedType) - { - announceTypeDrop(droppedType, false); - } - - public static void announceTypeDrop(UserType droppedType, boolean announceLocally) - { - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(droppedType.keyspace); - announce(SchemaKeyspace.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceFunctionDrop(UDFunction udf, boolean announceLocally) - { - logger.info("Drop scalar function overload '{}' args '{}'", udf.name(), udf.argTypes()); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(udf.name().keyspace); - announce(SchemaKeyspace.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally) - { - logger.info("Drop aggregate function overload '{}' args '{}'", udf.name(), udf.argTypes()); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(udf.name().keyspace); - announce(SchemaKeyspace.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - /** - * actively announce a new version to active hosts via rpc - * @param schema The schema mutation to be applied - */ - private static void announce(Mutation.SimpleBuilder schema, boolean announceLocally) - { - List<Mutation> mutations = Collections.singletonList(schema.build()); - - if (announceLocally) - SchemaKeyspace.mergeSchema(mutations); - else - FBUtilities.waitOnFuture(announce(mutations)); - } - - private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema) - { - MessageOut<Collection<Mutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE, - schema, - MigrationsSerializer.instance); - MessagingService.instance().sendOneWay(msg, endpoint); - } - - // Returns a future on the local application of the schema - private static Future<?> announce(final Collection<Mutation> schema) - { - Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable() - { - protected void runMayThrow() throws ConfigurationException - { - SchemaKeyspace.mergeSchemaAndAnnounceVersion(schema); - } - }); - - for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) - { - // only push schema to nodes with known and equal versions - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && - MessagingService.instance().knowsVersion(endpoint) && - MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) - pushSchemaMutation(endpoint, schema); - } - - return f; - } - - /** - * Announce my version passively over gossip. - * Used to notify nodes as they arrive in the cluster. - * - * @param version The schema version to announce - */ - public static void passiveAnnounce(UUID version) - { - Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); - logger.debug("Gossiping my schema version {}", version); - } - - /** - * Clear all locally stored schema information and reset schema to initial state. - * Called by user (via JMX) who wants to get rid of schema disagreement. - */ - public static void resetLocalSchema() - { - logger.info("Starting local schema reset..."); - - logger.debug("Truncating schema tables..."); - - SchemaKeyspace.truncate(); - - logger.debug("Clearing local schema keyspace definitions..."); - - Schema.instance.clear(); - - Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers(); - liveEndpoints.remove(FBUtilities.getBroadcastAddress()); - - // force migration if there are nodes around - for (InetAddress node : liveEndpoints) - { - if (shouldPullSchemaFrom(node)) - { - logger.debug("Requesting schema from {}", node); - FBUtilities.waitOnFuture(submitMigrationTask(node)); - break; - } - } - - logger.info("Local schema reset is complete."); - } - - public static class MigrationsSerializer implements IVersionedSerializer<Collection<Mutation>> - { - public static MigrationsSerializer instance = new MigrationsSerializer(); - - public void serialize(Collection<Mutation> schema, DataOutputPlus out, int version) throws IOException - { - out.writeInt(schema.size()); - for (Mutation mutation : schema) - Mutation.serializer.serialize(mutation, out, version); - } - - public Collection<Mutation> deserialize(DataInputPlus in, int version) throws IOException - { - int count = in.readInt(); - Collection<Mutation> schema = new ArrayList<>(count); - - for (int i = 0; i < count; i++) - schema.add(Mutation.serializer.deserialize(in, version)); - - return schema; - } - - public long serializedSize(Collection<Mutation> schema, int version) - { - int size = TypeSizes.sizeof(schema.size()); - for (Mutation mutation : schema) - size += Mutation.serializer.serializedSize(mutation, version); - return size; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/MigrationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java deleted file mode 100644 index 052b89e..0000000 --- a/src/java/org/apache/cassandra/service/MigrationTask.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.net.InetAddress; -import java.util.Collection; -import java.util.EnumSet; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.SystemKeyspace.BootstrapState; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.SchemaKeyspace; -import org.apache.cassandra.utils.WrappedRunnable; - - -class MigrationTask extends WrappedRunnable -{ - private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class); - - private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>(); - - private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS); - - private final InetAddress endpoint; - - MigrationTask(InetAddress endpoint) - { - this.endpoint = endpoint; - } - - public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks() - { - return inflightTasks; - } - - public void runMayThrow() throws Exception - { - // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(), - // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with - // a higher major. - if (!MigrationManager.shouldPullSchemaFrom(endpoint)) - { - logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint); - return; - } - - if (!FailureDetector.instance.isAlive(endpoint)) - { - logger.debug("Can't send schema pull request: node {} is down.", endpoint); - return; - } - - MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); - - final CountDownLatch completionLatch = new CountDownLatch(1); - - IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>() - { - @Override - public void response(MessageIn<Collection<Mutation>> message) - { - try - { - SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload); - } - catch (ConfigurationException e) - { - logger.error("Configuration exception merging remote schema", e); - } - finally - { - completionLatch.countDown(); - } - } - - public boolean isLatencyForSnitch() - { - return false; - } - }; - - // Only save the latches if we need bootstrap or are bootstrapping - if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState())) - inflightTasks.offer(completionLatch); - - MessagingService.instance().sendRR(message, endpoint, cb); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index 5b1aa0d..297774a 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -20,7 +20,7 @@ package org.apache.cassandra.service; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 440e35a..d63e860 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -77,9 +77,9 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> { this(resolver, consistencyLevel, - consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)), + consistencyLevel.blockFor(Keyspace.open(command.metadata().keyspace)), command, - Keyspace.open(command.metadata().ksName), + Keyspace.open(command.metadata().keyspace), filteredEndpoints, queryStartNanoTime); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/ReadRepairDecision.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadRepairDecision.java b/src/java/org/apache/cassandra/service/ReadRepairDecision.java new file mode 100644 index 0000000..8d2ced7 --- /dev/null +++ b/src/java/org/apache/cassandra/service/ReadRepairDecision.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +public enum ReadRepairDecision +{ + NONE, GLOBAL, DC_LOCAL; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/StartupChecks.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index 75f7788..767bfc6 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -29,10 +29,11 @@ import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SystemKeyspace; @@ -319,7 +320,7 @@ public class StartupChecks // we do a one-off scrub of the system keyspace first; we can't load the list of the rest of the keyspaces, // until system keyspace is opened. - for (CFMetaData cfm : Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME)) + for (TableMetadata cfm : Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME)) ColumnFamilyStore.scrubDataDirectories(cfm); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index d5a9f47..0585717 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -44,10 +44,10 @@ import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; @@ -236,7 +236,7 @@ public class StorageProxy implements StorageProxyMBean consistencyForPaxos.validateForCas(); consistencyForCommit.validateForCasCommit(keyspaceName); - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); + TableMetadata metadata = Schema.instance.getTableMetadata(keyspaceName, cfName); long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); while (System.nanoTime() - queryStartNanoTime < timeout) @@ -348,11 +348,11 @@ public class StorageProxy implements StorageProxyMBean }; } - private static Pair<List<InetAddress>, Integer> getPaxosParticipants(CFMetaData cfm, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException + private static Pair<List<InetAddress>, Integer> getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException { Token tk = key.getToken(); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(cfm.ksName, tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, cfm.ksName); + List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk); + Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, metadata.keyspace); if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) { // Restrict naturalEndpoints and pendingEndpoints to node in the local DC only @@ -387,7 +387,7 @@ public class StorageProxy implements StorageProxyMBean */ private static Pair<UUID, Integer> beginAndRepairPaxos(long queryStartNanoTime, DecoratedKey key, - CFMetaData metadata, + TableMetadata metadata, List<InetAddress> liveEndpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, @@ -482,7 +482,7 @@ public class StorageProxy implements StorageProxyMBean } recordCasContention(contentions); - throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName))); + throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.keyspace))); } /** @@ -528,7 +528,7 @@ public class StorageProxy implements StorageProxyMBean private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint, long queryStartNanoTime) throws WriteTimeoutException { boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; - Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName); + Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace); Token tk = proposal.update.partitionKey().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk); @@ -1513,7 +1513,7 @@ public class StorageProxy implements StorageProxyMBean private static boolean systemKeyspaceQuery(List<? extends ReadCommand> cmds) { for (ReadCommand cmd : cmds) - if (!SchemaConstants.isSystemKeyspace(cmd.metadata().ksName)) + if (!SchemaConstants.isSystemKeyspace(cmd.metadata().keyspace)) return false; return true; } @@ -1566,7 +1566,7 @@ public class StorageProxy implements StorageProxyMBean long start = System.nanoTime(); SinglePartitionReadCommand command = group.commands.get(0); - CFMetaData metadata = command.metadata(); + TableMetadata metadata = command.metadata(); DecoratedKey key = command.partitionKey(); PartitionIterator result = null; @@ -1590,7 +1590,7 @@ public class StorageProxy implements StorageProxyMBean } catch (WriteTimeoutException e) { - throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(metadata.ksName)), false); + throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(metadata.keyspace)), false); } catch (WriteFailureException e) { @@ -1626,7 +1626,7 @@ public class StorageProxy implements StorageProxyMBean readMetrics.addNano(latency); casReadMetrics.addNano(latency); readMetricsMap.get(consistencyLevel).addNano(latency); - Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); + Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); } return result; @@ -1764,7 +1764,7 @@ public class StorageProxy implements StorageProxyMBean ReadRepairMetrics.repairedBlocking.mark(); // Do a full data read to resolve the correct response (and repair node that need be) - Keyspace keyspace = Keyspace.open(command.metadata().ksName); + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size(), queryStartNanoTime); repairHandler = new ReadCallback(resolver, ConsistencyLevel.ALL, @@ -1805,7 +1805,7 @@ public class StorageProxy implements StorageProxyMBean logger.trace("Timed out waiting on digest mismatch repair requests"); // the caught exception here will have CL.ALL from the repair command, // not whatever CL the initial command was at (CASSANDRA-7947) - int blockFor = consistency.blockFor(Keyspace.open(command.metadata().ksName)); + int blockFor = consistency.blockFor(Keyspace.open(command.metadata().keyspace)); throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); } } @@ -1903,7 +1903,7 @@ public class StorageProxy implements StorageProxyMBean */ private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace) { - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); Index index = command.getIndex(cfs); float maxExpectedResults = index == null ? command.limits().estimateTotalResults(cfs) @@ -2223,7 +2223,7 @@ public class StorageProxy implements StorageProxyMBean { Tracing.trace("Computing ranges to query"); - Keyspace keyspace = Keyspace.open(command.metadata().ksName); + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel); // our estimate of how many result rows there will be per-range @@ -2274,7 +2274,7 @@ public class StorageProxy implements StorageProxyMBean return false; } }; - // an empty message acts as a request to the SchemaCheckVerbHandler. + // an empty message acts as a request to the SchemaVersionVerbHandler. MessageOut message = new MessageOut(MessagingService.Verb.SCHEMA_CHECK); for (InetAddress endpoint : liveHosts) MessagingService.instance().sendRR(message, endpoint, cb); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 6cb67fc..62f4871 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -52,7 +52,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Appender; import ch.qos.logback.core.hook.DelayingShutdownHook; import org.apache.cassandra.auth.AuthKeyspace; -import org.apache.cassandra.auth.AuthMigrationListener; +import org.apache.cassandra.auth.AuthSchemaChangeListener; import org.apache.cassandra.batchlog.BatchRemoveVerbHandler; import org.apache.cassandra.batchlog.BatchStoreVerbHandler; import org.apache.cassandra.batchlog.BatchlogManager; @@ -60,11 +60,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; -import org.apache.cassandra.config.ViewDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; @@ -85,6 +81,15 @@ import org.apache.cassandra.repair.*; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.MigrationManager; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SchemaPullVerbHandler; +import org.apache.cassandra.schema.SchemaPushVerbHandler; +import org.apache.cassandra.schema.SchemaVersionVerbHandler; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.schema.ViewMetadata; import org.apache.cassandra.service.paxos.CommitVerbHandler; import org.apache.cassandra.service.paxos.PrepareVerbHandler; import org.apache.cassandra.service.paxos.ProposeVerbHandler; @@ -285,9 +290,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.DEFINITIONS_UPDATE, new DefinitionsUpdateVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST, new MigrationRequestVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.DEFINITIONS_UPDATE, new SchemaPushVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SCHEMA_CHECK, new SchemaVersionVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST, new SchemaPullVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler()); @@ -1045,7 +1050,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE DatabaseDescriptor.getRoleManager().setup(); DatabaseDescriptor.getAuthenticator().setup(); DatabaseDescriptor.getAuthorizer().setup(); - MigrationManager.instance.register(new AuthMigrationListener()); + Schema.instance.registerListener(new AuthSchemaChangeListener()); authSetupComplete = true; } @@ -1079,23 +1084,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // (#8162 being an example), so even if the table definition exists, we still need to force the "current" // version of the schema, the one the node will be expecting. - KeyspaceMetadata defined = Schema.instance.getKSMetaData(expected.name); + KeyspaceMetadata defined = Schema.instance.getKeyspaceMetadata(expected.name); // If the keyspace doesn't exist, create it if (defined == null) { maybeAddKeyspace(expected); - defined = Schema.instance.getKSMetaData(expected.name); + defined = Schema.instance.getKeyspaceMetadata(expected.name); } // While the keyspace exists, it might miss table or have outdated one // There is also the potential for a race, as schema migrations add the bare // keyspace into Schema.instance before adding its tables, so double check that // all the expected tables are present - for (CFMetaData expectedTable : expected.tables) + for (TableMetadata expectedTable : expected.tables) { - CFMetaData definedTable = defined.tables.get(expectedTable.cfName).orElse(null); + TableMetadata definedTable = defined.tables.get(expectedTable.name).orElse(null); if (definedTable == null || !definedTable.equals(expectedTable)) - MigrationManager.forceAnnounceNewColumnFamily(expectedTable); + MigrationManager.forceAnnounceNewTable(expectedTable); } } @@ -1474,8 +1479,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void markViewsAsBuilt() { for (String keyspace : Schema.instance.getUserKeyspaces()) { - for (ViewDefinition view: Schema.instance.getKSMetaData(keyspace).views) - SystemKeyspace.finishViewBuildStatus(view.ksName, view.viewName); + for (ViewMetadata view: Schema.instance.getKeyspaceMetadata(keyspace).views) + SystemKeyspace.finishViewBuildStatus(view.keyspace, view.name); } } @@ -3616,15 +3621,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key) { - KeyspaceMetadata ksMetaData = Schema.instance.getKSMetaData(keyspaceName); + KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName); if (ksMetaData == null) throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'"); - CFMetaData cfMetaData = ksMetaData.getTableOrViewNullable(cf); - if (cfMetaData == null) + TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf); + if (metadata == null) throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'"); - return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(cfMetaData.getKeyValidator().fromString(key))); + return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))); } public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key) @@ -3760,7 +3765,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Token token = tokens.get(index); Range<Token> range = new Range<>(prevToken, token); // always return an estimate > 0 (see CASSANDRA-7322) - splits.add(Pair.create(range, Math.max(cfs.metadata.params.minIndexInterval, cfs.estimatedKeysForRange(range)))); + splits.add(Pair.create(range, Math.max(cfs.metadata().params.minIndexInterval, cfs.estimatedKeysForRange(range)))); prevToken = token; } return splits; @@ -4709,7 +4714,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public List<String> getNonSystemKeyspaces() { - return Collections.unmodifiableList(Schema.instance.getNonSystemKeyspaces()); + return Schema.instance.getNonSystemKeyspaces(); } public List<String> getNonLocalStrategyKeyspaces() @@ -4991,9 +4996,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - public CFMetaData getTableMetadata(String tableName) + public TableMetadataRef getTableMetadata(String tableName) { - return Schema.instance.getCFMetaData(keyspace, tableName); + return Schema.instance.getTableMetadataRef(keyspace, tableName); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index 8a04108..0682b15 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -17,12 +17,12 @@ */ package org.apache.cassandra.service.pager; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.transport.ProtocolVersion; @@ -77,10 +77,10 @@ abstract class AbstractQueryPager implements QueryPager return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController), pager); } - public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize, ReadExecutionController executionController) + public UnfilteredPartitionIterator fetchPageUnfiltered(TableMetadata metadata, int pageSize, ReadExecutionController executionController) { if (isExhausted()) - return EmptyIterators.unfilteredPartition(cfm); + return EmptyIterators.unfilteredPartition(metadata); pageSize = Math.min(pageSize, remaining); UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java index f9a8cda..5ac01b2 100644 --- a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java @@ -20,7 +20,7 @@ package org.apache.cassandra.service.pager; import java.nio.ByteBuffer; import java.util.NoSuchElementException; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.aggregation.GroupingState; import org.apache.cassandra.db.filter.DataLimits; @@ -319,7 +319,7 @@ public final class AggregationQueryPager implements QueryPager this.rowIterator = delegate; } - public CFMetaData metadata() + public TableMetadata metadata() { return rowIterator.metadata(); } @@ -329,7 +329,7 @@ public final class AggregationQueryPager implements QueryPager return rowIterator.isReverseOrder(); } - public PartitionColumns columns() + public RegularAndStaticColumns columns() { return rowIterator.columns(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/pager/PagingState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java index bcf3979..f036f96 100644 --- a/src/java/org/apache/cassandra/service/pager/PagingState.java +++ b/src/java/org/apache/cassandra/service/pager/PagingState.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.CompactTables; import org.apache.cassandra.db.TypeSizes; @@ -189,7 +189,7 @@ public class PagingState this.protocolVersion = protocolVersion; } - private static List<AbstractType<?>> makeClusteringTypes(CFMetaData metadata) + private static List<AbstractType<?>> makeClusteringTypes(TableMetadata metadata) { // This is the types that will be used when serializing the clustering in the paging state. We can't really use the actual clustering // types however because we can't guarantee that there won't be a schema change between when we send the paging state and get it back, @@ -203,7 +203,7 @@ public class PagingState return l; } - public static RowMark create(CFMetaData metadata, Row row, ProtocolVersion protocolVersion) + public static RowMark create(TableMetadata metadata, Row row, ProtocolVersion protocolVersion) { ByteBuffer mark; if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3)) @@ -234,7 +234,7 @@ public class PagingState return new RowMark(mark, protocolVersion); } - public Clustering clustering(CFMetaData metadata) + public Clustering clustering(TableMetadata metadata) { if (mark == null) return null; @@ -245,7 +245,7 @@ public class PagingState } // Old (pre-3.0) encoding of cells. We need that for the protocol v3 as that is how things where encoded - private static ByteBuffer encodeCellName(CFMetaData metadata, Clustering clustering, ByteBuffer columnName, ByteBuffer collectionElement) + private static ByteBuffer encodeCellName(TableMetadata metadata, Clustering clustering, ByteBuffer columnName, ByteBuffer collectionElement) { boolean isStatic = clustering == Clustering.STATIC_CLUSTERING; @@ -302,7 +302,7 @@ public class PagingState return CompositeType.build(isStatic, values); } - private static Clustering decodeClustering(CFMetaData metadata, ByteBuffer value) + private static Clustering decodeClustering(TableMetadata metadata, ByteBuffer value) { int csize = metadata.comparator.size(); if (csize == 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java index 68547be..4f09f97 100644 --- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java @@ -42,7 +42,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager if (state != null) { - lastReturnedKey = command.metadata().decorateKey(state.partitionKey); + lastReturnedKey = command.metadata().partitioner.decorateKey(state.partitionKey); lastReturnedRow = state.rowMark; restoreState(lastReturnedKey, state.remaining, state.remainingInPartition); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/paxos/Commit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 3b0364c..6e23c8b 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -27,7 +27,7 @@ import java.util.UUID; import com.google.common.base.Objects; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -55,7 +55,7 @@ public class Commit this.update = update; } - public static Commit newPrepare(DecoratedKey key, CFMetaData metadata, UUID ballot) + public static Commit newPrepare(DecoratedKey key, TableMetadata metadata, UUID ballot) { return new Commit(ballot, PartitionUpdate.emptyUpdate(metadata, key)); } @@ -66,7 +66,7 @@ public class Commit return new Commit(ballot, update); } - public static Commit emptyCommit(DecoratedKey key, CFMetaData metadata) + public static Commit emptyCommit(DecoratedKey key, TableMetadata metadata) { return new Commit(UUIDGen.minTimeUUID(0), PartitionUpdate.emptyUpdate(metadata, key)); }