FW: Spark error while running in spark mode
From: Ratika Prasad Sent: Monday, October 05, 2015 2:39 PM To: u...@spark.apache.org Cc: Ameeta Jayarajan Subject: Spark error while running in spark mode Hi, When we run our spark component in cluster mode as below we get the following error ./bin/spark-submit --class com.coupons.stream.processing.SparkStreamEventProcessingEngine --master spark://172.28.161.138:7077 EventProcessingEngine-0.0.1-SNAPSHOT-jar-with-dependencies.jar ERROR ErrorMonitor: dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://sparkMaster@172.28.161.138:7077/]] arriving at [akka.tcp://sparkMaster@172.28.161.138:7077] inbound addresses are [akka.tcp://sparkDriver@172.28.161.138:7077] akka.event.Logging$Error$NoCause$ Kindly help
RE: Spark-Kafka Connector issue
Thanks for your reply. I invoked my program with the broker ip and host and it triggered as expected but I see the below error ./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:9092 TestTopic 15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data. Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException org.apache.spark.SparkException: Couldn't find leader offsets for Set([TestTopic,0]) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Whene I ran the below to check the offsets I get this bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic TestTopic --group test-consumer-group --zookeeper localhost:2181 Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/TestTopic /0. Also I just added this below configs to my kafaka/config/consumer.properties and restarted kafka auto.offset.reset=smallest offsets.storage=zookeeper offsets.channel.backoff.ms=1000 offsets.channel.socket.timeout.ms=1 offsets.commit.max.retries=5 dual.commit.enabled=true From: Cody Koeninger [mailto:c...@koeninger.org] Sent: Monday, September 28, 2015 7:56 PM To: Ratika Prasad Cc: dev@spark.apache.org Subject: Re: Spark-Kafka Connector issue This is a user list question not a dev list question. Looks like your driver is having trouble communicating to the kafka brokers. Make sure the broker host and port is available from the driver host (using nc or telnet); make sure that you're providing the _broker_ host and port to createDirectStream, not the zookeeper host; make sure the topics in question actually exist on kafka and the names match what you're providing to createDirectStream. On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad mailto:rpra...@couponsinc.com>> wrote: Hi All, I am trying out the spark streaming and reading the messages from kafka topics which later would be created into streams as below…I have the kafka setup on a vm and topics created however when I try to run the program below from my spark vm as below I get an error even though the kafka server and zookeeper are up and running ./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:2181<http://172.28.161.32:2181> redemption_inbound Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) at org.apache.sp
Spark-Kafka Connector issue
Hi All, I am trying out the spark streaming and reading the messages from kafka topics which later would be created into streams as below...I have the kafka setup on a vm and topics created however when I try to run the program below from my spark vm as below I get an error even though the kafka server and zookeeper are up and running ./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:2181 redemption_inbound Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Program public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: DirectKafkaWordCount \n" + " is a list of one or more Kafka brokers\n" + " is a list of one or more kafka topics to consume from\n\n"); System.exit(1); } String brokers = args[0]; String topics = args[1]; // Create context with 2 second batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStreamEventProcessing"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); HashSet topicsSet = new HashSet(Arrays.asList(topics.split(","))); HashMap kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list", brokers); // Create direct kafka stream with brokers and topics JavaPairInputDStream messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); // Get the lines, split them into words, count the words and print JavaDStream lines = messages.map(new Function, String>() { public String call(Tuple2 tuple2) { return tuple2._2(); } }); JavaDStream words = lines.flatMap(new FlatMapFunction() { public Iterable call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream wordCounts = words.mapToPair( new PairFunction() { public Tuple2 call(String s) { return new Tuple2(s, 1); } }).reduceByKey( new Function2() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); System.out.println("Word Counts are : " + wordCounts.toString()); // Start the computation jssc.start(); jssc.awaitTermination(); } }
Re: Creating RDD with key and Subkey
We need to create RDDas below JavaPairRDD>>> The idea is we need to do lookup() on Key which will return a list of hash maps kind of structure and then do lookup on subkey which is the key in the HashMap returned _ From: Silas Davis mailto:si...@silasdavis.net>> Sent: Wednesday, August 19, 2015 10:34 pm Subject: Re: Creating RDD with key and Subkey To: Ratika Prasad mailto:rpra...@couponsinc.com>>, mailto:dev@spark.apache.org>> This should be sent to the user mailing list, I think. It depends what you want to do with the RDD, so yes you could throw around (String, HashMap>) tuples or perhaps you'd like to be able to groupByKey, reduceByKey on the key and sub-key as a composite in which case JavaPairRDD, List> might be more appropriate. Not really clear what you are asking. On Wed, 19 Aug 2015 at 17:15 Ratika Prasad < rpra...@couponsinc.com<mailto:rpra...@couponsinc.com>> wrote: Hi, We have a need where we need the RDD with the following format JavaPairRDD>>, mostly RDD with a Key and Subkey kind of a structure, how is that doable in Spark ? Thanks R
Creating RDD with key and Subkey
Hi, We have a need where we need the RDD with the following format JavaPairRDD>>, mostly RDD with a Key and Subkey kind of a structure, how is that doable in Spark ? Thanks R
RE: Unable to run the spark application in standalone cluster mode
Should this be done on master or slave node or both ? From: Madhusudanan Kandasamy [mailto:madhusuda...@in.ibm.com] Sent: Wednesday, August 19, 2015 9:31 PM To: Ratika Prasad Cc: dev@spark.apache.org Subject: Re: Unable to run the spark application in standalone cluster mode Try Increasing the spark worker memory in conf/spark-env.sh export SPARK_WORKER_MEMORY=2g Thanks, Madhu. [Inactive hide details for Ratika Prasad ---08/19/2015 09:22:37 PM---Ratika Prasad ]Ratika Prasad ---08/19/2015 09:22:37 PM---Ratika Prasad mailto:rpra...@couponsinc.com>> Ratika Prasad mailto:rpra...@couponsinc.com>> 08/19/2015 09:22 PM To "dev@spark.apache.org<mailto:dev@spark.apache.org>" mailto:dev@spark.apache.org>> cc Subject Unable to run the spark application in standalone cluster mode Hi , We have a simple spark application which is running through when run locally on master node as below ./bin/spark-submit --class com.coupons.salestransactionprocessor.SalesTransactionDataPointCreation --master local sales-transaction-processor-0.0.1-SNAPSHOT-jar-with-dependencies.jar But however I try to run it in cluster mode [ our spark cluster has two nodes one master and one slave with executer memory of 512MB], the application fails with the below, Pls provide some inputs as to why? 15/08/19 15:37:52 INFO client.AppClient$ClientActor: Executor updated: app-20150819153234-0001/8 is now RUNNING 15/08/19 15:37:56 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:38:11 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:38:26 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor updated: app-20150819153234-0001/8 is now EXITED (Command exited with code 1) 15/08/19 15:38:32 INFO cluster.SparkDeploySchedulerBackend: Executor app-20150819153234-0001/8 removed: Command exited with code 1 15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor added: app-20150819153234-0001/9 on worker-20150812111932-ip-172-28-161-173.us-west-2.compute.internal-50108 (ip-172-28-161-173.us-west-2.compute.internal:50108) with 1 cores 15/08/19 15:38:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150819153234-0001/9 on hostPort ip-172-28-161-173.us-west-2.compute.internal:50108 with 1 cores, 512.0 MB RAM 15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor updated: app-20150819153234-0001/9 is now RUNNING 15/08/19 15:38:41 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:38:56 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:39:11 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:39:12 INFO client.AppClient$ClientActor: Executor updated: app-20150819153234-0001/9 is now EXITED (Command exited with code 1) 15/08/19 15:39:12 INFO cluster.SparkDeploySchedulerBackend: Executor app-20150819153234-0001/9 removed: Command exited with code 1 15/08/19 15:39:12 ERROR cluster.SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 15/08/19 15:39:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null} 15/08/19 15:39:12 IN
Unable to run the spark application in standalone cluster mode
Hi , We have a simple spark application which is running through when run locally on master node as below ./bin/spark-submit --class com.coupons.salestransactionprocessor.SalesTransactionDataPointCreation --master local sales-transaction-processor-0.0.1-SNAPSHOT-jar-with-dependencies.jar But however I try to run it in cluster mode [ our spark cluster has two nodes one master and one slave with executer memory of 512MB], the application fails with the below, Pls provide some inputs as to why? 15/08/19 15:37:52 INFO client.AppClient$ClientActor: Executor updated: app-20150819153234-0001/8 is now RUNNING 15/08/19 15:37:56 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:38:11 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:38:26 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor updated: app-20150819153234-0001/8 is now EXITED (Command exited with code 1) 15/08/19 15:38:32 INFO cluster.SparkDeploySchedulerBackend: Executor app-20150819153234-0001/8 removed: Command exited with code 1 15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor added: app-20150819153234-0001/9 on worker-20150812111932-ip-172-28-161-173.us-west-2.compute.internal-50108 (ip-172-28-161-173.us-west-2.compute.internal:50108) with 1 cores 15/08/19 15:38:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150819153234-0001/9 on hostPort ip-172-28-161-173.us-west-2.compute.internal:50108 with 1 cores, 512.0 MB RAM 15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor updated: app-20150819153234-0001/9 is now RUNNING 15/08/19 15:38:41 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:38:56 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:39:11 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/08/19 15:39:12 INFO client.AppClient$ClientActor: Executor updated: app-20150819153234-0001/9 is now EXITED (Command exited with code 1) 15/08/19 15:39:12 INFO cluster.SparkDeploySchedulerBackend: Executor app-20150819153234-0001/9 removed: Command exited with code 1 15/08/19 15:39:12 ERROR cluster.SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 15/08/19 15:39:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null} 15/08/19 15:39:12 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null} 15/08/19 15:39:12 INFO