Debug what is replication Level of which RDD

2016-01-23 Thread gaurav sharma
Hi All,

I have enabled replication for my RDDs.

I see on the Storage tab of the Spark UI, which mentions the replication
level 2x or 1x.

But the names given are mappedRDD, shuffledRDD, I am not able to debug
which of my RDD is 2n replicated, and which one is 1x.

Please help.

Regards,
Gaurab


Spark RDD DAG behaviour understanding in case of checkpointing

2016-01-23 Thread gaurav sharma
Hi Tathagata/Cody,

I am facing a challenge in Production with DAG behaviour during
checkpointing in spark streaming -

Step 1 : Read data from Kafka every 15 min -  call this KafkaStreamRDD ~
100 GB of data

Step 2 : Repartition KafkaStreamRdd from 5 to 100 partitions to parallelise
processing - call this RepartitionedKafkaStreamRdd

Step 3 : on this RepartitionedKafkaStreamRdd I run map and
reduceByKeyAndWindow over a window of 2 hours, call this RDD1 ~ 100 MB of
data

Checkpointing is enabled.

If i restart my streaming context, it picks up from last checkpointed
state,

READS data for all the 8 SUCCESSFULLY FINISHED 15 minute batches from Kafka
, re-performs Repartition of all the data of all these 8 , 15 minute
batches.

Then reads data for current 15 minute batch and runs map and
reduceByKeyAndWindow over a window of 2 hours.

Challenge -
1> I cant checkpoint KafkaStreamRDD or RepartitionedKafkaStreamRdd as this
is huge data around 800GB for 2 hours, reading and writing (checkpointing)
this at every 15 minutes would be very slow.

2> When i have checkpointed data of RDD1 at every 15 minutes, and map and
reduceByKeyAndWindow is being run over RDD1 only, and i have snapshot of
all of the last 8, 15 minute batches of RDD1,
why is spark reading all the data for last 8 successfully completed batches
from Kafka again(Step 1) and again performing re-partitioning(Step 2) and
then again running map and reduceByKeyandWindow over these newly fetched
kafkaStreamRdd data of last 8 , 15 minute batches.

Because of above mentioned challenges, i am not able to exploit
checkpointing, in case streaming context is restarted at high load.

Please help out in understanding, if there is something that i am missing

Regards,
Gaurav


Re: Multiple Spark Streaming Jobs on Single Master

2015-10-23 Thread gaurav sharma
I specified spark,cores.max = 4

but it started 2 executors with 2 cores each on each of the 2 workers.

in standalone cluster mode, though we can specify Worker cores, there is no
ways to specify Number of cores executor must take on that particular
worker machine.



On Sat, Oct 24, 2015 at 1:41 AM, Augustus Hong 
wrote:

> How did you specify number of cores each executor can use?
>
> Be sure to use this when submitting jobs with spark-submit: 
> *--total-executor-cores
> 100.*
>
> Other options won't work from my experience.
>
> On Fri, Oct 23, 2015 at 8:36 AM, gaurav sharma 
> wrote:
>
>> Hi,
>>
>> I created 2 workers on same machine each with 4 cores and 6GB ram
>>
>> I submitted first job, and it allocated 2 cores on each of the worker
>> processes, and utilized full 4 GB ram for each executor process
>>
>> When i submit my second job it always say in WAITING state.
>>
>>
>> Cheers!!
>>
>>
>>
>> On Tue, Oct 20, 2015 at 10:46 AM, Tathagata Das 
>> wrote:
>>
>>> You can set the max cores for the first submitted job such that it does
>>> not take all the resources from the master. See
>>> http://spark.apache.org/docs/latest/submitting-applications.html
>>>
>>> # Run on a Spark standalone cluster in client deploy mode
>>> ./bin/spark-submit \
>>>   --class org.apache.spark.examples.SparkPi \
>>>   --master spark://207.184.161.138:7077 \
>>>   --executor-memory 20G \
>>>   *--total-executor-cores 100 \*
>>>   /path/to/examples.jar \
>>>   1000
>>>
>>>
>>> On Mon, Oct 19, 2015 at 4:26 PM, Augustus Hong <
>>> augus...@branchmetrics.io> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Would it be possible to run multiple spark streaming jobs on a single
>>>> master at the same time?
>>>>
>>>> I currently have one master node and several worker nodes in the
>>>> standalone mode, and I used spark-submit to submit multiple spark streaming
>>>> jobs.
>>>>
>>>> From what I observed, it seems like only the first submitted job would
>>>> get resources and run.  Jobs submitted afterwards will have the status
>>>> "Waiting", and will only run after the first one is finished or killed.
>>>>
>>>> I tried limiting each executor to only 1 core(each worker machine has 8
>>>> cores), but the same things happens that only one job will be run, even
>>>> though there are a lot of idle cores.
>>>>
>>>> Best,
>>>> Augustus
>>>>
>>>>
>>>>
>>>> --
>>>> [image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus
>>>> Hong*
>>>>  Data Analytics | Branch Metrics
>>>>  m 650-391-3369 | e augus...@branch.io
>>>>
>>>
>>>
>>
>
>
> --
> [image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus
> Hong*
>  Data Analytics | Branch Metrics
>  m 650-391-3369 | e augus...@branch.io
>


Re: Multiple Spark Streaming Jobs on Single Master

2015-10-23 Thread gaurav sharma
Hi,

I created 2 workers on same machine each with 4 cores and 6GB ram

I submitted first job, and it allocated 2 cores on each of the worker
processes, and utilized full 4 GB ram for each executor process

When i submit my second job it always say in WAITING state.


Cheers!!



On Tue, Oct 20, 2015 at 10:46 AM, Tathagata Das  wrote:

> You can set the max cores for the first submitted job such that it does
> not take all the resources from the master. See
> http://spark.apache.org/docs/latest/submitting-applications.html
>
> # Run on a Spark standalone cluster in client deploy mode
> ./bin/spark-submit \
>   --class org.apache.spark.examples.SparkPi \
>   --master spark://207.184.161.138:7077 \
>   --executor-memory 20G \
>   *--total-executor-cores 100 \*
>   /path/to/examples.jar \
>   1000
>
>
> On Mon, Oct 19, 2015 at 4:26 PM, Augustus Hong 
> wrote:
>
>> Hi All,
>>
>> Would it be possible to run multiple spark streaming jobs on a single
>> master at the same time?
>>
>> I currently have one master node and several worker nodes in the
>> standalone mode, and I used spark-submit to submit multiple spark streaming
>> jobs.
>>
>> From what I observed, it seems like only the first submitted job would
>> get resources and run.  Jobs submitted afterwards will have the status
>> "Waiting", and will only run after the first one is finished or killed.
>>
>> I tried limiting each executor to only 1 core(each worker machine has 8
>> cores), but the same things happens that only one job will be run, even
>> though there are a lot of idle cores.
>>
>> Best,
>> Augustus
>>
>>
>>
>> --
>> [image: Branch Metrics mobile deep linking] * Augustus
>> Hong*
>>  Data Analytics | Branch Metrics
>>  m 650-391-3369 | e augus...@branch.io
>>
>
>


Re: Worker Machine running out of disk for Long running Streaming process

2015-09-15 Thread gaurav sharma
Hi TD,

Sorry for late reply,


I implemented ur suggestion, but unfortunately it didnt help me, i am still
able to see very old schuffle files, because of which ultimately my long
runnning spark job gets terminated


Below is what i did.


//This is the spark-submit job
public class HourlyAggregatorV2 {

private static Logger logger =
Logger.getLogger(HourlyAggregatorV2.class);

public static void main(String[] args) throws Exception{

//Fix for preventing disk full issue in long running jobs, because
of shuffle files not getting cleaned up from disk
new Thread(new GCThread()).start();

}
}



public class GCThread implements Runnable{

@Override
public void run() {
boolean isGCedOnce = false;
while(true){
if(Calendar.getInstance().get(Calendar.MINUTE)%10 == 0){
if(!isGCedOnce){
System.out.println("Triggered System GC");
System.gc();
isGCedOnce = true;
}
}else {
isGCedOnce = false;
}
}
}

}


On Sat, Aug 22, 2015 at 9:16 PM, Ashish Rangole  wrote:

> Interesting. TD, can you please throw some light on why this is and point
> to  the relevant code in Spark repo. It will help in a better understanding
> of things that can affect a long running streaming job.
> On Aug 21, 2015 1:44 PM, "Tathagata Das"  wrote:
>
>> Could you periodically (say every 10 mins) run System.gc() on the driver.
>> The cleaning up shuffles is tied to the garbage collection.
>>
>>
>> On Fri, Aug 21, 2015 at 2:59 AM, gaurav sharma 
>> wrote:
>>
>>> Hi All,
>>>
>>>
>>> I have a 24x7 running Streaming Process, which runs on 2 hour windowed
>>> data
>>>
>>> The issue i am facing is my worker machines are running OUT OF DISK space
>>>
>>> I checked that the SHUFFLE FILES are not getting cleaned up.
>>>
>>>
>>> /log/spark-2b875d98-1101-4e61-86b4-67c9e71954cc/executor-5bbb53c1-cee9-4438-87a2-b0f2becfac6f/blockmgr-c905b93b-c817-4124-a774-be1e706768c1//00/shuffle_2739_5_0.data
>>>
>>> Ultimately the machines runs out of Disk Spac
>>>
>>>
>>> i read about *spark.cleaner.ttl *config param which what i can
>>> understand from the documentation, says cleans up all the metadata beyond
>>> the time limit.
>>>
>>> I went through https://issues.apache.org/jira/browse/SPARK-5836
>>> it says resolved, but there is no code commit
>>>
>>> Can anyone please throw some light on the issue.
>>>
>>>
>>>
>>


Worker Machine running out of disk for Long running Streaming process

2015-08-21 Thread gaurav sharma
Hi All,


I have a 24x7 running Streaming Process, which runs on 2 hour windowed data

The issue i am facing is my worker machines are running OUT OF DISK space

I checked that the SHUFFLE FILES are not getting cleaned up.

/log/spark-2b875d98-1101-4e61-86b4-67c9e71954cc/executor-5bbb53c1-cee9-4438-87a2-b0f2becfac6f/blockmgr-c905b93b-c817-4124-a774-be1e706768c1//00/shuffle_2739_5_0.data

Ultimately the machines runs out of Disk Spac


i read about *spark.cleaner.ttl *config param which what i can understand
from the documentation, says cleans up all the metadata beyond the time
limit.

I went through https://issues.apache.org/jira/browse/SPARK-5836
it says resolved, but there is no code commit

Can anyone please throw some light on the issue.


Re: Writing streaming data to cassandra creates duplicates

2015-08-04 Thread gaurav sharma
Ideally the 2 messages read from kafka must differ on some parameter
atleast, or else they are logically same

As a solution to your problem, if the message content is same, u cud create
a new field UUID, which might play the role of partition key while
inserting the 2 messages in Cassandra

Msg1 - UUID1, GAURAV, 100
Msg2 - UUID2, PRIYA, 200
Msg3 - UUID1, GAURAV, 100

Now when inserting in Cassandra 3 different rows would be created, pls
note, that even though Msg1, Msg3 have same content, they are inserted as 2
separate rows in Cassandra, since they differ on UUID,which is partition
key in my column family

Regards,
Gaurav
(please excuse spelling mistakes)
Sent from phone
On Aug 4, 2015 4:54 PM, "Gerard Maas"  wrote:

> (removing dev from the to: as not relevant)
>
> it would be good to see some sample data and the cassandra schema to have
> a more concrete idea of the problem space.
>
> Some thoughts: reduceByKey could still be used to 'pick' one element.
> example of arbitrarily choosing the first one: reduceByKey{case (e1,e2) =>
> e1}
>
> The question to be answered is: what should happen to the multiple values
> that arrive for 1 key?
>
> And why are they creating duplicates in cassandra? if they have the same
> key, they will result in an overwrite (that's not desirable due to
> tombstones anyway)
>
> -kr, Gerard.
>
>
>
> On Tue, Aug 4, 2015 at 1:03 PM, Priya Ch 
> wrote:
>
>>
>>
>>
>> Yes...union would be one solution. I am not doing any aggregation hence
>> reduceByKey would not be useful. If I use groupByKey, messages with same
>> key would be obtained in a partition. But groupByKey is very expensive
>> operation as it involves shuffle operation. My ultimate goal is to write
>> the messages to cassandra. if the messages with same key are handled by
>> different streams...there would be concurrency issues. To resolve this i
>> can union dstreams and apply hash parttioner so that it would bring all the
>> same keys to a single partition or do a groupByKey which does the same.
>>
>> As groupByKey is expensive, is there any work around for this ?
>>
>> On Thu, Jul 30, 2015 at 2:33 PM, Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Just my two cents. I understand your problem is that your problem is
>>> that you have messages with the same key in two different dstreams. What I
>>> would do would be making a union of all the dstreams with
>>> StreamingContext.union or several calls to DStream.union, and then I would
>>> create a pair dstream with the primary key as key, and then I'd use
>>> groupByKey or reduceByKey (or combineByKey etc) to combine the messages
>>> with the same primary key.
>>>
>>> Hope that helps.
>>>
>>> Greetings,
>>>
>>> Juan
>>>
>>>
>>> 2015-07-30 10:50 GMT+02:00 Priya Ch :
>>>
 Hi All,

  Can someone throw insights on this ?

 On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch >>> > wrote:

>
>
> Hi TD,
>
>  Thanks for the info. I have the scenario like this.
>
>  I am reading the data from kafka topic. Let's say kafka has 3
> partitions for the topic. In my streaming application, I would configure 3
> receivers with 1 thread each such that they would receive 3 dstreams (from
> 3 partitions of kafka topic) and also I implement partitioner. Now there 
> is
> a possibility of receiving messages with same primary key twice or more,
> one is at the time message is created and other times if there is an 
> update
> to any fields for same message.
>
> If two messages M1 and M2 with same primary key are read by 2
> receivers then even the partitioner in spark would still end up in 
> parallel
> processing as there are altogether in different dstreams. How do we 
> address
> in this situation ?
>
> Thanks,
> Padma Ch
>
> On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das 
> wrote:
>
>> You have to partition that data on the Spark Streaming by the primary
>> key, and then make sure insert data into Cassandra atomically per key, or
>> per set of keys in the partition. You can use the combination of the 
>> (batch
>> time, and partition Id) of the RDD inside foreachRDD as the unique id for
>> the data you are inserting. This will guard against multiple attempts to
>> run the task that inserts into Cassandra.
>>
>> See
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations
>>
>> TD
>>
>> On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch <
>> learnings.chitt...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>>  I have a problem when writing streaming data to cassandra. Or
>>> existing product is on Oracle DB in which while wrtiting data, locks are
>>> maintained such that duplicates in the DB are avoided.
>>>
>>> But as spark has parallel processing architecture, if more than 1
>>> thread is trying to

Re: Spark Streaming Kafka could not find leader offset for Set()

2015-07-30 Thread gaurav sharma
 I have run into similar excpetions

ERROR DirectKafkaInputDStream: ArrayBuffer(java.net.SocketTimeoutException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([AdServe,1]))


and the issue has happened on Kafka Side, where my broker offsets go out of
sync, or do not return leader for this particular partition

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic AdServe
--broker-list BROKER_IP --time -1

this shall return u valid offsets for all your kafka partitons


On Thu, Jul 30, 2015 at 7:58 PM, Umesh Kacha  wrote:

> Hi Cody sorry my bad you were right there was a typo in topicSet. When I
> corrected typo in topicSet it started working. Thanks a lot.
>
> Regards
>
> On Thu, Jul 30, 2015 at 7:43 PM, Cody Koeninger 
> wrote:
>
>> Can you post the code including the values of kafkaParams and topicSet,
>> ideally the relevant output of kafka-topics.sh --describe as well
>>
>> On Wed, Jul 29, 2015 at 11:39 PM, Umesh Kacha 
>> wrote:
>>
>>> Hi thanks for the response. Like I already mentioned in the question
>>> kafka topic is valid and it has data I can see data in it using another
>>> kafka consumer.
>>> On Jul 30, 2015 7:31 AM, "Cody Koeninger"  wrote:
>>>
 The last time someone brought this up on the mailing list, the issue
 actually was that the topic(s) didn't exist in Kafka at the time the spark
 job was running.





 On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das 
 wrote:

> There is a known issue that Kafka cannot return leader if there is not
> data in the topic. I think it was raised in another thread in this forum.
> Is that the issue?
>
> On Wed, Jul 29, 2015 at 10:38 AM, unk1102 
> wrote:
>
>> Hi I have Spark Streaming code which streams from Kafka topic it used
>> to work
>> fine but suddenly it started throwing the following exception
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set()
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
>> at scala.util.Either.fold(Either.scala:97)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>> My Spark Streaming client code is very simple I just create one
>> receiver
>> using the following code and trying to print messages it consumed
>>
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(jssc,
>> String.class,
>> String.class,
>> StringDecoder.class,
>> StringDecoder.class,
>> kafkaParams,
>> topicSet);
>>
>> Kafka param is only one I specify kafka.ofset.reset=largest. Kafka
>> topic has
>> data I can see data using other Kafka consumers but above Spark
>> Streaming
>> code throws exception saying leader offset not found. I tried both
>> smallest
>> and largest offset. I wonder what happened this code used to work
>> earlier. I
>> am using Spark-Streaming 1.3.1 as it was working in this version I
>> tried in
>> 1.4.1 and same exception. Please guide. I am new to Spark thanks in
>> advance.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

>>
>


Re: createDirectStream and Stats

2015-07-12 Thread gaurav sharma
Hi guys,

I too am facing similar challenge with directstream.

I have 3 Kafka Partitions.

and running spark on 18 cores, with parallelism level set to 48.

I am running simple map-reduce job on incoming stream.

Though the reduce stage takes milliseconds-seconds for around 15 million
packets, the Map stage takes around 4-5 minutes, since it creates only 3
tasks for Map stage(I believe 3 tasks because I have 3 kafka partitions and
the

JavaPairDStream kafkaStream =
KafkaConnector.getKafkaStream(jsc);

kafkaStream that i create in code is the parent Rdd for Map Job, so it
would create only 3 tasks)


I have 10 such jobs similar to above one working on same KafkaStream i
create

Could you guys please advise, if repartitioning the KafkaStream (taking
into account the rechuffle at repartition stage) would optimize my overall
batch processing time.


On Sat, Jun 20, 2015 at 7:24 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>  Are you sure you were using all 100 executors even with the receiver
> model? Because in receiver mode, the number of partitions is dependent on
> the batch duration and block interval. It may not necessarily map directly
> to the number of executors in your app unless you've adjusted the block
> interval and batch duration.
>
>  *From:* Tim Smith 
> *Sent:* ‎Friday‎, ‎June‎ ‎19‎, ‎2015 ‎10‎:‎36‎ ‎PM
> *To:* user@spark.apache.org
>
>  I did try without repartition, initially, but that was even more
> horrible because instead of the allocated 100 executors, only 30 (which is
> the number of kafka partitions) would have to do the work. The "MyFunc" is
> a CPU bound task so adding more memory per executor wouldn't help and I saw
> that each of the 30 executors was only using one thread/core on each Spark
> box. I could go and play with threading in MyFunc but I don't want to mess
> with threading with all the parallelism already involved and I don't think
> in-app threading outside of what the framework does is really desirable.
>
>  With repartition, there is shuffle involved, but at least the
> computation load spreads across all 100 executors instead of just 30.
>
>
>
>
> On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger 
> wrote:
>
>> If that's the case, you're still only using as many read executors as
>> there are kafka partitions.
>>
>>  I'd remove the repartition. If you weren't doing any shuffles in the
>> old job, and are doing a shuffle in the new job, it's not really comparable.
>>
>> On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith  wrote:
>>
>>>  On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das 
>>> wrote:
>>>
 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?

>>>
>>>  Sure, will try to debug/troubleshoot. Are there enhancements to this
>>> specific API between 1.3 and 1.4 that can substantially change it's
>>> behaviour?
>>>
>>>
   On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger 
 wrote:

> when you say your old version was
>
>  k = createStream .
>
>  were you manually creating multiple receivers?  Because otherwise
> you're only using one receiver on one executor...
>

>>>  Yes, sorry, the earlier/stable version was more like:
>>>  kInStreams = (1 to n).map{_ => KafkaUtils.createStream  //
>>> n being the number of kafka partitions, 1 receiver per partition
>>> val k = ssc.union(kInStreams)
>>>  val dataout = k.map(x=>myFunc(x._2,someParams))
>>>  dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>> myOutputFunc.write(rec) })
>>>
>>>  Thanks,
>>>
>>>  Tim
>>>
>>>
>>>
>>>
>>>

>  If that's the case I'd try direct stream without the repartitioning.
>
>
> On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith  wrote:
>
>>  Essentially, I went from:
>>  k = createStream .
>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>  dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>> myOutputFunc.write(rec) })
>>
>>  To:
>> kIn = createDirectStream .
>> k = kIn.repartition(numberOfExecutors) //since #kafka partitions <
>> #spark-executors
>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>  dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>> myOutputFunc.write(rec) })
>>
>>  With the new API, the app starts up and works fine for a while but
>> I guess starts to deteriorate after a while. With the existing API
>> "createStream", the app does deteriorate but over a much longer period,
>> hours vs days.
>>
>>
>>
>>
>>
>>
>> On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das 
>> wrote:
>>
>>> Yes, please tell us what operation are you using.
>>>
>>>  TD
>>>
>>> On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger >> > wrote:
>>>
 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim S

Re: Worker dies with java.io.IOException: Stream closed

2015-07-12 Thread gaurav sharma
the logs i pasted are from worker logs only,

spark does have permission to write into /opt, its not like the worker is
not able to startit runs perfectly for days, but then abruptly dies.

and its not always this machine, sometimes its some other machine. It
happens once in a while, but whne it happens, the problem persists for at
least a day, no matter how many times i restart the worker, it again dies
with the same exception


On Sun, Jul 12, 2015 at 12:42 PM, Akhil Das 
wrote:

> Can you dig a bit more in the worker logs? Also make sure that spark has
> permission to write to /opt/ on that machine as its one machine always
> throwing up.
>
> Thanks
> Best Regards
>
> On Sat, Jul 11, 2015 at 11:18 PM, gaurav sharma 
> wrote:
>
>> Hi All,
>>
>> I am facing this issue in my production environment.
>>
>> My worker dies by throwing this exception.
>> But i see the space is available on all the partitions on my disk
>> I did NOT see any abrupt increase in DIsk IO, which might have choked the
>> executor to write on to the stderr file.
>>
>> But still my worker dies, this is not happening on all my workers, it's
>> one machine that is performing this way.
>> Could you please help me debug if it is happening because i am doing
>> something wrong, or some issue from hardware/OS perspective, that i can
>> debug and fix.
>>
>>
>> 15/07/11 18:05:45 ERROR Worker: RECEIVED SIGNAL 1: SIGHUP
>> 15/07/11 18:05:45 INFO ExecutorRunner: Killing process!
>> 15/07/11 18:05:45 ERROR FileAppender: Error writing stream to file
>> /opt/spark-1.4.0-bin-hadoop2.6/work/app-20150710162005-0001/16517/stderr
>> java.io.IOException: Stream closed
>> at
>> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
>> at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
>> at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>> at java.io.FilterInputStream.read(FilterInputStream.java:107)
>> at
>> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>> at
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
>> 15/07/11 18:05:46 INFO Utils: Shutdown hook called
>> 15/07/11 18:05:46 INFO Utils: Deleting directory
>> /tmp/spark-f269acd9-3ab0-4b3c-843c-bcf2e8c2669f
>> 15/07/11 18:05:46 INFO Worker: Executor app-20150710162005-0001/16517
>> finished with state EXITED message Command exited with code 129 exitStatus
>> 129
>>
>>
>>
>>
>>
>


Worker dies with java.io.IOException: Stream closed

2015-07-11 Thread gaurav sharma
Hi All,

I am facing this issue in my production environment.

My worker dies by throwing this exception.
But i see the space is available on all the partitions on my disk
I did NOT see any abrupt increase in DIsk IO, which might have choked the
executor to write on to the stderr file.

But still my worker dies, this is not happening on all my workers, it's one
machine that is performing this way.
Could you please help me debug if it is happening because i am doing
something wrong, or some issue from hardware/OS perspective, that i can
debug and fix.


15/07/11 18:05:45 ERROR Worker: RECEIVED SIGNAL 1: SIGHUP
15/07/11 18:05:45 INFO ExecutorRunner: Killing process!
15/07/11 18:05:45 ERROR FileAppender: Error writing stream to file
/opt/spark-1.4.0-bin-hadoop2.6/work/app-20150710162005-0001/16517/stderr
java.io.IOException: Stream closed
at
java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.FilterInputStream.read(FilterInputStream.java:107)
at
org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
at
org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
15/07/11 18:05:46 INFO Utils: Shutdown hook called
15/07/11 18:05:46 INFO Utils: Deleting directory
/tmp/spark-f269acd9-3ab0-4b3c-843c-bcf2e8c2669f
15/07/11 18:05:46 INFO Worker: Executor app-20150710162005-0001/16517
finished with state EXITED message Command exited with code 129 exitStatus
129


Re: How does one decide no of executors/cores/memory allocation?

2015-06-15 Thread gaurav sharma
When you submit a job, spark breaks down it into stages, as per DAG. the
stages run transformations or actions on the rdd's. Each rdd constitutes of
N partitions. The tasks creates by spark to execute the stage are equal to
 the number of partitions. Every task is executed on the  cored utilized by
the executors in your cluster.

--conf spark.cores.max=24 defines max cores you want to utilize. Spark
itself would distribute the number of cores among the workers.

More the number of partitions and more the cores available -> more the
level of parallelism -> better the performance

On Tue, Jun 16, 2015 at 9:27 AM, shreesh  wrote:

> How do I decide in how many partitions I break up my data into, how many
> executors should I have? I guess memory and cores will be allocated based
> on
> the number of executors I have.
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-of-executors-cores-memory-allocation-tp23326.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark Streaming - Can i BIND Spark Executor to Kafka Partition Leader

2015-06-12 Thread gaurav sharma
Hi,

I am using Kafka Spark cluster for real time aggregation analytics use case
in production.

*Cluster details*

*6 nodes*, each node running 1 Spark and kafka processes each.
Node1  -> 1 Master , 1 Worker, 1 Driver,
   1 Kafka process
Node 2,3,4,5,6 -> 1 Worker prcocess each  1
Kafka process each

Spark version 1.3.0
Kafka Veriosn 0.8.1

I am using *Kafka* *Directstream* for Kafka Spark Integration.
Analytics code is written in using Spark Java API.

*Problem Statement : *

  We are dealing with about *10 M records per hour*.
  My Spark Streaming Batch runs at *1 hour interval*( at 11:30 12:30
1:30 and so on)

  Since i am using Direct Stream, it reads all the data for past hour
at 11:30 12:30 1:30 and so on
  Though as of now it takes *about 3 minutes* to read the data with
Network bandwidth utilization of  *100-200 MBPS per node*( out of 6 node
Spark Cluster)

  Since i am running both Spark and Kafka on same machine
*  I WANT TO BIND MY SPARK EXECUTOR TO KAFKA PARTITION LEADER*, so as
to elliminate the Network bandwidth consumption of Spark.

  I understand that the number of partitions created on Spark for a
Direct Stream is equivalent to the number of partitions on Kafka, which is
the reason got a curiosity, perhaps there might be such a provision in
SPark.



Regards,
Gaurav


Re: How to pass arguments dynamically, that needs to be used in executors

2015-06-12 Thread gaurav sharma
Thanks Todd, that solved my problem.

Regards,
Gaurav
(please excuse spelling mistakes)
Sent from phone
On Jun 11, 2015 6:42 PM, "Todd Nist"  wrote:

> Hi Gaurav,
>
> Seems like you could use a broadcast variable for this if I understand
> your use case.  Create it in the driver based on the CommandLineArguments
> and then use it in the workers.
>
>
> https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>
> So something like:
>
> Broadcast cmdLineArg = sc.broadcast(Inetger.parseInd(args[12]));
>
> Then just reference the broadcast variable in you workers.  It will get
> shipped once to all nodes in the cluster and can be referenced by them.
>
> HTH.
>
> -Todd
>
> On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma 
> wrote:
>
>> Hi,
>>
>> I am using Kafka Spark cluster for real time aggregation analytics use
>> case in production.
>>
>> Cluster details
>> 6 nodes, each node running 1 Spark and kafka processes each.
>> Node1  -> 1 Master , 1 Worker, 1 Driver,
>>  1 Kafka process
>> Node 2,3,4,5,6 -> 1 Worker prcocess each
>>  1 Kafka process each
>>
>> Spark version 1.3.0
>> Kafka Veriosn 0.8.1
>>
>> I am using Kafka Directstream for Kafka Spark Integration.
>> Analytics code is written in using Spark Java API.
>>
>> Problem Statement :
>>
>>   I want to accept a paramter as command line argument, and pass on
>> to the executors.
>>   (want to use the paramter in rdd.foreach method which is executed
>> on executor)
>>
>>   I understand that when driver is started, only the jar is
>> transported to all the Workers.
>>   But i need to use the dynamically passed command line argument in
>> the reduce operation executed on executors.
>>
>>
>> Code Snippets for better understanding my problem :
>>
>> public class KafkaReconcilationJob {
>>
>> private static Logger logger =
>> Logger.getLogger(KafkaReconcilationJob.class);
>>  public static void main(String[] args) throws Exception {
>>   CommandLineArguments.CLICK_THRESHOLD =
>> Integer.parseInt(args[12]);
>> ---> I want to use this
>> command line argument
>> }
>>
>> }
>>
>>
>>
>> JavaRDD adeAggregatedFilteredData =
>> adeAudGeoAggDataRdd.filter(new Function() {
>> @Override
>> public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception
>> {
>> if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
>> adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
>> return true;
>> }else {
>> return false;
>> }
>> }
>> });
>>
>>
>>
>> The above mentioned Filter operation gets executed on executor which has
>> 0 as the value of the static field CommandLineArguments.CLICK_THRESHOLD
>>
>>
>> Regards,
>> Gaurav
>>
>
>


How to pass arguments dynamically, that needs to be used in executors

2015-06-11 Thread gaurav sharma
Hi,

I am using Kafka Spark cluster for real time aggregation analytics use case
in production.

Cluster details
6 nodes, each node running 1 Spark and kafka processes each.
Node1  -> 1 Master , 1 Worker, 1 Driver,
   1 Kafka process
Node 2,3,4,5,6 -> 1 Worker prcocess each  1
Kafka process each

Spark version 1.3.0
Kafka Veriosn 0.8.1

I am using Kafka Directstream for Kafka Spark Integration.
Analytics code is written in using Spark Java API.

Problem Statement :

  I want to accept a paramter as command line argument, and pass on to
the executors.
  (want to use the paramter in rdd.foreach method which is executed on
executor)

  I understand that when driver is started, only the jar is transported
to all the Workers.
  But i need to use the dynamically passed command line argument in the
reduce operation executed on executors.


Code Snippets for better understanding my problem :

public class KafkaReconcilationJob {

private static Logger logger =
Logger.getLogger(KafkaReconcilationJob.class);
 public static void main(String[] args) throws Exception {
  CommandLineArguments.CLICK_THRESHOLD =
Integer.parseInt(args[12]);
---> I want to use this
command line argument
}

}



JavaRDD adeAggregatedFilteredData =
adeAudGeoAggDataRdd.filter(new Function() {
@Override
public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception {
if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
return true;
}else {
return false;
}
}
});



The above mentioned Filter operation gets executed on executor which has 0
as the value of the static field CommandLineArguments.CLICK_THRESHOLD


Regards,
Gaurav