Hi
spark version I am using is spark-0.9.1-bin-hadoop2
I build spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
I moved JavaKafkaWordCount.java from examples to new directory to play
with it.
My compile commands:
javac -cp
libs/spark-streaming_2.10-0.9.1.jar:libs/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:libs/spark-streaming-kafka_2.10-0.9.1.jar:libs/kafka_2.9.1-0.8.1.1.jar:libs/zkclient-0.4.jar
JavaKafkaWordCount.java
jar -cvf JavaKafkaWordCount.jar JavaKafkaWordCount*
And run:
java -cp
libs/spark-streaming_2.10-0.9.1.jar:libs/zkclient-0.4.jar:libs/kafka_2.9.1-0.8.1.1.jar:libs/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:libs/spark-streaming-kafka_2.10-0.9.1.jar:./JavaKafkaWordCount.jar
JavaKafkaWordCount spark://dlvm1:7077 vm37.dbweb.ee demogroup kafkademo1 1
The job is visible in UI but I am getting:
log4j:WARN No appenders could be found for logger
(akka.event.slf4j.Slf4jLogger).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
14/05/30 11:53:42 INFO SparkEnv: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/05/30 11:53:42 INFO SparkEnv: Registering BlockManagerMaster
14/05/30 11:53:42 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140530115342-0b66
14/05/30 11:53:42 INFO MemoryStore: MemoryStore started with capacity
386.3 MB.
14/05/30 11:53:42 INFO ConnectionManager: Bound socket to port 49250
with id = ConnectionManagerId(dlvm1,49250)
14/05/30 11:53:42 INFO BlockManagerMaster: Trying to register BlockManager
14/05/30 11:53:42 INFO BlockManagerMasterActor$BlockManagerInfo:
Registering block manager dlvm1:49250 with 386.3 MB RAM
14/05/30 11:53:42 INFO BlockManagerMaster: Registered BlockManager
14/05/30 11:53:42 INFO HttpServer: Starting HTTP Server
14/05/30 11:53:42 INFO HttpBroadcast: Broadcast server started at
http://90.190.106.47:42861
14/05/30 11:53:42 INFO SparkEnv: Registering MapOutputTracker
14/05/30 11:53:42 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-76fd126e-7fcd-4df1-a967-bf4b8d356973
14/05/30 11:53:42 INFO HttpServer: Starting HTTP Server
14/05/30 11:53:43 INFO SparkUI: Started Spark Web UI at http://dlvm1:4040
14/05/30 11:53:43 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/05/30 11:53:44 INFO SparkContext: Added JAR
/opt/spark-0.9.1-bin-hadoop2/margusja_kafka/JavaKafkaWordCount.jar at
http://90.190.106.47:57550/jars/JavaKafkaWordCount.jar with timestamp
1401440024153
14/05/30 11:53:44 INFO AppClient$ClientActor: Connecting to master
spark://dlvm1:7077...
...
...
...
14/05/30 11:53:56 INFO SparkContext: Job finished: collect at
NetworkInputTracker.scala:178, took 10.617582853 s
14/05/30 11:53:56 INFO TaskSetManager: Finished TID 70 in 41 ms on dlvm1
(progress: 20/20)
14/05/30 11:53:56 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks have all completed, from pool
14/05/30 11:53:56 INFO MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@dlvm1:48363
14/05/30 11:53:56 INFO TaskSetManager: Finished TID 71 in 58 ms on dlvm1
(progress: 1/1)
14/05/30 11:53:56 INFO DAGScheduler: Completed ResultTask(4, 1)
14/05/30 11:53:56 INFO DAGScheduler: Stage 4 (take at DStream.scala:586)
finished in 0.815 s
14/05/30 11:53:56 INFO SparkContext: Job finished: take at
DStream.scala:586, took 0.844135774 s
-------------------------------------------
Time: 1401440028000 ms
-------------------------------------------
14/05/30 11:53:56 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose
tasks have all completed, from pool
14/05/30 11:53:56 INFO JobScheduler: Finished job streaming job
1401440028000 ms.0 from job set of time 1401440028000 ms
14/05/30 11:53:56 INFO JobScheduler: Total delay: 8.189 s for time
1401440028000 ms (execution: 7.847 s)
14/05/30 11:53:56 INFO SparkContext: Starting job: take at DStream.scala:586
14/05/30 11:53:56 INFO DAGScheduler: Registering RDD 17 (combineByKey at
ShuffledDStream.scala:42)
14/05/30 11:53:56 INFO DAGScheduler: Got job 3 (take at
DStream.scala:586) with 1 output partitions (allowLocal=true)
14/05/30 11:53:56 INFO DAGScheduler: Final stage: Stage 6 (take at
DStream.scala:586)
14/05/30 11:53:56 INFO DAGScheduler: Parents of final stage: List(Stage 7)
14/05/30 11:53:56 INFO DAGScheduler: Missing parents: List()
14/05/30 11:53:56 INFO DAGScheduler: Submitting Stage 6
(MapPartitionsRDD[19] at combineByKey at ShuffledDStream.scala:42),
which has no missing parents
14/05/30 11:53:56 INFO JobScheduler: Starting job streaming job
1401440030000 ms.0 from job set of time 1401440030000 ms
14/05/30 11:53:56 INFO SparkContext: Starting job: runJob at
NetworkInputTracker.scala:182
14/05/30 11:53:56 INFO DAGScheduler: Submitting 1 missing tasks from
Stage 6 (MapPartitionsRDD[19] at combineByKey at ShuffledDStream.scala:42)
14/05/30 11:53:56 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
14/05/30 11:53:56 INFO TaskSetManager: Starting task 6.0:0 as TID 72 on
executor 0: dlvm1 (PROCESS_LOCAL)
14/05/30 11:53:56 INFO TaskSetManager: Serialized task 6.0:0 as 1958
bytes in 0 ms
14/05/30 11:53:56 INFO DAGScheduler: Got job 4 (runJob at
NetworkInputTracker.scala:182) with 1 output partitions (allowLocal=false)
14/05/30 11:53:56 INFO DAGScheduler: Final stage: Stage 8 (runJob at
NetworkInputTracker.scala:182)
14/05/30 11:53:56 INFO DAGScheduler: Parents of final stage: List()
14/05/30 11:53:56 INFO DAGScheduler: Missing parents: List()
14/05/30 11:53:56 INFO DAGScheduler: Submitting Stage 8
(ParallelCollectionRDD[0] at makeRDD at NetworkInputTracker.scala:165),
which has no missing parents
14/05/30 11:53:56 INFO MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 2 to spark@dlvm1:48363
14/05/30 11:53:56 INFO MapOutputTrackerMaster: Size of output statuses
for shuffle 2 is 82 bytes
14/05/30 11:53:56 INFO TaskSetManager: Finished TID 72 in 37 ms on dlvm1
(progress: 1/1)
14/05/30 11:53:56 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose
tasks have all completed, from pool
14/05/30 11:53:56 INFO DAGScheduler: Submitting 1 missing tasks from
Stage 8 (ParallelCollectionRDD[0] at makeRDD at
NetworkInputTracker.scala:165)
14/05/30 11:53:56 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks
14/05/30 11:53:56 INFO TaskSetManager: Starting task 8.0:0 as TID 73 on
executor 0: dlvm1 (PROCESS_LOCAL)
14/05/30 11:53:56 INFO TaskSetManager: Serialized task 8.0:0 as 2975
bytes in 1 ms
14/05/30 11:53:56 INFO DAGScheduler: Completed ResultTask(6, 0)
14/05/30 11:53:56 INFO DAGScheduler: Stage 6 (take at DStream.scala:586)
finished in 0.051 s
14/05/30 11:53:56 INFO SparkContext: Job finished: take at
DStream.scala:586, took 0.087153883 s
14/05/30 11:53:56 INFO SparkContext: Starting job: take at DStream.scala:586
14/05/30 11:53:56 INFO DAGScheduler: Got job 5 (take at
DStream.scala:586) with 1 output partitions (allowLocal=true)
14/05/30 11:53:56 INFO DAGScheduler: Final stage: Stage 9 (take at
DStream.scala:586)
14/05/30 11:53:56 INFO DAGScheduler: Parents of final stage: List(Stage 10)
14/05/30 11:53:56 INFO DAGScheduler: Missing parents: List()
14/05/30 11:53:56 INFO DAGScheduler: Submitting Stage 9
(MapPartitionsRDD[19] at combineByKey at ShuffledDStream.scala:42),
which has no missing parents
14/05/30 11:53:56 INFO DAGScheduler: Submitting 1 missing tasks from
Stage 9 (MapPartitionsRDD[19] at combineByKey at ShuffledDStream.scala:42)
14/05/30 11:53:56 INFO TaskSchedulerImpl: Adding task set 9.0 with 1 tasks
14/05/30 11:53:56 INFO TaskSetManager: Starting task 9.0:0 as TID 74 on
executor 0: dlvm1 (PROCESS_LOCAL)
14/05/30 11:53:56 INFO TaskSetManager: Serialized task 9.0:0 as 1958
bytes in 0 ms
14/05/30 11:53:56 WARN TaskSetManager: Lost TID 73 (task 8.0:0)
14/05/30 11:53:56 WARN TaskSetManager: Loss was due to
java.lang.ClassNotFoundException
java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145)
at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/05/30 11:53:56 INFO TaskSetManager: Starting task 8.0:0 as TID 75 on
executor 0: dlvm1 (PROCESS_LOCAL)
14/05/30 11:53:56 INFO TaskSetManager: Serialized task 8.0:0 as 2975
bytes in 1 ms
14/05/30 11:53:56 INFO TaskSetManager: Finished TID 74 in 62 ms on dlvm1
(progress: 1/1)
14/05/30 11:53:56 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose
tasks have all completed, from pool
14/05/30 11:53:56 INFO DAGScheduler: Completed ResultTask(9, 1)
14/05/30 11:53:56 INFO DAGScheduler: Stage 9 (take at DStream.scala:586)
finished in 0.083 s
14/05/30 11:53:56 INFO SparkContext: Job finished: take at
DStream.scala:586, took 0.101449564 s
-------------------------------------------
Time: 1401440030000 ms
-------------------------------------------
...
...
...
I know that KafkaReceiver is in my package:
[root@dlvm1 margusja_kafka]# cd libs/
[root@dlvm1 libs]# jar xvf spark-
spark-assembly_2.10-0.9.1-hadoop2.2.0.jar spark-streaming_2.10-0.9.1.jar
spark-streaming-kafka_2.10-0.9.1.jar
[root@dlvm1 libs]# jar xvf spark-streaming-kafka_2.10-0.9.1.jar | grep
KafkaReceiver
inflated: org/apache/spark/streaming/kafka/KafkaReceiver$$anonfun$1.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$MessageHandler$$anonfun$run$2.class
inflated: org/apache/spark/streaming/kafka/KafkaReceiver.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$$anonfun$onStart$3.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$$anonfun$onStart$1.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$MessageHandler.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$$anonfun$tryZookeeperConsumerGroupCleanup$1.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$$anonfun$onStart$5.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$$anonfun$onStart$2.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$MessageHandler$$anonfun$run$1.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$$anonfun$onStart$4.class
inflated:
org/apache/spark/streaming/kafka/KafkaReceiver$$anonfun$onStart$5$$anonfun$apply$1.class
[root@dlvm1 libs]#
Any ideas?
--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"