Caleb Rackliffe created CASSANDRA-17466: -------------------------------------------
Summary: Shut repair task executor down without interruption to avoid compromising shared channel proxies Key: CASSANDRA-17466 URL: https://issues.apache.org/jira/browse/CASSANDRA-17466 Project: Cassandra Issue Type: Bug Components: Consistency/Repair Reporter: Caleb Rackliffe Assignee: Caleb Rackliffe If a {{RepairJob}} gets past validation, it builds a list of {{SyncTask}} items and fires them off. If any one of those fails, we grab the relevant exception and throw it up from {{RepairJob}} to {{RepairSession}}. {noformat} ERROR 2022-03-09T23:53:36,721 [Stream-Deserializer-/10.246.3.102:7000-d97958c4] org.apache.cassandra.streaming.StreamSession:1110 - [Stream #07c55da0-a047-11ec-8122-ab911c7a993f] Remote peer /10.246.3.102:7000 failed stream session. {noformat} {{RepairSession}} then marks itself as being terminated and clears its internal maps of active validations and sync tasks, but immediately before it does that, it calls {{shutdownNow()}} on the executor that executes those tasks. In the case of our failing stream session, we may still have other running stream tasks whose threads' interrupt flag has been set, and this can have some unintended negative consequences, because any {{ChannelProxy}} interrupted in the middle of a blocking operation will both be closed and throw a {{ClosedByInterruptException}}. (Keep in mind that we share {{ChannelProxy}} instances outside a few specific cases, like those introduced in CASSANDRA-15666.) We've seen this manifest in production in a couple ways, both of them while trying to read from the {{peers_v2}} system table: {noformat} Exception in thread Thread[RepairJobTask:23,5,main]"^M exception="FSReadError in .../data/system/peers_v2-c4325fbb8e5e3bafbd070f9250ed818e/system-peers_v2-nb-99-big-Data.db at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:143) at org.apache.cassandra.io.util.CompressedChunkReader$Standard.readChunk(CompressedChunkReader.java:115) at org.apache.cassandra.io.util.BufferManagingRebufferer.rebuffer(BufferManagingRebufferer.java:79) at org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:68) at org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:210) at org.apache.cassandra.io.util.FileHandle.createReader(FileHandle.java:151) at org.apache.cassandra.io.sstable.format.SSTableReader.getFileDataInput(SSTableReader.java:1628) at org.apache.cassandra.db.columniterator.AbstractSSTableIterator.<init>(AbstractSSTableIterator.java:96) at org.apache.cassandra.db.columniterator.SSTableIterator.<init>(SSTableIterator.java:48) at org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:75) at org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:67) at org.apache.cassandra.db.StorageHook$1.makeRowIterator(StorageHook.java:87) at org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndSSTablesInTimestampOrder(SinglePartitionReadCommand.java:897) at org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDiskInternal(SinglePartitionReadCommand.java:605) at org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDisk(SinglePartitionReadCommand.java:578) at org.apache.cassandra.db.SinglePartitionReadCommand.queryStorage(SinglePartitionReadCommand.java:412) at org.apache.cassandra.db.ReadCommand.executeLocally(ReadCommand.java:414) at org.apache.cassandra.db.SinglePartitionReadQuery$Group.executeLocally(SinglePartitionReadQuery.java:242) at org.apache.cassandra.db.SinglePartitionReadQuery$Group.executeInternal(SinglePartitionReadQuery.java:216) at org.apache.cassandra.cql3.statements.SelectStatement.executeInternal(SelectStatement.java:458) at org.apache.cassandra.cql3.statements.SelectStatement.executeLocally(SelectStatement.java:442) at org.apache.cassandra.cql3.statements.SelectStatement.executeLocally(SelectStatement.java:96) at org.apache.cassandra.cql3.QueryProcessor.executeInternal(QueryProcessor.java:334) at org.apache.cassandra.db.SystemKeyspace.getPreferredIP(SystemKeyspace.java:973) at org.apache.cassandra.net.OutboundConnectionSettings.connectTo(OutboundConnectionSettings.java:455) at org.apache.cassandra.net.OutboundConnectionSettings.withDefaults(OutboundConnectionSettings.java:484) at org.apache.cassandra.streaming.DefaultConnectionFactory.createConnection(DefaultConnectionFactory.java:49) at org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createChannel(NettyStreamingMessageSender.java:199) at org.apache.cassandra.streaming.async.NettyStreamingMessageSender.setupControlMessageChannel(NettyStreamingMessageSender.java:180) at org.apache.cassandra.streaming.async.NettyStreamingMessageSender.sendMessage(NettyStreamingMessageSender.java:245) at org.apache.cassandra.streaming.async.NettyStreamingMessageSender.initialize(NettyStreamingMessageSender.java:149) at org.apache.cassandra.streaming.StreamSession.start(StreamSession.java:372) at org.apache.cassandra.streaming.StreamCoordinator.startSession(StreamCoordinator.java:262) at org.apache.cassandra.streaming.StreamCoordinator.access$700(StreamCoordinator.java:36) at org.apache.cassandra.streaming.StreamCoordinator$HostStreamingData.connectAllStreamSessions(StreamCoordinator.java:308) at org.apache.cassandra.streaming.StreamCoordinator.connectAllStreamSessions(StreamCoordinator.java:107) at org.apache.cassandra.streaming.StreamCoordinator.connect(StreamCoordinator.java:101) at org.apache.cassandra.streaming.StreamResultFuture.createInitiator(StreamResultFuture.java:98) at org.apache.cassandra.streaming.StreamPlan.execute(StreamPlan.java:179) at org.apache.cassandra.repair.LocalSyncTask.startSync(LocalSyncTask.java:113) at org.apache.cassandra.repair.SyncTask.run(SyncTask.java:89) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.nio.channels.ClosedByInterruptException at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:199) at java.base/sun.nio.ch.FileChannelImpl.endBlocking(FileChannelImpl.java:162) at java.base/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:816) at java.base/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:139) ... 48 more {noformat} In this case, the {{SyncTask}} is trying to read the preferred remote IP for the node it wants to communicate with, but when it reads the {{peers_v2}} table, it finds that it's already been closed by an interrupt, and {{StorageProxy#read()}} wraps the {{ClosedByInterruptException}} in a {{FSReadError}}, which triggers the disk failure policy, and kills the node. {noformat} message="Exception in thread Thread[CompactionExecutor:1690,1,main]"^M exception="FSReadError in …/data/system/peers_v2-c4325fbb8e5e3bafbd070f9250ed818e/system-peers_v2-nb-99-big-Data.db at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:143) at org.apache.cassandra.io.util.CompressedChunkReader$Standard.readChunk(CompressedChunkReader.java:115) at org.apache.cassandra.io.util.BufferManagingRebufferer.rebuffer(BufferManagingRebufferer.java:79) at org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:68) at org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:210) at org.apache.cassandra.io.sstable.format.big.BigTableScanner.seekToCurrentRangeStart(BigTableScanner.java:196) at org.apache.cassandra.io.sstable.format.big.BigTableScanner.access$400(BigTableScanner.java:52) at org.apache.cassandra.io.sstable.format.big.BigTableScanner$KeyScanningIterator.computeNext(BigTableScanner.java:305) at org.apache.cassandra.io.sstable.format.big.BigTableScanner$KeyScanningIterator.computeNext(BigTableScanner.java:282) at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:46) at org.apache.cassandra.io.sstable.format.big.BigTableScanner.hasNext(BigTableScanner.java:261) at org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:376) at org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:188) at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:157) at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:46) at org.apache.cassandra.db.partitions.UnfilteredPartitionIterators$2.hasNext(UnfilteredPartitionIterators.java:169) at org.apache.cassandra.db.transform.BasePartitions.hasNext(BasePartitions.java:93) at org.apache.cassandra.db.compaction.CompactionIterator.hasNext(CompactionIterator.java:254) at org.apache.cassandra.db.compaction.CompactionTask.runMayThrow(CompactionTask.java:202) at org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:28) at org.apache.cassandra.db.compaction.CompactionTask.executeInternal(CompactionTask.java:78) at org.apache.cassandra.db.compaction.AbstractCompactionTask.execute(AbstractCompactionTask.java:100) at org.apache.cassandra.db.compaction.CompactionManager$BackgroundCompactionCandidate.run(CompactionManager.java:363) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:790) at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:139) ... 28 more {noformat} In this case, we've been lucky enough to terminate the repair itself without any issues, but when we try to read from the {{ChannelProxy}} from another thread, we get a {{ClosedChannelException}}, which is also wrapped in a {{FSReadError}}, which triggers the disk failure policy and kills the node. A lot of violence here. Note that, in this case, while the channel is closed, we don't see a {{ClosedByInterruptException}}, because the repair job task thread, not the compaction thread, is interrupted. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org