This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 858a2b232178427a7f7696b4f086c1b8c1e2282d
Merge: e471a57dc2 9157d98e4c
Author: Jon Meredith <jonmered...@apache.org>
AuthorDate: Thu Apr 25 14:00:02 2024 -0600

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/lifecycle/LogFile.java |  36 ++--
 .../cassandra/db/lifecycle/LogReplicaSet.java      |   6 +-
 .../TransactionAlreadyCompletedException.java      |  36 ++++
 .../apache/cassandra/streaming/StreamSession.java  |  22 ++-
 .../streaming/StreamFailedWhileReceivingTest.java  | 208 +++++++++++++++++++++
 .../cassandra/db/lifecycle/LogTransactionTest.java |  32 ++++
 7 files changed, 326 insertions(+), 15 deletions(-)

diff --cc CHANGES.txt
index 188ce274b2,64c63912ba..23dbee1941
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 -4.0.13
 +4.1.5
 + * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495)
 + * Do not go to disk for reading hints file sizes (CASSANDRA-19477)
 + * Fix system_views.settings to handle array types (CASSANDRA-19475)
 +Merged from 4.0:
+  * Streaming exception race creates corrupt transaction log files that 
prevent restart (CASSANDRA-18736)
   * Fix CQL tojson timestamp output on negative timestamp values before 
Gregorian calendar reform in 1582 (CASSANDRA-19566)
   * Fix few types issues and implement types compatibility tests 
(CASSANDRA-19479)
   * Optionally avoid hint transfer during decommission (CASSANDRA-19525)
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 11e3ffbde1,d67019008f..9db3cb477c
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@@ -277,12 -289,14 +291,14 @@@ final class LogFile implements AutoClos
  
      void commit()
      {
 -        addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
 +        addRecord(LogRecord.makeCommit(currentTimeMillis()));
+         completed = true;
      }
  
      void abort()
      {
 -        addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
 +        addRecord(LogRecord.makeAbort(currentTimeMillis()));
+         completed = true;
      }
  
      private boolean isLastRecordValidWithType(Type type)
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
index 316e4b628d,3fa4d95579..c505926db7
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@@ -17,8 -17,8 +17,7 @@@
   */
  package org.apache.cassandra.db.lifecycle;
  
 -import java.io.File;
  import java.util.Collection;
- import java.util.Collections;
  import java.util.LinkedHashMap;
  import java.util.List;
  import java.util.Map;
@@@ -27,8 -27,9 +26,10 @@@ import java.util.Set
  import java.util.function.Function;
  import java.util.stream.Collectors;
  
+ import javax.annotation.concurrent.NotThreadSafe;
+ 
  import com.google.common.annotations.VisibleForTesting;
 +import org.apache.cassandra.io.util.File;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
diff --cc 
src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
index 0000000000,0ee3c3e5cb..ee13b54ccd
mode 000000,100644..100644
--- 
a/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
+++ 
b/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
@@@ -1,0 -1,36 +1,36 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.lifecycle;
+ 
 -import java.io.File;
+ import java.util.List;
+ 
++import org.apache.cassandra.io.util.File;
+ 
+ public class TransactionAlreadyCompletedException extends 
IllegalStateException
+ {
+     private TransactionAlreadyCompletedException(List<File> files)
+     {
+         super("Transaction already completed. " + files);
+     }
+ 
+     static TransactionAlreadyCompletedException create(List<File> files)
+     {
+         return new TransactionAlreadyCompletedException(files);
+     }
+ }
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 7b07a0d179,21d5afe480..17ceca137c
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -49,8 -32,11 +49,9 @@@ import org.apache.cassandra.utils.concu
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import io.netty.channel.Channel;
 -import io.netty.channel.ChannelId;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.lifecycle.TransactionAlreadyCompletedException;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.gms.*;
@@@ -680,10 -673,10 +700,10 @@@ public class StreamSession implements I
  
          logError(e);
  
 -        if (messageSender.connected())
 +        if (channel.connected())
          {
-             state(State.FAILED); // make sure subsequent error handling sees 
the session in a final state 
+             state(State.FAILED); // make sure subsequent error handling sees 
the session in a final state
 -            messageSender.sendMessage(new SessionFailedMessage());
 +            channel.sendControlMessage(new 
SessionFailedMessage()).awaitUninterruptibly();
          }
  
          return closeSession(State.FAILED);
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
index 0000000000,7cd346cd9a..99dfd440b7
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
@@@ -1,0 -1,208 +1,208 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.distributed.test.streaming;
+ 
 -import java.io.File;
+ import java.io.IOException;
+ import java.util.concurrent.Callable;
 -import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.junit.Test;
+ 
+ import net.bytebuddy.ByteBuddy;
+ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+ import net.bytebuddy.implementation.MethodDelegation;
+ import net.bytebuddy.implementation.bind.annotation.SuperCall;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
+ import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+ import org.apache.cassandra.db.streaming.CassandraStreamManager;
+ import org.apache.cassandra.db.streaming.CassandraStreamReceiver;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.distributed.test.TestBaseImpl;
+ import org.apache.cassandra.exceptions.StartupException;
+ import org.apache.cassandra.io.sstable.Descriptor;
+ import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+ import org.apache.cassandra.io.sstable.format.SSTableFormat;
+ import org.apache.cassandra.io.sstable.format.Version;
++import org.apache.cassandra.io.util.File;
+ import org.apache.cassandra.schema.Schema;
+ import org.apache.cassandra.streaming.IncomingStream;
+ import org.apache.cassandra.streaming.StreamSession;
+ import org.apache.cassandra.streaming.messages.StreamMessageHeader;
++import org.apache.cassandra.utils.concurrent.CountDownLatch;
+ 
+ import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+ 
+ /** This is a somewhat brittle test to demonstrate transaction log corruption
+     when streaming is aborted as streamed sstables are added to the
+     transaction log concurrently.
+ 
+     The transaction log should not be modified after streaming
+     has aborted or completed it.
+ */
+ public class StreamFailedWhileReceivingTest extends TestBaseImpl
+ {
+     @Test
+     public void zeroCopy() throws IOException
+     {
+         streamClose(true);
+     }
+ 
+     @Test
+     public void notZeroCopy() throws IOException
+     {
+         streamClose(false);
+     }
+ 
+     private void streamClose(boolean zeroCopyStreaming) throws IOException
+     {
+         try (Cluster cluster = Cluster.build(2)
+                                       
.withInstanceInitializer(BBHelper::install)
+                                       .withConfig(c -> 
c.with(Feature.values())
+                                                         
.set("stream_entire_sstables", zeroCopyStreaming)
+                                                         
.set("autocompaction_on_startup_enabled", false))
+                                       .start())
+         {
+             init(cluster);
+ 
+             cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY)"));
+             IInvokableInstance node1 = cluster.get(1);
+             IInvokableInstance node2 = cluster.get(2);
+             for (int i = 1; i <= 100; i++)
+                 node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) 
VALUES (?)"), i);
+             node1.flush(KEYSPACE);
+ 
+             // trigger streaming; expected to fail as streaming socket closed 
in the middle (currently this is an unrecoverable event)
+             node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl");
+ 
+             node2.runOnInstance(() -> {
+                 try
+                 {
+                     // use the startup logic to check for corrupt txn 
logfiles from the streaming failure
+                     // quicker than restarting the instance to check
+                     
ColumnFamilyStore.scrubDataDirectories(Schema.instance.getTableMetadata(KEYSPACE,
 "tbl"));
+                 }
+                 catch (StartupException ex)
+                 {
+                     throw new RuntimeException(ex);
+                 }
+             });
+         }
+     }
+ 
+ 
+     public static class BBHelper
+     {
+         static volatile StreamSession firstSession;
 -        static CountDownLatch firstStreamAbort = new CountDownLatch(1); // 
per-instance
++        static CountDownLatch firstStreamAbort = 
CountDownLatch.newCountDownLatch(1); // per-instance
+ 
+         // CassandraStreamManager.prepareIncomingStream
+         @SuppressWarnings("unused")
+         public static IncomingStream prepareIncomingStream(StreamSession 
session, StreamMessageHeader header, @SuperCall Callable<IncomingStream> zuper) 
throws Exception
+         {
+             if (firstSession == null)
+                 firstSession = session;
+             return zuper.call();
+         }
+ 
+         // CassandraStreamReceiver.abort
+         @SuppressWarnings("unused")
+         public static void abort(@SuperCall Callable<Integer> zuper) throws 
Exception
+         {
 -            firstStreamAbort.countDown();
++            firstStreamAbort.decrement();
+             zuper.call();
+         }
+ 
+         // RangeAwareSSTableWriter.append
+         @SuppressWarnings("unused")
+         public static boolean append(UnfilteredRowIterator partition, 
@SuperCall Callable<Boolean> zuper) throws Exception
+         {
+             // handles compressed and non-compressed
+             if (isCaller(CassandraIncomingFile.class.getName(), "read"))
+              {
+                 if (firstSession != null)
+                 {
+                     firstSession.abort();
+                     // delay here until CassandraStreamReceiver abort is 
called on NonPeriodic tasks
 -                    firstStreamAbort.await(1, TimeUnit.MINUTES);
++                    firstStreamAbort.awaitUninterruptibly(1, 
TimeUnit.MINUTES);
+                 }
+              }
+             return zuper.call();
+         }
+ 
+         // ColumnFamilyStore.createWriter - for entire sstable streaming, 
before adding to LogTransaction
+         @SuppressWarnings("unused")
+         public static Descriptor newSSTableDescriptor(File directory, Version 
version, SSTableFormat.Type format, @SuperCall Callable<Descriptor> zuper) 
throws Exception
+         {
+             if (isCaller(CassandraEntireSSTableStreamReader.class.getName(), 
"read"))
+             // handles compressed and non-compressed
+             {
+                 if (firstSession != null)
+                 {
+                     firstSession.abort();
+                     // delay here until CassandraStreamReceiver abort is 
called on NonPeriodic tasks
 -                    firstStreamAbort.await(1, TimeUnit.MINUTES);
++                    firstStreamAbort.awaitUninterruptibly(1, 
TimeUnit.MINUTES);
+                 }
+             }
+             return zuper.call();
+         }
+ 
+         private static boolean isCaller(String klass, String method)
+         {
+             StackTraceElement[] stack = 
Thread.currentThread().getStackTrace();
+             for (int i = 0; i < stack.length; i++)
+             {
+                 StackTraceElement e = stack[i];
+                 if (klass.equals(e.getClassName()) && 
method.equals(e.getMethodName()))
+                     return true;
+             }
+             return false;
+         }
+ 
+         public static void install(ClassLoader classLoader, Integer num)
+         {
+             if (num != 2) // only target the second instance
+                 return;
+ 
+             new ByteBuddy().rebase(CassandraStreamManager.class)
+                            
.method(named("prepareIncomingStream").and(takesArguments(2)))
+                            .intercept(MethodDelegation.to(BBHelper.class))
+                            .make()
+                            .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+             new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+                            .method(named("append").and(takesArguments(1)))
+                            .intercept(MethodDelegation.to(BBHelper.class))
+                            .make()
+                            .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+             new ByteBuddy().rebase(ColumnFamilyStore.class)
+                            
.method(named("newSSTableDescriptor").and(takesArguments(3)))
+                            .intercept(MethodDelegation.to(BBHelper.class))
+                            .make()
+                            .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+             new ByteBuddy().rebase(CassandraStreamReceiver.class)
+                            .method(named("abort").and(takesArguments(0)))
+                            .intercept(MethodDelegation.to(BBHelper.class))
+                            .make()
+                            .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+         }
+     }
+ }
diff --cc test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 2812353f36,7d1cb39ae3..88164555cb
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@@ -31,6 -33,6 +31,7 @@@ import java.util.Map
  import java.util.Set;
  import java.util.function.BiConsumer;
  import java.util.function.Consumer;
++import java.util.function.Supplier;
  import java.util.stream.Collectors;
  import java.util.stream.Stream;
  
@@@ -47,16 -48,19 +48,22 @@@ import org.apache.cassandra.db.Serializ
  import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
+ import org.apache.cassandra.io.sstable.SSTable;
++import org.apache.cassandra.io.sstable.SSTableId;
 +import org.apache.cassandra.io.sstable.SequenceBasedSSTableId;
  import org.apache.cassandra.io.sstable.format.SSTableFormat;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.sstable.metadata.MetadataType;
  import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+ import org.apache.cassandra.io.util.DiskOptimizationStrategy;
 +import org.apache.cassandra.io.util.File;
  import org.apache.cassandra.io.util.FileHandle;
  import org.apache.cassandra.io.util.FileUtils;
+ import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy;
  import org.apache.cassandra.schema.MockSchema;
+ import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.schema.TableMetadataRef;
  import org.apache.cassandra.utils.AlwaysPresentFilter;
  import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
@@@ -1393,4 -1400,30 +1400,29 @@@ public class LogTransactionTest extend
                         
.flatMap(LogTransactionTest::toCanonicalIgnoringNotFound)
                         .collect(Collectors.toSet());
      }
+ 
+     final String DUMMY_KS = "ks";
+     final String DUMMY_TBL = "tbl";
+     final File dir = new File(".");
 -    int nextGeneration = 1;
++    Supplier<SequenceBasedSSTableId> idSupplier = 
SequenceBasedSSTableId.Builder.instance.generator(Stream.of());
+     final Set<Component> dummyComponents = 
Collections.singleton(Component.DATA);
+     final TableMetadataRef dummyMetadata = 
TableMetadataRef.forOfflineTools(TableMetadata.minimal(DUMMY_KS, DUMMY_TBL));
+     final DiskOptimizationStrategy dummyOptimizationStrategy = new 
SpinningDiskOptimizationStrategy();
+ 
+     SSTable dummySSTable()
+     {
 -        int id = nextGeneration;
 -        nextGeneration++;
++        SSTableId id = idSupplier.get();
+         Descriptor descriptor = new Descriptor(dir, DUMMY_KS, DUMMY_TBL, id);
+         return new SSTable(descriptor, dummyComponents, dummyMetadata, 
dummyOptimizationStrategy)
+         {
+         };
+     }
+ 
+     @Test(expected = TransactionAlreadyCompletedException.class)
+     public void useAfterCompletedTest()
+     {
+         LogTransaction txnFile = new LogTransaction(OperationType.STREAM);
+         txnFile.abort(); // this should complete the txn
+         txnFile.trackNew(dummySSTable()); // expect an IllegalStateException 
here
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to