Fix endless loop/compaction of schema_* CFs due to broken timestamps patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-4880
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dd1633ba Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dd1633ba Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dd1633ba Branch: refs/heads/cassandra-1.2.0 Commit: dd1633ba0bf0e15547f5b7048271ab7334e862f0 Parents: caeee7c Author: Pavel Yaskevich <pyaskev...@twitter.com> Authored: Fri Nov 16 14:23:24 2012 -0800 Committer: Pavel Yaskevich <pyaskev...@twitter.com> Committed: Fri Nov 16 14:31:18 2012 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/DefsTable.java | 29 +++++++++++---- src/java/org/apache/cassandra/db/RowMutation.java | 3 ++ .../org/apache/cassandra/net/MessagingService.java | 12 +++--- .../apache/cassandra/service/MigrationManager.java | 9 +++-- 5 files changed, 37 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b80c60f..2ed9666 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ * Fix DynamicCompositeType same type comparison (CASSANDRA-4711) * Fix duplicate SSTable reference when stream session failed (CASSANDRA-3306) * Allow static CF definition with compact storage (CASSANDRA-4910) + * Fix endless loop/compaction of schema_* CFs due to broken timestamps (CASSANDRA-4880) 1.1.6 http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java index 2e4e5d3..4d6b574 100644 --- a/src/java/org/apache/cassandra/db/DefsTable.java +++ b/src/java/org/apache/cassandra/db/DefsTable.java @@ -174,7 +174,7 @@ public class DefsTable ColumnFamilyStore cfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(columnFamily); boolean needsCleanup = false; - long timestamp = FBUtilities.timestampMicros(); + Date now = new Date(); List<Row> rows = SystemTable.serializedSchema(columnFamily); @@ -186,11 +186,24 @@ public class DefsTable for (IColumn column : row.cf.columns) { - if (column.timestamp() > timestamp) + Date columnDate = new Date(column.timestamp()); + + if (columnDate.after(now)) + { + Date micros = new Date(column.timestamp() / 1000); // assume that it was in micros + + Calendar calendar = Calendar.getInstance(); + calendar.setTime(micros); + + if ((micros.before(now) && calendar.get(Calendar.YEAR) == 1970) || micros.after(now)) + { + needsCleanup = true; + break row_check_loop; + } + } + else // millis and we have to fix it to micros { needsCleanup = true; - // exit the loop on first found timestamp mismatch as we know that it - // wouldn't be only one column/row that we would have to fix anyway break row_check_loop; } } @@ -214,6 +227,8 @@ public class DefsTable throw new AssertionError(e); } + long microTimestamp = now.getTime() * 1000; + for (Row row : rows) { if (invalidSchemaRow(row)) @@ -224,7 +239,7 @@ public class DefsTable for (IColumn column : row.cf.columns) { if (column.isLive()) - mutation.add(new QueryPath(columnFamily, null, column.name()), column.value(), timestamp); + mutation.add(new QueryPath(columnFamily, null, column.name()), column.value(), microTimestamp); } mutation.apply(); @@ -315,9 +330,9 @@ public class DefsTable */ public static void mergeRemoteSchema(byte[] data, int version) throws ConfigurationException, IOException { - if (version < MessagingService.VERSION_11) + if (version < MessagingService.VERSION_117) { - logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first."); + logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1.6, please update first."); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/db/RowMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java index 3a05df9..16fa4be 100644 --- a/src/java/org/apache/cassandra/db/RowMutation.java +++ b/src/java/org/apache/cassandra/db/RowMutation.java @@ -483,6 +483,9 @@ public class RowMutation implements IMutation, MessageProducer cf.addColumn(new Column(column.name(), column.value(), now)); } + if (cf.isMarkedForDelete() && cf.isEmpty()) + continue; + fixedModifications.put(modification.getKey(), cf); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index b2649f9..7974e6c 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -28,7 +28,6 @@ import java.nio.channels.ServerSocketChannel; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -37,7 +36,6 @@ import javax.management.ObjectName; import com.google.common.base.Function; import com.google.common.collect.Lists; -import org.apache.cassandra.concurrent.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,11 +65,13 @@ public final class MessagingService implements MessagingServiceMBean public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; // 8 bits version, so don't waste versions - public static final int VERSION_07 = 1; + public static final int VERSION_07 = 1; public static final int VERSION_080 = 2; - public static final int VERSION_10 = 3; - public static final int VERSION_11 = 4; - public static final int version_ = VERSION_11; + public static final int VERSION_10 = 3; + public static final int VERSION_11 = 4; + public static final int VERSION_117 = 5; + + public static final int version_ = VERSION_117; static SerializerType serializerType_ = SerializerType.BINARY; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd1633ba/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 446bb5c..973b190 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -94,8 +94,8 @@ public class MigrationManager implements IEndpointStateChangeSubscriber private static void rectifySchema(UUID theirVersion, final InetAddress endpoint) { - // Can't request migrations from nodes with versions younger than 1.1 - if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11) + // Can't request migrations from nodes with versions younger than 1.1.7 + if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_117) return; if (Schema.instance.getVersion().equals(theirVersion)) @@ -341,11 +341,12 @@ public class MigrationManager implements IEndpointStateChangeSubscriber liveEndpoints.remove(FBUtilities.getBroadcastAddress()); // force migration is there are nodes around, first of all - // check if there are nodes with versions >= 1.1 to request migrations from, + // check if there are nodes with versions >= 1.1.7 to request migrations from, // because migration format of the nodes with versions < 1.1 is incompatible with older versions + // and due to broken timestamps in versions prior to 1.1.7 for (InetAddress node : liveEndpoints) { - if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_11) + if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_117) { if (logger.isDebugEnabled()) logger.debug("Requesting schema from " + node);