[ https://issues.apache.org/jira/browse/CASSANDRA-11670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15290085#comment-15290085 ]
Paulo Motta commented on CASSANDRA-11670: ----------------------------------------- Attaching initial patch for review. Basic idea is to introduce a {{BatchlogBuilder}}, that can be instantiated with a maximum size, and creates new batchlogs as mutation are added to it if it exceeds the specified maximum size. Based on this, I added a {{BatchlogManager.storeMultiBatch}} method that receives a list of mutations and stores them in multiple batches, respecting the maximum size of (90%*max_mutation_size_in_kb) per batch. This method is used when [storing views on a bootstrapping or moving node|https://github.com/pauloricardomg/cassandra/blob/63c5001e7cde2a6296fe3b14e96bb9225d893585/src/java/org/apache/cassandra/service/StorageProxy.java#L721], since we skip the normal MV path if there are pending endpoint. When adding mutations via the normal MV path, a {{BatchlogBuilder}} is created and [mutations are appended to it|https://github.com/pauloricardomg/cassandra/blob/63c5001e7cde2a6296fe3b14e96bb9225d893585/src/java/org/apache/cassandra/service/StorageProxy.java#L738] as the view write response handlers are created. Each time a new batchlog is created, a new {{BatchlogCleanup}} callback object is [created|https://github.com/pauloricardomg/cassandra/blob/63c5001e7cde2a6296fe3b14e96bb9225d893585/src/java/org/apache/cassandra/service/StorageProxy.java#L742], so it removes each batchlog independently after all of its mutations are written. Since we don't know the amount of mutations a batchlog will contain ahead of time, we increase the mutation count in the {{BatchlogCleanup}} as they are added to the {{BatchlogBuilder}}. I noticed that when we apply the view mutation locally we don't currently decrease the {{mutationsWaitingFor}} on {{BatchlogCleanup}}, so does this mean we never clean up view batchlogs when there are local paired mutations? In order to avoid this in the new approach I incremented the number of mutations in the {{BatchlogCleanup}} [only if the replica is not applied locally|https://github.com/pauloricardomg/cassandra/blob/63c5001e7cde2a6296fe3b14e96bb9225d893585/src/java/org/apache/cassandra/service/StorageProxy.java#L766]. Can we further optimize this and only add the replica to the batchlog if it's not local? I added a [dtest|https://github.com/riptano/cassandra-dtest/compare/master...pauloricardomg:11670] that reproduces this issue and it is fixed by this approach. If it looks good I will add more thorough testing and documentation to {{BatchlogBuilder}}. Patch and tests available below: ||3.0||dtest|| |[branch|https://github.com/apache/cassandra/compare/cassandra-3.0...pauloricardomg:3.0-11670]|[branch|https://github.com/riptano/cassandra-dtest/compare/master...pauloricardomg:11670]| |[testall|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-3.0-11670-testall/lastCompletedBuild/testReport/]| |[dtest|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-3.0-11670-dtest/lastCompletedBuild/testReport/]| > Error while waiting on bootstrap to complete. Bootstrap will have to be > restarted. Stream failed > ------------------------------------------------------------------------------------------------ > > Key: CASSANDRA-11670 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11670 > Project: Cassandra > Issue Type: Bug > Components: Configuration, Streaming and Messaging > Reporter: Anastasia Osintseva > Assignee: Paulo Motta > Fix For: 3.0.5 > > > I have in cluster 2 DC, in each DC - 2 Nodes. I wanted to add 1 node to each > DC. One node has been added successfully after I had made scrubing. > Now I'm trying to add node to another DC, but get error: > org.apache.cassandra.streaming.StreamException: Stream failed. > After scrubing and repair I get the same error. > {noformat} > ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 Keyspace.java:492 - > Unknown exception caught while attempting to update MaterializedView! > messages_dump.messages > java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large > for the maxiumum size of 33554432 > at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) > [apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) > [apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) > [apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) > [apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) > [apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) > [apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) > [apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) > [apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) > [apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169) > [apache-cassandra-3.0.5.jar:3.0.5] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_11] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_11] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_11] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_11] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11] > ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 > StreamReceiveTask.java:214 - Error applying streamed data: > java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large > for the maxiumum size of 33554432 > at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_11] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_11] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_11] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_11] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11] > ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 StreamSession.java:520 - > [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] Streaming error occurred > java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large > for the maxiumum size of 33554432 > at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_11] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_11] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_11] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_11] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11] > DEBUG [StreamReceiveTask:5] 2016-04-27 00:33:21,082 > ConnectionHandler.java:110 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] > Closing stream connection handler on /88.9.99.92 > DEBUG [STREAM-OUT-/88.9.99.92] 2016-04-27 00:33:21,082 > ConnectionHandler.java:341 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] > Sending Session Failed > INFO [StreamReceiveTask:5] 2016-04-27 00:33:21,082 > StreamResultFuture.java:182 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] > Session with /88.9.99.92 is complete > WARN [StreamReceiveTask:5] 2016-04-27 00:33:21,182 > StreamResultFuture.java:209 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] > Stream failed > ERROR [main] 2016-04-27 00:33:21,259 StorageService.java:1300 - Error while > waiting on bootstrap to complete. Bootstrap will have to be restarted. > java.util.concurrent.ExecutionException: > org.apache.cassandra.streaming.StreamException: Stream failed > at > com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) > ~[guava-18.0.jar:na] > at > com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) > ~[guava-18.0.jar:na] > at > com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) > ~[guava-18.0.jar:na] > at > org.apache.cassandra.service.StorageService.bootstrap(StorageService.java:1295) > [apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:971) > [apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.StorageService.initServer(StorageService.java:745) > [apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.StorageService.initServer(StorageService.java:610) > [apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:333) > [apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:551) > [apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:679) > [apache-cassandra-3.0.5.jar:3.0.5] > Caused by: org.apache.cassandra.streaming.StreamException: Stream failed > at > org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at com.google.common.util.concurrent.Futures$6.run(Futures.java:1310) > ~[guava-18.0.jar:na] > at > com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457) > ~[guava-18.0.jar:na] > at > com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156) > ~[guava-18.0.jar:na] > at > com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145) > ~[guava-18.0.jar:na] > at > com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202) > ~[guava-18.0.jar:na] > at > org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:210) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:186) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:430) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.streaming.StreamSession.onError(StreamSession.java:525) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:216) > ~[apache-cassandra-3.0.5.jar:3.0.5] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[na:1.8.0_11] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[na:1.8.0_11] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > ~[na:1.8.0_11] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ~[na:1.8.0_11] > at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_11] > {noformat} > I set commitlog_segment_size_in_mb: 128, but it didn't help. -- This message was sent by Atlassian JIRA (v6.3.4#6332)