I work with Karthik, and just realized this was posted to the old user group. Forwarding to the new Apache user group. Any input is appreciated!
Thanks On Wednesday, April 16, 2014 11:13:44 AM UTC-6, nandanavanam karthik wrote: > > Hi , > > We are building a storm application that reads records from Kafka Queue > and writes it to Phoenix database. It works perfectly fine if the number of > threads for PhoenixBolt are 1. If we increase the number, We are getting > the following error: > > 21993 [Thread-35-requestsLogBolt] ERROR backtype.storm.util - Async loop > died! > java.lang.RuntimeException: java.lang.IllegalArgumentException: > java.lang.IllegalArgumentException: > java.util.ConcurrentModificationException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) > > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) > > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) > > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745) > > ~[na:na] > at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) ~[na:na] > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] > Caused by: java.lang.IllegalArgumentException: > java.lang.IllegalArgumentException: > java.util.ConcurrentModificationException > at > com.overstock.storm.app.RequestsLogBolt.upsertRecords(RequestsLogBolt.java:51) > > ~[classes/:na] > at > com.overstock.storm.app.RequestsLogBolt.execute(RequestsLogBolt.java:33) > ~[classes/:na] > at > backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630) > > ~[na:na] > at > backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398) > > ~[na:na] > at > backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58) > > ~[na:na] > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) > > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > ... 6 common frames omitted > Caused by: java.lang.IllegalArgumentException: > java.util.ConcurrentModificationException > at > com.overstock.hadoop.phoenix.utils.PhoenixConnection.rollback(PhoenixConnection.java:115) > > ~[phoenix-utils-0.2-SNAPSHOT.jar:na] > at > com.overstock.hadoop.phoenix.utils.PhoenixConnection.access$100(PhoenixConnection.java:22) > > ~[phoenix-utils-0.2-SNAPSHOT.jar:na] > at > com.overstock.hadoop.phoenix.utils.PhoenixConnection$5.run(PhoenixConnection.java:214) > > ~[phoenix-utils-0.2-SNAPSHOT.jar:na] > at > com.overstock.hadoop.phoenix.utils.PhoenixConnection$5.run(PhoenixConnection.java:205) > > ~[phoenix-utils-0.2-SNAPSHOT.jar:na] > at java.security.AccessController.doPrivileged(Native Method) > ~[na:1.7.0_51] > at javax.security.auth.Subject.doAs(Subject.java:356) ~[na:1.7.0_51] > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1388) > > ~[hadoop-common-2.0.0-cdh4.3.0.jar:na] > at > com.overstock.hadoop.phoenix.utils.PhoenixConnection.executeBatch(PhoenixConnection.java:205) > > ~[phoenix-utils-0.2-SNAPSHOT.jar:na] > at > com.overstock.storm.app.RequestsLogBolt.upsertRecords(RequestsLogBolt.java:46) > > ~[classes/:na] > ... 12 common frames omitted > Caused by: java.util.ConcurrentModificationException: null > at java.util.HashMap$HashIterator.remove(HashMap.java:944) > ~[na:1.7.0_51] > at > org.apache.phoenix.execute.MutationState.commit(MutationState.java:438) > ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating] > at > org.apache.phoenix.jdbc.PhoenixConnection.commit(PhoenixConnection.java:351) > ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating] > at > org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:229) > > ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating] > at > org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:185) > ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating] > at > org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:146) > > ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating] > at > org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:151) > > ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating] > at > com.jolbox.bonecp.PreparedStatementHandle.execute(PreparedStatementHandle.java:140) > > ~[bonecp-0.8.0.RELEASE.jar:na] > at > com.overstock.hadoop.phoenix.utils.PhoenixConnection$5.run(PhoenixConnection.java:210) > > ~[phoenix-utils-0.2-SNAPSHOT.jar:na] > ... 18 common frames omitted > > We are using BoneCP for connection pool. Initially, We thought it is the > problem of BoneCP/PhoenixConnection. To prove it, I have written an > executor service that writes records to Phoenix table with 10 threads. It > works without any error. So, we are seeing this error only when we use > BoneCP Phoenix Connection pool in Storm Cluster. This is much closer to the > this email thread. We have the same use case as mentioned. The versions we > are using: > > phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating > bonecp-0.8.0.RELEASE.jar:na > storm-core-0.9.1-incubating.jar:0.9.1-incubating > > The solution James Taylor mentioned is to remove that iterator.remove() > from the loop. I don't think we should re-build entire Phoenix jar for that > change. Is there any other solution to run past this issue? > > -Nandanavanam Karthik > > On Thursday, August 1, 2013 7:43:31 PM UTC-6, James Taylor wrote: >> >> Try removing the iterator.remove call at com.salesforce.phoenix. >> execute.MutationState.commit(MutationState.java:232). Instead just clear >> the mutations Map at the end of the loop. It's not particularly important >> that an entry is remove as you're submitting the batch to the HBase server >> - it just is meant to provide a way for the caller to know (based on the >> state in the exception) what was successfully committed and what wasn't. >> >> Also, if you can create a standalone test that demonstrates this issue, >> we can get to the root cause and provide you a fix. >> >> Thanks, >> James >> >> >> On Thu, Aug 1, 2013 at 6:05 PM, 张晓燕 <[email protected]> wrote: >> >>> hi, We use phoenix in the bolts of storm. Get a lot data from kafka, >>> then handle it, then update to hbase with phoenix. New a connnection in a >>> bolt, select and upsert one table in evey 10seconds.Run this in storm >>> cluster, the problem "com.salesforce.phoenix.execute.CommitException: >>> java.util.ConcurrentModificationException" comes up. >>> >>> 在 2013年7月29日星期一UTC+8下午11时47分29秒,James Taylor写道: >>>> >>>> Looks like it might be a bug - can you put together a small unit test >>>> that consistently repros? >>>> >>>> Thanks, >>>> James >>>> >>>> >>>> On Jul 29, 2013, at 5:02 AM, panfei <[email protected]> wrote: >>>> >>>> com.salesforce.phoenix.execute.CommitException: java.util. >>>> ConcurrentModificationException >>>> at com.salesforce.phoenix.execute.MutationState.commit( >>>> MutationState.java:236) >>>> at com.salesforce.phoenix.jdbc.PhoenixConnection.commit( >>>> PhoenixConnection.java:244) >>>> at com.salesforce.phoenix.jdbc.PhoenixStatement. >>>> executeMutation(PhoenixStatement.java:180) >>>> at com.salesforce.phoenix.jdbc.PhoenixStatement.access$600( >>>> PhoenixStatement.java:73) >>>> at com.salesforce.phoenix.jdbc.PhoenixStatement$ >>>> ExecutableUpsertStatement.executeUpdate(PhoenixStatement.java:210) >>>> at com.salesforce.phoenix.jdbc.PhoenixStatement.executeUpdate( >>>> PhoenixStatement.java:741) >>>> at com.cy.kafka.hbase.PhoenixHBaseHelper.ExcuteNonQuery( >>>> PhoenixHBaseHelper.java:39) >>>> at com.cy.storm.bolt.dau.task.DauTask.saveData(DauTask.java: >>>> 138) >>>> at com.cy.storm.bolt.dau.task.DauTask.run(DauTask.java:34) >>>> at java.util.TimerThread.mainLoop(Timer.java:512) >>>> at java.util.TimerThread.run(Timer.java:462) >>>> Caused by: java.util.ConcurrentModificationException >>>> at java.util.HashMap$HashIterator.remove(HashMap.java:811) >>>> at com.salesforce.phoenix.execute.MutationState.commit( >>>> MutationState.java:232) >>>> ... 10 more >>>> com.salesforce.phoenix.execute.CommitException: java.util. >>>> ConcurrentModificationException >>>> at com.salesforce.phoenix.execute.MutationState.commit( >>>> MutationState.java:236) >>>> at com.salesforce.phoenix.jdbc.PhoenixConnection.commit( >>>> PhoenixConnection.java:244) >>>> at com.salesforce.phoenix.jdbc.PhoenixStatement. >>>> executeMutation(PhoenixStatement.java:180) >>>> at com.salesforce.phoenix.jdbc.PhoenixStatement.access$600( >>>> PhoenixStatement.java:73) >>>> at com.salesforce.phoenix.jdbc.PhoenixStatement$ >>>> ExecutableUpsertStatement.executeUpdate(PhoenixStatement.java:210) >>>> at com.salesforce.phoenix.jdbc.PhoenixStatement.executeUpdate( >>>> PhoenixStatement.java:741) >>>> at com.cy.kafka.hbase.PhoenixHBaseHelper.ExcuteNonQuery( >>>> PhoenixHBaseHelper.java:39) >>>> at com.cy.storm.bolt.dau.DauFilterBolt.filterDAUTuple( >>>> DauFilterBolt.java:105) >>>> at com.cy.storm.bolt.dau.DauFilterBolt.execute( >>>> DauFilterBolt.java:59) >>>> at backtype.storm.topology.BasicBoltExecutor.execute( >>>> BasicBoltExecutor.java:32) >>>> at backtype.storm.daemon.executor$fn__4050$tuple_ >>>> action_fn__4052.invoke(executor.clj:566) >>>> at backtype.storm.daemon.executor$mk_task_receiver$fn__ >>>> 3976.invoke(executor.clj:345) >>>> at backtype.storm.disruptor$clojure_handler$reify__1606. >>>> onEvent(disruptor.clj:43) >>>> at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor( >>>> DisruptorQueue.java:84) >>>> at backtype.storm.utils.DisruptorQueue. >>>> consumeBatchWhenAvailable(DisruptorQueue.java:58) >>>> at backtype.storm.disruptor$consume_batch_when_available. >>>> invoke(disruptor.clj:62) >>>> at backtype.storm.daemon.executor$fn__4050$fn__4059$fn_ >>>> _4106.invoke(executor.clj:658) >>>> at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) >>>> at clojure.lang.AFn.run(AFn.java:24) >>>> at java.lang.Thread.run(Thread.java:662) >>>> Caused by: java.util.ConcurrentModificationException >>>> at java.util.HashMap$HashIterator.remove(HashMap.java:811) >>>> at com.salesforce.phoenix.execute.MutationState.commit( >>>> MutationState.java: >>>> >>>> >>>> -- >>>> 不学习,不知道 >>>> >>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "Phoenix HBase User" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to [email protected]. >>>> For more options, visit https://groups.google.com/groups/opt_out. >>>> >>>> >>>> >>>> -- >>> You received this message because you are subscribed to the Google >>> Groups "Phoenix HBase User" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> For more options, visit https://groups.google.com/groups/opt_out. >>> >>> >>> >> >>
