[ 
https://issues.apache.org/jira/browse/CASSANDRA-11670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15290085#comment-15290085
 ] 

Paulo Motta commented on CASSANDRA-11670:
-----------------------------------------

Attaching initial patch for review.

Basic idea is to introduce a {{BatchlogBuilder}}, that can be instantiated with 
a maximum size, and creates new batchlogs as mutation are added to it if it 
exceeds the specified maximum size. Based on this, I added a 
{{BatchlogManager.storeMultiBatch}} method that receives a list of mutations 
and stores them in multiple batches, respecting the maximum size of 
(90%*max_mutation_size_in_kb) per batch. This method is used when [storing 
views on a bootstrapping or moving 
node|https://github.com/pauloricardomg/cassandra/blob/63c5001e7cde2a6296fe3b14e96bb9225d893585/src/java/org/apache/cassandra/service/StorageProxy.java#L721],
 since we skip the normal MV path if there are pending endpoint.

When adding mutations via the normal MV path, a {{BatchlogBuilder}} is created 
and [mutations are appended to 
it|https://github.com/pauloricardomg/cassandra/blob/63c5001e7cde2a6296fe3b14e96bb9225d893585/src/java/org/apache/cassandra/service/StorageProxy.java#L738]
 as the view write response handlers are created. Each time a new batchlog is 
created, a new {{BatchlogCleanup}} callback object is 
[created|https://github.com/pauloricardomg/cassandra/blob/63c5001e7cde2a6296fe3b14e96bb9225d893585/src/java/org/apache/cassandra/service/StorageProxy.java#L742],
 so it removes each batchlog independently after all of its mutations are 
written. Since we don't know the amount of mutations a batchlog will contain 
ahead of time, we increase the mutation count in the {{BatchlogCleanup}} as 
they are added to the {{BatchlogBuilder}}.

I noticed that when we apply the view mutation locally we don't currently 
decrease the {{mutationsWaitingFor}} on {{BatchlogCleanup}}, so does this mean 
we never clean up view batchlogs when there are local paired mutations? In 
order to avoid this in the new approach I incremented the number of mutations 
in the {{BatchlogCleanup}} [only if the replica is not applied 
locally|https://github.com/pauloricardomg/cassandra/blob/63c5001e7cde2a6296fe3b14e96bb9225d893585/src/java/org/apache/cassandra/service/StorageProxy.java#L766].
 Can we further optimize this and only add the replica to the batchlog if it's 
not local?

I added a 
[dtest|https://github.com/riptano/cassandra-dtest/compare/master...pauloricardomg:11670]
 that reproduces this issue and it is fixed by this approach.  If it looks good 
I will add more thorough testing and documentation to {{BatchlogBuilder}}.

Patch and tests available below:

||3.0||dtest||
|[branch|https://github.com/apache/cassandra/compare/cassandra-3.0...pauloricardomg:3.0-11670]|[branch|https://github.com/riptano/cassandra-dtest/compare/master...pauloricardomg:11670]|
|[testall|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-3.0-11670-testall/lastCompletedBuild/testReport/]|
|[dtest|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-3.0-11670-dtest/lastCompletedBuild/testReport/]|

> Error while waiting on bootstrap to complete. Bootstrap will have to be 
> restarted. Stream failed
> ------------------------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-11670
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-11670
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Configuration, Streaming and Messaging
>            Reporter: Anastasia Osintseva
>            Assignee: Paulo Motta
>             Fix For: 3.0.5
>
>
> I have in cluster 2 DC, in each DC - 2 Nodes. I wanted to add 1 node to each 
> DC. One node has been added successfully after I had made scrubing. 
> Now I'm trying to add node to another DC, but get error: 
> org.apache.cassandra.streaming.StreamException: Stream failed. 
> After scrubing and repair I get the same error.  
> {noformat}
> ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 Keyspace.java:492 - 
> Unknown exception caught while attempting to update MaterializedView! 
> messages_dump.messages
> java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large 
> for the maxiumum size of 33554432
>       at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_11]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_11]
>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11]
> ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 
> StreamReceiveTask.java:214 - Error applying streamed data: 
> java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large 
> for the maxiumum size of 33554432
>       at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_11]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_11]
>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11]
> ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 StreamSession.java:520 - 
> [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] Streaming error occurred
> java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large 
> for the maxiumum size of 33554432
>       at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_11]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_11]
>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11]
> DEBUG [StreamReceiveTask:5] 2016-04-27 00:33:21,082 
> ConnectionHandler.java:110 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] 
> Closing stream connection handler on /88.9.99.92
> DEBUG [STREAM-OUT-/88.9.99.92] 2016-04-27 00:33:21,082 
> ConnectionHandler.java:341 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] 
> Sending Session Failed
> INFO  [StreamReceiveTask:5] 2016-04-27 00:33:21,082 
> StreamResultFuture.java:182 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] 
> Session with /88.9.99.92 is complete
> WARN  [StreamReceiveTask:5] 2016-04-27 00:33:21,182 
> StreamResultFuture.java:209 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] 
> Stream failed
> ERROR [main] 2016-04-27 00:33:21,259 StorageService.java:1300 - Error while 
> waiting on bootstrap to complete. Bootstrap will have to be restarted.
> java.util.concurrent.ExecutionException: 
> org.apache.cassandra.streaming.StreamException: Stream failed
>       at 
> com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) 
> ~[guava-18.0.jar:na]
>       at 
> org.apache.cassandra.service.StorageService.bootstrap(StorageService.java:1295)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:971)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageService.initServer(StorageService.java:745)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageService.initServer(StorageService.java:610)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:333) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:551)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:679) 
> [apache-cassandra-3.0.5.jar:3.0.5]
> Caused by: org.apache.cassandra.streaming.StreamException: Stream failed
>       at 
> org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at com.google.common.util.concurrent.Futures$6.run(Futures.java:1310) 
> ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
>  ~[guava-18.0.jar:na]
>       at 
> org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:210)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:186)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:430)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamSession.onError(StreamSession.java:525) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:216)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_11]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  ~[na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  ~[na:1.8.0_11]
>       at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_11]
> {noformat}
> I set commitlog_segment_size_in_mb: 128, but it didn't help. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to