Caleb Rackliffe created CASSANDRA-17466:
-------------------------------------------

             Summary: Shut repair task executor down without interruption to 
avoid compromising shared channel proxies
                 Key: CASSANDRA-17466
                 URL: https://issues.apache.org/jira/browse/CASSANDRA-17466
             Project: Cassandra
          Issue Type: Bug
          Components: Consistency/Repair
            Reporter: Caleb Rackliffe
            Assignee: Caleb Rackliffe


If a {{RepairJob}} gets past validation, it builds a list of {{SyncTask}} items 
and fires them off. If any one of those fails, we grab the relevant exception 
and throw it up from {{RepairJob}} to {{RepairSession}}.

{noformat}
ERROR 2022-03-09T23:53:36,721 [Stream-Deserializer-/10.246.3.102:7000-d97958c4] 
org.apache.cassandra.streaming.StreamSession:1110 - [Stream 
#07c55da0-a047-11ec-8122-ab911c7a993f] Remote peer /10.246.3.102:7000 failed 
stream session.
{noformat}

{{RepairSession}} then marks itself as being terminated and clears its internal 
maps of active validations and sync tasks, but immediately before it does that, 
it calls {{shutdownNow()}} on the executor that executes those tasks. In the 
case of our failing stream session, we may still have other running stream 
tasks whose threads' interrupt flag has been set, and this can have some 
unintended negative consequences, because any {{ChannelProxy}} interrupted in 
the middle of a blocking operation will both be closed and throw a 
{{ClosedByInterruptException}}. (Keep in mind that we share {{ChannelProxy}} 
instances outside a few specific cases, like those introduced in 
CASSANDRA-15666.)

We've seen this manifest in production in a couple ways, both of them while 
trying to read from the {{peers_v2}} system table:

{noformat}
Exception in thread Thread[RepairJobTask:23,5,main]"^M
exception="FSReadError in 
.../data/system/peers_v2-c4325fbb8e5e3bafbd070f9250ed818e/system-peers_v2-nb-99-big-Data.db
        at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:143)
        at 
org.apache.cassandra.io.util.CompressedChunkReader$Standard.readChunk(CompressedChunkReader.java:115)
        at 
org.apache.cassandra.io.util.BufferManagingRebufferer.rebuffer(BufferManagingRebufferer.java:79)
        at 
org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:68)
        at 
org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:210)
        at 
org.apache.cassandra.io.util.FileHandle.createReader(FileHandle.java:151)
        at 
org.apache.cassandra.io.sstable.format.SSTableReader.getFileDataInput(SSTableReader.java:1628)
        at 
org.apache.cassandra.db.columniterator.AbstractSSTableIterator.<init>(AbstractSSTableIterator.java:96)
        at 
org.apache.cassandra.db.columniterator.SSTableIterator.<init>(SSTableIterator.java:48)
        at 
org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:75)
        at 
org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:67)
        at 
org.apache.cassandra.db.StorageHook$1.makeRowIterator(StorageHook.java:87)
        at 
org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndSSTablesInTimestampOrder(SinglePartitionReadCommand.java:897)
        at 
org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDiskInternal(SinglePartitionReadCommand.java:605)
        at 
org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDisk(SinglePartitionReadCommand.java:578)
        at 
org.apache.cassandra.db.SinglePartitionReadCommand.queryStorage(SinglePartitionReadCommand.java:412)
        at 
org.apache.cassandra.db.ReadCommand.executeLocally(ReadCommand.java:414)
        at 
org.apache.cassandra.db.SinglePartitionReadQuery$Group.executeLocally(SinglePartitionReadQuery.java:242)
        at 
org.apache.cassandra.db.SinglePartitionReadQuery$Group.executeInternal(SinglePartitionReadQuery.java:216)
        at 
org.apache.cassandra.cql3.statements.SelectStatement.executeInternal(SelectStatement.java:458)
        at 
org.apache.cassandra.cql3.statements.SelectStatement.executeLocally(SelectStatement.java:442)
        at 
org.apache.cassandra.cql3.statements.SelectStatement.executeLocally(SelectStatement.java:96)
        at 
org.apache.cassandra.cql3.QueryProcessor.executeInternal(QueryProcessor.java:334)
        at 
org.apache.cassandra.db.SystemKeyspace.getPreferredIP(SystemKeyspace.java:973)
        at 
org.apache.cassandra.net.OutboundConnectionSettings.connectTo(OutboundConnectionSettings.java:455)
        at 
org.apache.cassandra.net.OutboundConnectionSettings.withDefaults(OutboundConnectionSettings.java:484)
        at 
org.apache.cassandra.streaming.DefaultConnectionFactory.createConnection(DefaultConnectionFactory.java:49)
        at 
org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createChannel(NettyStreamingMessageSender.java:199)
        at 
org.apache.cassandra.streaming.async.NettyStreamingMessageSender.setupControlMessageChannel(NettyStreamingMessageSender.java:180)
        at 
org.apache.cassandra.streaming.async.NettyStreamingMessageSender.sendMessage(NettyStreamingMessageSender.java:245)
        at 
org.apache.cassandra.streaming.async.NettyStreamingMessageSender.initialize(NettyStreamingMessageSender.java:149)
        at 
org.apache.cassandra.streaming.StreamSession.start(StreamSession.java:372)
        at 
org.apache.cassandra.streaming.StreamCoordinator.startSession(StreamCoordinator.java:262)
        at 
org.apache.cassandra.streaming.StreamCoordinator.access$700(StreamCoordinator.java:36)
        at 
org.apache.cassandra.streaming.StreamCoordinator$HostStreamingData.connectAllStreamSessions(StreamCoordinator.java:308)
        at 
org.apache.cassandra.streaming.StreamCoordinator.connectAllStreamSessions(StreamCoordinator.java:107)
        at 
org.apache.cassandra.streaming.StreamCoordinator.connect(StreamCoordinator.java:101)
        at 
org.apache.cassandra.streaming.StreamResultFuture.createInitiator(StreamResultFuture.java:98)
        at 
org.apache.cassandra.streaming.StreamPlan.execute(StreamPlan.java:179)
        at 
org.apache.cassandra.repair.LocalSyncTask.startSync(LocalSyncTask.java:113)
        at org.apache.cassandra.repair.SyncTask.run(SyncTask.java:89)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at 
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.nio.channels.ClosedByInterruptException
        at 
java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:199)
        at 
java.base/sun.nio.ch.FileChannelImpl.endBlocking(FileChannelImpl.java:162)
        at 
java.base/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:816)
        at java.base/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796)
        at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:139)
        ... 48 more
{noformat}

In this case, the {{SyncTask}} is trying to read the preferred remote IP for 
the node it wants to communicate with, but when it reads the {{peers_v2}} 
table, it finds that it's already been closed by an interrupt, and 
{{StorageProxy#read()}} wraps the {{ClosedByInterruptException}} in a 
{{FSReadError}}, which triggers the disk failure policy, and kills the node.

{noformat}
message="Exception in thread Thread[CompactionExecutor:1690,1,main]"^M
exception="FSReadError in 
…/data/system/peers_v2-c4325fbb8e5e3bafbd070f9250ed818e/system-peers_v2-nb-99-big-Data.db
        at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:143)
        at 
org.apache.cassandra.io.util.CompressedChunkReader$Standard.readChunk(CompressedChunkReader.java:115)
        at 
org.apache.cassandra.io.util.BufferManagingRebufferer.rebuffer(BufferManagingRebufferer.java:79)
        at 
org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:68)
        at 
org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:210)
        at 
org.apache.cassandra.io.sstable.format.big.BigTableScanner.seekToCurrentRangeStart(BigTableScanner.java:196)
        at 
org.apache.cassandra.io.sstable.format.big.BigTableScanner.access$400(BigTableScanner.java:52)
        at 
org.apache.cassandra.io.sstable.format.big.BigTableScanner$KeyScanningIterator.computeNext(BigTableScanner.java:305)
        at 
org.apache.cassandra.io.sstable.format.big.BigTableScanner$KeyScanningIterator.computeNext(BigTableScanner.java:282)
        at 
org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:46)
        at 
org.apache.cassandra.io.sstable.format.big.BigTableScanner.hasNext(BigTableScanner.java:261)
        at 
org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:376)
        at 
org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:188)
        at 
org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:157)
        at 
org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:46)
        at 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators$2.hasNext(UnfilteredPartitionIterators.java:169)
        at 
org.apache.cassandra.db.transform.BasePartitions.hasNext(BasePartitions.java:93)
        at 
org.apache.cassandra.db.compaction.CompactionIterator.hasNext(CompactionIterator.java:254)
        at 
org.apache.cassandra.db.compaction.CompactionTask.runMayThrow(CompactionTask.java:202)
        at 
org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:28)
        at 
org.apache.cassandra.db.compaction.CompactionTask.executeInternal(CompactionTask.java:78)
        at 
org.apache.cassandra.db.compaction.AbstractCompactionTask.execute(AbstractCompactionTask.java:100)
        at 
org.apache.cassandra.db.compaction.CompactionManager$BackgroundCompactionCandidate.run(CompactionManager.java:363)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.nio.channels.ClosedChannelException
        at 
java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
        at java.base/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:790)
        at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:139)
        ... 28 more
{noformat}

In this case, we've been lucky enough to terminate the repair itself without 
any issues, but when we try to read from the {{ChannelProxy}} from another 
thread, we get a {{ClosedChannelException}}, which is also wrapped in a 
{{FSReadError}}, which triggers the disk failure policy and kills the node. A 
lot of violence here. Note that, in this case, while the channel is closed, we 
don't see a {{ClosedByInterruptException}}, because the repair job task thread, 
not the compaction thread, is interrupted.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to