FW: Spark error while running in spark mode

2015-10-05 Thread Ratika Prasad


From: Ratika Prasad
Sent: Monday, October 05, 2015 2:39 PM
To: u...@spark.apache.org
Cc: Ameeta Jayarajan 
Subject: Spark error while running in spark mode

Hi,

When we run our spark component in cluster mode as below we get the following 
error

./bin/spark-submit --class 
com.coupons.stream.processing.SparkStreamEventProcessingEngine --master 
spark://172.28.161.138:7077 
EventProcessingEngine-0.0.1-SNAPSHOT-jar-with-dependencies.jar

ERROR ErrorMonitor: dropping message [class akka.actor.ActorSelectionMessage] 
for non-local recipient [Actor[akka.tcp://sparkMaster@172.28.161.138:7077/]] 
arriving at [akka.tcp://sparkMaster@172.28.161.138:7077] inbound addresses are 
[akka.tcp://sparkDriver@172.28.161.138:7077]
akka.event.Logging$Error$NoCause$


Kindly help







RE: Spark-Kafka Connector issue

2015-09-28 Thread Ratika Prasad
Thanks for your reply.

I invoked my program with the broker ip and host and it triggered as expected 
but I see the below error

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing 
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
172.28.161.32:9092 TestTopic
15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as 
local[n], n > 1 in local mode if you have receivers to get data, otherwise 
Spark jobs will not get resources to process the received data.
Exception in thread "main" org.apache.spark.SparkException: 
java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for 
Set([TestTopic,0])
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at 
org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Whene I ran the below to check the offsets I get this

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic TestTopic 
--group test-consumer-group --zookeeper localhost:2181
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: 
KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/TestTopic 
/0.

Also I just added this below configs to my kafaka/config/consumer.properties 
and restarted kafka

auto.offset.reset=smallest
offsets.storage=zookeeper
offsets.channel.backoff.ms=1000
offsets.channel.socket.timeout.ms=1
offsets.commit.max.retries=5
dual.commit.enabled=true

From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: Monday, September 28, 2015 7:56 PM
To: Ratika Prasad 
Cc: dev@spark.apache.org
Subject: Re: Spark-Kafka Connector issue

This is a user list question not a dev list question.

Looks like your driver is having trouble communicating to the kafka brokers.  
Make sure the broker host and port is available from the driver host (using nc 
or telnet); make sure that you're providing the _broker_ host and port to 
createDirectStream, not the zookeeper host; make sure the topics in question 
actually exist on kafka and the names match what you're providing to 
createDirectStream.





On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad 
mailto:rpra...@couponsinc.com>> wrote:
Hi All,

I am trying out the spark streaming and reading the messages from kafka topics 
which later would be created into streams as below…I have the kafka setup on a 
vm and topics created however when I try to run the program below from my spark 
vm as below I get an error even though the kafka server and zookeeper are up 
and running

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing 
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
172.28.161.32:2181<http://172.28.161.32:2181> redemption_inbound

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
at 
org.apache.sp

Spark-Kafka Connector issue

2015-09-26 Thread Ratika Prasad
Hi All,

I am trying out the spark streaming and reading the messages from kafka topics 
which later would be created into streams as below...I have the kafka setup on 
a vm and topics created however when I try to run the program below from my 
spark vm as below I get an error even though the kafka server and zookeeper are 
up and running

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing 
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
172.28.161.32:2181 redemption_inbound

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at 
org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Program

public static void main(String[] args) {
if (args.length < 2) {
  System.err.println("Usage: DirectKafkaWordCount  \n" +
  "   is a list of one or more Kafka brokers\n" +
  "   is a list of one or more kafka topics to consume 
from\n\n");
  System.exit(1);
}

String brokers = args[0];
String topics = args[1];

// Create context with 2 second batch interval
SparkConf sparkConf = new 
SparkConf().setAppName("JavaKafkaStreamEventProcessing");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));

HashSet topicsSet = new 
HashSet(Arrays.asList(topics.split(",")));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", brokers);

// Create direct kafka stream with brokers and topics
JavaPairInputDStream messages = 
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

// Get the lines, split them into words, count the words and print
JavaDStream lines = messages.map(new Function, String>() {
  public String call(Tuple2 tuple2) {
return tuple2._2();
  }
});
JavaDStream words = lines.flatMap(new FlatMapFunction() {
  public Iterable call(String x) {
return Lists.newArrayList(SPACE.split(x));
  }
});
JavaPairDStream wordCounts = words.mapToPair(
  new PairFunction() {
public Tuple2 call(String s) {
  return new Tuple2(s, 1);
}
  }).reduceByKey(
new Function2() {
public Integer call(Integer i1, Integer i2) {
  return i1 + i2;
}
  });
wordCounts.print();
System.out.println("Word Counts are : " + wordCounts.toString());

// Start the computation
jssc.start();
jssc.awaitTermination();
  }
}


Re: Creating RDD with key and Subkey

2015-08-19 Thread Ratika Prasad

We need to create RDDas below

JavaPairRDD>>>

The idea is we need to do lookup() on Key which will return a list of hash maps 
kind of structure and then do lookup on subkey which is the key in the HashMap 
returned



_
From: Silas Davis mailto:si...@silasdavis.net>>
Sent: Wednesday, August 19, 2015 10:34 pm
Subject: Re: Creating RDD with key and Subkey
To: Ratika Prasad mailto:rpra...@couponsinc.com>>, 
mailto:dev@spark.apache.org>>


This should be sent to the user mailing list, I think.

It depends what you want to do with the RDD, so yes you could throw around 
(String, HashMap>) tuples or perhaps you'd like to be able 
to groupByKey, reduceByKey on the key and sub-key as a composite in which case 
JavaPairRDD, List> might be more appropriate. Not 
really clear what you are asking.


On Wed, 19 Aug 2015 at 17:15 Ratika Prasad < 
rpra...@couponsinc.com<mailto:rpra...@couponsinc.com>> wrote:
Hi,

We have a need where we need the RDD with the following format 
JavaPairRDD>>, mostly RDD with a Key and 
Subkey kind of a structure, how is that doable in Spark ?

Thanks
R




Creating RDD with key and Subkey

2015-08-19 Thread Ratika Prasad
Hi,

We have a need where we need the RDD with the following format 
JavaPairRDD>>, mostly RDD with a Key and 
Subkey kind of a structure, how is that doable in Spark ?

Thanks
R


RE: Unable to run the spark application in standalone cluster mode

2015-08-19 Thread Ratika Prasad
Should this be done on master or slave node or both ?

From: Madhusudanan Kandasamy [mailto:madhusuda...@in.ibm.com]
Sent: Wednesday, August 19, 2015 9:31 PM
To: Ratika Prasad 
Cc: dev@spark.apache.org
Subject: Re: Unable to run the spark application in standalone cluster mode


Try Increasing the spark worker memory in conf/spark-env.sh

export SPARK_WORKER_MEMORY=2g

Thanks,
Madhu.

[Inactive hide details for Ratika Prasad ---08/19/2015 09:22:37 PM---Ratika 
Prasad ]Ratika Prasad ---08/19/2015 09:22:37 
PM---Ratika Prasad mailto:rpra...@couponsinc.com>>
Ratika Prasad mailto:rpra...@couponsinc.com>>

08/19/2015 09:22 PM


To


"dev@spark.apache.org<mailto:dev@spark.apache.org>" 
mailto:dev@spark.apache.org>>


cc




Subject


Unable to run the spark application in standalone cluster mode








Hi ,

We have a simple spark application which is running through when run locally on 
master node as below

./bin/spark-submit --class 
com.coupons.salestransactionprocessor.SalesTransactionDataPointCreation 
--master local 
sales-transaction-processor-0.0.1-SNAPSHOT-jar-with-dependencies.jar

But however I try to run it in cluster mode [ our spark cluster has two nodes 
one master and one slave with executer memory of 512MB], the application fails 
with the below, Pls provide some inputs as to why?

15/08/19 15:37:52 INFO client.AppClient$ClientActor: Executor updated: 
app-20150819153234-0001/8 is now RUNNING
15/08/19 15:37:56 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:38:11 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:38:26 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor updated: 
app-20150819153234-0001/8 is now EXITED (Command exited with code 1)
15/08/19 15:38:32 INFO cluster.SparkDeploySchedulerBackend: Executor 
app-20150819153234-0001/8 removed: Command exited with code 1
15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor added: 
app-20150819153234-0001/9 on 
worker-20150812111932-ip-172-28-161-173.us-west-2.compute.internal-50108 
(ip-172-28-161-173.us-west-2.compute.internal:50108) with 1 cores
15/08/19 15:38:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID 
app-20150819153234-0001/9 on hostPort 
ip-172-28-161-173.us-west-2.compute.internal:50108 with 1 cores, 512.0 MB RAM
15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor updated: 
app-20150819153234-0001/9 is now RUNNING
15/08/19 15:38:41 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:38:56 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:39:11 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:39:12 INFO client.AppClient$ClientActor: Executor updated: 
app-20150819153234-0001/9 is now EXITED (Command exited with code 1)
15/08/19 15:39:12 INFO cluster.SparkDeploySchedulerBackend: Executor 
app-20150819153234-0001/9 removed: Command exited with code 1
15/08/19 15:39:12 ERROR cluster.SparkDeploySchedulerBackend: Application has 
been killed. Reason: Master removed our application: FAILED
15/08/19 15:39:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose 
tasks have all completed, from pool
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/metrics/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/static,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/executors/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/executors,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/environment/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/environment,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/storage/rdd,null}
15/08/19 15:39:12 IN

Unable to run the spark application in standalone cluster mode

2015-08-19 Thread Ratika Prasad
Hi ,

We have a simple spark application which is running through when run locally on 
master node as below

./bin/spark-submit --class 
com.coupons.salestransactionprocessor.SalesTransactionDataPointCreation 
--master local 
sales-transaction-processor-0.0.1-SNAPSHOT-jar-with-dependencies.jar

But however I try to run it in cluster mode [ our spark cluster has two nodes 
one master and one slave with executer memory of 512MB], the application fails 
with the below, Pls provide some inputs as to why?

15/08/19 15:37:52 INFO client.AppClient$ClientActor: Executor updated: 
app-20150819153234-0001/8 is now RUNNING
15/08/19 15:37:56 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:38:11 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:38:26 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor updated: 
app-20150819153234-0001/8 is now EXITED (Command exited with code 1)
15/08/19 15:38:32 INFO cluster.SparkDeploySchedulerBackend: Executor 
app-20150819153234-0001/8 removed: Command exited with code 1
15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor added: 
app-20150819153234-0001/9 on 
worker-20150812111932-ip-172-28-161-173.us-west-2.compute.internal-50108 
(ip-172-28-161-173.us-west-2.compute.internal:50108) with 1 cores
15/08/19 15:38:32 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID 
app-20150819153234-0001/9 on hostPort 
ip-172-28-161-173.us-west-2.compute.internal:50108 with 1 cores, 512.0 MB RAM
15/08/19 15:38:32 INFO client.AppClient$ClientActor: Executor updated: 
app-20150819153234-0001/9 is now RUNNING
15/08/19 15:38:41 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:38:56 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:39:11 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
15/08/19 15:39:12 INFO client.AppClient$ClientActor: Executor updated: 
app-20150819153234-0001/9 is now EXITED (Command exited with code 1)
15/08/19 15:39:12 INFO cluster.SparkDeploySchedulerBackend: Executor 
app-20150819153234-0001/9 removed: Command exited with code 1
15/08/19 15:39:12 ERROR cluster.SparkDeploySchedulerBackend: Application has 
been killed. Reason: Master removed our application: FAILED
15/08/19 15:39:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose 
tasks have all completed, from pool
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/metrics/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/static,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/executors/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/executors,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/environment/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/environment,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/storage/rdd,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/storage/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/storage,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages/pool/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages/pool,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages/stage/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages/stage,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages/json,null}
15/08/19 15:39:12 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages,null}
15/08/19 15:39:12 INFO