[ https://issues.apache.org/jira/browse/CASSANDRA-17466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh McKenzie updated CASSANDRA-17466: -------------------------------------- Reviewers: Josh McKenzie > Shut repair task executor down without interruption to avoid compromising > shared channel proxies > ------------------------------------------------------------------------------------------------ > > Key: CASSANDRA-17466 > URL: https://issues.apache.org/jira/browse/CASSANDRA-17466 > Project: Cassandra > Issue Type: Bug > Components: Consistency/Repair > Reporter: Caleb Rackliffe > Assignee: Caleb Rackliffe > Priority: Normal > Fix For: 4.1, 4.0.x > > Time Spent: 20m > Remaining Estimate: 0h > > If a {{RepairJob}} gets past validation, it builds a list of {{SyncTask}} > items and fires them off. If any one of those fails, we grab the relevant > exception and throw it up from {{RepairJob}} to {{RepairSession}}. > {noformat} > ERROR 2022-03-09T23:53:36,721 > [Stream-Deserializer-/10.246.3.102:7000-d97958c4] > org.apache.cassandra.streaming.StreamSession:1110 - [Stream > #07c55da0-a047-11ec-8122-ab911c7a993f] Remote peer /10.246.3.102:7000 failed > stream session. > {noformat} > {{RepairSession}} then marks itself as being terminated and clears its > internal maps of active validations and sync tasks, but immediately before it > does that, it calls {{shutdownNow()}} on the executor that executes those > tasks. In the case of our failing stream session, we may still have other > running stream tasks whose threads' interrupt flag has been set, and this can > have some unintended negative consequences, because any {{ChannelProxy}} > interrupted in the middle of a blocking operation will both be closed and > throw a {{ClosedByInterruptException}}. (Keep in mind that we share > {{ChannelProxy}} instances outside a few specific cases, like those > introduced in CASSANDRA-15666.) > We've seen this manifest in production in a couple ways, both of them while > trying to read from the {{peers_v2}} system table: > {noformat} > Exception in thread Thread[RepairJobTask:23,5,main]"^M > exception="FSReadError in > .../data/system/peers_v2-c4325fbb8e5e3bafbd070f9250ed818e/system-peers_v2-nb-99-big-Data.db > at > org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:143) > at > org.apache.cassandra.io.util.CompressedChunkReader$Standard.readChunk(CompressedChunkReader.java:115) > at > org.apache.cassandra.io.util.BufferManagingRebufferer.rebuffer(BufferManagingRebufferer.java:79) > at > org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:68) > at > org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:210) > at > org.apache.cassandra.io.util.FileHandle.createReader(FileHandle.java:151) > at > org.apache.cassandra.io.sstable.format.SSTableReader.getFileDataInput(SSTableReader.java:1628) > at > org.apache.cassandra.db.columniterator.AbstractSSTableIterator.<init>(AbstractSSTableIterator.java:96) > at > org.apache.cassandra.db.columniterator.SSTableIterator.<init>(SSTableIterator.java:48) > at > org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:75) > at > org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:67) > at > org.apache.cassandra.db.StorageHook$1.makeRowIterator(StorageHook.java:87) > at > org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndSSTablesInTimestampOrder(SinglePartitionReadCommand.java:897) > at > org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDiskInternal(SinglePartitionReadCommand.java:605) > at > org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDisk(SinglePartitionReadCommand.java:578) > at > org.apache.cassandra.db.SinglePartitionReadCommand.queryStorage(SinglePartitionReadCommand.java:412) > at > org.apache.cassandra.db.ReadCommand.executeLocally(ReadCommand.java:414) > at > org.apache.cassandra.db.SinglePartitionReadQuery$Group.executeLocally(SinglePartitionReadQuery.java:242) > at > org.apache.cassandra.db.SinglePartitionReadQuery$Group.executeInternal(SinglePartitionReadQuery.java:216) > at > org.apache.cassandra.cql3.statements.SelectStatement.executeInternal(SelectStatement.java:458) > at > org.apache.cassandra.cql3.statements.SelectStatement.executeLocally(SelectStatement.java:442) > at > org.apache.cassandra.cql3.statements.SelectStatement.executeLocally(SelectStatement.java:96) > at > org.apache.cassandra.cql3.QueryProcessor.executeInternal(QueryProcessor.java:334) > at > org.apache.cassandra.db.SystemKeyspace.getPreferredIP(SystemKeyspace.java:973) > at > org.apache.cassandra.net.OutboundConnectionSettings.connectTo(OutboundConnectionSettings.java:455) > at > org.apache.cassandra.net.OutboundConnectionSettings.withDefaults(OutboundConnectionSettings.java:484) > at > org.apache.cassandra.streaming.DefaultConnectionFactory.createConnection(DefaultConnectionFactory.java:49) > at > org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createChannel(NettyStreamingMessageSender.java:199) > at > org.apache.cassandra.streaming.async.NettyStreamingMessageSender.setupControlMessageChannel(NettyStreamingMessageSender.java:180) > at > org.apache.cassandra.streaming.async.NettyStreamingMessageSender.sendMessage(NettyStreamingMessageSender.java:245) > at > org.apache.cassandra.streaming.async.NettyStreamingMessageSender.initialize(NettyStreamingMessageSender.java:149) > at > org.apache.cassandra.streaming.StreamSession.start(StreamSession.java:372) > at > org.apache.cassandra.streaming.StreamCoordinator.startSession(StreamCoordinator.java:262) > at > org.apache.cassandra.streaming.StreamCoordinator.access$700(StreamCoordinator.java:36) > at > org.apache.cassandra.streaming.StreamCoordinator$HostStreamingData.connectAllStreamSessions(StreamCoordinator.java:308) > at > org.apache.cassandra.streaming.StreamCoordinator.connectAllStreamSessions(StreamCoordinator.java:107) > at > org.apache.cassandra.streaming.StreamCoordinator.connect(StreamCoordinator.java:101) > at > org.apache.cassandra.streaming.StreamResultFuture.createInitiator(StreamResultFuture.java:98) > at > org.apache.cassandra.streaming.StreamPlan.execute(StreamPlan.java:179) > at > org.apache.cassandra.repair.LocalSyncTask.startSync(LocalSyncTask.java:113) > at org.apache.cassandra.repair.SyncTask.run(SyncTask.java:89) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at > com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.nio.channels.ClosedByInterruptException > at > java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:199) > at > java.base/sun.nio.ch.FileChannelImpl.endBlocking(FileChannelImpl.java:162) > at > java.base/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:816) > at java.base/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) > at > org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:139) > ... 48 more > {noformat} > In this case, the {{SyncTask}} is trying to read the preferred remote IP for > the node it wants to communicate with, but when it reads the {{peers_v2}} > table, it finds that it's already been closed by an interrupt, and > {{StorageProxy#read()}} wraps the {{ClosedByInterruptException}} in a > {{FSReadError}}, which triggers the disk failure policy, and kills the node. > {noformat} > message="Exception in thread Thread[CompactionExecutor:1690,1,main]"^M > exception="FSReadError in > …/data/system/peers_v2-c4325fbb8e5e3bafbd070f9250ed818e/system-peers_v2-nb-99-big-Data.db > at > org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:143) > at > org.apache.cassandra.io.util.CompressedChunkReader$Standard.readChunk(CompressedChunkReader.java:115) > at > org.apache.cassandra.io.util.BufferManagingRebufferer.rebuffer(BufferManagingRebufferer.java:79) > at > org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:68) > at > org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:210) > at > org.apache.cassandra.io.sstable.format.big.BigTableScanner.seekToCurrentRangeStart(BigTableScanner.java:196) > at > org.apache.cassandra.io.sstable.format.big.BigTableScanner.access$400(BigTableScanner.java:52) > at > org.apache.cassandra.io.sstable.format.big.BigTableScanner$KeyScanningIterator.computeNext(BigTableScanner.java:305) > at > org.apache.cassandra.io.sstable.format.big.BigTableScanner$KeyScanningIterator.computeNext(BigTableScanner.java:282) > at > org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:46) > at > org.apache.cassandra.io.sstable.format.big.BigTableScanner.hasNext(BigTableScanner.java:261) > at > org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:376) > at > org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:188) > at > org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:157) > at > org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:46) > at > org.apache.cassandra.db.partitions.UnfilteredPartitionIterators$2.hasNext(UnfilteredPartitionIterators.java:169) > at > org.apache.cassandra.db.transform.BasePartitions.hasNext(BasePartitions.java:93) > at > org.apache.cassandra.db.compaction.CompactionIterator.hasNext(CompactionIterator.java:254) > at > org.apache.cassandra.db.compaction.CompactionTask.runMayThrow(CompactionTask.java:202) > at > org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:28) > at > org.apache.cassandra.db.compaction.CompactionTask.executeInternal(CompactionTask.java:78) > at > org.apache.cassandra.db.compaction.AbstractCompactionTask.execute(AbstractCompactionTask.java:100) > at > org.apache.cassandra.db.compaction.CompactionManager$BackgroundCompactionCandidate.run(CompactionManager.java:363) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.nio.channels.ClosedChannelException > at > java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) > at java.base/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:790) > at > org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:139) > ... 28 more > {noformat} > In this case, we've been lucky enough to terminate the repair itself without > any issues, but when we try to read from the {{ChannelProxy}} from another > thread, we get a {{ClosedChannelException}}, which is also wrapped in a > {{FSReadError}}, which triggers the disk failure policy and kills the node. A > lot of violence here. Note that, in this case, while the channel is closed, > we don't see a {{ClosedByInterruptException}}, because the repair job task > thread, not the compaction thread, is interrupted. > (Note: This reproduces easiest w/ {{disk_access_mode: mmap_index_only}}.) -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org