[ https://issues.apache.org/jira/browse/CASSANDRA-10237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14732071#comment-14732071 ]
Robert Stupp commented on CASSANDRA-10237: ------------------------------------------ This also breaks sstable verify (both standalone and via JMX/nodetool) by that the digest file isn't verified since the file existence check in {{org.apache.cassandra.db.compaction.Verifier#verify}} doesn't succeed. I.e. this results in no error, but the functionality is lost. > CFS.loadNewSSTables() broken for pre-3.0 sstables > ------------------------------------------------- > > Key: CASSANDRA-10237 > URL: https://issues.apache.org/jira/browse/CASSANDRA-10237 > Project: Cassandra > Issue Type: Bug > Reporter: Robert Stupp > > While working on CASSANDRA-10236 I discovered that {{CFS.loadNewSSTables()}} > doesn't work for pre-3.0 sstables - just for version {{ma}} sstables. > TBC: Starting C* with 2.0, 2.1 or 2.2 sstables works - but loading new > sstables during runtime doesn't. > Issues with {{CFS.loadNewSSTables()}} discovered so far: > # {{MetadataSerializer.deserialize(Descriptor,FileDataInput,EnumSet)}} > returns {{null}} for {{MetadataType.HEADER}} which results in a NPE later in > {{MetadataSerializer.serialize}} executing {{Collections.sort}}. > # After working around the previous issue, it turns out that it couldn't load > the digest file, since {{Component.DIGEST}} is a singleton which refers to > CRC32, but pre-3.0 sstables use Adler32. > # After working around that one, it fails in > {{StreamingHistogram$StreamingHistogramSerializer.deserialize}} as > {{maxBinSize==Integer.MAX_VALUE}}. > As loading legacy sstables works fine during startup, I assume my workarounds > are not correct. > For reference, [this > commit|https://github.com/snazy/cassandra/commit/2f0668a1e1d8a101e8301b9c4211b164c113afaa] > contains a ton of legacy sstables (simple, counter, clustered and > clustered+counter) for 2.0, 2.1 and 2.2. I've extended LegacySSTablesTest to > read these tables using {{CFS.loadNewSSTables()}}. > {noformat:title=LegacySSTablesTest.txt} > diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java > b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java > index d2922cc..1be6450 100644 > --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java > +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java > @@ -18,6 +18,9 @@ > package org.apache.cassandra.io.sstable; > > import java.io.File; > +import java.io.FileInputStream; > +import java.io.FileOutputStream; > +import java.io.IOException; > import java.nio.ByteBuffer; > import java.util.ArrayList; > import java.util.HashSet; > @@ -27,10 +30,15 @@ import java.util.Set; > import org.junit.BeforeClass; > import org.junit.Test; > > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > import org.apache.cassandra.SchemaLoader; > import org.apache.cassandra.Util; > import org.apache.cassandra.config.CFMetaData; > +import org.apache.cassandra.cql3.QueryProcessor; > import org.apache.cassandra.db.ColumnFamilyStore; > +import org.apache.cassandra.db.ConsistencyLevel; > import org.apache.cassandra.db.DeletionTime; > import org.apache.cassandra.db.Keyspace; > import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator; > @@ -43,6 +51,7 @@ import > org.apache.cassandra.exceptions.ConfigurationException; > import org.apache.cassandra.io.sstable.format.SSTableFormat; > import org.apache.cassandra.io.sstable.format.SSTableReader; > import org.apache.cassandra.io.sstable.format.Version; > +import org.apache.cassandra.io.sstable.format.big.BigFormat; > import org.apache.cassandra.schema.KeyspaceParams; > import org.apache.cassandra.service.StorageService; > import org.apache.cassandra.streaming.StreamPlan; > @@ -57,6 +66,8 @@ import static > org.apache.cassandra.utils.ByteBufferUtil.bytes; > */ > public class LegacySSTableTest > { > + private static final Logger logger = > LoggerFactory.getLogger(LegacySSTableTest.class); > + > public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root"; > public static final String KSNAME = "Keyspace1"; > public static final String CFNAME = "Standard1"; > @@ -64,6 +75,8 @@ public class LegacySSTableTest > public static Set<String> TEST_DATA; > public static File LEGACY_SSTABLE_ROOT; > > + public static final String[] legacyVersions = {"jb", "ka", "la"}; > + > @BeforeClass > public static void defineSchema() throws ConfigurationException > { > @@ -208,4 +221,65 @@ public class LegacySSTableTest > throw e; > } > } > + > + @Test > + public void testLegacyCqlTables() throws Exception > + { > + QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH > replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}"); > + > + loadLegacyTables(); > + } > + > + private void loadLegacyTables() throws IOException > + { > + for (String legacyVersion : legacyVersions) > + { > + logger.info("Preparing legacy version {}", legacyVersion); > + > + QueryProcessor.executeInternal(String.format("CREATE TABLE > legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", > legacyVersion)); > + QueryProcessor.executeInternal(String.format("CREATE TABLE > legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", > legacyVersion)); > + QueryProcessor.executeInternal(String.format("CREATE TABLE > legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, > ck))", legacyVersion)); > + QueryProcessor.executeInternal(String.format("CREATE TABLE > legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY > KEY (pk, ck))", legacyVersion)); > + > + loadLegacyTable("legacy_%s_simple", legacyVersion); > + loadLegacyTable("legacy_%s_simple_counter", legacyVersion); > + loadLegacyTable("legacy_%s_clust", legacyVersion); > + loadLegacyTable("legacy_%s_clust_counter", legacyVersion); > + > + } > + } > + > + private void loadLegacyTable(String tablePattern, String legacyVersion) > throws IOException > + { > + String table = String.format(tablePattern, legacyVersion); > + > + logger.info("Loading legacy table {}", table); > + > + ColumnFamilyStore cfs = > Keyspace.open("legacy_tables").getColumnFamilyStore(table); > + > + for (File cfDir : cfs.getDirectories().getCFDirectories()) > + { > + copySstables(legacyVersion, table, cfDir); > + } > + > + cfs.loadNewSSTables(); > + } > + > + private static void copySstables(String legacyVersion, String table, > File cfDir) throws IOException > + { > + byte[] buf = new byte[65536]; > + > + for (File file : new File(LEGACY_SSTABLE_ROOT, > String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles()) > + { > + if (file.isFile()) > + { > + File target = new File(cfDir, file.getName()); > + int rd; > + FileInputStream is = new FileInputStream(file); > + FileOutputStream os = new FileOutputStream(target); > + while ((rd = is.read(buf)) >= 0) > + os.write(buf, 0, rd); > + } > + } > + } > } > {noformat} > {noformat:title=broken-workaround} > diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java > b/src/java/org/apache/cassandra/db/compaction/Verifier.java > index 554c782..e953b1d 100644 > --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java > +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java > @@ -96,7 +96,7 @@ public class Verifier implements Closeable > { > validator = null; > > - if (new > File(sstable.descriptor.filenameFor(Component.DIGEST)).exists()) > + if (new > File(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())).exists()) > { > validator = > DataIntegrityMetadata.fileDigestValidator(sstable.descriptor); > validator.validate(); > diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java > b/src/java/org/apache/cassandra/io/sstable/Component.java > index 54dd35b..d0405e4 100644 > --- a/src/java/org/apache/cassandra/io/sstable/Component.java > +++ b/src/java/org/apache/cassandra/io/sstable/Component.java > @@ -34,6 +34,7 @@ public class Component > public static final char separator = '-'; > > final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class); > + > public enum Type > { > // the base data for an sstable: the remaining components can be > regenerated > @@ -79,13 +80,17 @@ public class Component > } > } > > + private static final String DIGEST_CRC32_NAME = "Digest.crc32"; > + private static final String DIGEST_ADLER32_NAME = "Digest.adler32"; > + > // singleton components for types that don't need ids > public final static Component DATA = new Component(Type.DATA); > public final static Component PRIMARY_INDEX = new > Component(Type.PRIMARY_INDEX); > public final static Component FILTER = new Component(Type.FILTER); > public final static Component COMPRESSION_INFO = new > Component(Type.COMPRESSION_INFO); > public final static Component STATS = new Component(Type.STATS); > - public final static Component DIGEST = new Component(Type.DIGEST); > + public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, > DIGEST_CRC32_NAME); > + public final static Component DIGEST_ADLER32 = new > Component(Type.DIGEST, DIGEST_ADLER32_NAME); > public final static Component CRC = new Component(Type.CRC); > public final static Component SUMMARY = new Component(Type.SUMMARY); > public final static Component TOC = new Component(Type.TOC); > @@ -138,11 +143,23 @@ public class Component > case FILTER: component = Component.FILTER; > break; > case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; > break; > case STATS: component = Component.STATS; > break; > - case DIGEST: component = Component.DIGEST; > break; > case CRC: component = Component.CRC; > break; > case SUMMARY: component = Component.SUMMARY; > break; > case TOC: component = Component.TOC; > break; > case CUSTOM: component = new Component(Type.CUSTOM, > path.right); break; > + case DIGEST: > + switch (path.right) > + { > + case DIGEST_CRC32_NAME: > + component = Component.DIGEST_CRC32; > + break; > + case DIGEST_ADLER32_NAME: > + component = Component.DIGEST_ADLER32; > + break; > + default: > + throw new IllegalStateException(); > + } > + break; > default: > throw new IllegalStateException(); > } > diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java > b/src/java/org/apache/cassandra/io/sstable/Descriptor.java > index 38829df..0db6f00 100644 > --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java > +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java > @@ -32,6 +32,7 @@ import > org.apache.cassandra.io.sstable.metadata.IMetadataSerializer; > import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer; > import org.apache.cassandra.io.sstable.metadata.MetadataSerializer; > import org.apache.cassandra.utils.Pair; > +import org.apache.hadoop.mapred.JobTracker; > > import static org.apache.cassandra.io.sstable.Component.separator; > > @@ -344,4 +345,16 @@ public class Descriptor > { > return hashCode; > } > + > + public Component digestComponent() > + { > + switch (version.compressedChecksumType()) > + { > + case Adler32: > + return Component.DIGEST_ADLER32; > + case CRC32: > + return Component.DIGEST_CRC32; > + } > + throw new IllegalStateException(); > + } > } > diff --git > a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java > b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java > index bd21536..74e4b56 100644 > --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java > +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java > @@ -131,7 +131,7 @@ public abstract class SSTableWriter extends SSTable > implements Transactional > Component.STATS, > Component.SUMMARY, > Component.TOC, > - Component.DIGEST)); > + Component.DIGEST_CRC32)); > > if (metadata.params.bloomFilterFpChance < 1.0) > components.add(Component.FILTER); > diff --git > a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java > b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java > index 9a5eae8..a40c37a 100644 > --- > a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java > +++ > b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java > @@ -122,7 +122,12 @@ public class MetadataSerializer implements > IMetadataSerializer > in.seek(offset); > component = type.serializer.deserialize(descriptor.version, > in); > } > - components.put(type, component); > + if (component == null) > + { > + assert type != MetadataType.HEADER || > !descriptor.version.storeRows(); > + } > + else > + components.put(type, component); > } > return components; > } > diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java > b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java > index 70cd860..b88f4f2 100644 > --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java > +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java > @@ -109,7 +109,7 @@ public class DataIntegrityMetadata > { > this.descriptor = descriptor; > checksum = > descriptor.version.uncompressedChecksumType().newInstance(); > - digestReader = RandomAccessReader.open(new > File(descriptor.filenameFor(Component.DIGEST))); > + digestReader = RandomAccessReader.open(new > File(descriptor.filenameFor(descriptor.digestComponent()))); > dataReader = RandomAccessReader.open(new > File(descriptor.filenameFor(Component.DATA))); > try > { > @@ -211,7 +211,7 @@ public class DataIntegrityMetadata > > public void writeFullChecksum(Descriptor descriptor) > { > - File outFile = new > File(descriptor.filenameFor(Component.DIGEST)); > + File outFile = new > File(descriptor.filenameFor(descriptor.digestComponent())); > try (BufferedWriter out > =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8)) > { > out.write(String.valueOf(fullChecksum.getValue())); > diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java > b/src/java/org/apache/cassandra/io/util/FileUtils.java > index 920eee0..1420cae 100644 > --- a/src/java/org/apache/cassandra/io/util/FileUtils.java > +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java > @@ -173,7 +173,7 @@ public class FileUtils > > public static void renameWithConfirm(File from, File to) > { > - assert from.exists(); > + assert from.exists() : String.format("File to rename does not exist: > %s", from.getPath()); > if (logger.isDebugEnabled()) > logger.debug((String.format("Renaming %s to %s", from.getPath(), > to.getPath()))); > // this is not FSWE because usually when we see it it's because we > didn't close the file before renaming it, > diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java > b/test/unit/org/apache/cassandra/db/VerifyTest.java > index 13ce0c1..0233169 100644 > --- a/test/unit/org/apache/cassandra/db/VerifyTest.java > +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java > @@ -275,11 +275,11 @@ public class VerifyTest > SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); > > > - RandomAccessFile file = new > RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw"); > + RandomAccessFile file = new > RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()), > "rw"); > Long correctChecksum = Long.parseLong(file.readLine()); > file.close(); > > - writeChecksum(++correctChecksum, > sstable.descriptor.filenameFor(Component.DIGEST)); > + writeChecksum(++correctChecksum, > sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())); > > try (Verifier verifier = new Verifier(cfs, sstable, false)) > { > @@ -315,7 +315,7 @@ public class VerifyTest > file.close(); > > // Update the Digest to have the right Checksum > - writeChecksum(simpleFullChecksum(sstable.getFilename()), > sstable.descriptor.filenameFor(Component.DIGEST)); > + writeChecksum(simpleFullChecksum(sstable.getFilename()), > sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())); > > try (Verifier verifier = new Verifier(cfs, sstable, false)) > { > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)