Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e63dacf7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e63dacf7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e63dacf7 Branch: refs/heads/trunk Commit: e63dacf793fedc8a9eed9c7fc635cde5f9fd68f3 Parents: 8b2dc1f e889ee4 Author: Robert Stupp <sn...@snazy.de> Authored: Wed Sep 16 22:00:25 2015 +0200 Committer: Robert Stupp <sn...@snazy.de> Committed: Wed Sep 16 22:00:25 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 193 ++++++++------ .../org/apache/cassandra/cache/CacheKey.java | 14 +- .../apache/cassandra/cache/CounterCacheKey.java | 26 +- .../org/apache/cassandra/cache/KeyCacheKey.java | 19 +- .../org/apache/cassandra/cache/OHCProvider.java | 17 +- .../org/apache/cassandra/cache/RowCacheKey.java | 34 +-- .../org/apache/cassandra/config/CFMetaData.java | 9 + .../cassandra/config/DatabaseDescriptor.java | 19 +- .../org/apache/cassandra/config/Schema.java | 56 +++- .../apache/cassandra/db/ColumnFamilyStore.java | 75 ++---- src/java/org/apache/cassandra/db/Keyspace.java | 4 - .../org/apache/cassandra/db/RowIndexEntry.java | 2 +- .../db/index/SecondaryIndexManager.java | 30 +-- .../io/sstable/format/SSTableReader.java | 10 +- .../io/sstable/format/big/BigTableReader.java | 2 +- .../apache/cassandra/service/CacheService.java | 58 ++-- .../cassandra/service/CassandraDaemon.java | 41 ++- .../cassandra/service/StorageService.java | 31 ++- .../org/apache/cassandra/utils/FBUtilities.java | 16 ++ .../cassandra/cache/AutoSavingCacheTest.java | 5 +- .../cassandra/cache/CacheProviderTest.java | 17 +- .../apache/cassandra/cql3/KeyCacheCqlTest.java | 266 +++++++++++++++++++ .../apache/cassandra/db/CounterCacheTest.java | 70 ++++- .../org/apache/cassandra/db/KeyCacheTest.java | 2 +- .../org/apache/cassandra/db/RowCacheTest.java | 41 ++- 26 files changed, 760 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 7deebcf,207f16a..96ec0fa --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,14 -1,5 +1,15 @@@ -2.1.10 +2.2.2 + * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761) + * Cancel transaction for sstables we wont redistribute index summary + for (CASSANDRA-10270) + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) + * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222) + * Fix failure to start with space in directory path on Windows (CASSANDRA-10239) + * Fix repair hang when snapshot failed (CASSANDRA-10057) + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks + (CASSANDRA-10199) +Merged from 2.1: + * Fix cache handling of 2i and base tables (CASSANDRA-10155) * Fix NPE in nodetool compactionhistory (CASSANDRA-9758) * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410) * BATCH statement is broken in cqlsh (CASSANDRA-10272) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java index f0f4e8a,3ebbc76..3ec9d4e --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@@ -61,8 -65,16 +67,16 @@@ public class AutoSavingCache<K extends protected volatile ScheduledFuture<?> saveTask; protected final CacheService.CacheType cacheType; - private CacheSerializer<K, V> cacheLoader; + private final CacheSerializer<K, V> cacheLoader; - private static final String CURRENT_VERSION = "c"; + + /* + * CASSANDRA-10155 required a format change to fix 2i indexes and caching. + * 2.2 is already at version "c" and 3.0 is at "d". + * + * Since cache versions match exactly and there is no partial fallback just add + * a minor version letter. + */ - private static final String CURRENT_VERSION = "ba"; ++ private static final String CURRENT_VERSION = "ca"; private static volatile IStreamFactory streamFactory = new IStreamFactory() { @@@ -90,16 -102,9 +104,14 @@@ this.cacheLoader = cacheloader; } - public File getCacheDataPath(UUID cfId, String version) - public File getCachePath(String version) ++ public File getCacheDataPath(String version) { - Pair<String, String> names = Schema.instance.getCF(cfId); - return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "db"); - return DatabaseDescriptor.getSerializedCachePath(cacheType, version); ++ return DatabaseDescriptor.getSerializedCachePath( cacheType, version, "db"); + } + - public File getCacheCrcPath(UUID cfId, String version) ++ public File getCacheCrcPath(String version) + { - Pair<String, String> names = Schema.instance.getCF(cfId); - return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "crc"); ++ return DatabaseDescriptor.getSerializedCachePath( cacheType, version, "crc"); } public Writer getWriter(int keysToSave) @@@ -136,42 -170,65 +177,70 @@@ long start = System.nanoTime(); // modern format, allows both key and value (so key cache load can be purely sequential) - File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION); - File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION); - File path = getCachePath(CURRENT_VERSION); - if (path.exists()) ++ File dataPath = getCacheDataPath(CURRENT_VERSION); ++ File crcPath = getCacheCrcPath(CURRENT_VERSION); + if (dataPath.exists() && crcPath.exists()) { DataInputStream in = null; try { - logger.info(String.format("reading saved cache %s", path)); - in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length())); + logger.info(String.format("reading saved cache %s", dataPath)); + in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length())); - List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>(); + ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>(); - while (in.available() > 0) { - Future<Pair<K, V>> entry = cacheLoader.deserialize(in, cfs); + //ksname and cfname are serialized by the serializers in CacheService + //That is delegated there because there are serializer specific conditions + //where a cache key is skipped and not written + String ksname = in.readUTF(); + String cfname = in.readUTF(); + + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname)); + + Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in, cfs); // Key cache entry can return null, if the SSTable doesn't exist. - if (entry == null) + if (entryFuture == null) continue; - futures.add(entry); + + futures.offer(entryFuture); count++; + + /* + * Kind of unwise to accrue an unbounded number of pending futures + * So now there is this loop to keep a bounded number pending. + */ + do + { + while (futures.peek() != null && futures.peek().isDone()) + { + Future<Pair<K, V>> future = futures.poll(); + Pair<K, V> entry = future.get(); + if (entry != null && entry.right != null) + put(entry.left, entry.right); + } + + if (futures.size() > 1000) + Thread.yield(); + } while(futures.size() > 1000); } - for (Future<Pair<K, V>> future : futures) + Future<Pair<K, V>> future = null; + while ((future = futures.poll()) != null) { Pair<K, V> entry = future.get(); if (entry != null && entry.right != null) put(entry.left, entry.right); } } + catch (CorruptFileException e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.warn(String.format("Non-fatal checksum error reading saved cache %s", dataPath.getAbsolutePath()), e); + } - catch (Exception e) + catch (Throwable t) { - JVMStabilityInspector.inspectThrowable(e); - logger.debug(String.format("harmless error reading saved cache %s", dataPath.getAbsolutePath()), e); + JVMStabilityInspector.inspectThrowable(t); - logger.info(String.format("Harmless error reading saved cache %s", path.getAbsolutePath()), t); ++ logger.info(String.format("Harmless error reading saved cache %s", dataPath.getAbsolutePath()), t); } finally { @@@ -236,11 -284,9 +305,10 @@@ public CompactionInfo getCompactionInfo() { // keyset can change in size, thus total can too - return info.forProgress(keysWritten, Math.max(keysWritten, keys.size())); + // TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten, keys.size())); + return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate)); } - @SuppressWarnings("resource") public void saveCache() { logger.debug("Deleting old {} files.", cacheType); @@@ -254,37 -300,25 +322,26 @@@ long start = System.nanoTime(); - HashMap<UUID, DataOutputPlus> writers = new HashMap<>(); - HashMap<UUID, OutputStream> streams = new HashMap<>(); - HashMap<UUID, Pair<File, File>> paths = new HashMap<>(); - - DataOutputStreamPlus writer = null; - File tempCacheFile = tempCacheFile(); ++ WrappedDataOutputStreamPlus writer = null; ++ Pair<File, File> cacheFilePaths = tempCacheFiles(); try { + try + { - writer = new DataOutputStreamPlus(streamFactory.getOutputStream(tempCacheFile)); ++ writer = new WrappedDataOutputStreamPlus(streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right)); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + - for (K key : keys) + while (keyIterator.hasNext()) { + K key = keyIterator.next(); - UUID cfId = key.getCFId(); - if (!Schema.instance.hasCF(key.getCFId())) - continue; // the table has been dropped. - DataOutputPlus writer = writers.get(cfId); - if (writer == null) - { - Pair<File, File> cacheFilePaths = tempCacheFiles(cfId); - OutputStream stream; - try - { - stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right); - writer = new WrappedDataOutputStreamPlus(stream); - } - catch (FileNotFoundException e) - { - throw new RuntimeException(e); - } - paths.put(cfId, cacheFilePaths); - streams.put(cfId, stream); - writers.put(cfId, writer); - } + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName); + if (cfs == null) + continue; // the table or 2i has been dropped. try { @@@ -292,7 -326,7 +349,7 @@@ } catch (IOException e) { - throw new FSWriteError(e, paths.get(cfId).left); - throw new FSWriteError(e, tempCacheFile); ++ throw new FSWriteError(e, cacheFilePaths.left); } keysWritten++; @@@ -302,49 -334,24 +359,31 @@@ } finally { - if (keyIterator instanceof Closeable) - try - { - ((Closeable)keyIterator).close(); - } - catch (IOException ignored) - { - // not thrown (by OHC) - } - - for (OutputStream writer : streams.values()) - { + if (writer != null) FileUtils.closeQuietly(writer); - } } - for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet()) - { - UUID cfId = entry.getKey(); - File cacheFile = getCachePath(CURRENT_VERSION); ++ File cacheFile = getCacheDataPath(CURRENT_VERSION); ++ File crcFile = getCacheCrcPath(CURRENT_VERSION); - Pair<File, File> tmpFiles = paths.get(cfId); - File cacheFile = getCacheDataPath(cfId, CURRENT_VERSION); - File crcFile = getCacheCrcPath(cfId, CURRENT_VERSION); + cacheFile.delete(); // ignore error if it didn't exist ++ crcFile.delete(); + - cacheFile.delete(); // ignore error if it didn't exist - crcFile.delete(); ++ if (!cacheFilePaths.left.renameTo(cacheFile)) ++ logger.error("Unable to rename {} to {}", cacheFilePaths.left, cacheFile); - if (!tmpFiles.left.renameTo(cacheFile)) - logger.error("Unable to rename {} to {}", tmpFiles.left, cacheFile); - - if (!tmpFiles.right.renameTo(crcFile)) - logger.error("Unable to rename {} to {}", tmpFiles.right, crcFile); - } - if (!tempCacheFile.renameTo(cacheFile)) - logger.error("Unable to rename {} to {}", tempCacheFile, cacheFile); ++ if (!cacheFilePaths.right.renameTo(crcFile)) ++ logger.error("Unable to rename {} to {}", cacheFilePaths.right, crcFile); - logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } - private Pair<File, File> tempCacheFiles(UUID cfId) - private File tempCacheFile() ++ private Pair<File, File> tempCacheFiles() { - File dataPath = getCacheDataPath(cfId, CURRENT_VERSION); - File crcPath = getCacheCrcPath(cfId, CURRENT_VERSION); - File path = getCachePath(CURRENT_VERSION); - return FileUtils.createTempFile(path.getName(), null, path.getParentFile()); ++ File dataPath = getCacheDataPath(CURRENT_VERSION); ++ File crcPath = getCacheCrcPath(CURRENT_VERSION); + return Pair.create(FileUtils.createTempFile(dataPath.getName(), null, dataPath.getParentFile()), + FileUtils.createTempFile(crcPath.getName(), null, crcPath.getParentFile())); } private void deleteOldCacheFiles() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/OHCProvider.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/OHCProvider.java index e4cfb69,0000000..9b1c8cf mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cache/OHCProvider.java +++ b/src/java/org/apache/cassandra/cache/OHCProvider.java @@@ -1,282 -1,0 +1,285 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Iterator; - import java.util.UUID; + +import com.google.common.base.Function; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.net.MessagingService; ++import org.apache.cassandra.utils.Pair; +import org.caffinitas.ohc.OHCache; +import org.caffinitas.ohc.OHCacheBuilder; + +public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> +{ + public ICache<RowCacheKey, IRowCacheEntry> create() + { + OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder(); + builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024) + .keySerializer(new KeySerializer()) + .valueSerializer(new ValueSerializer()) + .throwOOME(true); + + return new OHCacheAdapter(builder.build()); + } + + private static class OHCacheAdapter implements ICache<RowCacheKey, IRowCacheEntry> + { + private final OHCache<RowCacheKey, IRowCacheEntry> ohCache; + + public OHCacheAdapter(OHCache<RowCacheKey, IRowCacheEntry> ohCache) + { + this.ohCache = ohCache; + } + + public long capacity() + { + return ohCache.capacity(); + } + + public void setCapacity(long capacity) + { + ohCache.setCapacity(capacity); + } + + public void put(RowCacheKey key, IRowCacheEntry value) + { + ohCache.put(key, value); + } + + public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value) + { + return ohCache.putIfAbsent(key, value); + } + + public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value) + { + return ohCache.addOrReplace(key, old, value); + } + + public IRowCacheEntry get(RowCacheKey key) + { + return ohCache.get(key); + } + + public void remove(RowCacheKey key) + { + ohCache.remove(key); + } + + public int size() + { + return (int) ohCache.size(); + } + + public long weightedSize() + { + return ohCache.size(); + } + + public void clear() + { + ohCache.clear(); + } + + public Iterator<RowCacheKey> hotKeyIterator(int n) + { + return ohCache.hotKeyIterator(n); + } + + public Iterator<RowCacheKey> keyIterator() + { + return ohCache.keyIterator(); + } + + public boolean containsKey(RowCacheKey key) + { + return ohCache.containsKey(key); + } + } + + private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey> + { + public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException + { - dataOutput.writeLong(rowCacheKey.cfId.getMostSignificantBits()); - dataOutput.writeLong(rowCacheKey.cfId.getLeastSignificantBits()); ++ dataOutput.writeUTF(rowCacheKey.ksAndCFName.left); ++ dataOutput.writeUTF(rowCacheKey.ksAndCFName.right); + dataOutput.writeInt(rowCacheKey.key.length); + dataOutput.write(rowCacheKey.key); + } + + public RowCacheKey deserialize(DataInput dataInput) throws IOException + { - long msb = dataInput.readLong(); - long lsb = dataInput.readLong(); ++ String ksName = dataInput.readUTF(); ++ String cfName = dataInput.readUTF(); + byte[] key = new byte[dataInput.readInt()]; + dataInput.readFully(key); - return new RowCacheKey(new UUID(msb, lsb), key); ++ return new RowCacheKey(Pair.create(ksName, cfName), key); + } + + public int serializedSize(RowCacheKey rowCacheKey) + { - return 20 + rowCacheKey.key.length; ++ return TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.left) ++ + TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.right) ++ + 4 ++ + rowCacheKey.key.length; + } + } + + private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry> + { + public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException + { + assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache + boolean isSentinel = entry instanceof RowCacheSentinel; + out.writeBoolean(isSentinel); + if (isSentinel) + out.writeLong(((RowCacheSentinel) entry).sentinelId); + else + ColumnFamily.serializer.serialize((ColumnFamily) entry, new DataOutputPlusAdapter(out), MessagingService.current_version); + } + + public IRowCacheEntry deserialize(DataInput in) throws IOException + { + boolean isSentinel = in.readBoolean(); + if (isSentinel) + return new RowCacheSentinel(in.readLong()); + return ColumnFamily.serializer.deserialize(in, MessagingService.current_version); + } + + public int serializedSize(IRowCacheEntry entry) + { + TypeSizes typeSizes = TypeSizes.NATIVE; + int size = typeSizes.sizeof(true); + if (entry instanceof RowCacheSentinel) + size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId); + else + size += ColumnFamily.serializer.serializedSize((ColumnFamily) entry, typeSizes, MessagingService.current_version); + return size; + } + } + + static class DataOutputPlusAdapter implements DataOutputPlus + { + private final DataOutput out; + + public void write(byte[] b) throws IOException + { + out.write(b); + } + + public void write(byte[] b, int off, int len) throws IOException + { + out.write(b, off, len); + } + + public void write(int b) throws IOException + { + out.write(b); + } + + public void writeBoolean(boolean v) throws IOException + { + out.writeBoolean(v); + } + + public void writeByte(int v) throws IOException + { + out.writeByte(v); + } + + public void writeBytes(String s) throws IOException + { + out.writeBytes(s); + } + + public void writeChar(int v) throws IOException + { + out.writeChar(v); + } + + public void writeChars(String s) throws IOException + { + out.writeChars(s); + } + + public void writeDouble(double v) throws IOException + { + out.writeDouble(v); + } + + public void writeFloat(float v) throws IOException + { + out.writeFloat(v); + } + + public void writeInt(int v) throws IOException + { + out.writeInt(v); + } + + public void writeLong(long v) throws IOException + { + out.writeLong(v); + } + + public void writeShort(int v) throws IOException + { + out.writeShort(v); + } + + public void writeUTF(String s) throws IOException + { + out.writeUTF(s); + } + + public DataOutputPlusAdapter(DataOutput out) + { + this.out = out; + } + + public void write(ByteBuffer buffer) throws IOException + { + if (buffer.hasArray()) + out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + else + throw new UnsupportedOperationException("IMPLEMENT ME"); + } + + public void write(Memory memory, long offset, long length) throws IOException + { + throw new UnsupportedOperationException("IMPLEMENT ME"); + } + + public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException + { + throw new UnsupportedOperationException("IMPLEMENT ME"); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/RowCacheKey.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/RowCacheKey.java index ccb85d8,c959fd1..e02db42 --- a/src/java/org/apache/cassandra/cache/RowCacheKey.java +++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java @@@ -33,20 -31,14 +31,20 @@@ public final class RowCacheKey extends private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER)); - public RowCacheKey(UUID cfId, byte[] key) ++ public RowCacheKey(Pair<String, String> ksAndCFName, byte[] key) + { - this.cfId = cfId; ++ super(ksAndCFName); + this.key = key; + } + - public RowCacheKey(UUID cfId, DecoratedKey key) + public RowCacheKey(Pair<String, String> ksAndCFName, DecoratedKey key) { - this(cfId, key.getKey()); + this(ksAndCFName, key.getKey()); } - public RowCacheKey(UUID cfId, ByteBuffer key) + public RowCacheKey(Pair<String, String> ksAndCFName, ByteBuffer key) { - this.cfId = cfId; + super(ksAndCFName); this.key = ByteBufferUtil.getArray(key); assert this.key != null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/CFMetaData.java index 6468973,2939f09..348eb89 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@@ -47,11 -48,15 +47,12 @@@ import org.apache.cassandra.db.marshal. import org.apache.cassandra.exceptions.*; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.compress.LZ4Compressor; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.thrift.CfDef; -import org.apache.cassandra.thrift.CqlResult; -import org.apache.cassandra.thrift.CqlRow; -import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; import org.github.jamm.Unmetered; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 545ad05,84381a0..c459b5d --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -1480,20 -1431,17 +1480,11 @@@ public class DatabaseDescripto return conf.max_hint_window_in_ms; } - public static File getSerializedCachePath(String ksName, - String cfName, - UUID cfId, - CacheService.CacheType cacheType, - String version, - String extension) - @Deprecated - public static Integer getIndexInterval() -- { - StringBuilder builder = new StringBuilder(); - builder.append(ksName).append('-'); - builder.append(cfName).append('-'); - builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-'); - builder.append(cacheType); - builder.append((version == null ? "" : "-" + version + "." + extension)); - return new File(conf.saved_caches_directory, builder.toString()); - return conf.index_interval; - } - - public static File getSerializedCachePath(CacheService.CacheType cacheType, String version) ++ public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension) + { + String name = cacheType.toString() - + (version == null ? "" : "-" + version + ".db"); ++ + (version == null ? "" : "-" + version + "." + extension); + return new File(conf.saved_caches_directory, name); } public static int getDynamicUpdateInterval() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Schema.java index 548341e,fada670..00c9358 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@@ -26,18 -28,14 +26,19 @@@ import com.google.common.collect.Sets import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.cql3.functions.Functions; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.cql3.functions.UDFunction; import org.apache.cassandra.db.*; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.UserType; + import org.apache.cassandra.db.index.SecondaryIndex; -import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.utils.ConcurrentBiMap; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 343ecee,ffaa276..a8a8910 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1665,9 -1655,9 +1626,9 @@@ public class ColumnFamilyStore implemen private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter) { assert isRowCacheEnabled() - : String.format("Row cache is not enabled on column family [" + name + "]"); + : String.format("Row cache is not enabled on table [" + name + "]"); - RowCacheKey key = new RowCacheKey(cfId, filter.key); + RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key); // attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 @@@ -2075,23 -2026,19 +2036,23 @@@ { Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); - for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) + for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator(); + keyIter.hasNext(); ) { + RowCacheKey key = keyIter.next(); DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key)); - if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) + if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges)) invalidateCachedRow(dk); } if (metadata.isCounter()) { - for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet()) + for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator(); + keyIter.hasNext(); ) { + CounterCacheKey key = keyIter.next(); DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey)); - if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) + if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges)) CacheService.instance.counterCache.remove(key); } } @@@ -2965,13 -2955,21 +2926,13 @@@ } } - /** - * Returns the creation time of the oldest memtable not fully flushed yet. - */ - public long oldestUnflushedMemtable() - { - return data.getView().getOldestMemtable().creationTime(); - } - public boolean isEmpty() { - DataTracker.View view = data.getView(); - return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable(); + View view = data.getView(); + return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0; } - private boolean isRowCacheEnabled() + public boolean isRowCacheEnabled() { return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ----------------------------------------------------------------------