Parallelize initial materialized view build patch by Andres de la Peña; reviewed by Paulo Motta for CASSANDRA-12245
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c80eeec Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c80eeec Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c80eeec Branch: refs/heads/trunk Commit: 4c80eeece37d79f434078224a0504400ae10a20d Parents: 88b244a Author: AndreÌs de la PenÌa <a.penya.gar...@gmail.com> Authored: Sun Jul 9 14:42:14 2017 +0100 Committer: AndreÌs de la PenÌa <a.penya.gar...@gmail.com> Committed: Fri Dec 1 14:58:12 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 8 + conf/cassandra.yaml | 3 + doc/source/cql/mvs.rst | 5 + doc/source/operating/metrics.rst | 1 + .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 13 + src/java/org/apache/cassandra/db/Keyspace.java | 2 +- .../org/apache/cassandra/db/SystemKeyspace.java | 78 +++-- .../db/compaction/CompactionManager.java | 88 +++-- .../db/compaction/CompactionManagerMBean.java | 22 ++ src/java/org/apache/cassandra/db/view/View.java | 20 +- .../apache/cassandra/db/view/ViewBuilder.java | 305 ++++++++-------- .../cassandra/db/view/ViewBuilderTask.java | 250 +++++++++++++ .../apache/cassandra/db/view/ViewManager.java | 15 +- src/java/org/apache/cassandra/dht/Splitter.java | 129 +++++++ .../apache/cassandra/io/sstable/SSTable.java | 7 + .../org/apache/cassandra/schema/Schema.java | 3 +- .../cassandra/service/StorageService.java | 14 +- .../cassandra/service/StorageServiceMBean.java | 4 +- .../org/apache/cassandra/tools/NodeProbe.java | 10 + .../org/apache/cassandra/tools/NodeTool.java | 2 + .../nodetool/GetConcurrentViewBuilders.java | 33 ++ .../nodetool/SetConcurrentViewBuilders.java | 39 +++ .../org/apache/cassandra/cql3/ViewTest.java | 23 +- .../cassandra/db/view/ViewBuilderTaskTest.java | 135 ++++++++ .../org/apache/cassandra/dht/SplitterTest.java | 347 ++++++++++++++++++- 27 files changed, 1315 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 009dcb5..56458f8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Parallelize initial materialized view build (CASSANDRA-12245) * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965) * Make LWTs send resultset metadata on every request (CASSANDRA-13992) * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index de7d58a..510577e 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -30,6 +30,10 @@ New features immediately upon creation via hard-linking the files. This means that incomplete segments will be available in cdc_raw rather than fully flushed. See documentation and CASSANDRA-12148 for more detail. + - The initial build of materialized views can be parallelized. The number of concurrent builder + threads is specified by the property `cassandra.yaml:concurrent_materialized_view_builders`. + This property can be modified at runtime through both JMX and the new `setconcurrentviewbuilders` + and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 for more details. Upgrading --------- @@ -74,6 +78,10 @@ Upgrading - Cassandra 4.0 allows a single port to be used for both secure and insecure connections between cassandra nodes (CASSANDRA-10404). See the yaml for specific property changes, and see the security doc for full details. + - Due to the parallelization of the initial build of materialized views, + the per token range view building status is stored in the new table + `system.view_builds_in_progress`. The old table `system.views_builds_in_progress` + is no longer used and can be removed. See CASSANDRA-12245 for more details. Materialized Views ------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index e41af17..7328a01 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -754,6 +754,9 @@ column_index_cache_size_in_kb: 2 # Values less than one are interpreted as unbounded (the default) # concurrent_validations: 0 +# Number of simultaneous materialized view builder tasks to allow. +concurrent_materialized_view_builders: 1 + # Throttles compaction to the given total throughput across the entire # system. The faster you insert data, the faster you need to compact in # order to keep the sstable count down, but in general, setting this to http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/doc/source/cql/mvs.rst ---------------------------------------------------------------------- diff --git a/doc/source/cql/mvs.rst b/doc/source/cql/mvs.rst index 55ede22..200090a 100644 --- a/doc/source/cql/mvs.rst +++ b/doc/source/cql/mvs.rst @@ -62,6 +62,11 @@ Creating a materialized view has 3 main parts: Attempting to create an already existing materialized view will return an error unless the ``IF NOT EXISTS`` option is used. If it is used, the statement will be a no-op if the materialized view already exists. +.. note:: By default, materialized views are built in a single thread. The initial build can be parallelized by + increasing the number of threads specified by the property ``concurrent_materialized_view_builders`` in + ``cassandra.yaml``. This property can also be manipulated at runtime through both JMX and the + ``setconcurrentviewbuilders`` and ``getconcurrentviewbuilders`` nodetool commands. + .. _mv-select: MV select statement http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/doc/source/operating/metrics.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst index 6559b53..2df1cf8 100644 --- a/doc/source/operating/metrics.rst +++ b/doc/source/operating/metrics.rst @@ -227,6 +227,7 @@ PerDiskMemtableFlushWriter_0 internal Responsible for writing a spec (ther Sampler internal Responsible for re-sampling the index summaries of SStables SecondaryIndexManagement internal Performs updates to secondary indexes ValidationExecutor internal Performs validation compaction or scrubbing +ViewBuildExecutor internal Performs materialized views initial build ============================ ============== =========== .. |nbsp| unicode:: 0xA0 .. nonbreaking space http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index de193b0..f63d94d 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -166,6 +166,7 @@ public class Config public int min_free_space_per_drive_in_mb = 50; public volatile int concurrent_validations = Integer.MAX_VALUE; + public volatile int concurrent_materialized_view_builders = 1; /** * @deprecated retry support removed on CASSANDRA-10992 http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index af1cbde..58c0bf4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -589,6 +589,9 @@ public class DatabaseDescriptor if (conf.concurrent_compactors <= 0) throw new ConfigurationException("concurrent_compactors should be strictly greater than 0, but was " + conf.concurrent_compactors, false); + if (conf.concurrent_materialized_view_builders <= 0) + throw new ConfigurationException("concurrent_materialized_view_builders should be strictly greater than 0, but was " + conf.concurrent_materialized_view_builders, false); + if (conf.num_tokens > MAX_NUM_TOKENS) throw new ConfigurationException(String.format("A maximum number of %d tokens per node is supported", MAX_NUM_TOKENS), false); @@ -1516,6 +1519,16 @@ public class DatabaseDescriptor conf.concurrent_validations = value; } + public static int getConcurrentViewBuilders() + { + return conf.concurrent_materialized_view_builders; + } + + public static void setConcurrentViewBuilders(int value) + { + conf.concurrent_materialized_view_builders = value; + } + public static long getMinFreeSpacePerDriveInBytes() { return conf.min_free_space_per_drive_in_mb * 1024L * 1024L; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index d814ac7..c3e649a 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -332,7 +332,7 @@ public class Keyspace logger.trace("Initializing {}.{}", getName(), cfm.name); initCf(Schema.instance.getTableMetadataRef(cfm.id), loadSSTables); } - this.viewManager.reload(); + this.viewManager.reload(false); } private Keyspace(KeyspaceMetadata metadata) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 2ffae11..9da0f6b 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -99,7 +99,7 @@ public final class SystemKeyspace public static final String SIZE_ESTIMATES = "size_estimates"; public static final String AVAILABLE_RANGES = "available_ranges"; public static final String TRANSFERRED_RANGES = "transferred_ranges"; - public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress"; + public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress"; public static final String BUILT_VIEWS = "built_views"; public static final String PREPARED_STATEMENTS = "prepared_statements"; public static final String REPAIRS = "repairs"; @@ -262,15 +262,17 @@ public final class SystemKeyspace + "PRIMARY KEY ((operation, keyspace_name), peer))") .build(); - private static final TableMetadata ViewsBuildsInProgress = - parse(VIEWS_BUILDS_IN_PROGRESS, + private static final TableMetadata ViewBuildsInProgress = + parse(VIEW_BUILDS_IN_PROGRESS, "views builds current progress", "CREATE TABLE %s (" + "keyspace_name text," + "view_name text," + + "start_token varchar," + + "end_token varchar," + "last_token varchar," - + "generation_number int," - + "PRIMARY KEY ((keyspace_name), view_name))") + + "keys_built bigint," + + "PRIMARY KEY ((keyspace_name), view_name, start_token, end_token))") .build(); private static final TableMetadata BuiltViews = @@ -337,7 +339,7 @@ public final class SystemKeyspace SizeEstimates, AvailableRanges, TransferredRanges, - ViewsBuildsInProgress, + ViewBuildsInProgress, BuiltViews, PreparedStatements, Repairs); @@ -457,23 +459,15 @@ public final class SystemKeyspace public static void setViewRemoved(String keyspaceName, String viewName) { - String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ? IF EXISTS"; - executeInternal(String.format(buildReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, VIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName); - forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS); + String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?"; + executeInternal(String.format(buildReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, VIEW_BUILDS_IN_PROGRESS), keyspaceName, viewName); + forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS); String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ? IF EXISTS"; executeInternal(String.format(builtReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName); forceBlockingFlush(BUILT_VIEWS); } - public static void beginViewBuild(String ksname, String viewName, int generationNumber) - { - executeInternal(format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS), - ksname, - viewName, - generationNumber); - } - public static void finishViewBuildStatus(String ksname, String viewName) { // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed @@ -482,8 +476,8 @@ public final class SystemKeyspace // Also, if writing to the built_view succeeds, but the view_builds_in_progress deletion fails, we will be able // to skip the view build next boot. setViewBuilt(ksname, viewName, false); - executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ? IF EXISTS", VIEWS_BUILDS_IN_PROGRESS), ksname, viewName); - forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS); + executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", VIEW_BUILDS_IN_PROGRESS), ksname, viewName); + forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS); } public static void setViewBuiltReplicated(String ksname, String viewName) @@ -491,33 +485,41 @@ public final class SystemKeyspace setViewBuilt(ksname, viewName, true); } - public static void updateViewBuildStatus(String ksname, String viewName, Token token) + public static void updateViewBuildStatus(String ksname, String viewName, Range<Token> range, Token lastToken, long keysBuilt) { - String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)"; - Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory(); - executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token)); + String req = "INSERT INTO system.%s (keyspace_name, view_name, start_token, end_token, last_token, keys_built) VALUES (?, ?, ?, ?, ?, ?)"; + Token.TokenFactory factory = ViewBuildsInProgress.partitioner.getTokenFactory(); + executeInternal(format(req, VIEW_BUILDS_IN_PROGRESS), + ksname, + viewName, + factory.toString(range.left), + factory.toString(range.right), + factory.toString(lastToken), + keysBuilt); } - public static Pair<Integer, Token> getViewBuildStatus(String ksname, String viewName) + public static Map<Range<Token>, Pair<Token, Long>> getViewBuildStatus(String ksname, String viewName) { - String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?"; - UntypedResultSet queryResultSet = executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName); - if (queryResultSet == null || queryResultSet.isEmpty()) - return null; + String req = "SELECT start_token, end_token, last_token, keys_built FROM system.%s WHERE keyspace_name = ? AND view_name = ?"; + Token.TokenFactory factory = ViewBuildsInProgress.partitioner.getTokenFactory(); + UntypedResultSet rs = executeInternal(format(req, VIEW_BUILDS_IN_PROGRESS), ksname, viewName); - UntypedResultSet.Row row = queryResultSet.one(); + if (rs == null || rs.isEmpty()) + return Collections.emptyMap(); - Integer generation = null; - Token lastKey = null; - if (row.has("generation_number")) - generation = row.getInt("generation_number"); - if (row.has("last_key")) + Map<Range<Token>, Pair<Token, Long>> status = new HashMap<>(); + for (UntypedResultSet.Row row : rs) { - Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory(); - lastKey = factory.fromString(row.getString("last_key")); - } + Token start = factory.fromString(row.getString("start_token")); + Token end = factory.fromString(row.getString("end_token")); + Range<Token> range = new Range<>(start, end); + + Token lastToken = row.has("last_token") ? factory.fromString(row.getString("last_token")) : null; + long keysBuilt = row.has("keys_built") ? row.getLong("keys_built") : 0; - return Pair.create(generation, lastKey); + status.put(range, Pair.create(lastToken, keysBuilt)); + } + return status; } public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition position) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 3ff9c24..a615c03 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -51,7 +51,7 @@ import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.db.view.ViewBuilder; +import org.apache.cassandra.db.view.ViewBuilderTask; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -124,8 +124,9 @@ public class CompactionManager implements CompactionManagerMBean private final CompactionExecutor executor = new CompactionExecutor(); private final CompactionExecutor validationExecutor = new ValidationExecutor(); private final static CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor(); + private final CompactionExecutor viewBuildExecutor = new ViewBuildExecutor(); - private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor); + private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor, viewBuildExecutor); private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create(); private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE); @@ -216,6 +217,7 @@ public class CompactionManager implements CompactionManagerMBean // shutdown executors to prevent further submission executor.shutdown(); validationExecutor.shutdown(); + viewBuildExecutor.shutdown(); // interrupt compactions and validations for (Holder compactionHolder : CompactionMetrics.getCompactions()) @@ -226,7 +228,7 @@ public class CompactionManager implements CompactionManagerMBean // wait for tasks to terminate // compaction tasks are interrupted above, so it shuold be fairy quick // until not interrupted tasks to complete. - for (ExecutorService exec : Arrays.asList(executor, validationExecutor)) + for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor)) { try { @@ -1718,31 +1720,21 @@ public class CompactionManager implements CompactionManagerMBean } } - public Future<?> submitViewBuilder(final ViewBuilder builder) + public ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task) { - Runnable runnable = new Runnable() - { - public void run() + return viewBuildExecutor.submitIfRunning(() -> { + metrics.beginCompaction(task); + try { - metrics.beginCompaction(builder); - try - { - builder.run(); - } - finally - { - metrics.finishCompaction(builder); - } + return task.call(); } - }; - if (executor.isShutdown()) - { - logger.info("Compaction executor has shut down, not submitting index build"); - return null; - } - - return executor.submit(runnable); + finally + { + metrics.finishCompaction(task); + } + }, "view build"); } + public int getActiveCompactions() { return CompactionMetrics.getCompactions().size(); @@ -1817,7 +1809,7 @@ public class CompactionManager implements CompactionManagerMBean * @return the future that will deliver the task result, or a future that has already been * cancelled if the task could not be submitted. */ - public ListenableFuture<?> submitIfRunning(Callable<?> task, String name) + public <T> ListenableFuture<T> submitIfRunning(Callable<T> task, String name) { if (isShutdown()) { @@ -1827,7 +1819,7 @@ public class CompactionManager implements CompactionManagerMBean try { - ListenableFutureTask ret = ListenableFutureTask.create(task); + ListenableFutureTask<T> ret = ListenableFutureTask.create(task); execute(ret); return ret; } @@ -1851,6 +1843,14 @@ public class CompactionManager implements CompactionManagerMBean } } + private static class ViewBuildExecutor extends CompactionExecutor + { + public ViewBuildExecutor() + { + super(DatabaseDescriptor.getConcurrentViewBuilders(), "ViewBuildExecutor"); + } + } + private static class CacheCleanupExecutor extends CompactionExecutor { public CacheCleanupExecutor() @@ -1974,6 +1974,22 @@ public class CompactionManager implements CompactionManagerMBean validationExecutor.setMaximumPoolSize(value); } + public void setConcurrentViewBuilders(int value) + { + if (value > viewBuildExecutor.getCorePoolSize()) + { + // we are increasing the value + viewBuildExecutor.setMaximumPoolSize(value); + viewBuildExecutor.setCorePoolSize(value); + } + else if (value < viewBuildExecutor.getCorePoolSize()) + { + // we are reducing the value + viewBuildExecutor.setCorePoolSize(value); + viewBuildExecutor.setMaximumPoolSize(value); + } + } + public int getCoreCompactorThreads() { return executor.getCorePoolSize(); @@ -2014,6 +2030,26 @@ public class CompactionManager implements CompactionManagerMBean validationExecutor.setMaximumPoolSize(number); } + public int getCoreViewBuildThreads() + { + return viewBuildExecutor.getCorePoolSize(); + } + + public void setCoreViewBuildThreads(int number) + { + viewBuildExecutor.setCorePoolSize(number); + } + + public int getMaximumViewBuildThreads() + { + return viewBuildExecutor.getMaximumPoolSize(); + } + + public void setMaximumViewBuildThreads(int number) + { + viewBuildExecutor.setMaximumPoolSize(number); + } + /** * Try to stop all of the compactions for given ColumnFamilies. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java index 8785b41..b98b371 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java @@ -116,4 +116,26 @@ public interface CompactionManagerMBean * @param number New maximum of validator threads */ public void setMaximumValidatorThreads(int number); + + /** + * Returns core size of view build thread pool + */ + public int getCoreViewBuildThreads(); + + /** + * Allows user to resize maximum size of the view build thread pool. + * @param number New maximum of view build threads + */ + public void setCoreViewBuildThreads(int number); + + /** + * Returns size of view build thread pool + */ + public int getMaximumViewBuildThreads(); + + /** + * Allows user to resize maximum size of the view build thread pool. + * @param number New maximum of view build threads + */ + public void setMaximumViewBuildThreads(int number); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java index f601673..f6545b0 100644 --- a/src/java/org/apache/cassandra/db/view/View.java +++ b/src/java/org/apache/cassandra/db/view/View.java @@ -28,7 +28,6 @@ import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -192,15 +191,22 @@ public class View public synchronized void build() { - if (this.builder != null) + stopBuild(); + builder = new ViewBuilder(baseCfs, this); + builder.start(); + } + + /** + * Stops the building of this view, no-op if it isn't building. + */ + synchronized void stopBuild() + { + if (builder != null) { logger.debug("Stopping current view builder due to schema change"); - this.builder.stop(); - this.builder = null; + builder.stop(); + builder = null; } - - this.builder = new ViewBuilder(baseCfs, this); - CompactionManager.instance.submitViewBuilder(builder); } @Nullable http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index fcb1e98..8187a57 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -18,217 +18,224 @@ package org.apache.cassandra.db.view; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.Nullable; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import java.util.stream.Collectors; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.ReducingKeyIterator; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.repair.SystemDistributedKeyspace; -import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.concurrent.Refs; -public class ViewBuilder extends CompactionInfo.Holder +import static java.util.stream.Collectors.toList; + +/** + * Builds a materialized view for the local token ranges. + * <p> + * The build is split in at least {@link #NUM_TASKS} {@link ViewBuilderTask tasks}, suitable of being parallelized by + * the {@link CompactionManager} which will execute them. + */ +class ViewBuilder { - private final ColumnFamilyStore baseCfs; - private final View view; - private final UUID compactionId; - private volatile Token prevToken = null; + private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class); - private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class); + private static final int NUM_TASKS = Runtime.getRuntime().availableProcessors() * 4; + private final ColumnFamilyStore baseCfs; + private final View view; + private final String ksName; + private final UUID localHostId = SystemKeyspace.getLocalHostId(); + private final Set<Range<Token>> builtRanges = Sets.newConcurrentHashSet(); + private final Map<Range<Token>, Pair<Token, Long>> pendingRanges = Maps.newConcurrentMap(); + private final Set<ViewBuilderTask> tasks = Sets.newConcurrentHashSet(); + private volatile long keysBuilt = 0; private volatile boolean isStopped = false; + private volatile Future<?> future = Futures.immediateFuture(null); - public ViewBuilder(ColumnFamilyStore baseCfs, View view) + ViewBuilder(ColumnFamilyStore baseCfs, View view) { this.baseCfs = baseCfs; this.view = view; - compactionId = UUIDGen.getTimeUUID(); + ksName = baseCfs.metadata.keyspace; } - private void buildKey(DecoratedKey key) + public void start() { - ReadQuery selectQuery = view.getReadQuery(); - - if (!selectQuery.selectsKey(key)) + if (SystemKeyspace.isViewBuilt(ksName, view.name)) { - logger.trace("Skipping {}, view query filters", key); - return; + logger.debug("View already marked built for {}.{}", ksName, view.name); + if (!SystemKeyspace.isViewStatusReplicated(ksName, view.name)) + updateDistributed(); } - - int nowInSec = FBUtilities.nowInSeconds(); - SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec); - - // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates - // and pretend that there is nothing pre-existing. - UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false); - - try (ReadExecutionController orderGroup = command.executionController(); - UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command)) + else { - Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager - .forTable(baseCfs.metadata.id) - .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true); + SystemDistributedKeyspace.startViewBuild(ksName, view.name, localHostId); + + logger.debug("Starting build of view({}.{}). Flushing base table {}.{}", + ksName, view.name, ksName, baseCfs.name); + baseCfs.forceBlockingFlush(); - AtomicLong noBase = new AtomicLong(Long.MAX_VALUE); - mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime())); + loadStatusAndBuild(); } } - public void run() + private void loadStatusAndBuild() + { + loadStatus(); + build(); + } + + private void loadStatus() { - logger.debug("Starting view builder for {}.{}", baseCfs.metadata.keyspace, view.name); - UUID localHostId = SystemKeyspace.getLocalHostId(); - String ksname = baseCfs.metadata.keyspace, viewName = view.name; + builtRanges.clear(); + pendingRanges.clear(); + SystemKeyspace.getViewBuildStatus(ksName, view.name) + .forEach((range, pair) -> + { + Token lastToken = pair.left; + if (lastToken != null && lastToken.equals(range.right)) + { + builtRanges.add(range); + keysBuilt += pair.right; + } + else + { + pendingRanges.put(range, pair); + } + }); + } - if (SystemKeyspace.isViewBuilt(ksname, viewName)) + private synchronized void build() + { + if (isStopped) { - logger.debug("View already marked built for {}.{}", baseCfs.metadata.keyspace, view.name); - if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName)) - updateDistributed(ksname, viewName, localHostId); + logger.debug("Stopped build for view({}.{}) after covering {} keys", ksName, view.name, keysBuilt); return; } - Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.keyspace); - final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName); - Token lastToken; - Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function; - if (buildStatus == null) - { - logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.keyspace, baseCfs.name); - lastToken = null; - - //We don't track the generation number anymore since if a rebuild is stopped and - //restarted the max generation filter may yield no sstables due to compactions. - //We only care about max generation *during* a build, not across builds. - //see CASSANDRA-13405 - SystemKeyspace.beginViewBuild(ksname, viewName, 0); - } - else + // Get the local ranges for which the view hasn't already been built nor it's building + Set<Range<Token>> newRanges = StorageService.instance.getLocalRanges(ksName) + .stream() + .map(r -> r.subtractAll(builtRanges)) + .flatMap(Set::stream) + .map(r -> r.subtractAll(pendingRanges.keySet())) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + + // If there are no new nor pending ranges we should finish the build + if (newRanges.isEmpty() && pendingRanges.isEmpty()) { - lastToken = buildStatus.right; - logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.keyspace, baseCfs.name); + finish(); + return; } - baseCfs.forceBlockingFlush(); - function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL); - - prevToken = lastToken; - long keysBuilt = 0; - try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs; - ReducingKeyIterator iter = new ReducingKeyIterator(sstables)) + // Split the new local ranges and add them to the pending set + DatabaseDescriptor.getPartitioner() + .splitter() + .map(s -> s.split(newRanges, NUM_TASKS)) + .orElse(newRanges) + .forEach(r -> pendingRanges.put(r, Pair.<Token, Long>create(null, 0L))); + + // Submit a new view build task for each building range. + // We keep record of all the submitted tasks to be able of stopping them. + List<ListenableFuture<Long>> futures = pendingRanges.entrySet() + .stream() + .map(e -> new ViewBuilderTask(baseCfs, + view, + e.getKey(), + e.getValue().left, + e.getValue().right)) + .peek(tasks::add) + .map(CompactionManager.instance::submitViewBuilder) + .collect(toList()); + + // Add a callback to process any eventual new local range and mark the view as built, doing a delayed retry if + // the tasks don't succeed + ListenableFuture<List<Long>> future = Futures.allAsList(futures); + Futures.addCallback(future, new FutureCallback<List<Long>>() { - SystemDistributedKeyspace.startViewBuild(ksname, viewName, localHostId); - while (!isStopped && iter.hasNext()) + public void onSuccess(List<Long> result) { - DecoratedKey key = iter.next(); - Token token = key.getToken(); - if (lastToken == null || lastToken.compareTo(token) < 0) - { - for (Range<Token> range : ranges) - { - if (range.contains(token)) - { - buildKey(key); - ++keysBuilt; - - if (prevToken == null || prevToken.compareTo(token) != 0) - { - SystemKeyspace.updateViewBuildStatus(ksname, viewName, key.getToken()); - prevToken = token; - } - } - } - - lastToken = null; - } + keysBuilt += result.stream().mapToLong(x -> x).sum(); + builtRanges.addAll(pendingRanges.keySet()); + pendingRanges.clear(); + build(); } - if (!isStopped) + public void onFailure(Throwable t) { - logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt); - SystemKeyspace.finishViewBuildStatus(ksname, viewName); - updateDistributed(ksname, viewName, localHostId); - } - else - { - logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt); + if (t instanceof CompactionInterruptedException) + { + internalStop(true); + keysBuilt = tasks.stream().mapToLong(ViewBuilderTask::keysBuilt).sum(); + logger.info("Interrupted build for view({}.{}) after covering {} keys", ksName, view.name, keysBuilt); + } + else + { + ScheduledExecutors.nonPeriodicTasks.schedule(() -> loadStatusAndBuild(), 5, TimeUnit.MINUTES); + logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", t); + } } - } - catch (Exception e) - { - ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this), - 5, - TimeUnit.MINUTES); - logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e); - } + }, MoreExecutors.directExecutor()); + this.future = future; + } + + private void finish() + { + logger.debug("Marking view({}.{}) as built after covering {} keys ", ksName, view.name, keysBuilt); + SystemKeyspace.finishViewBuildStatus(ksName, view.name); + updateDistributed(); } - private void updateDistributed(String ksname, String viewName, UUID localHostId) + private void updateDistributed() { try { - SystemDistributedKeyspace.successfulViewBuild(ksname, viewName, localHostId); - SystemKeyspace.setViewBuiltReplicated(ksname, viewName); + SystemDistributedKeyspace.successfulViewBuild(ksName, view.name, localHostId); + SystemKeyspace.setViewBuiltReplicated(ksName, view.name); } catch (Exception e) { - ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this), - 5, - TimeUnit.MINUTES); - logger.warn("Failed to updated the distributed status of view, sleeping 5 minutes before retrying", e); + ScheduledExecutors.nonPeriodicTasks.schedule(this::updateDistributed, 5, TimeUnit.MINUTES); + logger.warn("Failed to update the distributed status of view, sleeping 5 minutes before retrying", e); } } - public CompactionInfo getCompactionInfo() + /** + * Stops the view building. + */ + synchronized void stop() { - long rangesLeft = 0, rangesTotal = 0; - Token lastToken = prevToken; - - // This approximation is not very accurate, but since we do not have a method which allows us to calculate the - // percentage of a range covered by a second range, this is the best approximation that we can calculate. - // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of - // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node - // has. - for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName())) - { - rangesLeft++; - rangesTotal++; - // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the - // end of the method. - if (lastToken == null || range.contains(lastToken)) - rangesLeft = 0; - } - - return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId); + boolean wasStopped = isStopped; + internalStop(false); + if (!wasStopped) + FBUtilities.waitOnFuture(future); } - public void stop() + private void internalStop(boolean isCompactionInterrupted) { isStopped = true; + tasks.forEach(task -> task.stop(isCompactionInterrupted)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java new file mode 100644 index 0000000..0273c17 --- /dev/null +++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.view; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.util.concurrent.Futures; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.ReadQuery; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Refs; + +public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<Long> +{ + private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class); + + private static final int ROWS_BETWEEN_CHECKPOINTS = 1000; + + private final ColumnFamilyStore baseCfs; + private final View view; + private final Range<Token> range; + private final UUID compactionId; + private volatile Token prevToken; + private volatile long keysBuilt = 0; + private volatile boolean isStopped = false; + private volatile boolean isCompactionInterrupted = false; + + ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt) + { + this.baseCfs = baseCfs; + this.view = view; + this.range = range; + this.compactionId = UUIDGen.getTimeUUID(); + this.prevToken = lastToken; + this.keysBuilt = keysBuilt; + } + + private void buildKey(DecoratedKey key) + { + ReadQuery selectQuery = view.getReadQuery(); + + if (!selectQuery.selectsKey(key)) + { + logger.trace("Skipping {}, view query filters", key); + return; + } + + int nowInSec = FBUtilities.nowInSeconds(); + SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec); + + // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates + // and pretend that there is nothing pre-existing. + UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false); + + try (ReadExecutionController orderGroup = command.executionController(); + UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command)) + { + Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager + .forTable(baseCfs.metadata.id) + .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true); + + AtomicLong noBase = new AtomicLong(Long.MAX_VALUE); + mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime())); + } + } + + public Long call() + { + String ksName = baseCfs.metadata.keyspace; + + if (prevToken == null) + logger.debug("Starting new view build for range {}", range); + else + logger.debug("Resuming view build for range {} from token {} with {} covered keys", range, prevToken, keysBuilt); + + Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function; + function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> range.intersects(s.getBounds())); + + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(function); + Refs<SSTableReader> sstables = viewFragment.refs; + ReducingKeyIterator keyIter = new ReducingKeyIterator(sstables)) + { + PeekingIterator<DecoratedKey> iter = Iterators.peekingIterator(keyIter); + while (!isStopped && iter.hasNext()) + { + DecoratedKey key = iter.next(); + Token token = key.getToken(); + //skip tokens already built or not present in range + if (range.contains(token) && (prevToken == null || token.compareTo(prevToken) > 0)) + { + buildKey(key); + ++keysBuilt; + //build other keys sharing the same token + while (iter.hasNext() && iter.peek().getToken().equals(token)) + { + key = iter.next(); + buildKey(key); + ++keysBuilt; + } + if (keysBuilt % ROWS_BETWEEN_CHECKPOINTS == 1) + SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, token, keysBuilt); + prevToken = token; + } + } + } + + finish(); + + return keysBuilt; + } + + private void finish() + { + String ksName = baseCfs.keyspace.getName(); + if (!isStopped) + { + // Save the completed status using the end of the range as last token. This way it will be possible for + // future view build attempts to don't even create a task for this range + SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, range.right, keysBuilt); + + logger.debug("Completed build of view({}.{}) for range {} after covering {} keys ", ksName, view.name, range, keysBuilt); + } + else + { + logger.debug("Stopped build for view({}.{}) for range {} after covering {} keys", ksName, view.name, range, keysBuilt); + + // If it's stopped due to a compaction interruption we should throw that exception. + // Otherwise we assume that the task has been stopped due to a schema update and we can finish successfully. + if (isCompactionInterrupted) + throw new StoppedException(ksName, view.name, getCompactionInfo()); + } + } + + @Override + public CompactionInfo getCompactionInfo() + { + // If there's splitter, calculate progress based on last token position + if (range.left.getPartitioner().splitter().isPresent()) + { + long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000); + return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, "token range parts", compactionId); + } + + // When there is no splitter, estimate based on number of total keys but + // take the max with keysBuilt + 1 to avoid having more completed than total + long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range)); + return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, "keys", compactionId); + } + + @Override + public void stop() + { + stop(true); + } + + synchronized void stop(boolean isCompactionInterrupted) + { + isStopped = true; + this.isCompactionInterrupted = isCompactionInterrupted; + } + + long keysBuilt() + { + return keysBuilt; + } + + /** + * {@link CompactionInterruptedException} with {@link Object#equals(Object)} and {@link Object#hashCode()} + * implementations that consider equals all the exceptions produced by the same view build, independently of their + * token range. + * <p> + * This is used to avoid Guava's {@link Futures#allAsList(Iterable)} log spamming when multiple build tasks fail + * due to compaction interruption. + */ + static class StoppedException extends CompactionInterruptedException + { + private final String ksName, viewName; + + private StoppedException(String ksName, String viewName, CompactionInfo info) + { + super(info); + this.ksName = ksName; + this.viewName = viewName; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof StoppedException)) + return false; + + StoppedException that = (StoppedException) o; + return Objects.equal(this.ksName, that.ksName) && Objects.equal(this.viewName, that.viewName); + } + + @Override + public int hashCode() + { + return 31 * ksName.hashCode() + viewName.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java index cf731dd..8506d82 100644 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@ -93,7 +93,7 @@ public class ViewManager return viewsByName.values(); } - public void reload() + public void reload(boolean buildAllViews) { Map<String, ViewMetadata> newViewsByName = new HashMap<>(); for (ViewMetadata definition : keyspace.getMetadata().views) @@ -113,6 +113,9 @@ public class ViewManager addView(entry.getValue()); } + if (!buildAllViews) + return; + // Building views involves updating view build status in the system_distributed // keyspace and therefore it requires ring information. This check prevents builds // being submitted when Keyspaces are initialized during CassandraDaemon::setup as @@ -163,6 +166,16 @@ public class ViewManager SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), view.name); } + /** + * Stops the building of the specified view, no-op if it isn't building. + * + * @param name the name of the view + */ + public void stopBuild(String name) + { + viewsByName.get(name).stopBuild(); + } + public View getByName(String name) { return viewsByName.get(name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/dht/Splitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java index 4433f97..c63fe91 100644 --- a/src/java/org/apache/cassandra/dht/Splitter.java +++ b/src/java/org/apache/cassandra/dht/Splitter.java @@ -18,10 +18,19 @@ package org.apache.cassandra.dht; +import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; + +import static java.util.stream.Collectors.toSet; /** * Partition splitter. @@ -35,10 +44,79 @@ public abstract class Splitter this.partitioner = partitioner; } + @VisibleForTesting protected abstract Token tokenForValue(BigInteger value); + @VisibleForTesting protected abstract BigInteger valueForToken(Token token); + @VisibleForTesting + protected BigInteger tokensInRange(Range<Token> range) + { + //full range case + if (range.left.equals(range.right)) + return tokensInRange(new Range(partitioner.getMinimumToken(), partitioner.getMaximumToken())); + + BigInteger totalTokens = BigInteger.ZERO; + for (Range<Token> unwrapped : range.unwrap()) + { + totalTokens = totalTokens.add(valueForToken(token(unwrapped.right)).subtract(valueForToken(unwrapped.left))).abs(); + } + return totalTokens; + } + + /** + * Computes the number of elapsed tokens from the range start until this token + * @return the number of tokens from the range start to the token + */ + @VisibleForTesting + protected BigInteger elapsedTokens(Token token, Range<Token> range) + { + // No token elapsed since range does not contain token + if (!range.contains(token)) + return BigInteger.ZERO; + + BigInteger elapsedTokens = BigInteger.ZERO; + for (Range<Token> unwrapped : range.unwrap()) + { + if (unwrapped.contains(token)) + { + elapsedTokens = elapsedTokens.add(tokensInRange(new Range<>(unwrapped.left, token))); + } + else if (token.compareTo(unwrapped.left) < 0) + { + elapsedTokens = elapsedTokens.add(tokensInRange(unwrapped)); + } + } + return elapsedTokens; + } + + /** + * Computes the normalized position of this token relative to this range + * @return A number between 0.0 and 1.0 representing this token's position + * in this range or -1.0 if this range doesn't contain this token. + */ + public double positionInRange(Token token, Range<Token> range) + { + //full range case + if (range.left.equals(range.right)) + return positionInRange(token, new Range(partitioner.getMinimumToken(), partitioner.getMaximumToken())); + + // leftmost token means we are on position 0.0 + if (token.equals(range.left)) + return 0.0; + + // rightmost token means we are on position 1.0 + if (token.equals(range.right)) + return 1.0; + + // Impossible to find position when token is not contained in range + if (!range.contains(token)) + return -1.0; + + return new BigDecimal(elapsedTokens(token, range)).divide(new BigDecimal(tokensInRange(range)), 3, BigDecimal.ROUND_HALF_EVEN).doubleValue(); + } + public List<Token> splitOwnedRanges(int parts, List<Range<Token>> localRanges, boolean dontSplitRanges) { if (localRanges.isEmpty() || parts == 1) @@ -127,4 +205,55 @@ public abstract class Splitter return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumToken() : t; } + /** + * Splits the specified token ranges in at least {@code parts} subranges. + * <p> + * Each returned subrange will be contained in exactly one of the specified ranges. + * + * @param ranges a collection of token ranges to be split + * @param parts the minimum number of returned ranges + * @return at least {@code minParts} token ranges covering {@code ranges} + */ + public Set<Range<Token>> split(Collection<Range<Token>> ranges, int parts) + { + int numRanges = ranges.size(); + if (numRanges >= parts) + { + return Sets.newHashSet(ranges); + } + else + { + int partsPerRange = (int) Math.ceil((double) parts / numRanges); + return ranges.stream() + .map(range -> split(range, partsPerRange)) + .flatMap(Collection::stream) + .collect(toSet()); + } + } + + /** + * Splits the specified token range in at least {@code minParts} subranges, unless the range has not enough tokens + * in which case the range will be returned without splitting. + * + * @param range a token range + * @param parts the number of subranges + * @return {@code parts} even subranges of {@code range} + */ + private Set<Range<Token>> split(Range<Token> range, int parts) + { + // the range might not have enough tokens to split + BigInteger numTokens = tokensInRange(range); + if (BigInteger.valueOf(parts).compareTo(numTokens) > 0) + return Collections.singleton(range); + + Token left = range.left; + Set<Range<Token>> subranges = new HashSet<>(parts); + for (double i = 1; i <= parts; i++) + { + Token right = partitioner.split(range.left, range.right, i / parts); + subranges.add(new Range<>(left, right)); + left = right; + } + return subranges; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index 3018fc1..f4d3706 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -33,7 +33,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.DiskOptimizationStrategy; import org.apache.cassandra.io.util.FileUtils; @@ -343,4 +345,9 @@ public abstract class SSTable appendTOC(descriptor, componentsToAdd); components.addAll(componentsToAdd); } + + public AbstractBounds<Token> getBounds() + { + return AbstractBounds.bounds(first.getToken(), true, last.getToken(), true); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/schema/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java index e79e3bd..711724b 100644 --- a/src/java/org/apache/cassandra/schema/Schema.java +++ b/src/java/org/apache/cassandra/schema/Schema.java @@ -632,7 +632,7 @@ public final class Schema viewsDiff.entriesDiffering().values().forEach(diff -> alterView(diff.rightValue())); // deal with all removed, added, and altered views - Keyspace.open(before.name).viewManager.reload(); + Keyspace.open(before.name).viewManager.reload(true); // notify on everything dropped udasDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropAggregate); @@ -691,6 +691,7 @@ public final class Schema private void dropView(ViewMetadata metadata) { + Keyspace.open(metadata.keyspace).viewManager.stopBuild(metadata.name); dropTable(metadata.metadata); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index cb942b9..c1202be 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -96,7 +96,6 @@ import org.apache.cassandra.service.paxos.PrepareVerbHandler; import org.apache.cassandra.service.paxos.ProposeVerbHandler; import org.apache.cassandra.streaming.*; import org.apache.cassandra.tracing.TraceKeyspace; -import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.progress.ProgressEvent; @@ -1392,6 +1391,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE CompactionManager.instance.setConcurrentValidations(DatabaseDescriptor.getConcurrentValidations()); } + public int getConcurrentViewBuilders() + { + return DatabaseDescriptor.getConcurrentViewBuilders(); + } + + public void setConcurrentViewBuilders(int value) + { + if (value <= 0) + throw new IllegalArgumentException("Number of concurrent view builders should be greater than 0."); + DatabaseDescriptor.setConcurrentViewBuilders(value); + CompactionManager.instance.setConcurrentViewBuilders(DatabaseDescriptor.getConcurrentViewBuilders()); + } + public boolean isIncrementalBackupsEnabled() { return DatabaseDescriptor.isIncrementalBackupsEnabled(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index c4548ae..48e1b2f 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -520,6 +519,9 @@ public interface StorageServiceMBean extends NotificationEmitter public int getConcurrentValidators(); public void setConcurrentValidators(int value); + public int getConcurrentViewBuilders(); + public void setConcurrentViewBuilders(int value); + public boolean isIncrementalBackupsEnabled(); public void setIncrementalBackupsEnabled(boolean value); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 0912534..0de00f7 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1053,6 +1053,16 @@ public class NodeProbe implements AutoCloseable return ssProxy.getConcurrentCompactors(); } + public void setConcurrentViewBuilders(int value) + { + ssProxy.setConcurrentViewBuilders(value); + } + + public int getConcurrentViewBuilders() + { + return ssProxy.getConcurrentViewBuilders(); + } + public void setMaxHintWindow(int value) { spProxy.setMaxHintWindow(value); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 8618d87..0db422e 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -110,6 +110,8 @@ public class NodeTool SetCompactionThroughput.class, GetConcurrentCompactors.class, SetConcurrentCompactors.class, + GetConcurrentViewBuilders.class, + SetConcurrentViewBuilders.class, SetTimeout.class, SetStreamThroughput.class, SetInterDCStreamThroughput.class, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java b/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java new file mode 100644 index 0000000..c189fb0 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import io.airlift.airline.Command; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +@Command(name = "getconcurrentviewbuilders", description = "Get the number of concurrent view builders in the system") +public class GetConcurrentViewBuilders extends NodeToolCmd +{ + protected void execute(NodeProbe probe) + { + System.out.println("Current number of concurrent view builders in the system is: \n" + + probe.getConcurrentViewBuilders()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java b/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java new file mode 100644 index 0000000..96adf2c --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; + +import static com.google.common.base.Preconditions.checkArgument; + +@Command(name = "setconcurrentviewbuilders", description = "Set the number of concurrent view builders in the system") +public class SetConcurrentViewBuilders extends NodeTool.NodeToolCmd +{ + @Arguments(title = "concurrent_view_builders", usage = "<value>", description = "Number of concurrent view builders, greater than 0.", required = true) + private Integer concurrentViewBuilders = null; + + protected void execute(NodeProbe probe) + { + checkArgument(concurrentViewBuilders > 0, "concurrent_view_builders should be great than 0."); + probe.setConcurrentViewBuilders(concurrentViewBuilders); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/test/unit/org/apache/cassandra/cql3/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index 4fd4df6..2b95574 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -1320,8 +1320,7 @@ public class ViewTest extends CQLTester } } - @Test - public void testViewBuilderResume() throws Throwable + private void testViewBuilderResume(int concurrentViewBuilders) throws Throwable { createTable("CREATE TABLE %s (" + "k int, " + @@ -1332,6 +1331,7 @@ public class ViewTest extends CQLTester execute("USE " + keyspace()); executeNet(protocolVersion, "USE " + keyspace()); + CompactionManager.instance.setConcurrentViewBuilders(concurrentViewBuilders); CompactionManager.instance.setCoreCompactorThreads(1); CompactionManager.instance.setMaximumCompactorThreads(1); ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); @@ -1357,21 +1357,32 @@ public class ViewTest extends CQLTester cfs.forceBlockingFlush(); - createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)"); + String viewName1 = "mv_test_" + concurrentViewBuilders; + createView(viewName1, "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)"); cfs.enableAutoCompaction(); List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs); + String viewName2 = viewName1 + "_2"; //Force a second MV on the same base table, which will restart the first MV builder... - createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)"); + createView(viewName2, "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)"); //Compact the base table FBUtilities.waitOnFutures(futures); - while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test")) + while (!SystemKeyspace.isViewBuilt(keyspace(), viewName1)) Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L)); + assertRows(execute("SELECT count(*) FROM " + viewName1), row(1024L)); + } + + @Test + public void testViewBuilderResume() throws Throwable + { + for (int i = 1; i <= 8; i *= 2) + { + testViewBuilderResume(i); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java b/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java new file mode 100644 index 0000000..2341c73 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.view; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.transport.ProtocolVersion; + +import static org.junit.Assert.assertEquals; + +public class ViewBuilderTaskTest extends CQLTester +{ + private static final ProtocolVersion protocolVersion = ProtocolVersion.CURRENT; + + @Test + public void testBuildRange() throws Throwable + { + requireNetwork(); + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + String tableName = createTable("CREATE TABLE %s (" + + "k int, " + + "c int, " + + "v text, " + + "PRIMARY KEY(k, c))"); + + String viewName = tableName + "_view"; + executeNet(protocolVersion, String.format("CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s " + + "WHERE v IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL " + + "PRIMARY KEY (v, k, c)", viewName)); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + View view = cfs.keyspace.viewManager.forTable(cfs.metadata().id).iterator().next(); + + // Insert the dataset + for (int k = 0; k < 100; k++) + for (int c = 0; c < 10; c++) + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, String.valueOf(k)); + + // Retrieve the sorted tokens of the inserted rows + IPartitioner partitioner = cfs.metadata().partitioner; + List<Token> tokens = IntStream.range(0, 100) + .mapToObj(Int32Type.instance::decompose) + .map(partitioner::getToken) + .sorted() + .collect(Collectors.toList()); + + class Tester + { + private void test(int indexOfStartToken, + int indexOfEndToken, + Integer indexOfLastToken, + long keysBuilt, + long expectedKeysBuilt, + int expectedRowsInView) throws Throwable + { + // Truncate the materialized view (not the base table) + cfs.viewManager.forceBlockingFlush(); + cfs.viewManager.truncateBlocking(cfs.forceBlockingFlush(), System.currentTimeMillis()); + assertRowCount(execute("SELECT * FROM " + viewName), 0); + + // Get the tokens from the referenced inserted rows + Token startToken = tokens.get(indexOfStartToken); + Token endToken = tokens.get(indexOfEndToken); + Token lastToken = indexOfLastToken == null ? null : tokens.get(indexOfLastToken); + Range<Token> range = new Range<>(startToken, endToken); + + // Run the view build task, verifying the returned number of bult keys + long actualKeysBuilt = new ViewBuilderTask(cfs, view, range, lastToken, keysBuilt).call(); + assertEquals(expectedKeysBuilt, actualKeysBuilt); + + // Verify that the rows have been written to the MV + assertRowCount(execute("SELECT * FROM " + viewName), expectedRowsInView); + + // Verify that the last position and number of bult keys have been stored + assertRows(execute(String.format("SELECT last_token, keys_built " + + "FROM %s.%s WHERE keyspace_name='%s' AND view_name='%s' " + + "AND start_token=? AND end_token=?", + SchemaConstants.SYSTEM_KEYSPACE_NAME, + SystemKeyspace.VIEW_BUILDS_IN_PROGRESS, + keyspace(), + viewName), + startToken.toString(), endToken.toString()), + row(endToken.toString(), expectedKeysBuilt)); + } + } + Tester tester = new Tester(); + + // Build range from rows 0 to 100 without any recorded start position + tester.test(0, 10, null, 0, 10, 100); + + // Build range from rows 100 to 200 starting at row 150 + tester.test(10, 20, 15, 0, 5, 50); + + // Build range from rows 300 to 400 starting at row 350 with 10 built keys + tester.test(30, 40, 35, 10, 15, 50); + + // Build range from rows 400 to 500 starting at row 100 (out of range) with 10 built keys + tester.test(40, 50, 10, 10, 20, 100); + + // Build range from rows 900 to 100 (wrap around) without any recorded start position + tester.test(90, 10, null, 0, 20, 200); + + executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + view.name); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org