[ https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mykhaylo Telizhyn updated SPARK-4830: ------------------------------------- Description: h4. Application Overview: We have Spark Streaming application that consumes messages from RabbitMQ and processes them. When generating hundreds of events on RabbitMQ and running our application on spark standalone cluster we see some {{java.lang.ClassNotFoundException}} exceptions in the log. Our domain model is simple POJO that represents RabbitMQ events we want to consume and contains some custom properties we are interested in: {code:title=com.xxx.Event.java|borderStyle=solid} public class Event implements java.io.Externalizable { // custom properties // custom implementation of writeExternal(), readExternal() methods } {code} We have implemented custom Spark Streaming receiver that just receives messages from RabbitMQ queue by means of custom consumer (See _"Receiving messages by subscription"_ at https://www.rabbitmq.com/api-guide.html), converts them to our custom domain event objects ({{com.xxx.Event}}) and stores them on spark memory: {code:title=RabbitMQReceiver.java|borderStyle=solid} byte[] body = // data received from Rabbit using custom consumer Event event = new Event(body); store(event) // store into Spark {code} The main program is simple, it just set up spark streaming context: {code:title=Application.java|borderStyle=solid} SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME); sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList()); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(BATCH_DURATION_MS)); {code} Initialize input streams: {code:title=Application.java|borderStyle=solid} ReceiverInputDStream<Event> stream = // create input stream from RabbitMQ JavaReceiverInputDStream<Event> events = new JavaReceiverInputDStream<Event>(stream, classTag(Event.class)); {code} Process events: {code:title=Application.java|borderStyle=solid} events.foreachRDD( rdd -> { rdd.foreachPartition( partition -> { // process partition } } }) ssc.start(); ssc.awaitTermination(); {code} h4. Application submission: Application is packaged as a single fat jar file using maven shade plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled with spark version _1.1.0_ We run our application on spark version _1.1.0_ standalone cluster that consists of driver host, master host and two worker hosts. We submit application from driver host. On one of the workers we see {{java.lang.ClassNotFoundException}} exceptions: {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff} 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message java.lang.ClassNotFoundException: com.xxx.Event at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104) at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748) at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639) at org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {panel} We see that worker has downloaded application.jar and added it to class loader: {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff} 14/11/27 10:26:59 INFO Executor: Fetching http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213 14/11/27 10:26:59 INFO Utils: Fetching http://xx.xx.xx.xx:38287/jars/application.jar to /tmp/fetchFileTemp8223721356974787443.tmp 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4 14/11/27 10:27:01 INFO Executor: Adding file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class loader {panel} Also we manually checked jar file and it contains {{com.xxx.Event}} class was: h4. Application Overview: We have Spark Streaming application that consumes messages from RabbitMQ and processes them. When generating hundreds of events on RabbitMQ and running our application on spark standalone cluster we see some {{java.lang.ClassNotFoundException}} exceptions in the log. Our domain model is simple POJO that represents RabbitMQ events we want to consume and contains some custom properties we are interested in: {code:title=com.xxx.Event.java|borderStyle=solid} public class Event implements java.io.Externalizable { // custom properties // custom implementation of writeExternal(), readExternal() methods } {code} We have implemented custom Spark Streaming receiver that just receives messages from RabbitMQ queue by means of custom consumer (See _"Receiving messages by subscription"_ at https://www.rabbitmq.com/api-guide.html), converts them to our custom domain event objects ({{com.xxx.Event}}) and stores them on spark memory: {code:title=RabbitMQReceiver.java|borderStyle=solid} byte[] body = // data received from Rabbit using custom consumer Event event = new Event(body); store(event) // store into Spark {code} The main program is simple, it just set up spark streaming context: {code:title=Application.java|borderStyle=solid} SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME); sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList()); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(BATCH_DURATION_MS)); {code} Initialize input streams: {code:title=Application.java|borderStyle=solid} ReceiverInputDStream<Event> stream = // create input stream from RabbitMQ JavaReceiverInputDStream<Event> events = new JavaReceiverInputDStream<Event>(stream, classTag(Event.class)); {code} Process events: {code:title=Application.java|borderStyle=solid} events.foreachRDD( rdd -> { rdd.foreachPartition( partition -> { // process partition } } }) ssc.start(); ssc.awaitTermination(); {code} h4. Application submission: Application is packaged as a single fat jar file using maven shade plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled with spark version _1.1.0_ We run our application on spark version _1.1.0_ standalone cluster that consists of driver host, master host and two worker hosts. We submit application from driver host. On one of the workers we see {{java.lang.ClassNotFoundException}} exceptions: {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff} 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message java.lang.ClassNotFoundException: com.xxx.Event at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104) at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748) at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639) at org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {panel} We see that worker has downloaded application.jar and added it to class loader: {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff} 14/11/27 10:26:59 INFO Executor: Fetching http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213 14/11/27 10:26:59 INFO Utils: Fetching http://xx.xx.xx.xx:38287/jars/application.jar to /tmp/fetchFileTemp8223721356974787443.tmp 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4 14/11/27 10:27:01 INFO Executor: Adding file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class loader {panel} Also we manually checked jar file and it contains {{com.xxx.Event}} class > Spark Java Application : java.lang.ClassNotFoundException > --------------------------------------------------------- > > Key: SPARK-4830 > URL: https://issues.apache.org/jira/browse/SPARK-4830 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.1.0 > Reporter: Mykhaylo Telizhyn > > h4. Application Overview: > > We have Spark Streaming application that consumes messages from > RabbitMQ and processes them. When generating hundreds of events on RabbitMQ > and running our application on spark standalone cluster we see some > {{java.lang.ClassNotFoundException}} exceptions in the log. > Our domain model is simple POJO that represents RabbitMQ events we want to > consume and contains some custom properties we are interested in: > {code:title=com.xxx.Event.java|borderStyle=solid} > public class Event implements java.io.Externalizable { > > // custom properties > // custom implementation of writeExternal(), readExternal() > methods > } > {code} > We have implemented custom Spark Streaming receiver that just > receives messages from RabbitMQ queue by means of custom consumer (See > _"Receiving messages by subscription"_ at > https://www.rabbitmq.com/api-guide.html), converts them to our custom domain > event objects ({{com.xxx.Event}}) and stores them on spark memory: > {code:title=RabbitMQReceiver.java|borderStyle=solid} > byte[] body = // data received from Rabbit using custom consumer > Event event = new Event(body); > store(event) // store into Spark > {code} > The main program is simple, it just set up spark streaming context: > {code:title=Application.java|borderStyle=solid} > SparkConf sparkConf = new > SparkConf().setAppName(APPLICATION_NAME); > > sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList()); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > new Duration(BATCH_DURATION_MS)); > {code} > Initialize input streams: > {code:title=Application.java|borderStyle=solid} > ReceiverInputDStream<Event> stream = // create input stream from > RabbitMQ > JavaReceiverInputDStream<Event> events = new > JavaReceiverInputDStream<Event>(stream, classTag(Event.class)); > {code} > Process events: > {code:title=Application.java|borderStyle=solid} > events.foreachRDD( > rdd -> { > rdd.foreachPartition( > partition -> { > > // process partition > } > } > }) > > ssc.start(); > ssc.awaitTermination(); > {code} > h4. Application submission: > > Application is packaged as a single fat jar file using maven shade > plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled > with spark version _1.1.0_ > We run our application on spark version _1.1.0_ standalone cluster > that consists of driver host, master host and two worker hosts. We submit > application from driver host. > > On one of the workers we see {{java.lang.ClassNotFoundException}} > exceptions: > {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff} > 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message > java.lang.ClassNotFoundException: com.xxx.Event > at java.net.URLClassLoader$1.run(URLClassLoader.java:372) > at java.net.URLClassLoader$1.run(URLClassLoader.java:361) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:360) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:344) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at > org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235) > at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126) > at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104) > at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748) > at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639) > at > org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92) > at > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) > at > org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682) > at > org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {panel} > We see that worker has downloaded application.jar and added it to class > loader: > {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff} > 14/11/27 10:26:59 INFO Executor: Fetching > http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213 > 14/11/27 10:26:59 INFO Utils: Fetching > http://xx.xx.xx.xx:38287/jars/application.jar to > /tmp/fetchFileTemp8223721356974787443.tmp > 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4 > 14/11/27 10:27:01 INFO Executor: Adding > file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class > loader > {panel} > Also we manually checked jar file and it contains {{com.xxx.Event}} class -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org