Custom SparkListener

2018-09-20 Thread Priya Ch
Hello All,

I am trying to extend SparkListener and post job ends trying to retrieve
job name to check the status of either success/failure and write to log
file.

I couldn't find a way where I could fetch job name in the onJobEnd method.

Thanks,
Padma CH


DirectFileOutputCommitter in Spark 2.3.1

2018-09-19 Thread Priya Ch
Hello Team,

I am trying to write a DataSet as parquet file in Append mode partitioned
by few columns. However since the job is time consuming, I would like to
enable DirectFileOutputCommitter (i.e by-passing the writes to temporary
folder).

Version of the spark i am using is 2.3.1.

Can someone please help in enabling the configuration which allows direct
write to S3 both in case of appending, writing new files and overwriting
the files.

Thanks,
Padma CH


Video analytics on SPark

2016-09-09 Thread Priya Ch
Hi All,

I have video surveillance data and this needs to be processed in Spark. I
am going through the Spark + OpenCV. How to load .mp4 images into an RDD ?
Can we directly do this or the video needs to be coverted to sequenceFile ?

Thanks,
Padma CH


Re: Send real-time alert using Spark

2016-07-12 Thread Priya Ch
I mean model training on incoming data is taken care by Spark. For detected
anomalies, need to send alert. Could we do this with Spark or any other
framework like Akka/REST API would do it ?

Thanks,
Padma CH

On Tue, Jul 12, 2016 at 7:30 PM, Marcin Tustin <mtus...@handybook.com>
wrote:

> Priya,
>
> You wouldn't necessarily "use spark" to send the alert. Spark is in an
> important sense one library among many. You can have your application use
> any other library available for your language to send the alert.
>
> Marcin
>
> On Tue, Jul 12, 2016 at 9:25 AM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> Hi All,
>>
>>  I am building Real-time Anomaly detection system where I am using
>> k-means to detect anomaly. Now in-order to send alert to mobile or an email
>> alert how do i send it using Spark itself ?
>>
>> Thanks,
>> Padma CH
>>
>
>
> Want to work at Handy? Check out our culture deck and open roles
> <http://www.handy.com/careers>
> Latest news <http://www.handy.com/press> at Handy
> Handy just raised $50m
> <http://venturebeat.com/2015/11/02/on-demand-home-service-handy-raises-50m-in-round-led-by-fidelity/>
>  led
> by Fidelity
>
>


Send real-time alert using Spark

2016-07-12 Thread Priya Ch
Hi All,

 I am building Real-time Anomaly detection system where I am using k-means
to detect anomaly. Now in-order to send alert to mobile or an email alert
how do i send it using Spark itself ?

Thanks,
Padma CH


Re: Spark Task failure with File segment length as negative

2016-07-06 Thread Priya Ch
Is anyone resolved this ?


Thanks,
Padma CH

On Wed, Jun 22, 2016 at 4:39 PM, Priya Ch <learnings.chitt...@gmail.com>
wrote:

> Hi All,
>
> I am running Spark Application with 1.8TB of data (which is stored in Hive
> tables format).  I am reading the data using HiveContect and processing it.
> The cluster has 5 nodes total, 25 cores per machine and 250Gb per node. I
> am launching the application with 25 executors with 5 cores each and 45GB
> per executor. Also, specified the property
> spark.yarn.executor.memoryOverhead=2024.
>
> During the execution, tasks are lost and ShuffleMapTasks are re-submitted.
> I am seeing that tasks are failing with the following message -
>
> *java.lang.IllegalArgumentException: requirement failed: File segment
> length cannot be negative (got -27045427)*
>
>
>
>
>
>
>
>
>
> * at scala.Predef$.require(Predef.scala:233)*
>
>
>
>
>
>
>
>
>
> * at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:220)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:184)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:398)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:206)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)*
>
>
>
>
>
>
>
>
>
> * at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)*
>
>
>
>
>
>
>
>
>
> * at org.apache.spark.scheduler.Task.run(Task.scala:89)*
>
>
>
>
>
>
>
>
>
> * at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)*
>
>
>
>
>
>
>
>
>
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>
>
>
>
>
>
>
>
>
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>
>
>
>
>
>
>
>
>
> I understood that its because the shuffle block is > 2G, the Int value is
> taking negative and throwing the above exeception.
>
> Can someone throw light on this ? What is the fix for this ?
>
> Thanks,
> Padma CH
>
>
>
>
>
>
>
>
>
>
>


Spark Task failure with File segment length as negative

2016-06-22 Thread Priya Ch
Hi All,

I am running Spark Application with 1.8TB of data (which is stored in Hive
tables format).  I am reading the data using HiveContect and processing it.
The cluster has 5 nodes total, 25 cores per machine and 250Gb per node. I
am launching the application with 25 executors with 5 cores each and 45GB
per executor. Also, specified the property
spark.yarn.executor.memoryOverhead=2024.

During the execution, tasks are lost and ShuffleMapTasks are re-submitted.
I am seeing that tasks are failing with the following message -

*java.lang.IllegalArgumentException: requirement failed: File segment
length cannot be negative (got -27045427)*









* at scala.Predef$.require(Predef.scala:233)*









* at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)*









* at
org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:220)*









* at
org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:184)*









* at
org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:398)*









* at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:206)*









* at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)*









* at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)*









* at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)*









* at org.apache.spark.scheduler.Task.run(Task.scala:89)*









* at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)*









* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*









* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*









I understood that its because the shuffle block is > 2G, the Int value is
taking negative and throwing the above exeception.

Can someone throw light on this ? What is the fix for this ?

Thanks,
Padma CH


Re: Spark Job Execution halts during shuffle...

2016-06-01 Thread Priya Ch
Hi
Can someone throw light on this. The issue is not frquently happening.
Sometimes the job halts with the above messages.

Regards,
Padma Ch

On Fri, May 27, 2016 at 8:47 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Priya:
> Have you checked the executor logs on hostname1 and hostname2 ?
>
> Cheers
>
> On Thu, May 26, 2016 at 8:00 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> If you get stuck in job fails, one of best practices is to increase
>> #partitions.
>> Also, you'd better off using DataFrame instread of RDD in terms of join
>> optimization.
>>
>> // maropu
>>
>>
>> On Thu, May 26, 2016 at 11:40 PM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> Hello Team,
>>>
>>>
>>>  I am trying to perform join 2 rdds where one is of size 800 MB and the
>>> other is 190 MB. During the join step, my job halts and I don't see
>>> progress in the execution.
>>>
>>> This is the message I see on console -
>>>
>>> INFO spark.MapOutputTrackerMasterEndPoint: Asked to send map output
>>> locations for shuffle 0 to :4
>>> INFO spark.MapOutputTrackerMasterEndPoint: Asked to send map output
>>> locations for shuffle 1 to :4
>>>
>>> After these messages, I dont see any progress. I am using Spark 1.6.0
>>> version and yarn scheduler (running in YARN client mode). My cluster
>>> configurations is - 3 node cluster (1 master and 2 slaves). Each slave has
>>> 1 TB hard disk space, 300GB memory and 32 cores.
>>>
>>> HDFS block size is 128 MB.
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


Spark Job Execution halts during shuffle...

2016-05-26 Thread Priya Ch
Hello Team,


 I am trying to perform join 2 rdds where one is of size 800 MB and the
other is 190 MB. During the join step, my job halts and I don't see
progress in the execution.

This is the message I see on console -

INFO spark.MapOutputTrackerMasterEndPoint: Asked to send map output
locations for shuffle 0 to :4
INFO spark.MapOutputTrackerMasterEndPoint: Asked to send map output
locations for shuffle 1 to :4

After these messages, I dont see any progress. I am using Spark 1.6.0
version and yarn scheduler (running in YARN client mode). My cluster
configurations is - 3 node cluster (1 master and 2 slaves). Each slave has
1 TB hard disk space, 300GB memory and 32 cores.

HDFS block size is 128 MB.

Thanks,
Padma Ch


Re: Cartesian join on RDDs taking too much time

2016-05-25 Thread Priya Ch
Hi,
  RDD A is of size 30MB and RDD B is of size 8 MB. Upon matching, we would
like to filter out the strings that have greater than 85% match and
generate a score for it which is used in the susbsequent calculations.

I tried generating pair rdd from both rdds A and B with same key for all
the records. Now performing A.join(B) is also resulting in huge execution
time..

How do I go about with map and reduce here ? To generate pairs from 2 rdds
I dont think map can be used because we cannot have rdd inside another rdd.

Would be glad if you can throw me some light on this.

Thanks,
Padma Ch

On Wed, May 25, 2016 at 7:39 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Solr or Elastic search provide much more functionality and are faster in
> this context. The decision for or against them depends on your current and
> future use cases. Your current use case is still very abstract so in order
> to get a more proper recommendation you need to provide more details
> including size of dataset, what you do with the result of the matching do
> you just need the match number or also the pairs in the results etc.
>
> Your concrete problem can also be solved in Spark (though it is not the
> best and most efficient tool for this, but it has other strength) using the
> map reduce steps. There are different ways to implement this (Generate
> pairs from the input datasets in the map step or (maybe less recommendable)
> broadcast the smaller dataset to all nodes and do the matching with the
> bigger dataset there.
> This highly depends on the data in your data set. How they compare in size
> etc.
>
>
>
> On 25 May 2016, at 13:27, Priya Ch <learnings.chitt...@gmail.com> wrote:
>
> Why do i need to deploy solr for text anaytics...i have files placed in
> HDFS. just need to look for matches against each string in both files and
> generate those records whose match is > 85%. We trying to Fuzzy match
> logic.
>
> How can use map/reduce operations across 2 rdds ?
>
> Thanks,
> Padma Ch
>
> On Wed, May 25, 2016 at 4:49 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>
>>
>> Alternatively depending on the exact use case you may employ solr on
>> Hadoop for text analytics
>>
>> > On 25 May 2016, at 12:57, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>> >
>> > Lets say i have rdd A of strings as  {"hi","bye","ch"} and another RDD
>> B of
>> > strings as {"padma","hihi","chch","priya"}. For every string rdd A i
>> need
>> > to check the matches found in rdd B as such for string "hi" i have to
>> check
>> > the matches against all strings in RDD B which means I need generate
>> every
>> > possible combination r
>>
>
>


Re: Cartesian join on RDDs taking too much time

2016-05-25 Thread Priya Ch
Why do i need to deploy solr for text anaytics...i have files placed in
HDFS. just need to look for matches against each string in both files and
generate those records whose match is > 85%. We trying to Fuzzy match
logic.

How can use map/reduce operations across 2 rdds ?

Thanks,
Padma Ch

On Wed, May 25, 2016 at 4:49 PM, Jörn Franke <jornfra...@gmail.com> wrote:

>
> Alternatively depending on the exact use case you may employ solr on
> Hadoop for text analytics
>
> > On 25 May 2016, at 12:57, Priya Ch <learnings.chitt...@gmail.com> wrote:
> >
> > Lets say i have rdd A of strings as  {"hi","bye","ch"} and another RDD B
> of
> > strings as {"padma","hihi","chch","priya"}. For every string rdd A i need
> > to check the matches found in rdd B as such for string "hi" i have to
> check
> > the matches against all strings in RDD B which means I need generate
> every
> > possible combination r
>


Re: Cartesian join on RDDs taking too much time

2016-05-25 Thread Priya Ch
Lets say i have rdd A of strings as  {"hi","bye","ch"} and another RDD B of
strings as {"padma","hihi","chch","priya"}. For every string rdd A i need
to check the matches found in rdd B as such for string "hi" i have to check
the matches against all strings in RDD B which means I need generate every
possible combination right.. Hence generating cartesian product and then
 using map transformation on cartesian rdd I am trying to check the matches
found.

Is there any better way I could do other than performaing cartesian. Till
now application took 30 mins and on top of that I see executor lost issues.

Thanks,
Padma Ch

On Wed, May 25, 2016 at 4:22 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> What is the use case of this ? A Cartesian product is by definition slow
> in any system. Why do you need this? How long does your application take
> now?
>
> On 25 May 2016, at 12:42, Priya Ch <learnings.chitt...@gmail.com> wrote:
>
> I tried
> dataframe.write.format("com.databricks.spark.csv").save("/hdfs_path"). Even
> this is taking too much time.
>
> Thanks,
> Padma Ch
>
> On Wed, May 25, 2016 at 3:47 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Why did you use Rdd#saveAsTextFile instead of DataFrame#save writing as
>> parquet, orc, ...?
>>
>> // maropu
>>
>> On Wed, May 25, 2016 at 7:10 PM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> Hi , Yes I have joined using DataFrame join. Now to save this into hdfs
>>> .I am converting the joined dataframe to rdd (dataframe.rdd) and using
>>> saveAsTextFile, trying to save it. However, this is also taking too much
>>> time.
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>> On Wed, May 25, 2016 at 1:32 PM, Takeshi Yamamuro <linguin@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> Seems you'd be better off using DataFrame#join instead of  RDD
>>>> .cartesian
>>>> because it always needs shuffle operations which have alot of
>>>> overheads such as reflection, serialization, ...
>>>> In your case,  since the smaller table is 7mb, DataFrame#join uses a
>>>> broadcast strategy.
>>>> This is a little more efficient than  RDD.cartesian.
>>>>
>>>> // maropu
>>>>
>>>> On Wed, May 25, 2016 at 4:20 PM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> It is basically a Cartesian join like RDBMS
>>>>>
>>>>> Example:
>>>>>
>>>>> SELECT * FROM FinancialCodes,  FinancialData
>>>>>
>>>>> The results of this query matches every row in the FinancialCodes
>>>>> table with every row in the FinancialData table.  Each row consists
>>>>> of all columns from the FinancialCodes table followed by all columns from
>>>>> the FinancialData table.
>>>>>
>>>>>
>>>>> Not very useful
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 25 May 2016 at 08:05, Priya Ch <learnings.chitt...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>>   I have two RDDs A and B where in A is of size 30 MB and B is of
>>>>>> size 7 MB, A.cartesian(B) is taking too much time. Is there any 
>>>>>> bottleneck
>>>>>> in cartesian operation ?
>>>>>>
>>>>>> I am using spark 1.6.0 version
>>>>>>
>>>>>> Regards,
>>>>>> Padma Ch
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


Re: Cartesian join on RDDs taking too much time

2016-05-25 Thread Priya Ch
I tried
dataframe.write.format("com.databricks.spark.csv").save("/hdfs_path"). Even
this is taking too much time.

Thanks,
Padma Ch

On Wed, May 25, 2016 at 3:47 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Why did you use Rdd#saveAsTextFile instead of DataFrame#save writing as
> parquet, orc, ...?
>
> // maropu
>
> On Wed, May 25, 2016 at 7:10 PM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> Hi , Yes I have joined using DataFrame join. Now to save this into hdfs
>> .I am converting the joined dataframe to rdd (dataframe.rdd) and using
>> saveAsTextFile, trying to save it. However, this is also taking too much
>> time.
>>
>> Thanks,
>> Padma Ch
>>
>> On Wed, May 25, 2016 at 1:32 PM, Takeshi Yamamuro <linguin@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Seems you'd be better off using DataFrame#join instead of  RDD.cartesian
>>> because it always needs shuffle operations which have alot of overheads
>>> such as reflection, serialization, ...
>>> In your case,  since the smaller table is 7mb, DataFrame#join uses a
>>> broadcast strategy.
>>> This is a little more efficient than  RDD.cartesian.
>>>
>>> // maropu
>>>
>>> On Wed, May 25, 2016 at 4:20 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> It is basically a Cartesian join like RDBMS
>>>>
>>>> Example:
>>>>
>>>> SELECT * FROM FinancialCodes,  FinancialData
>>>>
>>>> The results of this query matches every row in the FinancialCodes table
>>>> with every row in the FinancialData table.  Each row consists of all
>>>> columns from the FinancialCodes table followed by all columns from the
>>>> FinancialData table.
>>>>
>>>>
>>>> Not very useful
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 25 May 2016 at 08:05, Priya Ch <learnings.chitt...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>>   I have two RDDs A and B where in A is of size 30 MB and B is of size
>>>>> 7 MB, A.cartesian(B) is taking too much time. Is there any bottleneck in
>>>>> cartesian operation ?
>>>>>
>>>>> I am using spark 1.6.0 version
>>>>>
>>>>> Regards,
>>>>> Padma Ch
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Cartesian join on RDDs taking too much time

2016-05-25 Thread Priya Ch
Hi , Yes I have joined using DataFrame join. Now to save this into hdfs .I
am converting the joined dataframe to rdd (dataframe.rdd) and using
saveAsTextFile, trying to save it. However, this is also taking too much
time.

Thanks,
Padma Ch

On Wed, May 25, 2016 at 1:32 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> Seems you'd be better off using DataFrame#join instead of  RDD.cartesian
> because it always needs shuffle operations which have alot of overheads
> such as reflection, serialization, ...
> In your case,  since the smaller table is 7mb, DataFrame#join uses a
> broadcast strategy.
> This is a little more efficient than  RDD.cartesian.
>
> // maropu
>
> On Wed, May 25, 2016 at 4:20 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> It is basically a Cartesian join like RDBMS
>>
>> Example:
>>
>> SELECT * FROM FinancialCodes,  FinancialData
>>
>> The results of this query matches every row in the FinancialCodes table
>> with every row in the FinancialData table.  Each row consists of all
>> columns from the FinancialCodes table followed by all columns from the
>> FinancialData table.
>>
>>
>> Not very useful
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 25 May 2016 at 08:05, Priya Ch <learnings.chitt...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>>   I have two RDDs A and B where in A is of size 30 MB and B is of size 7
>>> MB, A.cartesian(B) is taking too much time. Is there any bottleneck in
>>> cartesian operation ?
>>>
>>> I am using spark 1.6.0 version
>>>
>>> Regards,
>>> Padma Ch
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Cartesian join on RDDs taking too much time

2016-05-25 Thread Priya Ch
Hi All,

  I have two RDDs A and B where in A is of size 30 MB and B is of size 7
MB, A.cartesian(B) is taking too much time. Is there any bottleneck in
cartesian operation ?

I am using spark 1.6.0 version

Regards,
Padma Ch


Need Streaming output to single HDFS File

2016-04-12 Thread Priya Ch
Hi All,

  I am working with Kafka, Spark Streaming and I want to write the
streaming output to a single file. dstream.saveAsTexFiles() is creating
files in different folders. Is there a way to write to a single folder ? or
else if written to different folders, how do I merge them ?
Thanks,
Padma Ch


Streaming k-means visualization

2016-04-08 Thread Priya Ch
Hi All,

 I am using Streaming k-means to train my model on streaming data. Now I
want to visualize the clusters. What would be the reporting tool used for
this ? Would zeppelin used to visualize the clusters

Regards,
Padma Ch


Re: Why KMeans with mllib is so slow ?

2016-03-14 Thread Priya Ch
Hi Xi Shen,

  Changing the initialization step from "kmeans||" to "random" decreased
the execution time from 2 hrs to 6 min. However, by default the no.of runs
is 1. If I try to set the number of runs to 10, then again see increase in
job execution time.

How to proceed on this ?.

By the way how is this initialization mode "random" different from
"k-means||" ?


Regards,
Padma Ch



On Sun, Mar 13, 2016 at 12:37 PM, Xi Shen  wrote:

> Hi Chitturi,
>
> Please checkout
> https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/mllib/clustering/KMeans.html#setInitializationSteps(int
> ).
>
> I think it is caused by the initialization step. the "kmeans||" method
> does not initialize dataset in parallel. If your dataset is large, it takes
> a long time to initialize. Just changed to "random".
>
> Hope it helps.
>
>
> On Sun, Mar 13, 2016 at 2:58 PM Chitturi Padma <
> learnings.chitt...@gmail.com> wrote:
>
>> Hi All,
>>
>>   I  am facing the same issue. taking k values from 60 to 120
>> incrementing by 10 each time i.e k takes value 60,70,80,...120 the
>> algorithm takes around 2.5h on a 800 MB data set with 38 dimensions.
>> On Sun, Mar 29, 2015 at 9:34 AM, davidshen84 [via Apache Spark User List]
>> <[hidden email] >
>> wrote:
>>
>>> Hi Jao,
>>>
>>> Sorry to pop up this old thread. I am have the same problem like you
>>> did. I want to know if you have figured out how to improve k-means on
>>> Spark.
>>>
>>> I am using Spark 1.2.0. My data set is about 270k vectors, each has
>>> about 350 dimensions. If I set k=500, the job takes about 3hrs on my
>>> cluster. The cluster has 7 executors, each has 8 cores...
>>>
>>> If I set k=5000 which is the required value for my task, the job goes on
>>> forever...
>>>
>>>
>>> Thanks,
>>> David
>>>
>>>
>>> --
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-KMeans-with-mllib-is-so-slow-tp20480p22273.html
>>>
>> To start a new topic under Apache Spark User List, email [hidden email]
>>> 
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>> 
>>>
>>
>>
>> --
>> View this message in context: Re: Why KMeans with mllib is so slow ?
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
> --
>
> Regards,
> David
>


BlockFetchFailed Exception

2016-03-11 Thread Priya Ch
Hi All,

  I am trying to run spark k-means on a data set which is closely to 1 GB.
Most often I seen BlockFetchFailed Exception which I am suspecting because
of Out of memory.

Here the configuration details-
Total cores:12
Total workers:3
Memory per node: 6GB

When running the job, I an giving the following parameters as
 --num-executors 3 --executor-cores 3 --executor-memory 5G

As the hdfs file has 9 blocks, there would be 9 partitions i.e 9 tasks (one
task need 1 core each) hence total cores allocated across all the executors
is 9.

Here the details of the exception:
6/03/11 14:13:27 ERROR shuffle.OneForOneBlockFetcher: Failed while starting
block fetches
java.io.IOException: Connection from pg-poc-02/10.10.10.92:44231 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)


Spark mllin k-means taking too much time

2016-03-03 Thread Priya Ch
Hi Team,

   I am running k-means algorithm on KDD 1999 data set (
http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html). I am running the
algorithm for different values of  k as such - 5,10,15,40. The data set
is 709 MB. I have placed the file in hdfs with a block size of 128MB (6
blocks).

The  cluster is of 4 nodes - (1 Master and 3 workers) . The total worker
memory and cores available are 20 GB and 12. When k value is 2 (default),
the job takes 4.2 min.

when k takes values as 5,10,15,40 the total duration is 42 min, which
is very huge. Are there any optimizations need to be done ?

I continuously see the following on the logs:
02)
16/03/03 15:58:28 INFO scheduler.TaskSchedulerImpl: Adding task set 316.0
with 6 tasks
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
316.0 (TID 1898, pg-poc-04, partition 0,NODE_LOCAL, 1967 bytes)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Starting task 5.0 in stage
316.0 (TID 1899, pg-poc-02, partition 5,NODE_LOCAL, 1967 bytes)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
316.0 (TID 1900, pg-poc-04, partition 1,NODE_LOCAL, 1967 bytes)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
316.0 (TID 1901, pg-poc-04, partition 2,NODE_LOCAL, 1967 bytes)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
316.0 (TID 1902, pg-poc-04, partition 3,NODE_LOCAL, 1967 bytes)
16/03/03 15:58:28 INFO storage.BlockManagerInfo: Added broadcast_471_piece0
in memory on pg-poc-02:53774 (size: 1680.0 B, free: 177.1 MB)
16/03/03 15:58:28 INFO storage.BlockManagerInfo: Added broadcast_471_piece0
in memory on pg-poc-04:32800 (size: 1680.0 B, free: 333.9 MB)
16/03/03 15:58:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 106 to pg-poc-02:51863
16/03/03 15:58:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 106 to pg-poc-04:42511
16/03/03 15:58:28 INFO spark.MapOutputTrackerMaster: Size of output
statuses for shuffle 106 is 209 bytes
16/03/03 15:58:28 INFO spark.MapOutputTrackerMaster: Size of output
statuses for shuffle 106 is 209 bytes
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Finished task 5.0 in stage
316.0 (TID 1899) in 21 ms on pg-poc-02 (1/6)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Starting task 4.0 in stage
316.0 (TID 1903, pg-poc-04, partition 4,NODE_LOCAL, 1967 bytes)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Finished task 3.0 in stage
316.0 (TID 1902) in 22 ms on pg-poc-04 (2/6)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Finished task 2.0 in stage
316.0 (TID 1901) in 23 ms on pg-poc-04 (3/6)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
316.0 (TID 1900) in 23 ms on pg-poc-04 (4/6)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
316.0 (TID 1898) in 24 ms on pg-poc-04 (5/6)
16/03/03 15:58:28 INFO scheduler.TaskSetManager: Finished task 4.0 in stage
316.0 (TID 1903) in 7 ms on pg-poc-04 (6/6)
16/03/03 15:58:28 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 316.0,
whose tasks have all completed, from pool
16/03/03 15:58:28 INFO scheduler.DAGScheduler: ResultStage 316
(collectAsMap at KMeans.scala:302) finished in 0.030 s
16/03/03 15:58:28 INFO scheduler.DAGScheduler: Job 209 finished:
collectAsMap at KMeans.scala:302, took 11.753212 s
16/03/03 15:58:28 INFO clustering.KMeans: Iterations took 205.479 seconds.
16/03/03 15:58:28 INFO clustering.KMeans: KMeans reached the max number of
iterations: 20.
16/03/03 15:58:28 INFO clustering.KMeans: The cost for the best run is
2.340836927489414E13.
16/03/03 15:58:28 INFO rdd.MapPartitionsRDD: Removing RDD 365 from
persistence list
16/03/03 15:58:28 INFO storage.BlockManager: Removing RDD 365
16/03/03 15:58:28 INFO storage.MemoryStore: Block broadcast_472 stored as
values in memory (estimated size 13.3 KB, free 588.2 KB)
16/03/03 15:58:28 INFO storage.MemoryStore: Block broadcast_472_piece0
stored as bytes in memory (estimated size 5.6 KB, free 593.8 KB)
16/03/03 15:58:28 INFO storage.BlockManagerInfo: Added broadcast_472_piece0
in memory on 10.10.10.90:56180 (size: 5.6 KB, free: 511.4 MB)
16/03/03 15:58:28 INFO spark.SparkContext: Created broadcast 472 from
broadcast at AnomalyDetection.scala:34
16/03/03 15:58:28 INFO spark.SparkContext: Starting job: mean at
AnomalyDetection.scala:35
16/03/03 15:58:28 INFO scheduler.DAGScheduler: Got job 210 (mean at
AnomalyDetection.scala:35) with 6 output partitions
16/03/03 15:58:28 INFO scheduler.DAGScheduler: Final stage: ResultStage 317
(mean at AnomalyDetection.scala:35)
16/03/03 15:58:28 INFO scheduler.DAGScheduler: Parents of final stage:
List()
16/03/03 15:58:28 INFO scheduler.DAGScheduler: Missing parents: List()
16/03/03 15:58:28 INFO scheduler.DAGScheduler: Submitting ResultStage 317
(MapPartitionsRDD[433] at mean at AnomalyDetection.scala:35), which has no
missing parents
16/03/03 15:58:28 INFO 

Spark Mllib kmeans execution

2016-03-02 Thread Priya Ch
Hi All,

  I am running k-means clustering algorithm. Now, when I am running the
algorithm as -

val conf = new SparkConf
val sc = new SparkContext(conf)
.
.
val kmeans = new KMeans()
val model = kmeans.run(RDD[Vector])
.
.
.
The 'kmeans' object gets created on driver. Now does *kmeans.run() *get
executed on each partition of the rdd in distributed fashion or else does
the entire RDD is brought to driver and then gets executed at the driver on
the entire RDD ??

Thanks,
Padma Ch


Real time anomaly system

2016-03-01 Thread Priya Ch
Hi,

  I am trying to build real time anomaly detection system using Spark,
kafka, Cassandra and Akka. I have network intrusion dataset (KDD 1999 cup).
how can i build the system using this ? I understood that certain part of
the data, I am considering as historical data for my model training and
other data, I would simulate as stream of data coming through kafka.

Should I use spark streaming for re-training the model on incoming stream ?

How can I use Akka in this for alerting purpose ?


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-06 Thread Priya Ch
Running 'lsof' will let us know the open files but how do we come to know
the root cause behind opening too many files.

Thanks,
Padma CH

On Wed, Jan 6, 2016 at 8:39 AM, Hamel Kothari <hamelkoth...@gmail.com>
wrote:

> The "Too Many Files" part of the exception is just indicative of the fact
> that when that call was made, too many files were already open. It doesn't
> necessarily mean that that line is the source of all of the open files,
> that's just the point at which it hit its limit.
>
> What I would recommend is to try to run this code again and use "lsof" on
> one of the spark executors (perhaps run it in a for loop, writing the
> output to separate files) until it fails and see which files are being
> opened, if there's anything that seems to be taking up a clear majority
> that might key you in on the culprit.
>
> On Tue, Jan 5, 2016 at 9:48 PM Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> Yes, the fileinputstream is closed. May be i didn't show in the screen
>> shot .
>>
>> As spark implements, sort-based shuffle, there is a parameter called
>> maximum merge factor which decides the number of files that can be merged
>> at once and this avoids too many open files. I am suspecting that it is
>> something related to this.
>>
>> Can someone confirm on this ?
>>
>> On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo <
>> melongo_anna...@yahoo.com> wrote:
>>
>>> Vijay,
>>>
>>> Are you closing the fileinputstream at the end of each loop (
>>> in.close())? My guess is those streams aren't close and thus the "too many
>>> open files" exception.
>>>
>>>
>>> On Tuesday, January 5, 2016 8:03 AM, Priya Ch <
>>> learnings.chitt...@gmail.com> wrote:
>>>
>>>
>>> Can some one throw light on this ?
>>>
>>> Regards,
>>> Padma Ch
>>>
>>> On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch <learnings.chitt...@gmail.com>
>>> wrote:
>>>
>>> Chris, we are using spark 1.3.0 version. we have not set  
>>> spark.streaming.concurrentJobs
>>> this parameter. It takes the default value.
>>>
>>> Vijay,
>>>
>>>   From the tack trace it is evident that 
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
>>> is throwing the exception. I opened the spark source code and visited the
>>> line which is throwing this exception i.e
>>>
>>> [image: Inline image 1]
>>>
>>> The lie which is marked in red is throwing the exception. The file is
>>> ExternalSorter.scala in org.apache.spark.util.collection package.
>>>
>>> i went through the following blog
>>> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
>>> and understood that there is merge factor which decide the number of
>>> on-disk files that could be merged. Is it some way related to this ?
>>>
>>> Regards,
>>> Padma CH
>>>
>>> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>
>>> and which version of Spark/Spark Streaming are you using?
>>>
>>> are you explicitly setting the spark.streaming.concurrentJobs to
>>> something larger than the default of 1?
>>>
>>> if so, please try setting that back to 1 and see if the problem still
>>> exists.
>>>
>>> this is a dangerous parameter to modify from the default - which is why
>>> it's not well-documented.
>>>
>>>
>>> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge <vijay.gha...@gmail.com>
>>> wrote:
>>>
>>> Few indicators -
>>>
>>> 1) during execution time - check total number of open files using lsof
>>> command. Need root permissions. If it is cluster not sure much !
>>> 2) which exact line in the code is triggering this error ? Can you paste
>>> that snippet ?
>>>
>>>
>>> On Wednesday 23 December 2015, Priya Ch <learnings.chitt...@gmail.com>
>>> wrote:
>>>
>>> ulimit -n 65000
>>>
>>> fs.file-max = 65000 ( in etc/sysctl.conf file)
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma <yash...@gmail.com> wrote:
>>>
>>> Could you share the ulimit for your setup please ?
>>> - Thanks, via mobile,  excuse brevity.
>>> On Dec 22, 2015 6:39 PM

Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-06 Thread Priya Ch
The line of code which I highlighted in the screenshot is within the spark
source code. Spark implements sort-based shuffle implementation and the
spilled files are merged using the merge sort.

Here is the link
https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf
which would convey the same.

On Wed, Jan 6, 2016 at 8:19 PM, Annabel Melongo <melongo_anna...@yahoo.com>
wrote:

> Priya,
>
> It would be helpful if you put the entire trace log along with your code
> to help determine the root cause of the error.
>
> Thanks
>
>
> On Wednesday, January 6, 2016 4:00 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>
> Running 'lsof' will let us know the open files but how do we come to know
> the root cause behind opening too many files.
>
> Thanks,
> Padma CH
>
> On Wed, Jan 6, 2016 at 8:39 AM, Hamel Kothari <hamelkoth...@gmail.com>
> wrote:
>
> The "Too Many Files" part of the exception is just indicative of the fact
> that when that call was made, too many files were already open. It doesn't
> necessarily mean that that line is the source of all of the open files,
> that's just the point at which it hit its limit.
>
> What I would recommend is to try to run this code again and use "lsof" on
> one of the spark executors (perhaps run it in a for loop, writing the
> output to separate files) until it fails and see which files are being
> opened, if there's anything that seems to be taking up a clear majority
> that might key you in on the culprit.
>
> On Tue, Jan 5, 2016 at 9:48 PM Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
> Yes, the fileinputstream is closed. May be i didn't show in the screen
> shot .
>
> As spark implements, sort-based shuffle, there is a parameter called
> maximum merge factor which decides the number of files that can be merged
> at once and this avoids too many open files. I am suspecting that it is
> something related to this.
>
> Can someone confirm on this ?
>
> On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo <
> melongo_anna...@yahoo.com> wrote:
>
> Vijay,
>
> Are you closing the fileinputstream at the end of each loop ( in.close())?
> My guess is those streams aren't close and thus the "too many open files"
> exception.
>
>
> On Tuesday, January 5, 2016 8:03 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>
> Can some one throw light on this ?
>
> Regards,
> Padma Ch
>
> On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
> Chris, we are using spark 1.3.0 version. we have not set  
> spark.streaming.concurrentJobs
> this parameter. It takes the default value.
>
> Vijay,
>
>   From the tack trace it is evident that 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
> is throwing the exception. I opened the spark source code and visited the
> line which is throwing this exception i.e
>
> [image: Inline image 1]
>
> The lie which is marked in red is throwing the exception. The file is
> ExternalSorter.scala in org.apache.spark.util.collection package.
>
> i went through the following blog
> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
> and understood that there is merge factor which decide the number of
> on-disk files that could be merged. Is it some way related to this ?
>
> Regards,
> Padma CH
>
> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly <ch...@fregly.com> wrote:
>
> and which version of Spark/Spark Streaming are you using?
>
> are you explicitly setting the spark.streaming.concurrentJobs to
> something larger than the default of 1?
>
> if so, please try setting that back to 1 and see if the problem still
> exists.
>
> this is a dangerous parameter to modify from the default - which is why
> it's not well-documented.
>
>
> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge <vijay.gha...@gmail.com>
> wrote:
>
> Few indicators -
>
> 1) during execution time - check total number of open files using lsof
> command. Need root permissions. If it is cluster not sure much !
> 2) which exact line in the code is triggering this error ? Can you paste
> that snippet ?
>
>
> On Wednesday 23 December 2015, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
> ulimit -n 65000
>
> fs.file-max = 65000 ( in etc/sysctl.conf file)
>
> Thanks,
> Padma Ch
>
> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma <yash...@gmail.com> wrote:
>
> Could you share the ulimit for your setup please ?
> - Thanks, via mobile,  excuse bre

Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-05 Thread Priya Ch
Yes, the fileinputstream is closed. May be i didn't show in the screen shot
.

As spark implements, sort-based shuffle, there is a parameter called
maximum merge factor which decides the number of files that can be merged
at once and this avoids too many open files. I am suspecting that it is
something related to this.

Can someone confirm on this ?

On Tue, Jan 5, 2016 at 11:19 PM, Annabel Melongo <melongo_anna...@yahoo.com>
wrote:

> Vijay,
>
> Are you closing the fileinputstream at the end of each loop ( in.close())?
> My guess is those streams aren't close and thus the "too many open files"
> exception.
>
>
> On Tuesday, January 5, 2016 8:03 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>
> Can some one throw light on this ?
>
> Regards,
> Padma Ch
>
> On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
> Chris, we are using spark 1.3.0 version. we have not set  
> spark.streaming.concurrentJobs
> this parameter. It takes the default value.
>
> Vijay,
>
>   From the tack trace it is evident that 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
> is throwing the exception. I opened the spark source code and visited the
> line which is throwing this exception i.e
>
> [image: Inline image 1]
>
> The lie which is marked in red is throwing the exception. The file is
> ExternalSorter.scala in org.apache.spark.util.collection package.
>
> i went through the following blog
> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
> and understood that there is merge factor which decide the number of
> on-disk files that could be merged. Is it some way related to this ?
>
> Regards,
> Padma CH
>
> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly <ch...@fregly.com> wrote:
>
> and which version of Spark/Spark Streaming are you using?
>
> are you explicitly setting the spark.streaming.concurrentJobs to
> something larger than the default of 1?
>
> if so, please try setting that back to 1 and see if the problem still
> exists.
>
> this is a dangerous parameter to modify from the default - which is why
> it's not well-documented.
>
>
> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge <vijay.gha...@gmail.com>
> wrote:
>
> Few indicators -
>
> 1) during execution time - check total number of open files using lsof
> command. Need root permissions. If it is cluster not sure much !
> 2) which exact line in the code is triggering this error ? Can you paste
> that snippet ?
>
>
> On Wednesday 23 December 2015, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
> ulimit -n 65000
>
> fs.file-max = 65000 ( in etc/sysctl.conf file)
>
> Thanks,
> Padma Ch
>
> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma <yash...@gmail.com> wrote:
>
> Could you share the ulimit for your setup please ?
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 6:39 PM, "Priya Ch" <learnings.chitt...@gmail.com> wrote:
>
> Jakob,
>
>Increased the settings like fs.file-max in /etc/sysctl.conf and also
> increased user limit in /etc/security/limits.conf. But still see the same
> issue.
>
> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky <joder...@gmail.com>
> wrote:
>
> It might be a good idea to see how many files are open and try increasing
> the open file limit (this is done on an os level). In some application
> use-cases it is actually a legitimate need.
>
> If that doesn't help, make sure you close any unused files and streams in
> your code. It will also be easier to help diagnose the issue if you send an
> error-reproducing snippet.
>
>
>
>
>
> --
> Regards,
> Vijay Gharge
>
>
>
>
>
>
> --
>
> *Chris Fregly*
> Principal Data Solutions Engineer
> IBM Spark Technology Center, San Francisco, CA
> http://spark.tc | http://advancedspark.com
>
>
>
>
>
>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2016-01-05 Thread Priya Ch
Can some one throw light on this ?

Regards,
Padma Ch

On Mon, Dec 28, 2015 at 3:59 PM, Priya Ch <learnings.chitt...@gmail.com>
wrote:

> Chris, we are using spark 1.3.0 version. we have not set  
> spark.streaming.concurrentJobs
> this parameter. It takes the default value.
>
> Vijay,
>
>   From the tack trace it is evident that 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
> is throwing the exception. I opened the spark source code and visited the
> line which is throwing this exception i.e
>
> [image: Inline image 1]
>
> The lie which is marked in red is throwing the exception. The file is
> ExternalSorter.scala in org.apache.spark.util.collection package.
>
> i went through the following blog
> http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
> and understood that there is merge factor which decide the number of
> on-disk files that could be merged. Is it some way related to this ?
>
> Regards,
> Padma CH
>
> On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly <ch...@fregly.com> wrote:
>
>> and which version of Spark/Spark Streaming are you using?
>>
>> are you explicitly setting the spark.streaming.concurrentJobs to
>> something larger than the default of 1?
>>
>> if so, please try setting that back to 1 and see if the problem still
>> exists.
>>
>> this is a dangerous parameter to modify from the default - which is why
>> it's not well-documented.
>>
>>
>> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge <vijay.gha...@gmail.com>
>> wrote:
>>
>>> Few indicators -
>>>
>>> 1) during execution time - check total number of open files using lsof
>>> command. Need root permissions. If it is cluster not sure much !
>>> 2) which exact line in the code is triggering this error ? Can you paste
>>> that snippet ?
>>>
>>>
>>> On Wednesday 23 December 2015, Priya Ch <learnings.chitt...@gmail.com>
>>> wrote:
>>>
>>>> ulimit -n 65000
>>>>
>>>> fs.file-max = 65000 ( in etc/sysctl.conf file)
>>>>
>>>> Thanks,
>>>> Padma Ch
>>>>
>>>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma <yash...@gmail.com> wrote:
>>>>
>>>>> Could you share the ulimit for your setup please ?
>>>>>
>>>>> - Thanks, via mobile,  excuse brevity.
>>>>> On Dec 22, 2015 6:39 PM, "Priya Ch" <learnings.chitt...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Jakob,
>>>>>>
>>>>>>Increased the settings like fs.file-max in /etc/sysctl.conf and
>>>>>> also increased user limit in /etc/security/limits.conf. But still
>>>>>> see the same issue.
>>>>>>
>>>>>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky <joder...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> It might be a good idea to see how many files are open and try
>>>>>>> increasing the open file limit (this is done on an os level). In some
>>>>>>> application use-cases it is actually a legitimate need.
>>>>>>>
>>>>>>> If that doesn't help, make sure you close any unused files and
>>>>>>> streams in your code. It will also be easier to help diagnose the issue 
>>>>>>> if
>>>>>>> you send an error-reproducing snippet.
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>> --
>>> Regards,
>>> Vijay Gharge
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> *Chris Fregly*
>> Principal Data Solutions Engineer
>> IBM Spark Technology Center, San Francisco, CA
>> http://spark.tc | http://advancedspark.com
>>
>
>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-28 Thread Priya Ch
Chris, we are using spark 1.3.0 version. we have not set
spark.streaming.concurrentJobs
this parameter. It takes the default value.

Vijay,

  From the tack trace it is evident that
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
is throwing the exception. I opened the spark source code and visited the
line which is throwing this exception i.e

[image: Inline image 1]

The lie which is marked in red is throwing the exception. The file is
ExternalSorter.scala in org.apache.spark.util.collection package.

i went through the following blog
http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
and understood that there is merge factor which decide the number of
on-disk files that could be merged. Is it some way related to this ?

Regards,
Padma CH

On Fri, Dec 25, 2015 at 7:51 PM, Chris Fregly <ch...@fregly.com> wrote:

> and which version of Spark/Spark Streaming are you using?
>
> are you explicitly setting the spark.streaming.concurrentJobs to
> something larger than the default of 1?
>
> if so, please try setting that back to 1 and see if the problem still
> exists.
>
> this is a dangerous parameter to modify from the default - which is why
> it's not well-documented.
>
>
> On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge <vijay.gha...@gmail.com>
> wrote:
>
>> Few indicators -
>>
>> 1) during execution time - check total number of open files using lsof
>> command. Need root permissions. If it is cluster not sure much !
>> 2) which exact line in the code is triggering this error ? Can you paste
>> that snippet ?
>>
>>
>> On Wednesday 23 December 2015, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> ulimit -n 65000
>>>
>>> fs.file-max = 65000 ( in etc/sysctl.conf file)
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma <yash...@gmail.com> wrote:
>>>
>>>> Could you share the ulimit for your setup please ?
>>>>
>>>> - Thanks, via mobile,  excuse brevity.
>>>> On Dec 22, 2015 6:39 PM, "Priya Ch" <learnings.chitt...@gmail.com>
>>>> wrote:
>>>>
>>>>> Jakob,
>>>>>
>>>>>Increased the settings like fs.file-max in /etc/sysctl.conf and
>>>>> also increased user limit in /etc/security/limits.conf. But still see
>>>>> the same issue.
>>>>>
>>>>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky <joder...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> It might be a good idea to see how many files are open and try
>>>>>> increasing the open file limit (this is done on an os level). In some
>>>>>> application use-cases it is actually a legitimate need.
>>>>>>
>>>>>> If that doesn't help, make sure you close any unused files and
>>>>>> streams in your code. It will also be easier to help diagnose the issue 
>>>>>> if
>>>>>> you send an error-reproducing snippet.
>>>>>>
>>>>>
>>>>>
>>>
>>
>> --
>> Regards,
>> Vijay Gharge
>>
>>
>>
>>
>
>
> --
>
> *Chris Fregly*
> Principal Data Solutions Engineer
> IBM Spark Technology Center, San Francisco, CA
> http://spark.tc | http://advancedspark.com
>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-23 Thread Priya Ch
ulimit -n 65000

fs.file-max = 65000 ( in etc/sysctl.conf file)

Thanks,
Padma Ch

On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma <yash...@gmail.com> wrote:

> Could you share the ulimit for your setup please ?
>
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 6:39 PM, "Priya Ch" <learnings.chitt...@gmail.com> wrote:
>
>> Jakob,
>>
>>Increased the settings like fs.file-max in /etc/sysctl.conf and also
>> increased user limit in /etc/security/limits.conf. But still see the
>> same issue.
>>
>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky <joder...@gmail.com>
>> wrote:
>>
>>> It might be a good idea to see how many files are open and try
>>> increasing the open file limit (this is done on an os level). In some
>>> application use-cases it is actually a legitimate need.
>>>
>>> If that doesn't help, make sure you close any unused files and streams
>>> in your code. It will also be easier to help diagnose the issue if you send
>>> an error-reproducing snippet.
>>>
>>
>>


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-22 Thread Priya Ch
Jakob,

   Increased the settings like fs.file-max in /etc/sysctl.conf and also
increased user limit in /etc/security/limits.conf. But still see the same
issue.

On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky  wrote:

> It might be a good idea to see how many files are open and try increasing
> the open file limit (this is done on an os level). In some application
> use-cases it is actually a legitimate need.
>
> If that doesn't help, make sure you close any unused files and streams in
> your code. It will also be easier to help diagnose the issue if you send an
> error-reproducing snippet.
>


java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-17 Thread Priya Ch
Hi All,


  When running streaming application, I am seeing the below error:


java.io.FileNotFoundException:
/data1/yarn/nm/usercache/root/appcache/application_1450172646510_0004/blockmgr-a81f42cd-6b52-4704-83f3-2cfc12a11b86/02/temp_shuffle_589ddccf-d436-4d2c-9935-e5f8c137b54b
(Too many open files)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.(FileInputStream.java:146)

at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:729)

at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

It looks like the issue is because in a multi-threaded application, there
are too many file handlers and this has reached maximum number of file
handles.

Regards,
Padma Ch


Inconsistent data in Cassandra

2015-12-06 Thread Priya Ch
Hi All,

  I have the following scenario in writing rows to Cassandra from Spark
Streaming -

in a 1 sec batch, I have 3 tickets with same ticket number (primary key)
but with different envelope numbers (i.e envelope 1, envelope 2, envelope
3.) I am writing these messages to Cassandra using saveTocassandra. Now if
I verify the C* DB, I see that some rows are updated by envelope 1 and
other rows by envelope 3 which is nothing but inconsistent rows. Ideally
all the rows must contain data of envelope 3.
I have not set any parameters such as-
spark.cassandra.output.batch.size.rows
spark.cassandra.output.batch.buffer.size
spark.cassandra.output.consurrent.writes

What would be the default values for these ?

Can someone throw light on the issue  ?

Regards,
Padma Ch


Spark Streaming Use Cases

2015-12-02 Thread Priya Ch
Hi All,

  I have the following use case for Spark Streaming -
There are 2 streams of data  say - FlightBookings and Ticket

For each ticket, I need to associate it with relevant Booking info. There
are distinct applications for Booking and Ticket. The Booking streaming
application processes the incoming bookings and writes the data to NoSQL
store (Cassandra).

The Ticket application process the tickets, queries for Booking info and
associates with relevant data and writes the processed results back.

In this flow for each ticket message there are atleast 20 reads on DB store
for Booking info.

Is there a better way of doing this ?? What is the probable approach to
address problems that need association between different streams of data
when these streams have altogether different network latency of arrival ?

--Padma CH


Subtract on rdd2 is throwing below exception

2015-11-05 Thread Priya Ch
Hi All,


 I am seeing exception when trying to substract 2 rdds.

 Lets say rdd1 has messages like -

*  pnr,  bookingId,  BookingObject*
 101,   1,   BookingObject1 // - event number is 0
 102,   1,   BookingObject2// - event number is 0
 103,   2,   BookingObject3//-event number is  1

rdd1 looks like RDD1[(String,Int,Booking)].

Booking table in Cassandra has primary key as pnr and bookingId.
Lets say Booking table has following rows-

*pnr,  bookingId, eventNumber*
Row1 -  101,   1,  1
Row2 -  103,   2,  0

RDD1.joinWithCassandraTable on columns pnr and bookingId with Booking table
is giving me the following CassandraJoinRDD -

(101, 1, BookingObject1), Row1
(103, 2, BookingObject3), Row2

Now on this rdd, I am comparing event number of BookinObject against
eventNumber in the row and filter the messages whose eventNUmber is greater
than that of in the row - which gives the following Rdd

val RDD2:RDD[(String,Int,BookingObject), CassandraRow] contains the below
record

(102, 2, BookingObject3), Row2.

But I also need pnr 102 from the original rdd as it is not existing in DB.
Hence to get such messages - I am CassandraJoinRDD from original RDD i.e
RDD1 as

val mappedCRdd= CassandraJoinRDD.map{case(tuple, row) => tuple}
subtractedRdd= RDD1.subtract(mappedCRdd)



val mappedRdd2 = RDD2.map{case(tuple, row) => tuple}

Now I am doing union this subtractedRdd with mappedRdd2 as
subtractedRdd.union(mappedRdd2 )

But subtract on Rdd is throwing below exception -


java.lang.NullPointerException
at org.joda.time.LocalDateTime.getValue(LocalDateTime.java:566)
at org.joda.time.base.AbstractPartial.hashCode(AbstractPartial.java:282)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
at com.amadeus.ti.models.tof.TOFModel$GCAS.hashCode(TOFModel.scala:14)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
at com.amadeus.ti.models.tof.TOFModel$TAOFRS.hashCode(TOFModel.scala:7)
at java.util.HashMap.hash(HashMap.java:362)
at java.util.HashMap.put(HashMap.java:492)
at org.apache.spark.rdd.SubtractedRDD.org
$apache$spark$rdd$SubtractedRDD$$getSeq$1(SubtractedRDD.scala:104)
at 
org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
at 
org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:116)
at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:119)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

If subtract if the problem, what  is other way i could achieve this or is
it something  I am doing wrong?

Thanks,
Padma Ch


Re: Allow multiple SparkContexts in Unit Testing

2015-11-04 Thread Priya Ch
Already tried setting spark.driver.allowMultipleContexts to true. But it
not successful. I the problem is we have different test suites which of
course run in parallel. How do we stop sparkContext after each test suite
and start it in the next test suite or is there any way to share
sparkContext across all test suites ???

On Thu, Nov 5, 2015 at 12:36 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Priya,
>
> If you're trying to get unit tests running local spark contexts, you can
> just set up your spark context with 'spark.driver.allowMultipleContexts'
> set to true.
>
> Example:
>
> def create(seconds : Int, appName : String): StreamingContext = {
>   val master = "local[*]"
>   val conf = new SparkConf().set("spark.driver.allowMultipleContexts",
> "true").setAppName(appName).setMaster(master)
>   new StreamingContext(conf, Seconds(seconds))
> }
>
> Regards,
>
> Bryan Jeffrey
>
>
> On Wed, Nov 4, 2015 at 9:49 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Are you trying to speed up tests where each test suite uses single 
>> SparkContext
>> ?
>>
>> You may want to read:
>> https://issues.apache.org/jira/browse/SPARK-2243
>>
>> Cheers
>>
>> On Wed, Nov 4, 2015 at 4:59 AM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> Hello All,
>>>
>>>   How to use multiple Spark Context in executing multiple test suite of
>>> spark code ???
>>> Can some one throw light on this ?
>>>
>>
>>
>


Allow multiple SparkContexts in Unit Testing

2015-11-04 Thread Priya Ch
Hello All,

  How to use multiple Spark Context in executing multiple test suite of
spark code ???
Can some one throw light on this ?


Re: Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Priya Ch
One more question, if i have a function which takes RDD as a parameter, how
do we mock an RDD ??

On Thu, Oct 29, 2015 at 5:20 PM, Priya Ch <learnings.chitt...@gmail.com>
wrote:

> How do we do it for Cassandra..can we use the same Mocking ?
> EmbeddedCassandra Server is available with CassandraUnit. Can this be used
> in Spark Code as well ? I mean with Scala code ?
>
> On Thu, Oct 29, 2015 at 5:03 PM, Василец Дмитрий <pronix.serv...@gmail.com
> > wrote:
>
>> there is example how i mock mysql
>> import org.scalamock.scalatest.MockFactory
>>  val connectionMock = mock[java.sql.Connection]
>>  val statementMock = mock[PreparedStatement]
>> (conMock.prepareStatement(_:
>> String)).expects(sql.toString).returning(statementMock)
>> (statementMock.executeUpdate _).expects()
>>
>>
>> On Thu, Oct 29, 2015 at 12:27 PM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>>   For my  Spark Streaming code, which writes the results to Cassandra
>>> DB, I need to write Unit test cases. what are the available test frameworks
>>> to mock the connection to Cassandra DB ?
>>>
>>
>>
>


Re: Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Priya Ch
How do we do it for Cassandra..can we use the same Mocking ?
EmbeddedCassandra Server is available with CassandraUnit. Can this be used
in Spark Code as well ? I mean with Scala code ?

On Thu, Oct 29, 2015 at 5:03 PM, Василец Дмитрий <pronix.serv...@gmail.com>
wrote:

> there is example how i mock mysql
> import org.scalamock.scalatest.MockFactory
>  val connectionMock = mock[java.sql.Connection]
>  val statementMock = mock[PreparedStatement]
> (conMock.prepareStatement(_:
> String)).expects(sql.toString).returning(statementMock)
> (statementMock.executeUpdate _).expects()
>
>
> On Thu, Oct 29, 2015 at 12:27 PM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> Hi All,
>>
>>   For my  Spark Streaming code, which writes the results to Cassandra DB,
>> I need to write Unit test cases. what are the available test frameworks to
>> mock the connection to Cassandra DB ?
>>
>
>


Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Priya Ch
Hi All,

  For my  Spark Streaming code, which writes the results to Cassandra DB, I
need to write Unit test cases. what are the available test frameworks to
mock the connection to Cassandra DB ?


Inner Joins on Cassandra RDDs

2015-10-21 Thread Priya Ch
Hello All,

   I have two Cassandra RDDs. I am using joinWithCassandraTable which is
doing a cartesian join because of which we are getting unwanted rows.


How to perform inner join on Cassandra RDDs ? If I intend to use normal
join, i have to read entire table which is costly.

Is there any specific transformations available that enable inner joins ??

Regards,
Padma CH


Concurrency issue in Streams of data

2015-10-20 Thread Priya Ch
Hi All,

  When processing streams of data (with batch inter val 1 sec), there is
possible case of Concurrency issue. i.e two messages M1 and M2 (updated
version of M1) with same key are processed by 2 threads in parallel.

To resolve this concurrency issue, I am applying Hash Partitioner on RDD.
(i.e rdd.partitionBy(new HashPartitioner)). Using this M1 and M2 get
processed in single thread (single partition).

I encounter a different issue now. rdd is having M1 and M2 in single
partition. For every message, I do a look up on DB and apply the business
logic.
The expected result is since M2 is an updated version of M1, only few
fields need to be updated in DB.

The code looks like-
val rdd1 = rdd.map{message =>

val listOfRows =  CassandraConnector.withSession(// read from
DB).iterator.toList
val filteredList = listOfRows.filter( // business rules)
val newList = filteredList.map(details => (details._1, details._2, true))
newList
}

val rdd2 = rdd1.flatMap(details => details).saveToCassandra("keySpace",
"table_name", SomeColumns(col1,col2,col3))

In the above case, since write is performed at one shot, the map
transformation applies business rules applicable for message M1 to M2
(treating it as initial version). This shouldn't happen.

Ony way i Could think of is use rdd.foreach and write individual statement
reads and writes instead of using save ToCassandra.

Any ideas to resolve the same ???

Thanks in aticipation,
Padma Ch


Re: CassandraSQLContext throwing NullPointer Exception

2015-09-28 Thread Priya Ch
Ted,

I am using spark 1.3.0 and running the code in YARN mode.

Here is the code..

object streaming {
def main(args:Array[String])
{
  val conf = new SparkConfst
  conf..setMaster("yarn-client")
  conf.setAppName("SimpleApp")
conf.set("spark.cassandra.coonection.host","")

val sc = new SparkContext(conf)
val sqlContext = new CassandraSQLContext(sc)

val dstream = KafkaUtils.createStream()
val words = dstream.flatMap(line => line.split(","))
val wordPairs = words.map(word =>(word,1))
val reducedStream = wordPairs.reduceByKey((a,b) => a+b)

reducedStream.foreachRDD{
rdd => rdd.foreach{
case(key,value) => val df = getDataFrame(sqlContext, key)
df.save("org.apach.spark.cassandra",SaveMode.Overwrite, Map("c_table" ->
"table1","kespace" -> "test"))
}
}
  }

def getDataFrame(cqlContext:CassandraSQLContext, key:String):DataFrame =
cqlContext.sql("select word,count from wordcount where word = '"+key+"'")
}

In the above code, the method getDataFrame is throwing Null Pointer
Exception at cqlContext.sql line.

On Mon, Sep 28, 2015 at 6:54 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Which Spark release are you using ?
>
> Can you show the snippet of your code around CassandraSQLContext#sql() ?
>
> Thanks
>
> On Mon, Sep 28, 2015 at 6:21 AM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> Hi All,
>>
>>  I am trying to use dataframes (which contain data from cassandra) in
>> rdd.foreach. This is throwing the following exception:
>>
>> Is CassandraSQLContext accessible within executor 
>>
>> 15/09/28 17:22:40 ERROR JobScheduler: Error running job streaming job
>> 144344116 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 6.0 (TID 83, blrrndnbudn05.rnd.amadeus.net):
>> java.lang.NullPointerException
>> at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134)
>> at
>> org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:74)
>> at
>> org.apache.spark.sql.cassandra.CassandraSQLContext.sql(CassandraSQLContext.scala:77)
>> at
>> com.amadeus.spark.example.SparkDemo$.withDataFrames(SparkDemo.scala:170)
>> at
>> com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
>> at
>> com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>
>


CassandraSQLContext throwing NullPointer Exception

2015-09-28 Thread Priya Ch
Hi All,

 I am trying to use dataframes (which contain data from cassandra) in
rdd.foreach. This is throwing the following exception:

Is CassandraSQLContext accessible within executor 

15/09/28 17:22:40 ERROR JobScheduler: Error running job streaming job
144344116 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
6.0 (TID 83, blrrndnbudn05.rnd.amadeus.net): java.lang.NullPointerException
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134)
at
org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:74)
at
org.apache.spark.sql.cassandra.CassandraSQLContext.sql(CassandraSQLContext.scala:77)
at
com.amadeus.spark.example.SparkDemo$.withDataFrames(SparkDemo.scala:170)
at
com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
at
com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Re: SparkContext declared as object variable

2015-09-24 Thread Priya Ch
object StreamJob {

  val conf = new SparkConf
  val sc = new SparkContext(conf)

 def main(args:Array[String])
 {
val baseRDD =
sc.parallelize(Array("hi","hai","hi","bye","bye","hi","hai","hi","bye","bye"))
val words = baseRDD.flatMap(line => line.split(","))
val wordPairs = words.map(word => (word,1))
val reducedWords = wordPairs.reduceByKey((a,b) => a+b)
reducedWords.print
  }
}

Please try to run this code in cluster mode, possibly YARN mode and the
spark version is 1.3.0

This has thrown
java.io.IOException: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5

Regards,
Padma Ch

On Thu, Sep 24, 2015 at 11:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> It should, i don't see any reason for it to not run in cluster mode.
>
> Thanks
> Best Regards
>
> On Wed, Sep 23, 2015 at 8:56 PM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> does it run in cluster mode ???
>>
>> On Wed, Sep 23, 2015 at 7:11 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Yes of course it works.
>>>
>>> [image: Inline image 1]
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Sep 22, 2015 at 4:53 PM, Priya Ch <learnings.chitt...@gmail.com>
>>> wrote:
>>>
>>>> Parallelzing some collection (array of strings). Infact in our product
>>>> we are reading data from kafka using KafkaUtils.createStream and applying
>>>> some transformations.
>>>>
>>>> Is creating sparContext at object level instead of creating in main
>>>> doesn't work 
>>>>
>>>> On Tue, Sep 22, 2015 at 2:59 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> Its a "value" not a variable, and what are you parallelizing here?
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Fri, Sep 18, 2015 at 11:21 PM, Priya Ch <
>>>>> learnings.chitt...@gmail.com> wrote:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>>   Instead of declaring sparkContext in main, declared as object
>>>>>> variable as -
>>>>>>
>>>>>>  object sparkDemo
>>>>>> {
>>>>>>
>>>>>>  val conf = new SparkConf
>>>>>>  val sc = new SparkContext(conf)
>>>>>>
>>>>>>   def main(args:Array[String])
>>>>>>   {
>>>>>>
>>>>>> val baseRdd = sc.parallelize()
>>>>>>.
>>>>>>.
>>>>>>.
>>>>>>   }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> But this piece of code is giving :
>>>>>> java.io.IOException: org.apache.spark.SparkException: Failed to get
>>>>>> broadcast_5_piece0 of broadcast_5
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
>>>>>> at
>>>>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>>>>>> at
>>>>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>>>>> at
>>>>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>>>>> at
>>>>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>>>>>> at
>>>>>> org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>>>>> at
>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: org.apache.spark.SparkException: Faile

Re: passing SparkContext as parameter

2015-09-22 Thread Priya Ch
I have scenario like this -

 I read dstream of messages from kafka. Now if my rdd contains 10 messages,
for each message I need to query the cassandraDB, do some modification and
update the records in DB. If there is no option of passing sparkContext to
workers to read.write into DB, the only option is to use
CassandraConnextor.withSession  If yes, for writing to table, should i
construct the entire INSERT statement for thousands of fields in the DB ?
Is this way of writing code is an optimized way ???

On Tue, Sep 22, 2015 at 1:32 AM, Romi Kuntsman <r...@totango.com> wrote:

> Cody, that's a great reference!
> As shown there - the best way to connect to an external database from the
> workers is to create a connection pool on (each) worker.
> The driver mass pass, via broadcast, the connection string, but not the
> connect object itself and not the spark context.
>
> On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger <c...@koeninger.org> wrote:
>
>> That isn't accurate, I think you're confused about foreach.
>>
>> Look at
>>
>>
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>
>>
>> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman <r...@totango.com> wrote:
>>
>>> foreach is something that runs on the driver, not the workers.
>>>
>>> if you want to perform some function on each record from cassandra, you
>>> need to do cassandraRdd.map(func), which will run distributed on the spark
>>> workers
>>>
>>> *Romi Kuntsman*, *Big Data Engineer*
>>> http://www.totango.com
>>>
>>> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch <learnings.chitt...@gmail.com>
>>> wrote:
>>>
>>>> Yes, but i need to read from cassandra db within a spark
>>>> transformation..something like..
>>>>
>>>> dstream.forachRDD{
>>>>
>>>> rdd=> rdd.foreach {
>>>>  message =>
>>>>  sc.cassandraTable()
>>>>   .
>>>>   .
>>>>   .
>>>> }
>>>> }
>>>>
>>>> Since rdd.foreach gets executed on workers, how can i make sparkContext
>>>> available on workers ???
>>>>
>>>> Regards,
>>>> Padma Ch
>>>>
>>>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> You can use broadcast variable for passing connection information.
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Sep 21, 2015, at 4:27 AM, Priya Ch <learnings.chitt...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> can i use this sparkContext on executors ??
>>>>> In my application, i have scenario of reading from db for certain
>>>>> records in rdd. Hence I need sparkContext to read from DB (cassandra in 
>>>>> our
>>>>> case),
>>>>>
>>>>> If sparkContext couldn't be sent to executors , what is the workaround
>>>>> for this ??
>>>>>
>>>>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> add @transient?
>>>>>>
>>>>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
>>>>>> learnings.chitt...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello All,
>>>>>>>
>>>>>>> How can i pass sparkContext as a parameter to a method in an
>>>>>>> object. Because passing sparkContext is giving me TaskNotSerializable
>>>>>>> Exception.
>>>>>>>
>>>>>>> How can i achieve this ?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Padma Ch
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>


Re: passing SparkContext as parameter

2015-09-22 Thread Priya Ch
Suppose I use rdd.joinWithCassnadra("keySpace", "table1"), does this do a
full table scan which is not required at any cost 

On Tue, Sep 22, 2015 at 3:03 PM, Artem Aliev <artem.al...@gmail.com> wrote:

> All that code should looks like:
> stream.filter(...).map(x=>(key,
> )).joinWithCassandra().map(...).saveToCassandra()
>
> I'm not sure about exactly 10 messages, spark streaming focus on time not
> count..
>
>
> On Tue, Sep 22, 2015 at 2:14 AM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> I have scenario like this -
>>
>>  I read dstream of messages from kafka. Now if my rdd contains 10
>> messages, for each message I need to query the cassandraDB, do some
>> modification and update the records in DB. If there is no option of passing
>> sparkContext to workers to read.write into DB, the only option is to use
>> CassandraConnextor.withSession  If yes, for writing to table, should i
>> construct the entire INSERT statement for thousands of fields in the DB ?
>> Is this way of writing code is an optimized way ???
>>
>> On Tue, Sep 22, 2015 at 1:32 AM, Romi Kuntsman <r...@totango.com> wrote:
>>
>>> Cody, that's a great reference!
>>> As shown there - the best way to connect to an external database from
>>> the workers is to create a connection pool on (each) worker.
>>> The driver mass pass, via broadcast, the connection string, but not the
>>> connect object itself and not the spark context.
>>>
>>> On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> That isn't accurate, I think you're confused about foreach.
>>>>
>>>> Look at
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>>
>>>>
>>>> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman <r...@totango.com>
>>>> wrote:
>>>>
>>>>> foreach is something that runs on the driver, not the workers.
>>>>>
>>>>> if you want to perform some function on each record from cassandra,
>>>>> you need to do cassandraRdd.map(func), which will run distributed on the
>>>>> spark workers
>>>>>
>>>>> *Romi Kuntsman*, *Big Data Engineer*
>>>>> http://www.totango.com
>>>>>
>>>>> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch <
>>>>> learnings.chitt...@gmail.com> wrote:
>>>>>
>>>>>> Yes, but i need to read from cassandra db within a spark
>>>>>> transformation..something like..
>>>>>>
>>>>>> dstream.forachRDD{
>>>>>>
>>>>>> rdd=> rdd.foreach {
>>>>>>  message =>
>>>>>>  sc.cassandraTable()
>>>>>>   .
>>>>>>   .
>>>>>>   .
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> Since rdd.foreach gets executed on workers, how can i make
>>>>>> sparkContext available on workers ???
>>>>>>
>>>>>> Regards,
>>>>>> Padma Ch
>>>>>>
>>>>>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> You can use broadcast variable for passing connection information.
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Sep 21, 2015, at 4:27 AM, Priya Ch <learnings.chitt...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> can i use this sparkContext on executors ??
>>>>>>> In my application, i have scenario of reading from db for certain
>>>>>>> records in rdd. Hence I need sparkContext to read from DB (cassandra in 
>>>>>>> our
>>>>>>> case),
>>>>>>>
>>>>>>> If sparkContext couldn't be sent to executors , what is the
>>>>>>> workaround for this ??
>>>>>>>
>>>>>>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> add @transient?
>>>>>>>>
>>>>>>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
>>>>>>>> learnings.chitt...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello All,
>>>>>>>>>
>>>>>>>>> How can i pass sparkContext as a parameter to a method in an
>>>>>>>>> object. Because passing sparkContext is giving me TaskNotSerializable
>>>>>>>>> Exception.
>>>>>>>>>
>>>>>>>>> How can i achieve this ?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Padma Ch
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to spark-connector-user+unsubscr...@lists.datastax.com.
>>
>
> To unsubscribe from this group and stop receiving emails from it, send an
> email to spark-connector-user+unsubscr...@lists.datastax.com.
>


Re: passing SparkContext as parameter

2015-09-21 Thread Priya Ch
can i use this sparkContext on executors ??
In my application, i have scenario of reading from db for certain records
in rdd. Hence I need sparkContext to read from DB (cassandra in our case),

If sparkContext couldn't be sent to executors , what is the workaround for
this ??

On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com> wrote:

> add @transient?
>
> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> Hello All,
>>
>> How can i pass sparkContext as a parameter to a method in an object.
>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>
>> How can i achieve this ?
>>
>> Thanks,
>> Padma Ch
>>
>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Priya Ch
Yes, but i need to read from cassandra db within a spark
transformation..something like..

dstream.forachRDD{

rdd=> rdd.foreach {
 message =>
 sc.cassandraTable()
  .
  .
  .
}
}

Since rdd.foreach gets executed on workers, how can i make sparkContext
available on workers ???

Regards,
Padma Ch

On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> You can use broadcast variable for passing connection information.
>
> Cheers
>
> On Sep 21, 2015, at 4:27 AM, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records
> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>
> If sparkContext couldn't be sent to executors , what is the workaround for
> this ??
>
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com> wrote:
>
>> add @transient?
>>
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> Hello All,
>>>
>>> How can i pass sparkContext as a parameter to a method in an object.
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>>
>>> How can i achieve this ?
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>
>>
>


passing SparkContext as parameter

2015-09-21 Thread Priya Ch
Hello All,

How can i pass sparkContext as a parameter to a method in an object.
Because passing sparkContext is giving me TaskNotSerializable Exception.

How can i achieve this ?

Thanks,
Padma Ch


SparkContext declared as object variable

2015-09-18 Thread Priya Ch
Hello All,

  Instead of declaring sparkContext in main, declared as object variable as
-

 object sparkDemo
{

 val conf = new SparkConf
 val sc = new SparkContext(conf)

  def main(args:Array[String])
  {

val baseRdd = sc.parallelize()
   .
   .
   .
  }

}

But this piece of code is giving :
java.io.IOException: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org

$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)

Why should't we declare sc as object variable ???

Regards,
Padma Ch


Re: Spark Streaming..Exception

2015-09-14 Thread Priya Ch
Hi All,

 I came across the related old conversation on the above issue (
https://issues.apache.org/jira/browse/SPARK-5594. ) Is the issue fixed? I
tried different values for spark.cleaner.ttl  -> 0sec, -1sec,
2000sec,..none of them worked. I also tried setting
spark.streaming.unpersist -> true. What is the possible solution for this ?
Is this a bug in Spark 1.3.0? Changing the scheduling mode to Stand-alone
or Mesos mode would work fine ??

Please someone share your views on this.

On Sat, Sep 12, 2015 at 11:04 PM, Priya Ch <learnings.chitt...@gmail.com>
wrote:

> Hello All,
>
>  When I push messages into kafka and read into streaming application, I
> see the following exception-
>  I am running the application on YARN and no where broadcasting the
> message within the application. Just simply reading message, parsing it and
> populating fields in a class and then printing the dstream (using
> DStream.print).
>
>  Have no clue if this is cluster issue or spark version issue or node
> issue. The strange part is, sometimes the message is processed but
> sometimes I see the below exception -
>
> java.io.IOException: org.apache.spark.SparkException: Failed to get
> broadcast_5_piece0 of broadcast_5
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_5_piece0 of broadcast_5
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.broadcast.TorrentBroadcast.org
> <http://org.apache.spark.broadcast.torrentbroadcast.org/>
> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
>
>
> I would be glad if someone can throw some light on this.
>
> Thanks,
> Padma Ch
>
>


Spark Streaming..Exception

2015-09-12 Thread Priya Ch
Hello All,

 When I push messages into kafka and read into streaming application, I see
the following exception-
 I am running the application on YARN and no where broadcasting the message
within the application. Just simply reading message, parsing it and
populating fields in a class and then printing the dstream (using
DStream.print).

 Have no clue if this is cluster issue or spark version issue or node
issue. The strange part is, sometimes the message is processed but
sometimes I see the below exception -

java.io.IOException: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org

$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)


I would be glad if someone can throw some light on this.

Thanks,
Padma Ch


foreachRDD causing executor lost failure

2015-09-08 Thread Priya Ch
Hello All,

 I am using foreachRDD in my code as -

  dstream.foreachRDD { rdd => rdd.foreach { record => // look up with
cassandra table
// save updated rows to cassandra table.
}
}
 This foreachRDD is causing executor lost failure. what is the behavior of
this foreachRDD ???

Thanks,
Padma Ch


Spark - launchng job for each action

2015-09-06 Thread Priya Ch
Hi All,

 In Spark, each action results in launching a job. Lets say my spark app
looks as-

val baseRDD =sc.parallelize(Array(1,2,3,4,5),2)
val rdd1 = baseRdd.map(x => x+2)
val rdd2 = rdd1.filter(x => x%2 ==0)
val count = rdd2.count
val firstElement = rdd2.first

println("Count is"+count)
println("First is"+firstElement)

Now, rdd2.count launches  job0 with 1 task and rdd2.first launches job1
with 1 task. Here in job2, when calculating rdd.first, is the entire
lineage computed again or else as job0 already computes rdd2, is it reused
???

Thanks,
Padma Ch


Re: Spark - launchng job for each action

2015-09-06 Thread Priya Ch
Hi All,

 Thanks for the info. I have one more doubt -
When writing a streaming application, I specify batch-interval. Lets say if
the interval is 1sec, for every 1sec batch, rdd is formed and launches a
job. If there are >1 action specified on an rddhow many jobs would it
launch???

I mean every 1sec batch launches a job and suppose there are two actions
then internally 2 more jobs launched ?

On Sun, Sep 6, 2015 at 1:15 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> "... Here in job2, when calculating rdd.first..."
>
> If you mean if rdd2.first, then it uses rdd2 already computed by
> rdd2.count, because it is already available. If some partitions are not
> available due to GC, then only those partitions are recomputed.
>
> On Sun, Sep 6, 2015 at 5:11 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> If you want to reuse the data, you need to call rdd2.cache
>>
>>
>>
>> On Sun, Sep 6, 2015 at 2:33 PM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>>  In Spark, each action results in launching a job. Lets say my spark app
>>> looks as-
>>>
>>> val baseRDD =sc.parallelize(Array(1,2,3,4,5),2)
>>> val rdd1 = baseRdd.map(x => x+2)
>>> val rdd2 = rdd1.filter(x => x%2 ==0)
>>> val count = rdd2.count
>>> val firstElement = rdd2.first
>>>
>>> println("Count is"+count)
>>> println("First is"+firstElement)
>>>
>>> Now, rdd2.count launches  job0 with 1 task and rdd2.first launches job1
>>> with 1 task. Here in job2, when calculating rdd.first, is the entire
>>> lineage computed again or else as job0 already computes rdd2, is it reused
>>> ???
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Executor lost failure

2015-09-01 Thread Priya Ch
Hi All,

 I have a spark streaming application which writes the processed results to
cassandra. In local mode, the code seems to work fine. The moment i start
running in distributed mode using yarn, i see executor lost failure. I
increased executor memory to occupy entire node's memory which is around
12gb/ But still see the same issue.

What could be the possible scenarios for executor lost failure ?


Spark RDD join with CassandraRDD

2015-08-25 Thread Priya Ch
Hi All,

 I have the following scenario:

  There exists a booking table in cassandra, which holds the fields like,
bookingid, passengeName, contact etc etc.

Now in my spark streaming application, there is one class Booking which
acts as a container and holds all the field details -

class Booking
{
   val bookingid =...
   val passengerName = ...
   val contact = ...
   .
   .
   .
   .
}

when a new booking message comes in I populate the fields in the class
which create rdds of type RDD[Booking]. Now I have this rdd to cassandra
table Booking as rdd.saveToCassandra.

Lets say if I query on booking table I would get cassandraRDD[CassandraRow]
If I want to join RDD[Booking] with this cassandraRDD...how is it
possible...as these are of two different rdds ?

converting CassandraRDD to RDD[CassandraRow] would make things work ?

Thanks,
Padma Ch


rdd count is throwing null pointer exception

2015-08-17 Thread Priya Ch
Hi All,

 Thank you very much for the detailed explanation.

I have scenario like this-
I have rdd of ticket records and another rdd of booking records. for each
ticket record, i need to check whether any link exists in booking table.

val ticketCachedRdd = ticketRdd.cache

ticketRdd.foreach{
ticket =
val bookingRecords =  queryOnBookingTable (date, flightNumber,
flightCarrier)  // this function queries the booking table and retrieves
the booking rows
println(ticketCachedRdd.count) // this is throwing Null pointer exception

}

Is there somthing wrong in the count, i am trying to use the count of
cached rdd when looping through the actual rdd. whats wrong in this ?

Thanks,
Padma Ch


Re: rdd count is throwing null pointer exception

2015-08-17 Thread Priya Ch
Looks like because of Spark-5063
RDD transformations and actions can only be invoked by the driver, not
inside of other transformations; for example, rdd1.map(x =
rdd2.values.count() * x) is invalid because the values transformation and
count action cannot be performed inside of the rdd1.map transformation. For
more information, see SPARK-5063.

On Mon, Aug 17, 2015 at 8:13 PM, Preetam preetam...@gmail.com wrote:

 The error could be because of the missing brackets after the word cache -
 .ticketRdd.cache()

  On Aug 17, 2015, at 7:26 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:
 
  Hi All,
 
   Thank you very much for the detailed explanation.
 
  I have scenario like this-
  I have rdd of ticket records and another rdd of booking records. for
 each ticket record, i need to check whether any link exists in booking
 table.
 
  val ticketCachedRdd = ticketRdd.cache
 
  ticketRdd.foreach{
  ticket =
  val bookingRecords =  queryOnBookingTable (date, flightNumber,
 flightCarrier)  // this function queries the booking table and retrieves
 the booking rows
  println(ticketCachedRdd.count) // this is throwing Null pointer exception
 
  }
 
  Is there somthing wrong in the count, i am trying to use the count of
 cached rdd when looping through the actual rdd. whats wrong in this ?
 
  Thanks,
  Padma Ch



Write to cassandra...each individual statement

2015-08-13 Thread Priya Ch
Hi All,

 I have a question in writing rdd to cassandra. Instead of writing entire
rdd to cassandra, i want to write individual statement into cassandra
beacuse there is a need to perform to ETL on each message ( which requires
checking with the DB).
How could i insert statements individually? Using
CassandraConnector.session ??

If so, what is the performance impact of this ? How about using
sc.parallelize() for eah message in the rdd and then insert into cassandra ?

Thanks,
Padma Ch


Re: Write to cassandra...each individual statement

2015-08-13 Thread Priya Ch
Hi Philip,

 I have the following requirement -
I read the streams of data from various partitions of kafka topic. And then
I union the dstreams and apply hash partitioner so messages of same key
would go into single partition of an rdd, which is ofcourse handled by a
single thread. This way we trying to resolve concurrency issue.

Now one of the partitions of the rdd holds messages with same key. Let's
say 1st message in the partition may correspond to ticket issuance and 2nd
message might corresponds to update on the ticket. Now while handling 1st
message there is different logic and 2nd message's logic depends on 1st
message.
Hence using rdd.foreach i am handling different logic for individual
messages. Now bulk rdd.saveToCassandra will now work.

Hope you got what i am trying to say..

On Fri, Aug 14, 2015 at 12:07 AM, Philip Weaver philip.wea...@gmail.com
wrote:

 All you'd need to do is *transform* the rdd before writing it, e.g. using
 the .map function.


 On Thu, Aug 13, 2015 at 11:30 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:

 Hi All,

  I have a question in writing rdd to cassandra. Instead of writing entire
 rdd to cassandra, i want to write individual statement into cassandra
 beacuse there is a need to perform to ETL on each message ( which requires
 checking with the DB).
 How could i insert statements individually? Using
 CassandraConnector.session ??

 If so, what is the performance impact of this ? How about using
 sc.parallelize() for eah message in the rdd and then insert into cassandra ?

 Thanks,
 Padma Ch





Fwd: Writing streaming data to cassandra creates duplicates

2015-08-04 Thread Priya Ch
Yes...union would be one solution. I am not doing any aggregation hence
reduceByKey would not be useful. If I use groupByKey, messages with same
key would be obtained in a partition. But groupByKey is very expensive
operation as it involves shuffle operation. My ultimate goal is to write
the messages to cassandra. if the messages with same key are handled by
different streams...there would be concurrency issues. To resolve this i
can union dstreams and apply hash parttioner so that it would bring all the
same keys to a single partition or do a groupByKey which does the same.

As groupByKey is expensive, is there any work around for this ?

On Thu, Jul 30, 2015 at 2:33 PM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com wrote:

 Hi,

 Just my two cents. I understand your problem is that your problem is that
 you have messages with the same key in two different dstreams. What I would
 do would be making a union of all the dstreams with StreamingContext.union
 or several calls to DStream.union, and then I would create a pair dstream
 with the primary key as key, and then I'd use groupByKey or reduceByKey (or
 combineByKey etc) to combine the messages with the same primary key.

 Hope that helps.

 Greetings,

 Juan


 2015-07-30 10:50 GMT+02:00 Priya Ch learnings.chitt...@gmail.com:

 Hi All,

  Can someone throw insights on this ?

 On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:



 Hi TD,

  Thanks for the info. I have the scenario like this.

  I am reading the data from kafka topic. Let's say kafka has 3
 partitions for the topic. In my streaming application, I would configure 3
 receivers with 1 thread each such that they would receive 3 dstreams (from
 3 partitions of kafka topic) and also I implement partitioner. Now there is
 a possibility of receiving messages with same primary key twice or more,
 one is at the time message is created and other times if there is an update
 to any fields for same message.

 If two messages M1 and M2 with same primary key are read by 2 receivers
 then even the partitioner in spark would still end up in parallel
 processing as there are altogether in different dstreams. How do we address
 in this situation ?

 Thanks,
 Padma Ch

 On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com
 wrote:

 You have to partition that data on the Spark Streaming by the primary
 key, and then make sure insert data into Cassandra atomically per key, or
 per set of keys in the partition. You can use the combination of the (batch
 time, and partition Id) of the RDD inside foreachRDD as the unique id for
 the data you are inserting. This will guard against multiple attempts to
 run the task that inserts into Cassandra.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations

 TD

 On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch 
 learnings.chitt...@gmail.com wrote:

 Hi All,

  I have a problem when writing streaming data to cassandra. Or
 existing product is on Oracle DB in which while wrtiting data, locks are
 maintained such that duplicates in the DB are avoided.

 But as spark has parallel processing architecture, if more than 1
 thread is trying to write same data i.e with same primary key, is there as
 any scope to created duplicates? If yes, how to address this problem 
 either
 from spark or from cassandra side ?

 Thanks,
 Padma Ch









Re: Writing streaming data to cassandra creates duplicates

2015-07-30 Thread Priya Ch
Hi All,

 Can someone throw insights on this ?

On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch learnings.chitt...@gmail.com
wrote:



 Hi TD,

  Thanks for the info. I have the scenario like this.

  I am reading the data from kafka topic. Let's say kafka has 3 partitions
 for the topic. In my streaming application, I would configure 3 receivers
 with 1 thread each such that they would receive 3 dstreams (from 3
 partitions of kafka topic) and also I implement partitioner. Now there is a
 possibility of receiving messages with same primary key twice or more, one
 is at the time message is created and other times if there is an update to
 any fields for same message.

 If two messages M1 and M2 with same primary key are read by 2 receivers
 then even the partitioner in spark would still end up in parallel
 processing as there are altogether in different dstreams. How do we address
 in this situation ?

 Thanks,
 Padma Ch

 On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com
 wrote:

 You have to partition that data on the Spark Streaming by the primary
 key, and then make sure insert data into Cassandra atomically per key, or
 per set of keys in the partition. You can use the combination of the (batch
 time, and partition Id) of the RDD inside foreachRDD as the unique id for
 the data you are inserting. This will guard against multiple attempts to
 run the task that inserts into Cassandra.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations

 TD

 On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:

 Hi All,

  I have a problem when writing streaming data to cassandra. Or existing
 product is on Oracle DB in which while wrtiting data, locks are maintained
 such that duplicates in the DB are avoided.

 But as spark has parallel processing architecture, if more than 1 thread
 is trying to write same data i.e with same primary key, is there as any
 scope to created duplicates? If yes, how to address this problem either
 from spark or from cassandra side ?

 Thanks,
 Padma Ch







Fwd: Writing streaming data to cassandra creates duplicates

2015-07-28 Thread Priya Ch
Hi TD,

 Thanks for the info. I have the scenario like this.

 I am reading the data from kafka topic. Let's say kafka has 3 partitions
for the topic. In my streaming application, I would configure 3 receivers
with 1 thread each such that they would receive 3 dstreams (from 3
partitions of kafka topic) and also I implement partitioner. Now there is a
possibility of receiving messages with same primary key twice or more, one
is at the time message is created and other times if there is an update to
any fields for same message.

If two messages M1 and M2 with same primary key are read by 2 receivers
then even the partitioner in spark would still end up in parallel
processing as there are altogether in different dstreams. How do we address
in this situation ?

Thanks,
Padma Ch

On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com wrote:

 You have to partition that data on the Spark Streaming by the primary key,
 and then make sure insert data into Cassandra atomically per key, or per
 set of keys in the partition. You can use the combination of the (batch
 time, and partition Id) of the RDD inside foreachRDD as the unique id for
 the data you are inserting. This will guard against multiple attempts to
 run the task that inserts into Cassandra.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations

 TD

 On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:

 Hi All,

  I have a problem when writing streaming data to cassandra. Or existing
 product is on Oracle DB in which while wrtiting data, locks are maintained
 such that duplicates in the DB are avoided.

 But as spark has parallel processing architecture, if more than 1 thread
 is trying to write same data i.e with same primary key, is there as any
 scope to created duplicates? If yes, how to address this problem either
 from spark or from cassandra side ?

 Thanks,
 Padma Ch





Writing streaming data to cassandra creates duplicates

2015-07-26 Thread Priya Ch
Hi All,

 I have a problem when writing streaming data to cassandra. Or existing
product is on Oracle DB in which while wrtiting data, locks are maintained
such that duplicates in the DB are avoided.

But as spark has parallel processing architecture, if more than 1 thread is
trying to write same data i.e with same primary key, is there as any scope
to created duplicates? If yes, how to address this problem either from
spark or from cassandra side ?

Thanks,
Padma Ch


Spark streaming with Kafka- couldnt find KafkaUtils

2015-04-05 Thread Priya Ch
Hi All,

  I configured Kafka  cluster on a  single node and I have streaming
application which reads data from kafka topic using KafkaUtils. When I
execute the code in local mode from the IDE, the application runs fine.

But when I submit the same to spark cluster in standalone mode, I end up
with the following exception:
java.lang.ClassNotFoundException:
org/apache/spark/streaming/kafka/KafkaUtils.

I am using spark-1.2.1 version. when i checked the source files of
streaming, the source files related to kafka are missing. Are these not
included in spark-1.3.0 and spark-1.2.1 versions ?

Have to manually include these ??

Regards,
Padma Ch


Re: TF-IDF from spark-1.1.0 not working on cluster mode

2015-01-09 Thread Priya Ch
Please find the attached worker log.
 I could see stream closed exception

On Wed, Jan 7, 2015 at 10:51 AM, Xiangrui Meng men...@gmail.com wrote:

 Could you attach the executor log? That may help identify the root
 cause. -Xiangrui

 On Mon, Jan 5, 2015 at 11:12 PM, Priya Ch learnings.chitt...@gmail.com
 wrote:
  Hi All,
 
  Word2Vec and TF-IDF algorithms in spark mllib-1.1.0 are working only in
  local mode and not on distributed mode. Null pointer exception has been
  thrown. Is this a bug in spark-1.1.0 ?
 
  Following is the code:
def main(args:Array[String])
{
   val conf=new SparkConf
   val sc=new SparkContext(conf)
   val
 
 documents=sc.textFile(hdfs://IMPETUS-DSRV02:9000/nlp/sampletext).map(_.split(
  ).toSeq)
   val hashingTF = new HashingTF()
   val tf= hashingTF.transform(documents)
   tf.cache()
  val idf = new IDF().fit(tf)
  val tfidf = idf.transform(tf)
   val rdd=tfidf.map { vec = println(vector is+vec)
  (10)
 }
   rdd.saveAsTextFile(/home/padma/usecase)
 
}
 
 
 
 
  Exception thrown:
 
  15/01/06 12:36:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
 with
  2 tasks
  15/01/06 12:36:10 INFO cluster.SparkDeploySchedulerBackend: Registered
  executor:
  Actor[akka.tcp://
 sparkexecu...@impetus-dsrv05.impetus.co.in:33898/user/Executor#-1525890167
 ]
  with ID 0
  15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 0.0 in
 stage
  0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes)
  15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 1.0 in
 stage
  0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes)
  15/01/06 12:36:10 INFO storage.BlockManagerMasterActor: Registering block
  manager IMPETUS-DSRV05.impetus.co.in:35130 with 2.1 GB RAM
  15/01/06 12:36:12 INFO network.ConnectionManager: Accepted connection
 from
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:46888]
  15/01/06 12:36:12 INFO network.SendingConnection: Initiating connection
 to
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130]
  15/01/06 12:36:12 INFO network.SendingConnection: Connected to
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130], 1 messages pending
  15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added
 broadcast_1_piece0 in
  memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 2.1 KB, free: 2.1
 GB)
  15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added
 broadcast_0_piece0 in
  memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 10.1 KB, free: 2.1
 GB)
  15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_1 in memory
 on
  IMPETUS-DSRV05.impetus.co.in:35130 (size: 280.0 B, free: 2.1 GB)
  15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_0 in memory
 on
  IMPETUS-DSRV05.impetus.co.in:35130 (size: 416.0 B, free: 2.1 GB)
  15/01/06 12:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage
 0.0
  (TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.NullPointerException:
  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
 
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
  org.apache.spark.scheduler.Task.run(Task.scala:54)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
  java.lang.Thread.run(Thread.java:722)
 
 
  Thanks,
  Padma Ch



spark-rtauser-org.apache.spark.deploy.worker.Worker-1-IMPETUS-DSRV02.out
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

TF-IDF from spark-1.1.0 not working on cluster mode

2015-01-05 Thread Priya Ch
Hi All,

Word2Vec and TF-IDF algorithms in spark mllib-1.1.0 are working only in
local mode and not on distributed mode. Null pointer exception has been
thrown. Is this a bug in spark-1.1.0 ?

*Following is the code:*
  def main(args:Array[String])
  {
 val conf=new SparkConf
 val sc=new SparkContext(conf)
 val
documents=sc.textFile(hdfs://IMPETUS-DSRV02:9000/nlp/sampletext).map(_.split(
).toSeq)
 val hashingTF = new HashingTF()
 val tf= hashingTF.transform(documents)
 tf.cache()
val idf = new IDF().fit(tf)
val tfidf = idf.transform(tf)
 val rdd=tfidf.map { vec = println(vector is+vec)
(10)
   }
 rdd.saveAsTextFile(/home/padma/usecase)

  }




*Exception thrown:*

15/01/06 12:36:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
with 2 tasks
15/01/06 12:36:10 INFO cluster.SparkDeploySchedulerBackend: Registered
executor: Actor[akka.tcp://
sparkexecu...@impetus-dsrv05.impetus.co.in:33898/user/Executor#-1525890167]
with ID 0
15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes)
15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes)
15/01/06 12:36:10 INFO storage.BlockManagerMasterActor: Registering block
manager IMPETUS-DSRV05.impetus.co.in:35130 with 2.1 GB RAM
15/01/06 12:36:12 INFO network.ConnectionManager: Accepted connection from [
IMPETUS-DSRV05.impetus.co.in/192.168.145.195:46888]
15/01/06 12:36:12 INFO network.SendingConnection: Initiating connection to [
IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130]
15/01/06 12:36:12 INFO network.SendingConnection: Connected to [
IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130], 1 messages pending
15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 2.1 KB, free: 2.1 GB)
15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 10.1 KB, free: 2.1
GB)
15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_1 in memory on
IMPETUS-DSRV05.impetus.co.in:35130 (size: 280.0 B, free: 2.1 GB)
15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_0 in memory on
IMPETUS-DSRV05.impetus.co.in:35130 (size: 416.0 B, free: 2.1 GB)
15/01/06 12:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
(TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.NullPointerException:
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)

org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)


Thanks,
Padma Ch


Spark exception when sending message to akka actor

2014-12-22 Thread Priya Ch
Hi All,

I have akka remote actors running on 2 nodes. I submitted spark application
from node1. In the spark code, in one of the rdd, i am sending message to
actor running on node1. My Spark code is as follows:




class ActorClient extends Actor with Serializable
{
  import context._

  val currentActor: ActorSelection =
context.system.actorSelection(akka.tcp://
ActorSystem@192.168.145.183:2551/user/MasterActor)
  implicit val timeout = Timeout(10 seconds)


  def receive =
  {
  case msg:String = { if(msg.contains(Spark))
   { currentActor ! msg
 sender ! Local
   }
   else
   {
println(Received..+msg)
val future=currentActor ? msg
val result = Await.result(future,
timeout.duration).asInstanceOf[String]
if(result.contains(ACK))
  sender ! OK
   }
 }
  case PoisonPill = context.stop(self)
  }
}

object SparkExec extends Serializable
{

  implicit val timeout = Timeout(10 seconds)
   val actorSystem=ActorSystem(ClientActorSystem)
   val
actor=actorSystem.actorOf(Props(classOf[ActorClient]),name=ClientActor)

 def main(args:Array[String]) =
  {

 val conf = new SparkConf().setAppName(DeepLearningSpark)

 val sc=new SparkContext(conf)

val
textrdd=sc.textFile(hdfs://IMPETUS-DSRV02:9000/deeplearning/sample24k.csv)
val rdd1=textrddmap{ line = println(In Map...)

   val future = actor ? Hello..Spark
   val result =
Await.result(future,timeout.duration).asInstanceOf[String]
   if(result.contains(Local)){
 println(Recieved in map+result)
  //actorSystem.shutdown
  }
  (10)
 }


 val rdd2=rdd1.map{ x =
 val future=actor ? Done
 val result = Await.result(future,
timeout.duration).asInstanceOf[String]
  if(result.contains(OK))
  {
   actorSystem.stop(remoteActor)
   actorSystem.shutdown
  }
 (2) }
 rdd2.saveAsTextFile(/home/padma/SparkAkkaOut)
}

}

In my ActorClientActor, through actorSelection, identifying the remote
actor and sending the message. Once the messages are sent, in *rdd2*, after
receiving ack from remote actor, i am killing the actor ActorClient and
shutting down the ActorSystem.

The above code is throwing the following exception:




14/12/22 19:04:36 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
(TID 1, IMPETUS-DSRV05.impetus.co.in):
java.lang.ExceptionInInitializerError:
com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:166)
com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:159)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)
14/12/22 19:04:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
(TID 0, IMPETUS-DSRV05.impetus.co.in): java.lang.NoClassDefFoundError:
Could not initialize class com.impetus.spark.SparkExec$
com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:166)
com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:159)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)


Fwd: 1gb file processing...task doesn't launch on all the node...Unseen exception

2014-11-24 Thread Priya Ch
Hi,

I tried with try catch  blocks. Infact, inside mapPartitionsWithIndex,
method is invoked which does the operation. I put the operations inside the
function in try...catch block but thats of no use...still the error
persists. Even I commented all the operations and a simple print statement
inside the method is not executed. The data size is 542 MB. hdfs block size
is 64 MB and it has got 9 blocks. I used a 2 node cluster with rep.factor
2.

When is see the logs, it seemed to me like it tried to launch tasks on the
other node ..but TaskSetManager has encountered Null pointer exception and
the job is aborted. Is this the problem with mapPartitionWithIndex ?

The same operations when performed with map transformation, it got executed
with no issues.


Please let me know if anyone has the same problem ?

Thanks,
Padma Ch

On Fri, Nov 14, 2014 at 7:42 PM, Akhil [via Apache Spark User List] 
ml-node+s1001560n18936...@n3.nabble.com wrote:

 It shows nullPointerException, your data could be corrupted? Try putting a
 try catch inside the operation that you are doing, Are you running the
 worker process on the master node also? If not, then only 1 node will be
 doing the processing. If yes, then try setting the level of parallelism and
 number of partitions while creating/transforming the RDD.

 Thanks
 Best Regards

 On Fri, Nov 14, 2014 at 5:17 PM, Priya Ch [hidden email]
 http://user/SendEmail.jtp?type=nodenode=18936i=0 wrote:

 Hi All,

   We have set up 2 node cluster (NODE-DSRV05 and NODE-DSRV02) each is
 having 32gb RAM and 1 TB hard disk capacity and 8 cores of cpu. We have set
 up hdfs which has 2 TB capacity and the block size is 256 mb   When we try
 to process 1 gb file on spark, we see the following exception

 14/11/14 17:01:42 INFO scheduler.TaskSetManager: Starting task 0.0 in
 stage 0.0 (TID 0, NODE-DSRV05.impetus.co.in, NODE_LOCAL, 1667 bytes)
 14/11/14 17:01:42 INFO scheduler.TaskSetManager: Starting task 1.0 in
 stage 0.0 (TID 1, NODE-DSRV05.impetus.co.in, NODE_LOCAL, 1667 bytes)
 14/11/14 17:01:42 INFO scheduler.TaskSetManager: Starting task 2.0 in
 stage 0.0 (TID 2, NODE-DSRV05.impetus.co.in, NODE_LOCAL, 1667 bytes)
 14/11/14 17:01:43 INFO cluster.SparkDeploySchedulerBackend: Registered
 executor: 
 Actor[akka.tcp://sparkExecutor@IMPETUS-DSRV02:41124/user/Executor#539551156]
 with ID 0
 14/11/14 17:01:43 INFO storage.BlockManagerMasterActor: Registering block
 manager NODE-DSRV05.impetus.co.in:60432 with 2.1 GB RAM
 14/11/14 17:01:43 INFO storage.BlockManagerMasterActor: Registering block
 manager NODE-DSRV02:47844 with 2.1 GB RAM
 14/11/14 17:01:43 INFO network.ConnectionManager: Accepted connection
 from [NODE-DSRV05.impetus.co.in/192.168.145.195:51447]
 14/11/14 17:01:43 INFO network.SendingConnection: Initiating connection
 to [NODE-DSRV05.impetus.co.in/192.168.145.195:60432]
 14/11/14 17:01:43 INFO network.SendingConnection: Connected to [
 NODE-DSRV05.impetus.co.in/192.168.145.195:60432], 1 messages pending
 14/11/14 17:01:43 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
 in memory on NODE-DSRV05.impetus.co.in:60432 (size: 17.1 KB, free: 2.1
 GB)
 14/11/14 17:01:43 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
 in memory on NODE-DSRV05.impetus.co.in:60432 (size: 14.1 KB, free: 2.1
 GB)
 14/11/14 17:01:44 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
 0.0 (TID 0, NODE-DSRV05.impetus.co.in): java.lang.NullPointerException:
 org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609)
 org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609)

 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 java.lang.Thread.run(Thread.java:722)
 14/11/14 17:01:44 INFO scheduler.TaskSetManager: Starting task 0.1 in
 stage 0.0 (TID 3, NODE-DSRV05.impetus.co.in, NODE_LOCAL, 1667 bytes)
 14/11/14 17:01:44 INFO scheduler.TaskSetManager: Lost task 1.0 in stage
 0.0 (TID 1) on executor NODE-DSRV05.impetus.co.in:
 java.lang.NullPointerException (null) [duplicate 1]
 14/11/14 17:01:44 INFO scheduler.TaskSetManager: Lost task 2.0 in stage
 0.0 (TID 2) on executor NODE-DSRV05.impetus.co.in:
 java.lang.NullPointerException (null) [duplicate 2]
 14/11/14 17:01:44 INFO scheduler.TaskSetManager: Starting task 2.1 in
 stage 0.0 (TID 4, NODE-DSRV05

Default spark.deploy.recoveryMode

2014-10-14 Thread Priya Ch
Hi Spark users/experts,

In Spark source code  (Master.scala  Worker.scala), when  registering the
worker with master, I see the usage of *persistenceEngine*. When we don't
specify spark.deploy.recovery mode explicitly, what is the default value
used ? This recovery mode is used to persists and restore the application 
worker details.

 I see when recovery mode not specified explicitly,
*BlackHolePersistenceEngine* being used. Am i right ?


Thanks,
Padma Ch


Breeze Library usage in Spark

2014-10-03 Thread Priya Ch
Hi Team,

When I am trying to use DenseMatrix of breeze library in spark, its
throwing me the following error:

java.lang.noclassdeffounderror: breeze/storage/Zero


Can someone help me on this ?

Thanks,
Padma Ch


spark.local.dir and spark.worker.dir not used

2014-09-23 Thread Priya Ch
Hi,

I am using spark 1.0.0. In my spark code i m trying to persist an rdd to
disk as rrd.persist(DISK_ONLY). But unfortunately couldn't find the
location where the rdd has been written to disk. I specified
SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to some other location rather than
using the default /tmp directory, but still couldnt see anything in worker
directory andspark ocal directory.

I also tried specifying the local dir and worker dir from the spark code
while defining the SparkConf as conf.set(spark.local.dir,
/home/padma/sparkdir) but the directories are not used.


In general which directories spark would be using for map output files,
intermediate writes and persisting rdd to disk ?


Thanks,
Padma Ch


Spark build error

2014-08-06 Thread Priya Ch
Hi,

I am trying to build jars using the command :

mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package

Execution of the above command is throwing the following error:

[INFO] Spark Project Core . FAILURE [  0.295 s]
[INFO] Spark Project Bagel  SKIPPED
[INFO] Spark Project GraphX ... SKIPPED
[INFO] Spark Project ML Library ... SKIPPED
[INFO] Spark Project Streaming  SKIPPED
[INFO] Spark Project Tools  SKIPPED
[INFO] Spark Project Catalyst . SKIPPED
[INFO] Spark Project SQL .. SKIPPED
[INFO] Spark Project Hive . SKIPPED
[INFO] Spark Project REPL . SKIPPED
[INFO] Spark Project YARN Parent POM .. SKIPPED
[INFO] Spark Project YARN Stable API .. SKIPPED
[INFO] Spark Project Assembly . SKIPPED
[INFO] Spark Project External Twitter . SKIPPED
[INFO] Spark Project External Kafka ... SKIPPED
[INFO] Spark Project External Flume ... SKIPPED
[INFO] Spark Project External ZeroMQ .. SKIPPED
[INFO] Spark Project External MQTT  SKIPPED
[INFO] Spark Project Examples . SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 3.748 s
[INFO] Finished at: 2014-08-07T01:00:48+05:30
[INFO] Final Memory: 24M/175M
[INFO] 
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
(default) on project spark-core_2.10: Execution default of goal
org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
failed: For artifact {null:null:null:jar}: The groupId cannot be
empty. - [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
execute goal org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
(default) on project spark-core_2.10: Execution default of goal
org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
failed: For artifact {null:null:null:jar}: The groupId cannot be
empty.
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
at 
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
at 
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:347)
at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:154)
at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)
at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:213)
at org.apache.maven.cli.MavenCli.main(MavenCli.java:157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
default of goal
org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process
failed: For artifact {null:null:null:jar}: The groupId cannot be
empty.
at 
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:143)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
... 19 more
Caused by: org.apache.maven.artifact.InvalidArtifactRTException: For
artifact {null:null:null:jar}: The groupId cannot be empty.



Can someone help me on this ?