This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 858a2b232178427a7f7696b4f086c1b8c1e2282d Merge: e471a57dc2 9157d98e4c Author: Jon Meredith <jonmered...@apache.org> AuthorDate: Thu Apr 25 14:00:02 2024 -0600 Merge branch 'cassandra-4.0' into cassandra-4.1 CHANGES.txt | 1 + .../org/apache/cassandra/db/lifecycle/LogFile.java | 36 ++-- .../cassandra/db/lifecycle/LogReplicaSet.java | 6 +- .../TransactionAlreadyCompletedException.java | 36 ++++ .../apache/cassandra/streaming/StreamSession.java | 22 ++- .../streaming/StreamFailedWhileReceivingTest.java | 208 +++++++++++++++++++++ .../cassandra/db/lifecycle/LogTransactionTest.java | 32 ++++ 7 files changed, 326 insertions(+), 15 deletions(-) diff --cc CHANGES.txt index 188ce274b2,64c63912ba..23dbee1941 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,8 -1,5 +1,9 @@@ -4.0.13 +4.1.5 + * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495) + * Do not go to disk for reading hints file sizes (CASSANDRA-19477) + * Fix system_views.settings to handle array types (CASSANDRA-19475) +Merged from 4.0: + * Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736) * Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566) * Fix few types issues and implement types compatibility tests (CASSANDRA-19479) * Optionally avoid hint transfer during decommission (CASSANDRA-19525) diff --cc src/java/org/apache/cassandra/db/lifecycle/LogFile.java index 11e3ffbde1,d67019008f..9db3cb477c --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@@ -277,12 -289,14 +291,14 @@@ final class LogFile implements AutoClos void commit() { - addRecord(LogRecord.makeCommit(System.currentTimeMillis())); + addRecord(LogRecord.makeCommit(currentTimeMillis())); + completed = true; } void abort() { - addRecord(LogRecord.makeAbort(System.currentTimeMillis())); + addRecord(LogRecord.makeAbort(currentTimeMillis())); + completed = true; } private boolean isLastRecordValidWithType(Type type) diff --cc src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java index 316e4b628d,3fa4d95579..c505926db7 --- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java @@@ -17,8 -17,8 +17,7 @@@ */ package org.apache.cassandra.db.lifecycle; -import java.io.File; import java.util.Collection; - import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@@ -27,8 -27,9 +26,10 @@@ import java.util.Set import java.util.function.Function; import java.util.stream.Collectors; + import javax.annotation.concurrent.NotThreadSafe; + import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.io.util.File; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --cc src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java index 0000000000,0ee3c3e5cb..ee13b54ccd mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java +++ b/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java @@@ -1,0 -1,36 +1,36 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.lifecycle; + -import java.io.File; + import java.util.List; + ++import org.apache.cassandra.io.util.File; + + public class TransactionAlreadyCompletedException extends IllegalStateException + { + private TransactionAlreadyCompletedException(List<File> files) + { + super("Transaction already completed. " + files); + } + + static TransactionAlreadyCompletedException create(List<File> files) + { + return new TransactionAlreadyCompletedException(files); + } + } diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 7b07a0d179,21d5afe480..17ceca137c --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -49,8 -32,11 +49,9 @@@ import org.apache.cassandra.utils.concu import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.channel.Channel; -import io.netty.channel.ChannelId; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.lifecycle.TransactionAlreadyCompletedException; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; @@@ -680,10 -673,10 +700,10 @@@ public class StreamSession implements I logError(e); - if (messageSender.connected()) + if (channel.connected()) { - state(State.FAILED); // make sure subsequent error handling sees the session in a final state + state(State.FAILED); // make sure subsequent error handling sees the session in a final state - messageSender.sendMessage(new SessionFailedMessage()); + channel.sendControlMessage(new SessionFailedMessage()).awaitUninterruptibly(); } return closeSession(State.FAILED); diff --cc test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java index 0000000000,7cd346cd9a..99dfd440b7 mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java @@@ -1,0 -1,208 +1,208 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.distributed.test.streaming; + -import java.io.File; + import java.io.IOException; + import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; + import java.util.concurrent.TimeUnit; + + import org.junit.Test; + + import net.bytebuddy.ByteBuddy; + import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; + import net.bytebuddy.implementation.MethodDelegation; + import net.bytebuddy.implementation.bind.annotation.SuperCall; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.rows.UnfilteredRowIterator; + import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader; + import org.apache.cassandra.db.streaming.CassandraIncomingFile; + import org.apache.cassandra.db.streaming.CassandraStreamManager; + import org.apache.cassandra.db.streaming.CassandraStreamReceiver; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.Feature; + import org.apache.cassandra.distributed.api.IInvokableInstance; + import org.apache.cassandra.distributed.test.TestBaseImpl; + import org.apache.cassandra.exceptions.StartupException; + import org.apache.cassandra.io.sstable.Descriptor; + import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; + import org.apache.cassandra.io.sstable.format.SSTableFormat; + import org.apache.cassandra.io.sstable.format.Version; ++import org.apache.cassandra.io.util.File; + import org.apache.cassandra.schema.Schema; + import org.apache.cassandra.streaming.IncomingStream; + import org.apache.cassandra.streaming.StreamSession; + import org.apache.cassandra.streaming.messages.StreamMessageHeader; ++import org.apache.cassandra.utils.concurrent.CountDownLatch; + + import static net.bytebuddy.matcher.ElementMatchers.named; + import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + + /** This is a somewhat brittle test to demonstrate transaction log corruption + when streaming is aborted as streamed sstables are added to the + transaction log concurrently. + + The transaction log should not be modified after streaming + has aborted or completed it. + */ + public class StreamFailedWhileReceivingTest extends TestBaseImpl + { + @Test + public void zeroCopy() throws IOException + { + streamClose(true); + } + + @Test + public void notZeroCopy() throws IOException + { + streamClose(false); + } + + private void streamClose(boolean zeroCopyStreaming) throws IOException + { + try (Cluster cluster = Cluster.build(2) + .withInstanceInitializer(BBHelper::install) + .withConfig(c -> c.with(Feature.values()) + .set("stream_entire_sstables", zeroCopyStreaming) + .set("autocompaction_on_startup_enabled", false)) + .start()) + { + init(cluster); + + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY)")); + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance node2 = cluster.get(2); + for (int i = 1; i <= 100; i++) + node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES (?)"), i); + node1.flush(KEYSPACE); + + // trigger streaming; expected to fail as streaming socket closed in the middle (currently this is an unrecoverable event) + node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl"); + + node2.runOnInstance(() -> { + try + { + // use the startup logic to check for corrupt txn logfiles from the streaming failure + // quicker than restarting the instance to check + ColumnFamilyStore.scrubDataDirectories(Schema.instance.getTableMetadata(KEYSPACE, "tbl")); + } + catch (StartupException ex) + { + throw new RuntimeException(ex); + } + }); + } + } + + + public static class BBHelper + { + static volatile StreamSession firstSession; - static CountDownLatch firstStreamAbort = new CountDownLatch(1); // per-instance ++ static CountDownLatch firstStreamAbort = CountDownLatch.newCountDownLatch(1); // per-instance + + // CassandraStreamManager.prepareIncomingStream + @SuppressWarnings("unused") + public static IncomingStream prepareIncomingStream(StreamSession session, StreamMessageHeader header, @SuperCall Callable<IncomingStream> zuper) throws Exception + { + if (firstSession == null) + firstSession = session; + return zuper.call(); + } + + // CassandraStreamReceiver.abort + @SuppressWarnings("unused") + public static void abort(@SuperCall Callable<Integer> zuper) throws Exception + { - firstStreamAbort.countDown(); ++ firstStreamAbort.decrement(); + zuper.call(); + } + + // RangeAwareSSTableWriter.append + @SuppressWarnings("unused") + public static boolean append(UnfilteredRowIterator partition, @SuperCall Callable<Boolean> zuper) throws Exception + { + // handles compressed and non-compressed + if (isCaller(CassandraIncomingFile.class.getName(), "read")) + { + if (firstSession != null) + { + firstSession.abort(); + // delay here until CassandraStreamReceiver abort is called on NonPeriodic tasks - firstStreamAbort.await(1, TimeUnit.MINUTES); ++ firstStreamAbort.awaitUninterruptibly(1, TimeUnit.MINUTES); + } + } + return zuper.call(); + } + + // ColumnFamilyStore.createWriter - for entire sstable streaming, before adding to LogTransaction + @SuppressWarnings("unused") + public static Descriptor newSSTableDescriptor(File directory, Version version, SSTableFormat.Type format, @SuperCall Callable<Descriptor> zuper) throws Exception + { + if (isCaller(CassandraEntireSSTableStreamReader.class.getName(), "read")) + // handles compressed and non-compressed + { + if (firstSession != null) + { + firstSession.abort(); + // delay here until CassandraStreamReceiver abort is called on NonPeriodic tasks - firstStreamAbort.await(1, TimeUnit.MINUTES); ++ firstStreamAbort.awaitUninterruptibly(1, TimeUnit.MINUTES); + } + } + return zuper.call(); + } + + private static boolean isCaller(String klass, String method) + { + StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + for (int i = 0; i < stack.length; i++) + { + StackTraceElement e = stack[i]; + if (klass.equals(e.getClassName()) && method.equals(e.getMethodName())) + return true; + } + return false; + } + + public static void install(ClassLoader classLoader, Integer num) + { + if (num != 2) // only target the second instance + return; + + new ByteBuddy().rebase(CassandraStreamManager.class) + .method(named("prepareIncomingStream").and(takesArguments(2))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + new ByteBuddy().rebase(RangeAwareSSTableWriter.class) + .method(named("append").and(takesArguments(1))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + new ByteBuddy().rebase(ColumnFamilyStore.class) + .method(named("newSSTableDescriptor").and(takesArguments(3))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + new ByteBuddy().rebase(CassandraStreamReceiver.class) + .method(named("abort").and(takesArguments(0))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + } + } diff --cc test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java index 2812353f36,7d1cb39ae3..88164555cb --- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java @@@ -31,6 -33,6 +31,7 @@@ import java.util.Map import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; ++import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@@ -47,16 -48,19 +48,22 @@@ import org.apache.cassandra.db.Serializ import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; + import org.apache.cassandra.io.sstable.SSTable; ++import org.apache.cassandra.io.sstable.SSTableId; +import org.apache.cassandra.io.sstable.SequenceBasedSSTableId; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; + import org.apache.cassandra.io.util.DiskOptimizationStrategy; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.FileUtils; + import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy; import org.apache.cassandra.schema.MockSchema; + import org.apache.cassandra.schema.TableMetadata; + import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; @@@ -1393,4 -1400,30 +1400,29 @@@ public class LogTransactionTest extend .flatMap(LogTransactionTest::toCanonicalIgnoringNotFound) .collect(Collectors.toSet()); } + + final String DUMMY_KS = "ks"; + final String DUMMY_TBL = "tbl"; + final File dir = new File("."); - int nextGeneration = 1; ++ Supplier<SequenceBasedSSTableId> idSupplier = SequenceBasedSSTableId.Builder.instance.generator(Stream.of()); + final Set<Component> dummyComponents = Collections.singleton(Component.DATA); + final TableMetadataRef dummyMetadata = TableMetadataRef.forOfflineTools(TableMetadata.minimal(DUMMY_KS, DUMMY_TBL)); + final DiskOptimizationStrategy dummyOptimizationStrategy = new SpinningDiskOptimizationStrategy(); + + SSTable dummySSTable() + { - int id = nextGeneration; - nextGeneration++; ++ SSTableId id = idSupplier.get(); + Descriptor descriptor = new Descriptor(dir, DUMMY_KS, DUMMY_TBL, id); + return new SSTable(descriptor, dummyComponents, dummyMetadata, dummyOptimizationStrategy) + { + }; + } + + @Test(expected = TransactionAlreadyCompletedException.class) + public void useAfterCompletedTest() + { + LogTransaction txnFile = new LogTransaction(OperationType.STREAM); + txnFile.abort(); // this should complete the txn + txnFile.trackNew(dummySSTable()); // expect an IllegalStateException here + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org