This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 33a9093c5c Allow sstableloader to specify table without relying on path 33a9093c5c is described below commit 33a9093c5cc2f8fcf913d1931415b697e52ec108 Author: Eduard Tudenhoefner <eduard.tudenhoef...@datastax.com> AuthorDate: Fri Jul 1 08:35:04 2022 +0200 Allow sstableloader to specify table without relying on path patch by Eduard Tudenhoefner; reviewed by Stefan Miklosovic, Brandon Williams for CASSANDRA-16584 --- CHANGES.txt | 1 + .../apache/cassandra/io/sstable/Descriptor.java | 89 +++++++++++++++++----- .../org/apache/cassandra/io/sstable/SSTable.java | 22 ++++++ .../apache/cassandra/io/sstable/SSTableLoader.java | 18 ++++- .../org/apache/cassandra/tools/BulkLoader.java | 4 +- .../org/apache/cassandra/tools/LoaderOptions.java | 27 ++++++- .../cassandra/io/sstable/SSTableLoaderTest.java | 86 +++++++++++++-------- 7 files changed, 191 insertions(+), 56 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 99f64b06a6..e5cbc8ea68 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Allow sstableloader to specify table without relying on path (CASSANDRA-16584) * Fix TestGossipingPropertyFileSnitch.test_prefer_local_reconnect_on_listen_address (CASSANDRA-17700) * Add ByteComparable API (CASSANDRA-6936) * Add guardrail for maximum replication factor (CASSANDRA-17500) diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 83bafd4ff4..589e46b015 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -218,6 +218,7 @@ public class Descriptor /** * Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part. + * The keyspace/table name will be extracted from the directory path. * * @param file the {@code File} object for the filename to parse. * @return a pair of the descriptor and component corresponding to the provided {@code file}. @@ -233,6 +234,58 @@ public class Descriptor if (!file.isAbsolute()) file = file.toAbsolute(); + SSTableInfo info = validateAndExtractInfo(file); + String name = file.name(); + + File directory = parentOf(name, file); + File tableDir = directory; + + // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot) + String indexName = ""; + if (tableDir.name().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR)) + { + indexName = tableDir.name(); + tableDir = parentOf(name, tableDir); + } + + // Then it can be a backup or a snapshot + if (tableDir.name().equals(Directories.BACKUPS_SUBDIR)) + tableDir = tableDir.parent(); + else if (parentOf(name, tableDir).name().equals(Directories.SNAPSHOT_SUBDIR)) + tableDir = parentOf(name, parentOf(name, tableDir)); + + String table = tableDir.name().split("-")[0] + indexName; + String keyspace = parentOf(name, tableDir).name(); + + return Pair.create(new Descriptor(info.version, directory, keyspace, table, info.id, info.format), info.component); + } + + /** + * Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part. + * + * @param file the {@code File} object for the filename to parse. + * @param keyspace The keyspace name of the file. If <code>null</code>, then the keyspace name will be extracted + * from the directory path. + * @param table The table name of the file. If <code>null</code>, then the table name will be extracted from the + * directory path. + * @return a pair of the descriptor and component corresponding to the provided {@code file}. + * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could + * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported + * versions. + */ + public static Pair<Descriptor, Component> fromFilenameWithComponent(File file, String keyspace, String table) + { + if (null == keyspace || null == table) + { + return fromFilenameWithComponent(file); + } + + SSTableInfo info = validateAndExtractInfo(file); + return Pair.create(new Descriptor(info.version, parentOf(file.name(), file), keyspace, table, info.id, info.format), info.component); + } + + private static SSTableInfo validateAndExtractInfo(File file) + { String name = file.name(); List<String> tokens = filenameSplitter.splitToList(name); int size = tokens.size(); @@ -245,9 +298,7 @@ public class Descriptor // Note that we assume it's an old format sstable if it has the right number of tokens: this is not perfect // but we're just trying to be helpful, not perfect. if (size == 5 || size == 6) - throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.", - name, - tokens.get(size - 3))); + throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.", name, tokens.get(size - 3))); throw new IllegalArgumentException(String.format("Invalid sstable file %s: the name doesn't look like a supported sstable file name", name)); } @@ -282,27 +333,23 @@ public class Descriptor if (!version.isCompatible()) throw invalidSSTable(name, "incompatible sstable version (%s); you should have run upgradesstables before upgrading", versionString); - File directory = parentOf(name, file); - File tableDir = directory; + return new SSTableInfo(version, id, format, component); + } - // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot) - String indexName = ""; - if (Directories.isSecondaryIndexFolder(tableDir)) + private static class SSTableInfo + { + final Version version; + final SSTableId id; + final SSTableFormat.Type format; + final Component component; + + SSTableInfo(Version version, SSTableId id, SSTableFormat.Type format, Component component) { - indexName = tableDir.name(); - tableDir = parentOf(name, tableDir); + this.version = version; + this.id = id; + this.format = format; + this.component = component; } - - // Then it can be a backup or a snapshot - if (tableDir.name().equals(Directories.BACKUPS_SUBDIR)) - tableDir = tableDir.parent(); - else if (parentOf(name, tableDir).name().equals(Directories.SNAPSHOT_SUBDIR)) - tableDir = parentOf(name, parentOf(name, tableDir)); - - String table = tableDir.name().split("-")[0] + indexName; - String keyspace = parentOf(name, tableDir).name(); - - return Pair.create(new Descriptor(version, directory, keyspace, table, id, format), component); } private static File parentOf(String name, File file) diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index 488a7dc45a..6a691b1f0d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -216,6 +216,28 @@ public abstract class SSTable } } + /** + * Parse a sstable filename into both a {@link Descriptor} and {@code Component} object. + * + * @param file the filename to parse. + * @param keyspace The keyspace name of the file. + * @param table The table name of the file. + * @return a pair of the {@code Descriptor} and {@code Component} corresponding to {@code file} if it corresponds to + * a valid and supported sstable filename, {@code null} otherwise. Note that components of an unknown type will be + * returned as CUSTOM ones. + */ + public static Pair<Descriptor, Component> tryComponentFromFilename(File file, String keyspace, String table) + { + try + { + return Descriptor.fromFilenameWithComponent(file, keyspace, table); + } + catch (Throwable e) + { + return null; + } + } + /** * Parse a sstable filename into a {@link Descriptor} object. * <p> diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 3d9e0f4c17..71bd025db8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -48,6 +48,7 @@ public class SSTableLoader implements StreamEventHandler { private final File directory; private final String keyspace; + private final String table; private final Client client; private final int connectionsPerHost; private final OutputHandler outputHandler; @@ -62,9 +63,15 @@ public class SSTableLoader implements StreamEventHandler } public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace) + { + this(directory, client, outputHandler, connectionsPerHost, targetKeyspace, null); + } + + public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace, String targetTable) { this.directory = directory; this.keyspace = targetKeyspace != null ? targetKeyspace : directory.parent().name(); + this.table = targetTable; this.client = client; this.outputHandler = outputHandler; this.connectionsPerHost = connectionsPerHost; @@ -87,7 +94,16 @@ public class SSTableLoader implements StreamEventHandler return false; } - Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(file); + Pair<Descriptor, Component> p; + if (null != keyspace && null != table) + { + p = SSTable.tryComponentFromFilename(file, keyspace, table); + } + else + { + p = SSTable.tryComponentFromFilename(file); + } + Descriptor desc = p == null ? null : p.left; if (p == null || !p.right.equals(Component.DATA)) return false; diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 811df7ab97..a3a296b97a 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -66,7 +66,8 @@ public class BulkLoader buildSSLOptions(options.clientEncOptions)), handler, options.connectionsPerHost, - options.targetKeyspace); + options.targetKeyspace, + options.targetTable); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle); StreamResultFuture future = null; @@ -82,7 +83,6 @@ public class BulkLoader { future = loader.stream(options.ignores, indicator); } - } catch (Exception e) { diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java index d882e5a853..27d54a7414 100644 --- a/src/java/org/apache/cassandra/tools/LoaderOptions.java +++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java @@ -66,6 +66,7 @@ public class LoaderOptions public static final String ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS = "entire-sstable-inter-dc-throttle"; public static final String TOOL_NAME = "sstableloader"; public static final String TARGET_KEYSPACE = "target-keyspace"; + public static final String TARGET_TABLE = "target-table"; /* client encryption options */ public static final String SSL_TRUSTSTORE = "truststore"; @@ -97,6 +98,7 @@ public class LoaderOptions public final Set<InetSocketAddress> hosts; public final Set<InetAddressAndPort> ignores; public final String targetKeyspace; + public final String targetTable; LoaderOptions(Builder builder) { @@ -120,6 +122,7 @@ public class LoaderOptions hosts = builder.hosts; ignores = builder.ignores; targetKeyspace = builder.targetKeyspace; + targetTable = builder.targetTable; } static class Builder @@ -147,6 +150,7 @@ public class LoaderOptions Set<InetSocketAddress> hosts = new HashSet<>(); Set<InetAddressAndPort> ignores = new HashSet<>(); String targetKeyspace; + String targetTable; Builder() { @@ -328,6 +332,18 @@ public class LoaderOptions return this; } + public Builder targetKeyspace(String keyspace) + { + this.targetKeyspace = keyspace; + return this; + } + + public Builder targetTable(String table) + { + this.targetKeyspace = table; + return this; + } + public Builder parseArgs(String cmdArgs[]) { CommandLineParser parser = new GnuParser(); @@ -566,10 +582,16 @@ public class LoaderOptions { targetKeyspace = cmd.getOptionValue(TARGET_KEYSPACE); if (StringUtils.isBlank(targetKeyspace)) - { errorMsg("Empty keyspace is not supported.", options); - } } + + if (cmd.hasOption(TARGET_TABLE)) + { + targetTable = cmd.getOptionValue(TARGET_TABLE); + if (StringUtils.isBlank(targetTable)) + errorMsg("Empty table is not supported.", options); + } + return this; } catch (ParseException | ConfigurationException | MalformedURLException e) @@ -678,6 +700,7 @@ public class LoaderOptions options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use"); options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL."); options.addOption("k", TARGET_KEYSPACE, "target keyspace name", "target keyspace name"); + options.addOption("tb", TARGET_TABLE, "target table name", "target table name"); return options; } diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java index c941a81db2..0af6d24a0a 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import com.google.common.io.Files; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.io.util.File; import org.junit.After; import org.junit.Before; @@ -40,7 +41,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamEvent; @@ -93,16 +93,33 @@ public class SSTableLoaderTest @After public void cleanup() { - try { - FileUtils.deleteRecursive(tmpdir); - } catch (FSWriteError e) { + try + { + tmpdir.deleteRecursive(); + } + catch (FSWriteError e) + { /* We force a GC here to force buffer deallocation, and then try deleting the directory again. For more information, see: http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4715154 If this is not the problem, the exception will be rethrown anyway. */ System.gc(); - FileUtils.deleteRecursive(tmpdir); + tmpdir.deleteRecursive(); + } + + try + { + for (String[] keyspaceTable : new String[][] { {KEYSPACE1, CF_STANDARD1}, + {KEYSPACE1, CF_STANDARD2}, + {KEYSPACE1, CF_BACKUPS}, + {KEYSPACE2, CF_STANDARD1}, + {KEYSPACE2, CF_STANDARD2}}) + StorageService.instance.truncate(keyspaceTable[0], keyspaceTable[1]); + } + catch (Exception ex) + { + throw new RuntimeException("Unable to truncate table!", ex); } } @@ -150,9 +167,11 @@ public class SSTableLoaderTest assertEquals(1, partitions.size()); assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey())); assert metadata != null; - assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1"))) - .getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))) - .buffer()); + + Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1"))); + assert row != null; + + assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer()); // The stream future is signalled when the work is complete but before releasing references. Wait for release // before cleanup (CASSANDRA-10118). @@ -168,7 +187,7 @@ public class SSTableLoaderTest .inDirectory(dataDir) .forTable(String.format(schema, KEYSPACE1, CF_STANDARD2)) .using(String.format(query, KEYSPACE1, CF_STANDARD2)) - .withBufferSizeInMB(1) + .withBufferSizeInMiB(1) .build(); int NB_PARTITIONS = 5000; // Enough to write >1MiB and get at least one completed sstable before we've closed the writer @@ -209,10 +228,9 @@ public class SSTableLoaderTest } @Test - public void testLoadingSSTableToDifferentKeyspace() throws Exception + public void testLoadingSSTableToDifferentKeyspaceAndTable() throws Exception { - File dataDir = new File(tmpdir.absolutePath() + File.pathSeparator() + KEYSPACE1 + File.pathSeparator() + CF_STANDARD1); - assert dataDir.tryCreateDirectories(); + File dataDir = dataDir(CF_STANDARD1); TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD1); String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; @@ -230,25 +248,31 @@ public class SSTableLoaderTest ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); Util.flush(cfs); // wait for sstables to be on disk else we won't be able to stream them - final CountDownLatch latch = new CountDownLatch(1); - SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2); - loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); + for (String table : new String[] { CF_STANDARD2, null }) + { + final CountDownLatch latch = new CountDownLatch(1); + SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2, table); + loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); - cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1); - Util.flush(cfs); + String targetTable = table == null ? CF_STANDARD1 : table; + cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(targetTable); + Util.flush(cfs); - List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build()); + List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build()); - assertEquals(1, partitions.size()); - assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey())); - assert metadata != null; - assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1"))) - .getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))) - .buffer()); + assertEquals(1, partitions.size()); + assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey())); + assert metadata != null; - // The stream future is signalled when the work is complete but before releasing references. Wait for release - // before cleanup (CASSANDRA-10118). - latch.await(); + Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1"))); + assert row != null; + + assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer()); + + // The stream future is signalled when the work is complete but before releasing references. Wait for release + // before cleanup (CASSANDRA-10118). + latch.await(); + } } @Test @@ -278,9 +302,11 @@ public class SSTableLoaderTest assertEquals(1, partitions.size()); assertEquals("key", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey())); assert metadata != null; - assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1"))) - .getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))) - .buffer()); + + Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1"))); + assert row != null; + + assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer()); // The stream future is signalled when the work is complete but before releasing references. Wait for release // before cleanup (CASSANDRA-10118). --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org