Re: Kafka streams vs Spark streaming

2017-10-11 Thread Sabarish Sasidharan
@Sachin
>>The partition key is very important if you need to run multiple instances
of streams application and certain instance processing certain partitions
only.

Again, depending on partition key is optional. It's actually a feature
enabler, so we can use local state stores to improve throughput. I don't
see this as a downside.

Regards
Sab

On 11 Oct 2017 1:44 pm, "Sachin Mittal"  wrote:

> Kafka streams has a lower learning curve and if your source data is in
> kafka topics it is pretty simple to integrate it with.
> It can run like a library inside your main programs.
>
> So as compared to spark streams
> 1. Is much simpler to implement.
> 2. Is not much heavy on hardware unlike spark.
>
>
> On the downside
> 1. It is not elastic. You need to anticipate before hand on volume of data
> you will have. Very difficult to add and reduce topic partitions later on.
> 2. The partition key is very important if you need to run multiple
> instances of streams application and certain instance processing certain
> partitions only.
>  In case you need aggregation on a different key you may need to
> re-partition the data to a new topic and run new streams app against that.
>
> So yes if you have good idea about your data and if it comes from kafka
> and you want to build something quick without much hardware kafka streams
> is a way to go.
>
> We had first tried spark streaming but given hardware limitation and
> complexity of fetching data from mongodb we decided kafka streams as way to
> go forward.
>
> Thanks
> Sachin
>
>
>
>
>
> On Wed, Oct 11, 2017 at 1:01 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> Has anyone had an experience of using Kafka streams versus Spark?
>>
>> I am not familiar with Kafka streams concept except that it is a set of
>> libraries.
>>
>> Any feedback will be appreciated.
>>
>> Regards,
>>
>> Mich
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Re: Kafka streams vs Spark streaming

2017-10-11 Thread Sabarish Sasidharan
@Sachin
>>is not elastic. You need to anticipate before hand on volume of data you
will have. Very difficult to add and reduce topic partitions later on.

Why do you say so Sachin? Kafka Streams will readjust once we add more
partitions to the Kafka topic. And when we add more machines, rebalancing
auto distributes the partitions among the new stream threads.

Regards
Sab

On 11 Oct 2017 1:44 pm, "Sachin Mittal"  wrote:

> Kafka streams has a lower learning curve and if your source data is in
> kafka topics it is pretty simple to integrate it with.
> It can run like a library inside your main programs.
>
> So as compared to spark streams
> 1. Is much simpler to implement.
> 2. Is not much heavy on hardware unlike spark.
>
>
> On the downside
> 1. It is not elastic. You need to anticipate before hand on volume of data
> you will have. Very difficult to add and reduce topic partitions later on.
> 2. The partition key is very important if you need to run multiple
> instances of streams application and certain instance processing certain
> partitions only.
>  In case you need aggregation on a different key you may need to
> re-partition the data to a new topic and run new streams app against that.
>
> So yes if you have good idea about your data and if it comes from kafka
> and you want to build something quick without much hardware kafka streams
> is a way to go.
>
> We had first tried spark streaming but given hardware limitation and
> complexity of fetching data from mongodb we decided kafka streams as way to
> go forward.
>
> Thanks
> Sachin
>
>
>
>
>
> On Wed, Oct 11, 2017 at 1:01 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> Has anyone had an experience of using Kafka streams versus Spark?
>>
>> I am not familiar with Kafka streams concept except that it is a set of
>> libraries.
>>
>> Any feedback will be appreciated.
>>
>> Regards,
>>
>> Mich
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Re: Optimized way to use spark as db to hdfs etl

2016-11-06 Thread Sabarish Sasidharan
Pls be aware that Accumulators involve communication back with the driver
and may not be efficient. I think OP wants some way to extract the stats
from the sql plan if it is being stored in some internal data structure

Regards
Sab

On 5 Nov 2016 9:42 p.m., "Deepak Sharma"  wrote:

> Hi Rohit
> You can use accumulators and increase it on every record processing.
> At last you can get the value of accumulator on driver , which will give
> you the count.
>
> HTH
> Deepak
>
> On Nov 5, 2016 20:09, "Rohit Verma"  wrote:
>
>> I am using spark to read from database and write in hdfs as parquet file.
>> Here is code snippet.
>>
>> private long etlFunction(SparkSession spark){
>> spark.sqlContext().setConf("spark.sql.parquet.compression.codec",
>> “SNAPPY");
>> Properties properties = new Properties();
>> properties.put("driver”,”oracle.jdbc.driver");
>> properties.put("fetchSize”,”5000");
>> Dataset dataset = spark.read().jdbc(jdbcUrl, query, properties);
>> dataset.write.format(“parquet”).save(“pdfs-path”);
>> return dataset.count();
>> }
>>
>> When I look at spark ui, during write I have stats of records written,
>> visible in sql tab under query plan.
>>
>> While the count itself is a heavy task.
>>
>> Can someone suggest best way to get count in most optimized way.
>>
>> Thanks all..
>>
>


Re: How to insert data for 100 partitions at a time using Spark SQL

2016-05-22 Thread Sabarish Sasidharan
Can't you just reduce the amount of data you insert by applying a filter so
that only a small set of idpartitions is selected. You could have multiple
such inserts to cover all idpartitions. Does that help?

Regards
Sab
On 22 May 2016 1:11 pm, "swetha kasireddy" 
wrote:

> I am looking at ORC. I insert the data using the following query.
>
> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
> record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING)
> stored as ORC LOCATION '/user/users' ")
>   sqlContext.sql("  orc.compress= SNAPPY")
>   sqlContext.sql(
> """ from recordsTemp ps   insert overwrite table users
> partition(datePartition , idPartition )  select ps.id, ps.record ,
> ps.datePartition, ps.idPartition  """.stripMargin)
>
> On Sun, May 22, 2016 at 12:37 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> where is your base table and what format is it Parquet, ORC etc)
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 22 May 2016 at 08:34, SRK  wrote:
>>
>>> Hi,
>>>
>>> In my Spark SQL query to insert data, I have around 14,000 partitions of
>>> data which seems to be causing memory issues. How can I insert the data
>>> for
>>> 100 partitions at a time to avoid any memory issues?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: reading file from S3

2016-03-15 Thread Sabarish Sasidharan
There are many solutions to a problem.

Also understand that sometimes your situation might be such. For ex what if
you are accessing S3 from your Spark job running in your continuous
integration server sitting in your data center or may be a box under your
desk. And sometimes you are just trying something.

Also understand that sometimes you want answers to solve your problem at
hand without redirecting you to something else. Understand what you
suggested is an appropriate way of doing it, which I myself have proposed
before, but that doesn't solve the OP's problem at hand.

Regards
Sab
On 15-Mar-2016 8:27 pm, "Gourav Sengupta"  wrote:

> Oh!!! What the hell
>
> Please never use the URI
>
> *s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY.*That is a major cause of
> pain, security issues, code maintenance issues and ofcourse something that
> Amazon strongly suggests that we do not use. Please use roles and you will
> not have to worry about security.
>
> Regards,
> Gourav Sengupta
>
> On Tue, Mar 15, 2016 at 2:38 PM, Sabarish Sasidharan <
> sabarish@gmail.com> wrote:
>
>> You have a slash before the bucket name. It should be @.
>>
>> Regards
>> Sab
>> On 15-Mar-2016 4:03 pm, "Yasemin Kaya"  wrote:
>>
>>> Hi,
>>>
>>> I am using Spark 1.6.0 standalone and I want to read a txt file from S3
>>> bucket named yasemindeneme and my file name is deneme.txt. But I am getting
>>> this error. Here is the simple code
>>> <https://gist.github.com/anonymous/6d174f8587f0f3fd2334>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Invalid
>>> hostname in URI s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@
>>> /yasemindeneme/deneme.txt
>>> at
>>> org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:45)
>>> at
>>> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:55)
>>>
>>>
>>> I try 2 options
>>> *sc.hadoopConfiguration() *and
>>> *sc.textFile("s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@/yasemindeneme/deneme.txt/");*
>>>
>>> Also I did export AWS_ACCESS_KEY_ID= .
>>>  export AWS_SECRET_ACCESS_KEY=
>>> But there is no change about error.
>>>
>>> Could you please help me about this issue?
>>>
>>>
>>> --
>>> hiç ender hiç
>>>
>>
>


Re: reading file from S3

2016-03-15 Thread Sabarish Sasidharan
You have a slash before the bucket name. It should be @.

Regards
Sab
On 15-Mar-2016 4:03 pm, "Yasemin Kaya"  wrote:

> Hi,
>
> I am using Spark 1.6.0 standalone and I want to read a txt file from S3
> bucket named yasemindeneme and my file name is deneme.txt. But I am getting
> this error. Here is the simple code
> 
> Exception in thread "main" java.lang.IllegalArgumentException: Invalid
> hostname in URI s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@
> /yasemindeneme/deneme.txt
> at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:45)
> at
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:55)
>
>
> I try 2 options
> *sc.hadoopConfiguration() *and
> *sc.textFile("s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@/yasemindeneme/deneme.txt/");*
>
> Also I did export AWS_ACCESS_KEY_ID= .
>  export AWS_SECRET_ACCESS_KEY=
> But there is no change about error.
>
> Could you please help me about this issue?
>
>
> --
> hiç ender hiç
>


Re: Compress individual RDD

2016-03-15 Thread Sabarish Sasidharan
It will compress only rdds with serialization enabled in the persistence
mode. So you could skip _SER modes for your other rdds. Not perfect but
something.
On 15-Mar-2016 4:33 pm, "Nirav Patel"  wrote:

> Hi,
>
> I see that there's following spark config to compress an RDD.  My guess is
> it will compress all RDDs of a given SparkContext, right?  If so, is there
> a way to instruct spark context to only compress some rdd and leave others
> uncompressed ?
>
> Thanks
>
> spark.rdd.compress false Whether to compress serialized RDD partitions
> (e.g. forStorageLevel.MEMORY_ONLY_SER). Can save substantial space at the
> cost of some extra CPU time.
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: Hive Query on Spark fails with OOM

2016-03-15 Thread Sabarish Sasidharan
Yes, I suggested increasing shuffle partitions to address this problem. The
other suggestion to increase shuffle fraction was not for this but makes
sense given that you are reserving all that memory and doing nothing with
it. By diverting more of it for shuffles you can help improve your shuffle
performance.

Regards
Sab
On 14-Mar-2016 2:33 pm, "Prabhu Joseph"  wrote:

> The issue is the query hits OOM on a Stage when reading Shuffle Output
> from previous stage.How come increasing shuffle memory helps to avoid OOM.
>
> On Mon, Mar 14, 2016 at 2:28 PM, Sabarish Sasidharan <
> sabarish@gmail.com> wrote:
>
>> Thats a pretty old version of Spark SQL. It is devoid of all the
>> improvements introduced in the last few releases.
>>
>> You should try bumping your spark.sql.shuffle.partitions to a value
>> higher than default (5x or 10x). Also increase your shuffle memory fraction
>> as you really are not explicitly caching anything. You could simply swap
>> the fractions in your case.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> It is a Spark-SQL and the version used is Spark-1.2.1.
>>>
>>> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
>>>> I believe the OP is using Spark SQL and not Hive on Spark.
>>>>
>>>> Regards
>>>> Sab
>>>>
>>>> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> I think the only version of Spark that works OK with Hive (Hive on
>>>>> Spark engine) is version 1.3.1. I also get OOM from time to time and have
>>>>> to revert using MR
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 14 March 2016 at 08:06, Sabarish Sasidharan <
>>>>> sabarish.sasidha...@manthan.com> wrote:
>>>>>
>>>>>> Which version of Spark are you using? The configuration varies by
>>>>>> version.
>>>>>>
>>>>>> Regards
>>>>>> Sab
>>>>>>
>>>>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>>>>>> prabhujose.ga...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> A Hive Join query which runs fine and faster in MapReduce takes lot
>>>>>>> of time with Spark and finally fails with OOM.
>>>>>>>
>>>>>>> *Query:  hivejoin.py*
>>>>>>>
>>>>>>> from pyspark import SparkContext, SparkConf
>>>>>>> from pyspark.sql import HiveContext
>>>>>>> conf = SparkConf().setAppName("Hive_Join")
>>>>>>> sc = SparkContext(conf=conf)
>>>>>>> hiveCtx = HiveContext(sc)
>>>>>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
>>>>>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>>>>>> a.instance_id");
>>>>>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>>>>>> print results
>>>>>>>
>>>>>>>
>>>>>>> *Data Study:*
>>>>>>>
>>>>>>> Number of Rows:
>>>>>>>
>>>>>>> A table has 1002093508
>>>>>>> B table has5371668
>>>>>>> C table has  1000
>>>>>>>
>>>>>>> No Data Skewness:
>>>>>>>
>>>>>>> item_id in B is unique and A has multiple rows with same item_id, so
>>>>>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>>>>>
>>>>>>> instance_id in C is unique and A has multiple rows with same
>>>>>>> instanc

Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Sabarish Sasidharan
Thats a pretty old version of Spark SQL. It is devoid of all the
improvements introduced in the last few releases.

You should try bumping your spark.sql.shuffle.partitions to a value higher
than default (5x or 10x). Also increase your shuffle memory fraction as you
really are not explicitly caching anything. You could simply swap the
fractions in your case.

Regards
Sab

On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph 
wrote:

> It is a Spark-SQL and the version used is Spark-1.2.1.
>
> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> I believe the OP is using Spark SQL and not Hive on Spark.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I think the only version of Spark that works OK with Hive (Hive on Spark
>>> engine) is version 1.3.1. I also get OOM from time to time and have to
>>> revert using MR
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 14 March 2016 at 08:06, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
>>>> Which version of Spark are you using? The configuration varies by
>>>> version.
>>>>
>>>> Regards
>>>> Sab
>>>>
>>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>>>> prabhujose.ga...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> A Hive Join query which runs fine and faster in MapReduce takes lot of
>>>>> time with Spark and finally fails with OOM.
>>>>>
>>>>> *Query:  hivejoin.py*
>>>>>
>>>>> from pyspark import SparkContext, SparkConf
>>>>> from pyspark.sql import HiveContext
>>>>> conf = SparkConf().setAppName("Hive_Join")
>>>>> sc = SparkContext(conf=conf)
>>>>> hiveCtx = HiveContext(sc)
>>>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
>>>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>>>> a.instance_id");
>>>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>>>> print results
>>>>>
>>>>>
>>>>> *Data Study:*
>>>>>
>>>>> Number of Rows:
>>>>>
>>>>> A table has 1002093508
>>>>> B table has5371668
>>>>> C table has  1000
>>>>>
>>>>> No Data Skewness:
>>>>>
>>>>> item_id in B is unique and A has multiple rows with same item_id, so
>>>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>>>
>>>>> instance_id in C is unique and A has multiple rows with same
>>>>> instance_id (maximum count of number of rows with same instance_id is 250)
>>>>>
>>>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
>>>>> has allotted all the requested resource immediately and no other job is
>>>>> running on the
>>>>> cluster.
>>>>>
>>>>> spark.storage.memoryFraction 0.6
>>>>> spark.shuffle.memoryFraction 0.2
>>>>>
>>>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
>>>>> write 500GB of intermediate data
>>>>>
>>>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
>>>>> output of 400GB is shuffled
>>>>>
>>>>> Stage 4 - tasks fails with OOM on reading the shuffled output data
>>>>> when it reached 40GB data itself
>>>>>
>>>>> First of all, what kind of Hive queries when run on Spark gets a
>>>>> better performance than Mapreduce. And what are the hive queries that 
>>>>> won't
>>>>> perform
>>>>> well in Spark.
>>>>>
>>>>> How to calculate the optimal Heap for Executor Memory and the number
>>>>> of executors for given input data size. We don't specify Spark Executors 
>>>>> to
>>>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
>>>>> is failing immediately
>>>>> when it has just read 40GB data, is it caching data in Memory.
>>>>>
>>>>> And in a Spark job, some stage will need lot of memory for shuffle and
>>>>> some need lot of memory for cache. So, when a Spark Executor has lot of
>>>>> memory available
>>>>> for cache and does not use the cache but when there is a need to do
>>>>> lot of shuffle, will executors only use the shuffle fraction which is set
>>>>> for doing shuffle or will it use
>>>>> the free memory available for cache as well.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Prabhu Joseph
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Architect - Big Data
>>>> Ph: +91 99805 99458
>>>>
>>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>>> Sullivan India ICT)*
>>>> +++
>>>>
>>>
>>>
>>
>>
>> --
>>
>> Architect - Big Data
>> Ph: +91 99805 99458
>>
>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>> Sullivan India ICT)*
>> +++
>>
>
>


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Sabarish Sasidharan
I believe the OP is using Spark SQL and not Hive on Spark.

Regards
Sab

On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh 
wrote:

> I think the only version of Spark that works OK with Hive (Hive on Spark
> engine) is version 1.3.1. I also get OOM from time to time and have to
> revert using MR
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 March 2016 at 08:06, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Which version of Spark are you using? The configuration varies by version.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> A Hive Join query which runs fine and faster in MapReduce takes lot of
>>> time with Spark and finally fails with OOM.
>>>
>>> *Query:  hivejoin.py*
>>>
>>> from pyspark import SparkContext, SparkConf
>>> from pyspark.sql import HiveContext
>>> conf = SparkConf().setAppName("Hive_Join")
>>> sc = SparkContext(conf=conf)
>>> hiveCtx = HiveContext(sc)
>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
>>> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>> a.instance_id");
>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>> print results
>>>
>>>
>>> *Data Study:*
>>>
>>> Number of Rows:
>>>
>>> A table has 1002093508
>>> B table has5371668
>>> C table has  1000
>>>
>>> No Data Skewness:
>>>
>>> item_id in B is unique and A has multiple rows with same item_id, so
>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>
>>> instance_id in C is unique and A has multiple rows with same instance_id
>>> (maximum count of number of rows with same instance_id is 250)
>>>
>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
>>> has allotted all the requested resource immediately and no other job is
>>> running on the
>>> cluster.
>>>
>>> spark.storage.memoryFraction 0.6
>>> spark.shuffle.memoryFraction 0.2
>>>
>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
>>> 500GB of intermediate data
>>>
>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
>>> output of 400GB is shuffled
>>>
>>> Stage 4 - tasks fails with OOM on reading the shuffled output data when
>>> it reached 40GB data itself
>>>
>>> First of all, what kind of Hive queries when run on Spark gets a better
>>> performance than Mapreduce. And what are the hive queries that won't perform
>>> well in Spark.
>>>
>>> How to calculate the optimal Heap for Executor Memory and the number of
>>> executors for given input data size. We don't specify Spark Executors to
>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
>>> is failing immediately
>>> when it has just read 40GB data, is it caching data in Memory.
>>>
>>> And in a Spark job, some stage will need lot of memory for shuffle and
>>> some need lot of memory for cache. So, when a Spark Executor has lot of
>>> memory available
>>> for cache and does not use the cache but when there is a need to do lot
>>> of shuffle, will executors only use the shuffle fraction which is set for
>>> doing shuffle or will it use
>>> the free memory available for cache as well.
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Architect - Big Data
>> Ph: +91 99805 99458
>>
>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>> Sullivan India ICT)*
>> +++
>>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Sabarish Sasidharan
Which version of Spark are you using? The configuration varies by version.

Regards
Sab

On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph 
wrote:

> Hi All,
>
> A Hive Join query which runs fine and faster in MapReduce takes lot of
> time with Spark and finally fails with OOM.
>
> *Query:  hivejoin.py*
>
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import HiveContext
> conf = SparkConf().setAppName("Hive_Join")
> sc = SparkContext(conf=conf)
> hiveCtx = HiveContext(sc)
> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
> a.instance_id");
> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
> print results
>
>
> *Data Study:*
>
> Number of Rows:
>
> A table has 1002093508
> B table has5371668
> C table has  1000
>
> No Data Skewness:
>
> item_id in B is unique and A has multiple rows with same item_id, so after
> first INNER_JOIN the result set is same 1002093508 rows
>
> instance_id in C is unique and A has multiple rows with same instance_id
> (maximum count of number of rows with same instance_id is 250)
>
> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN has
> allotted all the requested resource immediately and no other job is running
> on the
> cluster.
>
> spark.storage.memoryFraction 0.6
> spark.shuffle.memoryFraction 0.2
>
> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
> 500GB of intermediate data
>
> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
> output of 400GB is shuffled
>
> Stage 4 - tasks fails with OOM on reading the shuffled output data when it
> reached 40GB data itself
>
> First of all, what kind of Hive queries when run on Spark gets a better
> performance than Mapreduce. And what are the hive queries that won't perform
> well in Spark.
>
> How to calculate the optimal Heap for Executor Memory and the number of
> executors for given input data size. We don't specify Spark Executors to
> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
> is failing immediately
> when it has just read 40GB data, is it caching data in Memory.
>
> And in a Spark job, some stage will need lot of memory for shuffle and
> some need lot of memory for cache. So, when a Spark Executor has lot of
> memory available
> for cache and does not use the cache but when there is a need to do lot of
> shuffle, will executors only use the shuffle fraction which is set for
> doing shuffle or will it use
> the free memory available for cache as well.
>
>
> Thanks,
> Prabhu Joseph
>
>
>
>
>
>
>
>
>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Zeppelin Integration

2016-03-10 Thread Sabarish Sasidharan
I believe you need to co-locate your Zeppelin on the same node where Spark
is installed. You need to specify the SPARK HOME. The master I used was
YARN.

Zeppelin exposes a notebook interface. A notebook can have many paragraphs.
You run the paragraphs. You can mix multiple contexts in the same notebook.
So first paragraph can be scala, second can be sql that uses DF from first
paragraph etc. If you use a select query, the output is automatically
displayed as a chart.

As RDDs are bound to the context that creates them, I don't think Zeppelin
can use those RDDs.

I don't know if notebooks can be reused within other notebooks. It would be
a nice way of doing some common preparatory work (like building these RDDs).

Regards
Sab

On Thu, Mar 10, 2016 at 2:28 PM, ayan guha  wrote:

> Hi All
>
> I am writing this in order to get a fair understanding of how zeppelin can
> be integrated with Spark.
>
> Our use case is to load few tables from a DB to Spark, run some
> transformation. Once done, we want to expose data through Zeppelin for
> analytics. I have few question around that to sound off any gross
> architectural flaws.
>
> Questions:
>
> 1. How Zeppelin connects to Spark? Thriftserver? Thrift JDBC?
>
> 2. What is the scope of Spark application when it is used from Zeppelin?
> For example, if I have few subsequent actions in zeppelin like
> map,filter,reduceByKey, filter,collect. I assume this will translate to an
> application and get submitted to Spark. However, If I want to use reuse
> some part of the data (for example) after first map transformation in
> earlier application. Can I do it? Or will it be another application and
> another spark submit?
>
>  In our use case data will already be loaded in RDDs. So how Zeppelin can
> access it?
>
> 3. How can I control access on specific rdds to specific users in Zeppelin
> (assuming we have implemented some way of login mechanism in Zeppelin and
> we have a mapping between Zeppelin users and their LDAP accounts). Is it
> even possible?
>
> 4. If Zeppelin is not a good choice, yet, for the use case, what are the
> other alternatives?
>
> appreciate any help/pointers/guidance.
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: S3 Zip File Loading Advice

2016-03-09 Thread Sabarish Sasidharan
You can use S3's listKeys API and do a diff between consecutive listKeys to
identify what's new.

Are there multiple files in each zip? Single file archives are processed
just like text as long as it is one of the supported compression formats.

Regards
Sab

On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim  wrote:

> I am wondering if anyone can help.
>
> Our company stores zipped CSV files in S3, which has been a big headache
> from the start. I was wondering if anyone has created a way to iterate
> through several subdirectories (s3n://events/2016/03/01/00,
> s3n://2016/03/01/01, etc.) in S3 to find the newest files and load them. It
> would be a big bonus to include the unzipping of the file in the process so
> that the CSV can be loaded directly into a dataframe for further
> processing. I’m pretty sure that the S3 part of this request is not
> uncommon. I would think the file being zipped is uncommon. If anyone can
> help, I would truly be grateful for I am new to Scala and Spark. This would
> be a great help in learning.
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Get rid of FileAlreadyExistsError

2016-03-01 Thread Sabarish Sasidharan
Have you tried spark.*hadoop*.*validateOutputSpecs*?
On 01-Mar-2016 9:43 pm, "Peter Halliday"  wrote:

> http://pastebin.com/vbbFzyzb
>
> The problem seems to be to be two fold.  First, the ParquetFileWriter in
> Hadoop allows for an overwrite flag that Spark doesn’t allow to be set.
> The second is that the DirectParquetOutputCommitter has an abortTask that’s
> empty.  I see SPARK-8413 open on this too, but no plans on changing this.
> I’m surprised not to see this fixed yet.
>
> Peter Halliday
>
>
>
> On Mar 1, 2016, at 10:01 AM, Ted Yu  wrote:
>
> Do you mind pastebin'ning the stack trace with the error so that we know
> which part of the code is under discussion ?
>
> Thanks
>
> On Tue, Mar 1, 2016 at 7:48 AM, Peter Halliday  wrote:
>
>> I have a Spark application that has a Task seem to fail, but it actually
>> did write out some of the files that were assigned it.  And Spark assigns
>> another executor that task, and it gets a FileAlreadyExistsException.  The
>> Hadoop code seems to allow for files to be overwritten, but I see the 1.5.1
>> version of this code doesn’t allow for this to be passed in.  Is that
>> correct?
>>
>> Peter Halliday
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: Spark on Windows platform

2016-03-01 Thread Sabarish Sasidharan
If all you want is Spark standalone then its as simple as installing the
binaries and calling Spark submit passing your main class. I would advise
against running on Hadoop on Windows, it's a bit of trouble. But yes you
can do it if you want to.

Regards
Sab

Regards
Sab
On 29-Feb-2016 6:58 pm, "gaurav pathak"  wrote:

> Can someone guide me the steps and information regarding, installation of
> SPARK on Windows 7/8.1/10 , as well as on Windows Server. Also, it will be
> great to read your experiences in using SPARK on Windows platform.
>
>
> Thanks & Regards,
> Gaurav Pathak
>


Re: DataSet Evidence

2016-03-01 Thread Sabarish Sasidharan
BeanInfo?
On 01-Mar-2016 6:25 am, "Steve Lewis"  wrote:

>  I have a relatively complex Java object that I would like to use in a
> dataset
>
> if I say
>
> Encoder evidence = Encoders.kryo(MyType.class);
>
> JavaRDD rddMyType= generateRDD(); // some code
>
>  Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), 
> evidence);
>
>
> I get one column - the whole object
>
> The object is a bean with all fields having getters and setters but some of 
> the fields are other complex java objects -
>
> It would be fine to serielize the objects in these fields with Kryo or Java 
> serialization but the Bean serializer treats all referenced objects as beans 
> and some lack the required getter and setter fields
>
> How can I get my columns with bean serializer even if some of the values in 
> the columns are not bean types
>
>


Re: Spark for client

2016-02-29 Thread Sabarish Sasidharan
Zeppelin?

Regards
Sab
On 01-Mar-2016 12:27 am, "Mich Talebzadeh" 
wrote:

> Hi,
>
> Is there such thing as Spark for client much like RDBMS client that have
> cut down version of their big brother useful for client connectivity but
> cannot be used as server.
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: .cache() changes contents of RDD

2016-02-27 Thread Sabarish Sasidharan
This is because Hadoop writables are being reused. Just map it to some
custom type and then do further operations including cache() on it.

Regards
Sab
On 27-Feb-2016 9:11 am, "Yan Yang"  wrote:

> Hi
>
> I am pretty new to Spark, and after experimentation on our pipelines. I
> ran into this weird issue.
>
> The Scala code is as below:
>
> val input = sc.newAPIHadoopRDD(...)
> val rdd = input.map(...)
> rdd.cache()
> rdd.saveAsTextFile(...)
>
> I found rdd to consist of 80+K identical rows. To be more precise, the
> number of rows is right, but all are identical.
>
> The truly weird part is if I remove rdd.cache(), everything works just
> fine. I have encountered this issue on a few occasions.
>
> Thanks
> Yan
>
>
>
>
>


Re: ALS trainImplicit performance

2016-02-25 Thread Sabarish Sasidharan
I have tested upto 3 billion. ALS scales, you just need to scale your
cluster accordingly. More than building the model, it's getting the final
recommendations that won't scale as nicely, especially when number of
products is huge. This is the case when you are generating recommendations
in a batch mode and shouldn't affect if the model is used to recommend in a
real time/online mode.

Regards
Sab
On 26-Feb-2016 6:53 am, "Roberto Pagliari" 
wrote:

> Does anyone know about the maximum number of ratings ALS was tested
> successfully?
>
> For example, is 1 billion ratings (nonzero entries) too much for it to
> work properly?
>
>
> Thank you,
>


Re: Multiple user operations in spark.

2016-02-25 Thread Sabarish Sasidharan
I don't have a proper answer to this. But to circumvent if you have 2
independent Spark jobs, you could update one when the other is serving
reads. But it's still not scalable for incessant updates.

Regards
Sab
On 25-Feb-2016 7:19 pm, "Udbhav Agarwal"  wrote:

> Hi,
>
> I am using graphx. I am adding a batch of vertices to a graph with around
> 100,000 vertices and few edges. Adding around 400 vertices is taking 7
> seconds with one machine of 8 core and 8g ram. My trouble is when this
> process of addition is happening with the graph(name is *inputGraph)* am
> not able to access it or perform query over it. Since it is s real time
> system I want it to be available to the user every time. Currently when I
> am querying the graph during this process of addition of vertices its
> giving result after the addition is over. I have also tried with creating
> and querying another variable tempInputGraph where am storing state of
> inputGraph, which is updated whenever the addition process is over. But
> querying this is also being delayed due to the background process.
>
> I have set the number of executors as 8 as per my 8 core system.
>
> Please provide any suggestion as to how I can keep this graph always
> available to user even if any background process is happening over it.
>
>
>
> *Thanks,*
>
> *Udbhav Agarwal*
>
>
>
>
>


Re: How could I do this algorithm in Spark?

2016-02-25 Thread Sabarish Sasidharan
Like Robin said, pls explore Pregel. You could do it without Pregel but it
might be laborious. I have a simple outline below. You will need more
iterations if the number of levels is higher.

a-b
b-c
c-d
b-e
e-f
f-c

flatmaptopair

a -> (a-b)
b -> (a-b)
b -> (b-c)
c -> (b-c)
c -> (c-d)
d -> (c-d)
b -> (b-e)
e -> (b-e)
e -> (e-f)
f -> (e-f)
f -> (f-c)
c -> (f-c)

aggregatebykey

a -> (a-b)
b -> (a-b, b-c, b-e)
c -> (b-c, c-d, f-c)
d -> (c-d)
e -> (b-e, e-f)
f -> (e-f, f-c)

filter to remove keys with less than 2 values

b -> (a-b, b-c, b-e)
c -> (b-c, c-d, f-c)
e -> (b-e, e-f)
f -> (e-f, f-c)

flatmap

a-b-c
a-b-e
b-c-d
b-e-f
e-f-c

flatmaptopair followed by aggregatebykey

(a-b) -> (a-b-c, a-b-e)
(b-c) -> (a-b-c, b-c-d)
(c-d) -> (b-c-d)
(b-e) -> (b-e-f)
(e-f) -> (b-e-f, e-f-c)
(f-c) -> (e-f-c)

filter out keys with less than 2 values

(b-c) -> (a-b-c, b-c-d)
(e-f) -> (b-e-f, e-f-c)

mapvalues

a-b-c-d
b-e-f-c

flatmap

a,d
b,d
c,d
b,c
e,c
f,c


On Thu, Feb 25, 2016 at 6:19 PM, Guillermo Ortiz 
wrote:

> I'm taking a look to Pregel. It seems it's a good way to do it. The only
> negative thing that I see it's not a really complex graph with a lot of
> edges between the vertex .. They are more like a lot of isolated small
> graphs
>
> 2016-02-25 12:32 GMT+01:00 Robin East :
>
>> The structures you are describing look like edges of a graph and you want
>> to follow the graph to a terminal vertex and then propagate that value back
>> up the path. On this assumption it would be simple to create the structures
>> as graphs in GraphX and use Pregel for the algorithm implementation.
>>
>> ---
>> Robin East
>> *Spark GraphX in Action* Michael Malak and Robin East
>> Manning Publications Co.
>> http://www.manning.com/books/spark-graphx-in-action
>>
>>
>>
>>
>>
>> On 25 Feb 2016, at 10:52, Guillermo Ortiz  wrote:
>>
>> Oh, the letters were just an example, it could be:
>> a , t
>> b, o
>> t, k
>> k, c
>>
>> So.. a -> t -> k -> c and the result is: a,c; t,c; k,c and b,o
>> I don't know if you were thinking about sortBy because the another
>> example where letter were consecutive.
>>
>>
>> 2016-02-25 9:42 GMT+01:00 Guillermo Ortiz :
>>
>>> I don't see that sorting the data helps.
>>> The answer has to be all the associations. In this case the answer has
>>> to be:
>>> a , b --> it was a error in the question, sorry.
>>> b , d
>>> c , d
>>> x , y
>>> y , y
>>>
>>> I feel like all the data which is associate should be in the same
>>> executor.
>>> On this case if I order the inputs.
>>> a , b
>>> x , y
>>> b , c
>>> y , y
>>> c , d
>>> --> to
>>> a , b
>>> b , c
>>> c , d
>>> x , y
>>> y , y
>>>
>>> Now, a,b ; b,c; one partitions for example, "c,d" and "x,y" another one
>>> and so on.
>>> I could get the relation between "a,b,c", but not about "d" with
>>> "a,b,c", am I wrong? I hope to be wrong!.
>>>
>>> It seems that it could be done with GraphX, but as you said, it seems a
>>> little bit overhead.
>>>
>>>
>>> 2016-02-25 5:43 GMT+01:00 James Barney :
>>>
 Guillermo,
 I think you're after an associative algorithm where A is ultimately
 associated with D, correct? Jakob would correct if that is a typo--a sort
 would be all that is necessary in that case.

 I believe you're looking for something else though, if I understand
 correctly.

 This seems like a similar algorithm to PageRank, no?
 https://github.com/amplab/graphx/blob/master/python/examples/pagerank.py
 Except return the "neighbor" itself, not the necessarily the rank of the
 page.

 If you wanted to, use Scala and Graphx for this problem. Might be a bit
 of overhead though: Construct a node for each member of each tuple with an
 edge between. Then traverse the graph for all sets of nodes that are
 connected. That result set would quickly explode in size, but you could
 restrict results to a minimum N connections. I'm not super familiar with
 Graphx myself, however. My intuition is saying 'graph problem' though.

 Thoughts?


 On Wed, Feb 24, 2016 at 6:43 PM, Jakob Odersky 
 wrote:

> Hi Guillermo,
> assuming that the first "a,b" is a typo and you actually meant "a,d",
> this is a sorting problem.
>
> You could easily model your data as an RDD or tuples (or as a
> dataframe/set) and use the sortBy (or orderBy for dataframe/sets)
> methods.
>
> best,
> --Jakob
>
> On Wed, Feb 24, 2016 at 2:26 PM, Guillermo Ortiz 
> wrote:
> > I want to do some algorithm in Spark.. I know how to do it in a
> single
> > machine where all data are together, but I don't know a good way to
> do it in
> > Spark.
> >
> > If someone has an idea..
> > I have some data like this
> > a , b
> > x , y
> > b , c
> > y , y
> > c , d
> >
> > I want something like:
> > a , d
> > b , d
> > c , d

Re: which is a more appropriate form of ratings ?

2016-02-25 Thread Sabarish Sasidharan
I believe the ALS algo expects the ratings to be aggregated (A). I don't
see why you have to use decimals for rating.

Regards
Sab

On Thu, Feb 25, 2016 at 4:50 PM, Hiroyuki Yamada  wrote:

> Hello.
>
> I just started working on CF in MLlib.
> I am using trainImplicit because I only have implicit ratings like page
> views.
>
> I am wondering which is a more appropriate form of ratings.
> Let's assume that view count is regarded as a rating and
> user 1 sees page 1 3 times and sees page 2 twice and so on.
>
> In this case, I think ratings can be formatted like the following 2 cases.
> (of course it is a RDD actually)
>
> A:
> user_id,page_id,rating(page view)
> 1,1,0.3
> 1,2,0.2
> ...
>
> B:
> user_id,page_id,rating(page view)
> 1,1,0.1
> 1,1,0.1
> 1,1,0.1
> 1,2,0.1
> 1,2,0.1
> ...
>
> It is allowed to have like B ?
> If it is, which is better ? ( is there any difference between them ?)
>
> Best,
> Hiro
>
>
>
>


Re: Execution plan in spark

2016-02-24 Thread Sabarish Sasidharan
There is no execution plan for FP. Execution plan exists for sql.

Regards
Sab
On 24-Feb-2016 2:46 pm, "Ashok Kumar"  wrote:

> Gurus,
>
> Is there anything like explain in Spark to see the execution plan in
> functional programming?
>
> warm regards
>


Re: Using functional programming rather than SQL

2016-02-24 Thread Sabarish Sasidharan
I never said it needs one. All I said is that when calling context.sql()
the sql is executed in the source database (assuming datasource is Hive or
some RDBMS)

Regards
Sab

Regards
Sab
On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:

> That is incorrect HiveContext does not need a hive instance to run.
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Yes
>>
>> Regards
>> Sab
>> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>>
>>> are you saying that HiveContext.sql(...) runs on hive, and not on spark
>>> sql?
>>>
>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
>>>> When using SQL your full query, including the joins, were executed in
>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>>> cluster and then the join is executed.
>>>>
>>>> Thus the time difference.
>>>>
>>>> It's not immediately obvious why the results are different.
>>>>
>>>> Regards
>>>> Sab
>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>>
>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> First thanks everyone for their suggestions. Much appreciated.
>>>>>
>>>>> This was the original queries written in SQL and run against
>>>>> Spark-shell
>>>>>
>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>> HiveContext.sql("use oraclehadoop")
>>>>>
>>>>> val rs = HiveContext.sql(
>>>>> """
>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>>>> TotalSales
>>>>> FROM smallsales s
>>>>> INNER JOIN times t
>>>>> ON s.time_id = t.time_id
>>>>> INNER JOIN channels c
>>>>> ON s.channel_id = c.channel_id
>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>> """)
>>>>> rs.registerTempTable("tmp")
>>>>> println ("\nfirst query")
>>>>> HiveContext.sql("""
>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>> TotalSales
>>>>> from tmp
>>>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>>>> """).collect.foreach(println)
>>>>> println ("\nsecond query")
>>>>> HiveContext.sql("""
>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>> FROM tmp
>>>>> GROUP BY channel_desc
>>>>> order by SALES DESC LIMIT 5
>>>>> """).collect.foreach(println)
>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>> sys.exit
>>>>>
>>>>> The second queries were written in FP as much as I could as below
>>>>>
>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>> HiveContext.sql("use oraclehadoop")
>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>>>> sales")
>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>>>> channels")
>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>>>> times")
>>>>> val rs =
>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

2016-02-24 Thread Sabarish Sasidharan
LD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [24/02/2016 09:00:14.14]
>
>
>
> On 24/02/2016 06:27, Sabarish Sasidharan wrote:
>
> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
>
>
>


Re: Using Spark functional programming rather than SQL, Spark on Hive tables

2016-02-24 Thread Sabarish Sasidharan
hannels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [24/02/2016 09:00:14.14]
>
>
>
> On 24/02/2016 06:27, Sabarish Sasidharan wrote:
>
> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
>
>
>


Re: Using functional programming rather than SQL

2016-02-24 Thread Sabarish Sasidharan
Yes

Regards
Sab
On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:

> are you saying that HiveContext.sql(...) runs on hive, and not on spark
> sql?
>
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> When using SQL your full query, including the joins, were executed in
>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>> the FP case, the data for the 3 tables is first pulled into the Spark
>> cluster and then the join is executed.
>>
>> Thus the time difference.
>>
>> It's not immediately obvious why the results are different.
>>
>> Regards
>> Sab
>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> First thanks everyone for their suggestions. Much appreciated.
>>>
>>> This was the original queries written in SQL and run against Spark-shell
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>>
>>> val rs = HiveContext.sql(
>>> """
>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>> TotalSales
>>> FROM smallsales s
>>> INNER JOIN times t
>>> ON s.time_id = t.time_id
>>> INNER JOIN channels c
>>> ON s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> rs.registerTempTable("tmp")
>>> println ("\nfirst query")
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nsecond query")
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>> The second queries were written in FP as much as I could as below
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>> sales")
>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>>> val rs =
>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>> println ("\nfirst query")
>>> val rs1 =
>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>> println ("\nsecond query")
>>> val rs2
>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>>
>>>
>>> However The first query results are slightly different in SQL and FP
>>> (may be the first query code in FP is not exactly correct?) and more
>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>> minutes compared to less than a minute). I am not surprised as I expected
>>> Functional Programming has to flatten up all those method calls and convert
>>> them to SQL?
>>>
>&g

Re: streaming spark is writing results to S3 a good idea?

2016-02-23 Thread Sabarish Sasidharan
Writing to S3 is over the network. So will obviously be slower than local
disk. That said, within AWS the network is pretty fast. Still you might
want to write to S3 only after a certain threshold in data is reached, so
that it's efficient. You might also want to use the DirectOutputCommitter
as it avoid one extra set of writes and is doubly faster.

Note that when using S3 your data moves through the public Internet, though
it's still https. If you don't like that you should look at using vpc
endpoints.

Regards
Sab
On 24-Feb-2016 6:57 am, "Andy Davidson" 
wrote:

> Currently our stream apps write results to hdfs. We are running into
> problems with HDFS becoming corrupted and running out of space. It seems
> like a better solution might be to write directly to S3. Is this a good
> idea?
>
> We plan to continue to write our checkpoints to hdfs
>
> Are there any issues to be aware of? Maybe performance or something else
> to watch out for?
>
> This is our first S3 project. Does storage just grow on on demand?
>
> Kind regards
>
> Andy
>
>
> P.s. Turns out we are using an old version of hadoop (v 1.0.4)
>
>
>
>


Re: streaming spark is writing results to S3 a good idea?

2016-02-23 Thread Sabarish Sasidharan
And yes, storage grows on demand. No issues with that.

Regards
Sab
On 24-Feb-2016 6:57 am, "Andy Davidson" 
wrote:

> Currently our stream apps write results to hdfs. We are running into
> problems with HDFS becoming corrupted and running out of space. It seems
> like a better solution might be to write directly to S3. Is this a good
> idea?
>
> We plan to continue to write our checkpoints to hdfs
>
> Are there any issues to be aware of? Maybe performance or something else
> to watch out for?
>
> This is our first S3 project. Does storage just grow on on demand?
>
> Kind regards
>
> Andy
>
>
> P.s. Turns out we are using an old version of hadoop (v 1.0.4)
>
>
>
>


Re: Using functional programming rather than SQL

2016-02-23 Thread Sabarish Sasidharan
When using SQL your full query, including the joins, were executed in
Hive(or RDBMS) and only the results were brought into the Spark cluster. In
the FP case, the data for the 3 tables is first pulled into the Spark
cluster and then the join is executed.

Thus the time difference.

It's not immediately obvious why the results are different.

Regards
Sab
On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP (may
> be the first query code in FP is not exactly correct?) and more importantly
> the FP takes order of magnitude longer compared to SQL (8 minutes compared
> to less than a minute). I am not surprised as I expected Functional
> Programming has to flatten up all those method calls and convert them to
> SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at
> [23/02/2016 23:56:11.11]
>
> *The FP results*
>
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [23/02/2016 23:53:42.42]
>
>
>
> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>
> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in S

Re: [Please Help] Log redirection on EMR

2016-02-21 Thread Sabarish Sasidharan
Your logs are getting archived in your logs bucket in S3.

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-plan-debugging.html

Regards
Sab

On Mon, Feb 22, 2016 at 12:14 PM, HARSH TAKKAR 
wrote:

> Hi
>
> In am using an EMR cluster  for running my spark jobs, but after the job
> finishes logs disappear,
>
> I have added a log4j.properties in my jar, but all the logs still
> redirects to EMR resource manager which vanishes after jobs completes, is
> there a way i could redirect the logs to a location in file  syatem, I am
> working on price points and its very critical for me to maintain logs.
>
> Just to add i get following error when my application starts.
>
> java.io.FileNotFoundException: /etc/spark/conf/log4j.properties (No such file 
> or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at java.io.FileInputStream.(FileInputStream.java:101)
>   at 
> sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90)
>   at 
> sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
>   at 
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557)
>   at 
> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
>   at org.apache.log4j.LogManager.(LogManager.java:127)
>   at org.apache.spark.Logging$class.initializeLogging(Logging.scala:122)
>   at 
> org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
>   at org.apache.spark.Logging$class.log(Logging.scala:51)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:607)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:621)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Is this likely to cause any problems?

2016-02-19 Thread Sabarish Sasidharan
EMR does cost more than vanilla EC2. Using spark-ec2 can result in savings
with large clusters, though that is not everybody's cup of tea.

Regards
Sab
On 19-Feb-2016 7:55 pm, "Daniel Siegmann" 
wrote:

> With EMR supporting Spark, I don't see much reason to use the spark-ec2
> script unless it is important for you to be able to launch clusters using
> the bleeding edge version of Spark. EMR does seem to do a pretty decent job
> of keeping up to date - the latest version (4.3.0) supports the latest
> Spark version (1.6.0).
>
> So I'd flip the question around and ask: is there any reason to continue
> using the spark-ec2 script rather than EMR?
>
> On Thu, Feb 18, 2016 at 11:39 AM, James Hammerton  wrote:
>
>> I have now... So far  I think the issues I've had are not related to
>> this, but I wanted to be sure in case it should be something that needs to
>> be patched. I've had some jobs run successfully but this warning appears in
>> the logs.
>>
>> Regards,
>>
>> James
>>
>> On 18 February 2016 at 12:23, Ted Yu  wrote:
>>
>>> Have you seen this ?
>>>
>>> HADOOP-10988
>>>
>>> Cheers
>>>
>>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>>
 HI,

 I am seeing warnings like this in the logs when I run Spark jobs:

 OpenJDK 64-Bit Server VM warning: You have loaded library 
 /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
 disabled stack guard. The VM will try to fix the stack guard now.
 It's highly recommended that you fix the library with 'execstack -c 
 ', or link it with '-z noexecstack'.


 I used spark-ec2 to launch the cluster with the default AMI, Spark
 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
 written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
 is m4.large.

 Could this contribute to any problems running the jobs?

 Regards,

 James

>>>
>>>
>>
>


Re: Running multiple foreach loops

2016-02-17 Thread Sabarish Sasidharan
I don't think that's a good idea. Even if it wasn't in Spark. I am trying
to understand the benefits you gain by separating.

I would rather use a Composite pattern approach wherein adding to the
composite cascades the additive operations to the children. Thereby your
foreach code doesn't have to know the number or names of your individual
accumulators. So a single accumulator would suffice.

Regards
Sab

On Thu, Feb 18, 2016 at 3:00 AM, Daniel Imberman 
wrote:

> Hi all,
>
> So I'm currently figuring out how to accumulate three separate
> accumulators:
>
> val a:Accumulator
> val b:Accumulator
> val c:Accumulator
>
> I have an r:RDD[thing] and the code currently reads
>
> r.foreach{
> thing =>
>  a += thing
>  b += thing
>  c += thing
> }
>
>
> Ideally, I would much prefer to split this up so that I can separate
> concerns. I'm considering creating something along the lines of:
>
> def handleA(a:Accumulator, r:RDD[Thing]){
> //a's logic
> r.foreach{ thing => a += thing}
> }
>
>
> def handleB(b:Accumulator, r:RDD[Thing]){
> //a's logic
> r.foreach{ thing => b += thing}
> }
>
> and so on. However Im worried that this would cause a performance hit. Does
> anyone have any thoughts as to whether this would be a bad idea?
>
> thank you!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-foreach-loops-tp26256.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Need help :Does anybody has HDP cluster on EC2?

2016-02-15 Thread Sabarish Sasidharan
You can setup SSH tunneling.

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-ssh-tunnel.html

Regards
Sab

On Mon, Feb 15, 2016 at 1:55 PM, Divya Gehlot 
wrote:

> Hi,
> I have hadoop cluster set up in EC2.
> I am unable to view application logs in Web UI as its taking internal IP
> Like below :
> http://ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal:8042
> 
>
> How can I change this to external one or redirecting to external ?
> Attached screenshots for better understanding of my issue.
>
> Would really appreciate help.
>
>
> Thanks,
> Divya
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: which master option to view current running job in Spark UI

2016-02-14 Thread Sabarish Sasidharan
When running in YARN, you can use the YARN Resource Manager UI to get to
the ApplicationMaster url, irrespective of client or cluster mode.

Regards
Sab
On 15-Feb-2016 10:10 am, "Divya Gehlot"  wrote:

> Hi,
> I have Hortonworks 2.3.4 cluster on EC2 and Have spark jobs as scala files
> .
> I am bit confused between using *master  *options
> I want to execute this spark job in YARN
>
> Curently running as
> spark-shell --properties-file  /TestDivya/Spark/Oracle.properties --jars
> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --driver-class-path
> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --packages
> com.databricks:spark-csv_2.10:1.1.0  *--master yarn-client *  -i
> /TestDivya/Spark/Test.scala
>
> with this option I cant see the currently running jobs in Spark WEB UI
> though it later appear in spark history server.
>
> My question with which --master option should I run my spark jobs so that
> I can view the currently running jobs in spark web UI .
>
> Thanks,
> Divya
>


Re: coalesce and executor memory

2016-02-14 Thread Sabarish Sasidharan
I believe you will gain more understanding if you look at or use
mapPartitions()

Regards
Sab
On 15-Feb-2016 8:38 am, "Christopher Brady" 
wrote:

> I tried it without the cache, but it didn't change anything. The reason
> for the cache is that other actions will be performed on this RDD, even
> though it never gets that far.
>
> I can make it work by just increasing the number of partitions, but I was
> hoping to get a better understanding of how Spark works rather that just
> use trial and error every time I hit this issue.
>
>
> - Original Message -
> From: silvio.fior...@granturing.com
> To: christopher.br...@oracle.com, ko...@tresata.com
> Cc: user@spark.apache.org
> Sent: Sunday, February 14, 2016 8:27:09 AM GMT -05:00 US/Canada Eastern
> Subject: RE: coalesce and executor memory
>
> Actually, rereading your email I see you're caching. But ‘cache’ uses
> MEMORY_ONLY. Do you see errors about losing partitions as your job is
> running?
>
>
>
> Are you sure you need to cache if you're just saving to disk? Can you try
> the coalesce without cache?
>
>
>
>
>
> *From: *Christopher Brady 
> *Sent: *Friday, February 12, 2016 8:34 PM
> *To: *Koert Kuipers ; Silvio Fiorito
> 
> *Cc: *user 
> *Subject: *Re: coalesce and executor memory
>
>
> Thank you for the responses. The map function just changes the format of
> the record slightly, so I don't think that would be the cause of the memory
> problem.
>
> So if I have 3 cores per executor, I need to be able to fit 3 partitions
> per executor within whatever I specify for the executor memory? Is there a
> way I can programmatically find a number of partitions I can coalesce down
> to without running out of memory? Is there some documentation where this is
> explained?
>
>
> On 02/12/2016 05:10 PM, Koert Kuipers wrote:
>
> in spark, every partition needs to fit in the memory available to the core
> processing it.
>
> as you coalesce you reduce number of partitions, increasing partition
> size. at some point the partition no longer fits in memory.
>
> On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Coalesce essentially reduces parallelism, so fewer cores are getting more
>> records. Be aware that it could also lead to loss of data locality,
>> depending on how far you reduce. Depending on what you’re doing in the map
>> operation, it could lead to OOM errors. Can you give more details as to
>> what the code for the map looks like?
>>
>>
>>
>>
>> On 2/12/16, 1:13 PM, "Christopher Brady" < 
>> christopher.br...@oracle.com> wrote:
>>
>> >Can anyone help me understand why using coalesce causes my executors to
>> >crash with out of memory? What happens during coalesce that increases
>> >memory usage so much?
>> >
>> >If I do:
>> >hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile
>> >
>> >everything works fine, but if I do:
>> >hadoopFile -> sample -> coalesce -> cache -> map ->
>> saveAsNewAPIHadoopFile
>> >
>> >my executors crash with out of memory exceptions.
>> >
>> >Is there any documentation that explains what causes the increased
>> >memory requirements with coalesce? It seems to be less of a problem if I
>> >coalesce into a larger number of partitions, but I'm not sure why this
>> >is. How would I estimate how much additional memory the coalesce
>> requires?
>> >
>> >Thanks.
>> >
>> >-
>> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >For additional commands, e-mail: 
>> user-h...@spark.apache.org
>> >
>>
>
>
>


Re: Spark Application Master on Yarn client mode - Virtual memory limit

2016-02-14 Thread Sabarish Sasidharan
Looks like your executors are running out of memory. YARN is not kicking
them out. Just increase the executor memory. Also considering increasing
the parallelism ie the number of partitions.

Regards
Sab
On 11-Feb-2016 5:46 am, "Nirav Patel"  wrote:

> In Yarn we have following settings enabled so that job can use virtual
> memory to have a capacity beyond physical memory off course.
>
> 
> yarn.nodemanager.vmem-check-enabled
> false
> 
>
> 
> yarn.nodemanager.pmem-check-enabled
> false
> 
>
> vmem to pmem ration is 2:1. However spark doesn't seem to be able to
> utilize this vmem limits
> we are getting following heap space error which seemed to be contained
> within spark executor.
>
> 16/02/09 23:08:06 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> SIGNAL 15: SIGTERM
> 16/02/09 23:08:06 ERROR executor.Executor: Exception in task 4.0 in stage
> 7.6 (TID 22363)
> java.lang.OutOfMemoryError: Java heap space
> at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
> at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
> at
> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
> at
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:203)
> at
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:202)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:202)
> at
> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
> at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
> at
> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
> at
> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
> at
> org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
>
> Yarn resource manager doesn't give any indication that whether container
> ran out of phycial or virtual memory limits.
>
> Also how to profile this container memory usage? We know our data is
> skewed so some of the executor will have large data (~2M RDD objects) to
> process. I used following as executorJavaOpts but it doesn't seem to work.
> -XX:-HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -3 %p'
> -XX:HeapDumpPath=/opt/cores/spark
>
>
>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: newbie unable to write to S3 403 forbidden error

2016-02-14 Thread Sabarish Sasidharan
Make sure you are using s3 bucket in same region. Also I would access my
bucket this way s3n://bucketname/foldername.

You can test privileges using the s3 cmd line client.

Also, if you are using instance profiles you don't need to specify access
and secret keys. No harm in specifying though.

Regards
Sab
On 12-Feb-2016 2:46 am, "Andy Davidson" 
wrote:

> I am using spark 1.6.0 in a cluster created using the spark-ec2 script. I
> am using the standalone cluster manager
>
> My java streaming app is not able to write to s3. It appears to be some
> for of permission problem.
>
> Any idea what the problem might be?
>
> I tried use the IAM simulator to test the policy. Everything seems okay.
> Any idea how I can debug this problem?
>
> Thanks in advance
>
> Andy
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> // I did not include the full key in my email
>// the keys do not contain ‘\’
>// these are the keys used to create the cluster. They belong to
> the IAM user andy
>
> jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAJREX"
> );
>
> jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
> "uBh9v1hdUctI23uvq9qR");
>
>
>
>   private static void saveTweets(JavaDStream jsonTweets, String
> outputURI) {
>
> jsonTweets.foreachRDD(new VoidFunction2, Time>() {
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public void call(JavaRDD rdd, Time time) throws
> Exception {
>
> if(!rdd.isEmpty()) {
>
> // bucket name is ‘com.pws.twitter’ it has a folder ‘json'
>
> String dirPath = "s3n://
> s3-us-west-1.amazonaws.com/com.pws.twitter/*json” *+ "-" + time
> .milliseconds();
>
> rdd.saveAsTextFile(dirPath);
>
> }
>
> }
>
> });
>
>
>
>
> Bucket name : com.pws.titter
> Bucket policy (I replaced the account id)
>
> {
> "Version": "2012-10-17",
> "Id": "Policy1455148808376",
> "Statement": [
> {
> "Sid": "Stmt1455148797805",
> "Effect": "Allow",
> "Principal": {
> "AWS": "arn:aws:iam::123456789012:user/andy"
> },
> "Action": "s3:*",
> "Resource": "arn:aws:s3:::com.pws.twitter/*"
> }
> ]
> }
>
>
>


Re: Best practises of share Spark cluster over few applications

2016-02-14 Thread Sabarish Sasidharan
Yes you can look at using the capacity scheduler or the fair scheduler with
YARN. Both allow using full cluster when idle. And both allow considering
cpu plus memory when allocating resources which is sort of necessary with
Spark.

Regards
Sab
On 13-Feb-2016 10:11 pm, "Eugene Morozov" 
wrote:

> Hi,
>
> I have several instances of the same web-service that is running some ML
> algos on Spark (both training and prediction) and do some Spark unrelated
> job. Each web-service instance creates their on JavaSparkContext, thus
> they're seen as separate applications by Spark, thus they're configured
> with separate limits of resources such as cores (I'm not concerned about
> the memory as much as about cores).
>
> With this set up, say 3 web service instances, each of them has just 1/3
> of cores. But it might happen, than only one instance is going to use
> Spark, while others are busy with Spark unrelated. I'd like in this case
> all Spark cores be available for the one that's in need.
>
> Ideally I'd like Spark cores just be available in total and the first app
> who needs it, takes as much as required from the available at the moment.
> Is it possible? I believe Mesos is able to set resources free if they're
> not in use. Is it possible with YARN?
>
> I'd appreciate if you could share your thoughts or experience on the
> subject.
>
> Thanks.
> --
> Be well!
> Jean Morozov
>


RE: Trying to join a registered Hive table as temporary with two Oracle tables registered as temporary in Spark

2016-02-14 Thread Sabarish Sasidharan
The Hive context can be used instead of sql context even when you are
accessing data from non-Hive sources like mysql or postgres for ex.  It has
better sql support than the sqlcontext as it uses the HiveQL parser.

Regards
Sab
On 15-Feb-2016 3:07 am, "Mich Talebzadeh"  wrote:

> Thanks.
>
>
>
> I tried to access Hive table via JDBC (it works) through sqlContext
>
>
>
>
>
> scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> sqlContext: org.apache.spark.sql.SQLContext =
> org.apache.spark.sql.SQLContext@4f60415b
>
>
>
> scala> val s = sqlContext.load("jdbc",
>
>  | Map("url" -> "jdbc:hive2://rhes564:10010/oraclehadoop",
>
>  | "dbtable" -> "SALES",
>
>  | "user" -> "hduser",
>
>  | "password" -> "xxx"))
>
> warning: there were 1 deprecation warning(s); re-run with -deprecation for
> details
>
> *java.sql.SQLException: Method not supported*
>
>
>
> In general one should expect this to work
>
>
>
> The attraction of Spark is to cache these tables in memory via registering
> them as temporary tables and do the queries there.
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>
> *From:* ayan guha [mailto:guha.a...@gmail.com]
> *Sent:* 14 February 2016 21:07
> *To:* Mich Talebzadeh 
> *Cc:* user 
> *Subject:* Re: Trying to join a registered Hive table as temporary with
> two Oracle tables registered as temporary in Spark
>
>
>
> Why can't you use the jdbc in hive context? I don't think sharing data
> across contexts are allowed.
>
> On 15 Feb 2016 07:22, "Mich Talebzadeh"  wrote:
>
> I am intending to get a table from Hive and register it as temporary table
> in Spark.
>
>
>
> I have created contexts for both Hive and Spark as below
>
>
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> //
>
>
>
> I get the Hive table as below using HiveContext
>
>
>
> //Get the FACT table from Hive
>
> //
>
> var s = hiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> oraclehadoop.sales")
>
>
>
> s.registerTempTable("t_s")
>
>
>
> This works fine using HiveContext
>
>
>
> scala> hiveContext.sql("select count(1) from
> t_s").collect.foreach(println)
>
> [4991761]
>
>
>
> Now I use JDBC to get data from two Oracle tables and registar them as
> temporary tables using sqlContext
>
>
>
> val c = sqlContext.load("jdbc",
>
> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>
> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
> sh.channels)",
>
> "user" -> "sh",
>
> "password" -> "xxx"))
>
>
>
> val t = sqlContext.load("jdbc",
>
> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>
> "dbtable" -> "(SELECT to_char(TIME_ID) AS TIME_ID, CALENDAR_MONTH_DESC
> FROM sh.times)",
>
> "user" -> "sh",
>
> "password" -> "sxxx"))
>
>
>
> And register them as temporary tables
>
>
>
> c.registerTempTable("t_c")
>
> t.registerTempTable("t_t")
>
> //
>
>
>
> Now trying to do SQL on three tables using sqlContext. However it cannot
> see the hive table
>
>
>
> var sqltext : String = ""
>
> sqltext = """
>
> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>
> FROM
>
> (
>
> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS SalesChannel,
> SUM(t_s.AMOUNT_SOLD) AS TotalSales
>
> FROM t_s, t_t, t_c
>
> WHERE t_s.TIME_ID = t_t.TIME_ID
>
> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>
> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ) rs
>
> LIMIT 10
>
>
>
>
>
> sqlContext.sql(sqltext).collect.foreach(println)
>
>
>
> *org.apache.spark.sql.AnalysisException: no such table t_s; line 5 pos 10*
>
>
>
> I guess this is due to two  different Data Frame used. Is there any
> solution? For example can I transorm from HiveContext to sqlContext?
>
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not t

Re: recommendProductsForUser for a subset of user

2016-02-03 Thread Sabarish Sasidharan
You could always construct a new MatrixFactorizationModel with your
filtered set of user features and product features. I believe its just a
stateless wrapper around the actual rdds.

Regards
Sab

On Wed, Feb 3, 2016 at 5:28 AM, Roberto Pagliari 
wrote:

> When using ALS, is it possible to use recommendProductsForUser for a
> subset of users?
>
> Currently, productFeatures and userFeatures are val. Is there a
> workaround for it? Using recommendForUser repeatedly would not work in my
> case, since it would be too slow with many users.
>
>
> Thank you,
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Split columns in RDD

2016-01-19 Thread Sabarish Sasidharan
The most efficient to determine the number of columns would be to do a
take(1) and split in the driver.

Regards
Sab
On 19-Jan-2016 8:48 pm, "Richard Siebeling"  wrote:

> Hi,
>
> what is the most efficient way to split columns and know how many columns
> are created.
>
> Here is the current RDD
> -
> ID   STATE
> -
> 1   TX, NY, FL
> 2   CA, OH
> -
>
> This is the preferred output:
> -
> IDSTATE_1 STATE_2  STATE_3
> -
> 1 TX  NY  FL
> 2 CA  OH
> -
>
> With a separated with the new columns STATE_1, STATE_2, STATE_3
>
>
> It looks like the following output is feasible using a ReduceBy operator
> -
> IDSTATE_1 STATE_2  STATE_3   NEW_COLUMNS
> -
> 1 TXNY   FLSTATE_1, STATE_2,
> STATE_3
> 2 CAOH STATE_1, STATE_2
> -
>
> Then in the reduce step, the distinct new columns can be calculated.
> Is it possible to get the second output where next to the RDD the
> new_columns are saved somewhere?
> Or is the required to use the second approach?
>
> thanks in advance,
> Richard
>
>


Re:

2016-01-12 Thread Sabarish Sasidharan
You could generate as many duplicates with a tag/sequence. And then use a
custom partitioner that uses that tag/sequence in addition to the key to do
the partitioning.

Regards
Sab
On 12-Jan-2016 12:21 am, "Daniel Imberman" 
wrote:

> Hi all,
>
> I'm looking for a way to efficiently partition an RDD, but allow the same
> data to exists on multiple partitions.
>
>
> Lets say I have a key-value RDD with keys {1,2,3,4}
>
> I want to be able to to repartition the RDD so that so the partitions look
> like
>
> p1 = {1,2}
> p2 = {2,3}
> p3 = {3,4}
>
> Locality is important in this situation as I would be doing internal
> comparison values.
>
> Does anyone have any thoughts as to how I could go about doing this?
>
> Thank you
>


Re: FPGrowth does not handle large result sets

2016-01-12 Thread Sabarish Sasidharan
How much RAM are you giving to the driver? 17000 items being collected
shouldn't fail unless your driver memory is too low.

Regards
Sab
On 13-Jan-2016 6:14 am, "Ritu Raj Tiwari" 
wrote:

> Folks:
> We are running into a problem where FPGrowth seems to choke on data sets
> that we think are not too large. We have about 200,000 transactions. Each
> transaction is composed of on an average 50 items. There are about 17,000
> unique item (SKUs) that might show up in any transaction.
>
> When running locally with 12G ram given to the PySpark process, the
> FPGrowth code fails with out of memory error for minSupport of 0.001. The
> failure occurs when we try to enumerate and save the frequent itemsets.
> Looking at the FPGrowth code (
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala),
> it seems this is because the genFreqItems() method tries to collect() all
> items. Is there a way the code could be rewritten so it does not try to
> collect and therefore store all frequent item sets in memory?
>
> Thanks for any insights.
>
> -Raj
>


Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-11 Thread Sabarish Sasidharan
One option could be to store them as blobs in a cache like Redis and then
read + broadcast them from the driver. Or you could store them in HDFS and
read + broadcast from the driver.

Regards
Sab

On Tue, Jan 12, 2016 at 1:44 AM, Dmitry Goldenberg  wrote:

> We have a bunch of Spark jobs deployed and a few large resource files such
> as e.g. a dictionary for lookups or a statistical model.
>
> Right now, these are deployed as part of the Spark jobs which will
> eventually make the mongo-jars too bloated for deployments.
>
> What are some of the best practices to consider for maintaining and
> sharing large resource files like these?
>
> Thanks.
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Read from AWS s3 with out having to hard-code sensitive keys

2016-01-11 Thread Sabarish Sasidharan
If you are on EMR, these can go into your hdfs site config. And will work
with Spark on YARN by default.

Regards
Sab
On 11-Jan-2016 5:16 pm, "Krishna Rao"  wrote:

> Hi all,
>
> Is there a method for reading from s3 without having to hard-code keys?
> The only 2 ways I've found both require this:
>
> 1. Set conf in code e.g.:
> sc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "")
> sc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
> "")
>
> 2. Set keys in URL, e.g.:
> sc.textFile("s3n://@/bucket/test/testdata")
>
>
> Both if which I'm reluctant to do within production code!
>
>
> Cheers
>


Re: How to concat few rows into a new column in dataframe

2016-01-06 Thread Sabarish Sasidharan
You can just repartition by the id, if the final objective is to have all
data for the same key in the same partition.

Regards
Sab

On Wed, Jan 6, 2016 at 11:02 AM, Gavin Yue  wrote:

> I found that in 1.6 dataframe could do repartition.
>
> Should I still need to do orderby first or I just have to repartition?
>
>
>
>
> On Tue, Jan 5, 2016 at 9:25 PM, Gavin Yue  wrote:
>
>> I tried the Ted's solution and it works.   But I keep hitting the JVM out
>> of memory problem.
>> And grouping the key causes a lot of  data shuffling.
>>
>> So I am trying to order the data based on ID first and save as Parquet.
>> Is there way to make sure that the data is partitioned that each ID's data
>> is in one partition, so there would be no shuffling in the future?
>>
>> Thanks.
>>
>>
>> On Tue, Jan 5, 2016 at 3:19 PM, Michael Armbrust 
>> wrote:
>>
>>> This would also be possible with an Aggregator in Spark 1.6:
>>>
>>> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
>>>
>>> On Tue, Jan 5, 2016 at 2:59 PM, Ted Yu  wrote:
>>>
 Something like the following:

 val zeroValue = collection.mutable.Set[String]()

 val aggredated = data.aggregateByKey (zeroValue)((set, v) => set += v,
 (setOne, setTwo) => setOne ++= setTwo)

 On Tue, Jan 5, 2016 at 2:46 PM, Gavin Yue 
 wrote:

> Hey,
>
> For example, a table df with two columns
> id  name
> 1   abc
> 1   bdf
> 2   ab
> 2   cd
>
> I want to group by the id and concat the string into array of string.
> like this
>
> id
> 1 [abc,bdf]
> 2 [ab, cd]
>
> How could I achieve this in dataframe?  I stuck on df.groupBy("id").
> ???
>
> Thanks
>
>

>>>
>>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: [Beg for help] spark job with very low efficiency

2015-12-21 Thread Sabarish Sasidharan
collect() will bring everything to driver and is costly. Instead of using
collect() + parallelize, you could use rdd1.checkpoint() along with a more
efficient action like rdd1.count(). This you can do within the for loop.

Hopefully you are using the Kryo serializer already.

Regards
Sab

On Mon, Dec 21, 2015 at 5:51 PM, Zhiliang Zhu 
wrote:

> Dear All.
>
> I have some kind of  iteration job, that is, the next stag's input would
> be the previous stag's output , and it must do quite lots of times of
> iteration.
>
> JavaRDD rdd1 =  //rdd1 may be with one or more
> partitions
> for (int i=0, JavaRDD rdd2 = rdd1; i < N; ++i) {
>JavaRDD rdd3 = rdd2.map(new MapName1(...));// 1
>rdd4 = rdd3.map(new MapName2()); //  2
>
>List list = rdd4.collect(); //*however, N is very big,
> then this line will be VERY MUCH COST *
>rdd2 = jsc.parallelize(list, M).cache();
> }
>
> Is there way to properly improve the run speed?
>
> In fact, I would like to apply spark to mathematica optimization by
> genetic algorithm , in the above codes, rdd would be the Vector lines
> storing  ,
> 1 is to count  fitness number, and 2 is to breed and  variate .
> To get good solution, the iteration number will be big (for example more
> than 1000 )  ...
>
> Thanks in advance!
> Zhiliang
>
>
>
>
>
> On Monday, December 21, 2015 7:44 PM, Zhiliang Zhu
>  wrote:
>
>
> Dear All,
>
> I need to iterator some job / rdd quite a lot of times, but just lost in
> the problem of
> spark only accept to call around 350 number of map before it meets one
> action Function ,
> besides, dozens of action will obviously increase the run time.
> Is there any proper way ...
>
> As tested, there is piece of codes as follows:
>
> ..
>  83 int count = 0;
>  84 JavaRDD dataSet = jsc.parallelize(list, 1).cache();
> //with only 1 partition
>  85 int m = 350;
>  86 JavaRDD r = dataSet.cache();
>  87 JavaRDD t = null;
>  88
>  89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the
> rdd r to t
>  90   if(null != t) {
>  91 r = t;
>  92   }
> //inner loop to call map 350 times , if m is much more than
> 350 (for instance, around 400), then the job will throw exception message
>   "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class
> threw exception: java.lang.StackOverflowError java.lang.StackOverflowError
> ")
>  93   for(int i=0; i < m; ++i) {
>  94  *   r = r.map(new Function() {*
>  95   @Override
>  96   public Integer call(Integer integer) {
>  97 double x = Math.random() * 2 - 1;
>  98 double y = Math.random() * 2 - 1;
>  99 return (x * x + y * y < 1) ? 1 : 0;
> 100   }
> 101 });
>
> 104   }
> 105
> 106   List lt = r.collect(); //then collect this rdd to get
> another rdd, however, dozens of action Function as collect is VERY MUCH COST
> 107   t = jsc.parallelize(lt, 1).cache();
> 108
> 109 }
> 110
> ..
>
> Thanks very much in advance!
> Zhiliang
>
>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Pros and cons -Saving spark data in hive

2015-12-15 Thread Sabarish Sasidharan
If all you want to do is to load data into Hive, you don't need to use
Spark.

For subsequent query performance you would want to convert to ORC or
Parquet when loading into Hive.

Regards
Sab
On 16-Dec-2015 7:34 am, "Divya Gehlot"  wrote:

> Hi,
> I am new bee to Spark and  I am exploring option and pros and cons which
> one will work best in spark and hive context.My  dataset  inputs are CSV
> files, using spark to process the my data and saving it in hive using
> hivecontext
>
> 1) Process the CSV file using spark-csv package and create temptable and
> store the data in hive using hive context.
> 2) Process the file as normal text file in sqlcontext  ,register its as
> temptable in sqlcontext and store it as ORC file and read that ORC file in
> hive context and store it in hive.
>
> Is there any other best options apart from mentioned above.
> Would really appreciate the inputs.
> Thanks in advance.
>
> Thanks,
> Regards,
> Divya
>


Re: newbie best practices: is spark-ec2 intended to be used to manage long-lasting infrastructure ?

2015-12-04 Thread Sabarish Sasidharan
#2: if using hdfs it's on the disks. You can use the HDFS command line to
browse your data. And then use s3distcp or simply distcp to copy data from
hdfs to S3. Or even use hdfs get commands to copy to local disk and then
use S3 cli to copy to s3

#3. Cost of accessing data in S3 from  Ec2 nodes, though not as fast as
local disks, is still fast enough. You can use hdfs for intermediate steps
and use S3 for final storage. Make sure your s3 bucket is in the same
region as your Ec2 cluster.

Regards
Sab
On 04-Dec-2015 3:35 am, "Andy Davidson" 
wrote:

> About 2 months ago I used spark-ec2 to set up a small cluster. The cluster
> runs a spark streaming app 7x24 and stores the data to hdfs. I also need to
> run some batch analytics on the data.
>
> Now that I have a little more experience I wonder if this was a good way
> to set up the cluster the following issues
>
>1. I have not been able to find explicit directions for upgrading the
>spark version
>   1.
>   
> http://search-hadoop.com/m/q3RTt7E0f92v0tKh2&subj=Re+Upgrading+Spark+in+EC2+clusters
>2. I am not sure where the data is physically be stored. I think I may
>accidentally loose all my data
>3. spark-ec2 makes it easy to launch a cluster with as many machines
>as you like how ever Its not clear how I would add slaves to an existing
>installation
>
>
> Our Java streaming app we call rdd.saveAsTextFile(“hdfs://path”);
>
> ephemeral-hdfs/conf/hdfs-site.xml:
>
>   
>
> dfs.data.dir
>
> /mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data
>
>   
>
>
> persistent-hdfs/conf/hdfs-site.xml
>
>
> $ mount
>
> /dev/xvdb on /mnt type ext3 (rw,nodiratime)
>
> /dev/xvdf on /mnt2 type ext3 (rw,nodiratime)
>
>
> http://spark.apache.org/docs/latest/ec2-scripts.html
>
> *"*The spark-ec2 script also supports pausing a cluster. In this case,
> the VMs are stopped but not terminated, so they *lose all data on
> ephemeral disks* but keep the data in their root partitions and their
> persistent-pdfs.”
>
>
> Initially I though using HDFS was a good idea. spark-ec2 makes HDFS easy
> to use. I incorrectly thought spark some how knew how HDFS partitioned my
> data.
>
> I think many people are using amazon s3. I do not have an direct
> experience with S3. My concern would be that the data is not physically
> stored closed to my slaves. I.e. High communication costs.
>
> Any suggestions would be greatly appreciated
>
> Andy
>


Re: question about combining small parquet files

2015-11-30 Thread Sabarish Sasidharan
You could use the number of input files to determine the number of output
partitions. This assumes your input file sizes are deterministic.

Else, you could also persist the RDD and then determine it's size using the
apis.

Regards
Sab
On 26-Nov-2015 11:13 pm, "Nezih Yigitbasi" 
wrote:

> Hi Spark people,
> I have a Hive table that has a lot of small parquet files and I am
> creating a data frame out of it to do some processing, but since I have a
> large number of splits/files my job creates a lot of tasks, which I don't
> want. Basically what I want is the same functionality that Hive provides,
> that is, to combine these small input splits into larger ones by specifying
> a max split size setting. Is this currently possible with Spark?
>
> I look at coalesce() but with coalesce I can only control the number
> of output files not their sizes. And since the total input dataset size
> can vary significantly in my case, I cannot just use a fixed partition
> count as the size of each output file can get very large. I then looked for
> getting the total input size from an rdd to come up with some heuristic to
> set the partition count, but I couldn't find any ways to do it (without
> modifying the spark source).
>
> Any help is appreciated.
>
> Thanks,
> Nezih
>
> PS: this email is the same as my previous email as I learned that my
> previous email ended up as spam for many people since I sent it through
> nabble, sorry for the double post.
>


Re: Conversely, Hive is performing better than Spark-Sql

2015-11-24 Thread Sabarish Sasidharan
First of all, select * is not a useful SQL to evaluate. Very rarely would a
user require all 362K records for visual analysis.

Second, collect() forces movement of all data from executors to the driver.
Instead write it out to some other table or to HDFS.

Also Spark is more beneficial when you have subsequent
queries/transformations on the same dataset. You cache the table and then
can subsequent operations will be faster.

Regards
Sab

On Wed, Nov 25, 2015 at 12:30 PM, UMESH CHAUDHARY 
wrote:

> Hi,
> I am using Hive 1.1.0 and Spark 1.5.1 and creating hive context in
> spark-shell.
>
> Now, I am experiencing reversed performance by Spark-Sql over Hive.
> By default Hive gives result back in 27 seconds for plain select * query
> on 1 GB dataset containing 3623203 records, while spark-sql gives back in 2
> mins on collect operation.
>
> Cluster Config:
> Hive : 6 Node : 16 GB Memory, 4 cores each
> Spark : 4 Nodes : 16 GB Memory, 4 cores each
>
> My dataset has 45 partitions and spark-sql creates 82 jobs.
>
> I have tried all memory and garbage collection optimizations suggested on
> official website but failed to get better performance and its worth to
> mention that sometimes I get OOM error when I allocate executor memory less
> than 10G.
>
> Can somebody tell whats actually going on ?
>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction

2015-11-24 Thread Sabarish Sasidharan
If yarn has only 50 cores then it can support max 49 executors plus 1
driver application master.

Regards
Sab
On 24-Nov-2015 1:58 pm, "谢廷稳"  wrote:

> OK, yarn.scheduler.maximum-allocation-mb is 16384.
>
> I have ran it again, the command to run it is:
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
> yarn-cluster -
> -driver-memory 4g  --executor-memory 8g lib/spark-examples*.jar 200
>
>
>
>>
>>
>> 15/11/24 16:15:56 INFO yarn.ApplicationMaster: Registered signal handlers 
>> for [TERM, HUP, INT]
>>
>> 15/11/24 16:15:57 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
>> appattempt_1447834709734_0120_01
>>
>> 15/11/24 16:15:58 INFO spark.SecurityManager: Changing view acls to: 
>> hdfs-test
>>
>> 15/11/24 16:15:58 INFO spark.SecurityManager: Changing modify acls to: 
>> hdfs-test
>>
>> 15/11/24 16:15:58 INFO spark.SecurityManager: SecurityManager: 
>> authentication disabled; ui acls disabled; users with view permissions: 
>> Set(hdfs-test); users with modify permissions: Set(hdfs-test)
>>
>> 15/11/24 16:15:58 INFO yarn.ApplicationMaster: Starting the user application 
>> in a separate Thread
>>
>> 15/11/24 16:15:58 INFO yarn.ApplicationMaster: Waiting for spark context 
>> initialization
>>
>> 15/11/24 16:15:58 INFO yarn.ApplicationMaster: Waiting for spark context 
>> initialization ...
>> 15/11/24 16:15:58 INFO spark.SparkContext: Running Spark version 1.5.0
>>
>> 15/11/24 16:15:58 INFO spark.SecurityManager: Changing view acls to: 
>> hdfs-test
>>
>> 15/11/24 16:15:58 INFO spark.SecurityManager: Changing modify acls to: 
>> hdfs-test
>>
>> 15/11/24 16:15:58 INFO spark.SecurityManager: SecurityManager: 
>> authentication disabled; ui acls disabled; users with view permissions: 
>> Set(hdfs-test); users with modify permissions: Set(hdfs-test)
>> 15/11/24 16:15:58 INFO slf4j.Slf4jLogger: Slf4jLogger started
>> 15/11/24 16:15:59 INFO Remoting: Starting remoting
>>
>> 15/11/24 16:15:59 INFO Remoting: Remoting started; listening on addresses 
>> :[akka.tcp://sparkDriver@X.X.X.X
>> ]
>>
>> 15/11/24 16:15:59 INFO util.Utils: Successfully started service 
>> 'sparkDriver' on port 61904.
>> 15/11/24 16:15:59 INFO spark.SparkEnv: Registering MapOutputTracker
>> 15/11/24 16:15:59 INFO spark.SparkEnv: Registering BlockManagerMaster
>>
>> 15/11/24 16:15:59 INFO storage.DiskBlockManager: Created local directory at 
>> /data1/hadoop/nm-local-dir/usercache/hdfs-test/appcache/application_1447834709734_0120/blockmgr-33fbe6c4-5138-4eff-83b4-fb0c886667b7
>>
>> 15/11/24 16:15:59 INFO storage.MemoryStore: MemoryStore started with 
>> capacity 1966.1 MB
>>
>> 15/11/24 16:15:59 INFO spark.HttpFileServer: HTTP File server directory is 
>> /data1/hadoop/nm-local-dir/usercache/hdfs-test/appcache/application_1447834709734_0120/spark-fbbfa2bd-6d30-421e-a634-4546134b3b5f/httpd-e31d7b8e-ca8f-400e-8b4b-d2993fb6f1d1
>> 15/11/24 16:15:59 INFO spark.HttpServer: Starting HTTP Server
>> 15/11/24 16:15:59 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/11/24 16:15:59 INFO server.AbstractConnector: Started
>> SocketConnector@0.0.0.0:14692
>>
>> 15/11/24 16:15:59 INFO util.Utils: Successfully started service 'HTTP file 
>> server' on port 14692.
>> 15/11/24 16:15:59 INFO spark.SparkEnv: Registering OutputCommitCoordinator
>>
>> 15/11/24 16:15:59 INFO ui.JettyUtils: Adding filter: 
>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>> 15/11/24 16:15:59 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/11/24 16:15:59 INFO server.AbstractConnector: Started
>> SelectChannelConnector@0.0.0.0:15948
>> 15/11/24 16:15:59 INFO util.Utils: Successfully started service 'SparkUI' on 
>> port 15948.
>>
>> 15/11/24 16:15:59 INFO ui.SparkUI: Started SparkUI at X.X.X.X
>>
>> 15/11/24 16:15:59 INFO cluster.YarnClusterScheduler: Created 
>> YarnClusterScheduler
>>
>> 15/11/24 16:15:59 WARN metrics.MetricsSystem: Using default name 
>> DAGScheduler for source because
>> spark.app.id is not set.
>>
>> 15/11/24 16:15:59 INFO util.Utils: Successfully started service 
>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41830.
>> 15/11/24 16:15:59 INFO netty.NettyBlockTransferService: Server created on 
>> 41830
>>
>> 15/11/24 16:15:59 INFO storage.BlockManagerMaster: Trying to register 
>> BlockManager
>>
>> 15/11/24 16:15:59 INFO storage.BlockManagerMasterEndpoint: Registering block 
>> manager X.X.X.X:41830 with 1966.1 MB RAM, BlockManagerId(driver, 10.12.30.2, 
>> 41830)
>>
>> 15/11/24 16:15:59 INFO storage.BlockManagerMaster: Registered BlockManager
>> 15/11/24 16:16:00 INFO scheduler.EventLoggingListener: Logging events to 
>> hdfs:///tmp/latest-spark-events/application_1447834709734_0120_1
>>
>> 15/11/24 16:16:00 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
>> ApplicationMaster registered as 
>> AkkaRpcEndpointRef(Actor[akka://sparkDriver/user/YarnAM#293602859])
>>
>> 15/11/24 16:16:00 INFO client.RMProxy: Connecting to ResourceManager at 
>> X.X.X.X
>>
>>
>> 15/11/24 16:16:00 INFO yarn.Yarn

Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Sabarish Sasidharan
Hi Andy

You can try sc.wholeTextFiles() instead of sc.textFile()

Regards
Sab
On 24-Nov-2015 4:01 am, "Andy Davidson" 
wrote:

> Hi Xiao and Sabarish
>
> Using the Stage tab on the UI. It turns out you can see how many
> partitions there are. If I did nothing I would have 228155 partition.
> (This confirms what Sabarish said). I tried coalesce(3). RDD.count()
> fails. I though given I have 3 workers and 1/3 of the data would easily
> fit into memory this would be a good choice.
>
> If I use coalesce(30) count works. How ever it still seems slow. It took
> 2.42 min to read 4720 records. My total data set size is 34M.
>
> Any suggestions how to choose the number of partitions.?
>
>  ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')
>
>
> The data was originally collected using spark stream. I noticed that the
> number of default partitions == the number of files create on hdfs. I bet
> each file is one spark streaming mini-batchI suspect if I concatenate
> these into a small number of files things will run much faster. I suspect
> I would not need to call coalesce() and that coalesce() is taking a lot of
> time. Any suggestions how to choose the file number of files.
>
> Kind regards
>
> Andy
>
>
> From:  Xiao Li 
> Date:  Monday, November 23, 2015 at 12:21 PM
> To:  Andrew Davidson 
> Cc:  Sabarish Sasidharan , "user @spark"
> 
> Subject:  Re: newbie : why are thousands of empty files being created on
> HDFS?
>
>
> >In your case, maybe you can try to call the function coalesce?
> >Good luck,
> >
> >Xiao Li
> >
> >2015-11-23 12:15 GMT-08:00 Andy Davidson :
> >
> >Hi Sabarish
> >
> >I am but a simple padawan :-) I do not understand your answer. Why would
> >Spark be creating so many empty partitions? My real problem is my
> >application is very slow. I happened to notice thousands of empty files
> >being created. I thought this is a hint to why my app is slow.
> >
> >My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
> >takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
> >The total data writing in 38M.
> >
> >The data is read from HDFS. My understanding is Spark can not know in
> >advance how HDFS partitioned the data. Spark knows I have a master and 3
> >slaves machines. It knows how many works/executors are assigned to my
> >Job. I would expect spark would be smart enough not create more
> >partitions than I have worker machines?
> >
> >Also given I am not using any key/value operations like Join() or doing
> >multiple scans I would assume my app would not benefit from partitioning.
> >
> >
> >Kind regards
> >
> >Andy
> >
> >
> >From:  Sabarish Sasidharan 
> >Date:  Saturday, November 21, 2015 at 7:20 PM
> >To:  Andrew Davidson 
> >Cc:  "user @spark" 
> >Subject:  Re: newbie : why are thousands of empty files being created on
> >HDFS?
> >
> >
> >
> >Those are empty partitions. I don't see the number of partitions
> >specified in code. That then implies the default parallelism config is
> >being used and is set to a very high number, the sum of empty + non empty
> >files.
> >Regards
> >Sab
> >On 21-Nov-2015 11:59 pm, "Andy Davidson" 
> >wrote:
> >
> >I start working on a very simple ETL pipeline for a POC. It reads a in a
> >data set of tweets stored as JSON strings on in HDFS and randomly selects
> >1% of the observations and writes them to HDFS. It seems to run very
> >slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
> >Also noticed that RDD saveAsTextFile is creating thousands of empty
> >files.
> >
> >I assume creating all these empty files must be slowing down the system.
> >Any idea why this is happening? Do I have write a script to periodical
> >remove empty files?
> >
> >
> >Kind regards
> >
> >Andy
> >
> >tweetStrings = sc.textFile(inputDataURL)
> >
> >
> >def removeEmptyLines(line) :
> >if line:
> >return True
> >else :
> >emptyLineCount.add(1);
> >return False
> >
> >emptyLineCount = sc.accumulator(0)
> >sample = (tweetStrings.filter(removeEmptyLines)
> >  .sample(withReplacement=False, fraction=0.01, seed=345678))
> >
> >
> >startTime = datetime.datetime.now()
> >sample.saveAsTextFile(saveDataURL)
> >
> >endTime = datetime.datetime.now()
> >print("elapsed time:%s" % (datetime.datetime.now() - startTime))
> >
> >
> >elapsed time:1:06:46.577795
> >
> >Total number of empty files$ hadoop fs -du {saveDataURL} | grep '^0' | wc
> >­l223515
> >Total number of files with data$ hadoop fs -du {saveDataURL} | grep ­v
> >'^0' | wc ­l4642
> >
> >I randomly pick a part file. It¹s size is 9251
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>
>
>


Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-21 Thread Sabarish Sasidharan
Those are empty partitions. I don't see the number of partitions specified
in code. That then implies the default parallelism config is being used and
is set to a very high number, the sum of empty + non empty files.

Regards
Sab
On 21-Nov-2015 11:59 pm, "Andy Davidson" 
wrote:

> I start working on a very simple ETL pipeline for a POC. It reads a in a
> data set of tweets stored as JSON strings on in HDFS and randomly selects
> 1% of the observations and writes them to HDFS. It seems to run very
> slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
> Also noticed that RDD saveAsTextFile is creating thousands of empty files.
>
> I assume creating all these empty files must be slowing down the system. Any
> idea why this is happening? Do I have write a script to periodical remove
> empty files?
>
>
> Kind regards
>
> Andy
>
> tweetStrings = sc.textFile(inputDataURL)
>
> def removeEmptyLines(line) :
> if line:
> return True
> else :
> emptyLineCount.add(1);
> return False
>
> emptyLineCount = sc.accumulator(0)
> sample = (tweetStrings.filter(removeEmptyLines)
>   .sample(withReplacement=False, fraction=0.01, seed=345678))
>
> startTime = datetime.datetime.now()
> sample.saveAsTextFile(saveDataURL)
>
> endTime = datetime.datetime.now()
> print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>
> elapsed time:1:06:46.577795
>
>
>
> *Total number of empty files*
>
> $ hadoop fs -du {saveDataURL} | grep '^0' | wc –l
>
> 223515
>
> *Total number of files with data*
>
> $ hadoop fs -du {saveDataURL} | grep –v '^0' | wc –l
>
> 4642
>
>
> I randomly pick a part file. It’s size is 9251
>
>


Re: Reading from RabbitMq via Apache Spark Streaming

2015-11-19 Thread Sabarish Sasidharan
The stack trace is clear enough:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method(reply-code=406,
reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue
'hello1' in vhost '/': received 'true' but current is 'false', class-id=50,
method-id=10)

Regards
Sab
On 19-Nov-2015 2:32 pm, "D"  wrote:

> I am trying to write a simple "Hello World" kind of application using
> spark streaming and RabbitMq, in which Apache Spark Streaming will read
> message from RabbitMq via the RabbitMqReceiver
>  and print it in the
> console. But some how I am not able to print the string read from Rabbit Mq
> into console. The spark streaming code is printing the message below:-
>
> Value Received BlockRDD[1] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
> Value Received BlockRDD[2] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
>
>
> The message is sent to the rabbitmq via the simple code below:-
>
> package helloWorld;
>
> import com.rabbitmq.client.Channel;
> import com.rabbitmq.client.Connection;
> import com.rabbitmq.client.ConnectionFactory;
>
> public class Send {
>
>   private final static String QUEUE_NAME = "hello1";
>
>   public static void main(String[] argv) throws Exception {
> ConnectionFactory factory = new ConnectionFactory();
> factory.setHost("localhost");
> Connection connection = factory.newConnection();
> Channel channel = connection.createChannel();
>
> channel.queueDeclare(QUEUE_NAME, false, false, false, null);
> String message = "Hello World! is a code. Hi Hello World!";
> channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
> System.out.println(" [x] Sent '" + message + "'");
>
> channel.close();
> connection.close();
>   }
> }
>
>
> I am trying to read messages via Apache Streaming as shown below:-
>
> package rabbitmq.example;
>
> import java.util.*;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> import com.stratio.receiver.RabbitMQUtils;
>
> public class RabbitMqEx {
>
> public static void main(String[] args) {
> System.out.println("CreatingSpark   Configuration");
> SparkConf conf = new SparkConf();
> conf.setAppName("RabbitMq Receiver Example");
> conf.setMaster("local[2]");
>
> System.out.println("Retreiving  Streaming   Context fromSpark   
> Conf");
> JavaStreamingContext streamCtx = new JavaStreamingContext(conf,
> Durations.seconds(2));
>
> MaprabbitMqConParams = new HashMap();
> rabbitMqConParams.put("host", "localhost");
> rabbitMqConParams.put("queueName", "hello1");
> System.out.println("Trying to connect to RabbitMq");
> JavaReceiverInputDStream receiverStream = 
> RabbitMQUtils.createJavaStream(streamCtx, rabbitMqConParams);
> receiverStream.foreachRDD(new Function, Void>() {
> @Override
> public Void call(JavaRDD arg0) throws Exception {
> System.out.println("Value Received " + arg0.toString());
> return null;
> }
> } );
> streamCtx.start();
> streamCtx.awaitTermination();
> }
> }
>
> The output console only has message like the following:-
>
> CreatingSpark   Configuration
> Retreiving  Streaming   Context fromSpark   Conf
> Trying to connect to RabbitMq
> Value Received BlockRDD[1] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
> Value Received BlockRDD[2] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
>
>
> In the logs I see the following:-
>
> 15/11/18 13:20:45 INFO SparkContext: Running Spark version 1.5.2
> 15/11/18 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 15/11/18 13:20:45 WARN Utils: Your hostname, jabong1143 resolves to a 
> loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlan0)
> 15/11/18 13:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
> another address
> 15/11/18 13:20:45 INFO SecurityManager: Changing view acls to: jabong
> 15/11/18 13:20:45 INFO SecurityManager: Changing modify acls to: jabong
> 15/11/18 13:20:45 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(jabong); users 
> with modify permissions: Set(jabong)
> 15/11/18 13:20:46 INFO Slf4jLogger: Slf4jLogger started
> 15/11/18 13:20:46 INFO Remoting: Starting remoting
> 15/11/18 13:20:46 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkDriver@192.168.1.3:42978]
> 15/11/18 13:20:4

Re: Working with RDD from Java

2015-11-17 Thread Sabarish Sasidharan
You can also do rdd.toJavaRDD(). Pls check the API docs

Regards
Sab
On 18-Nov-2015 3:12 am, "Bryan Cutler"  wrote:

> Hi Ivan,
>
> Since Spark 1.4.1 there is a Java-friendly function in LDAModel to get the
> topic distributions called javaTopicDistributions() that returns a
> JavaPairRDD.  If you aren't able to upgrade, you can check out the
> conversion used here
> https://github.com/apache/spark/blob/v1.4.1/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala#L350
>
> -bryan
>
> On Tue, Nov 17, 2015 at 3:06 AM, frula00 
> wrote:
>
>> Hi,
>> I'm working in Java, with Spark 1.3.1 - I am trying to extract data from
>> the
>> RDD returned by
>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicDistributions()
>> (return type is RDD>). How do I work with it
>> from
>> within Java, I can't seem to cast it to JavaPairRDD nor JavaRDD and if I
>> try
>> to collect it it simply returns an Object?
>>
>> Thank you for your help in advance!
>>
>> Ivan
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Working-with-RDD-from-Java-tp25399.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: how can evenly distribute my records in all partition

2015-11-16 Thread Sabarish Sasidharan
You can write your own custom partitioner to achieve this

Regards
Sab
On 17-Nov-2015 1:11 am, "prateek arora"  wrote:

> Hi
>
> I have a RDD with 30 record ( Key/value pair ) and running 30 executor . i
> want to reparation this RDD in to 30 partition so every partition  get one
> record and assigned to one executor .
>
> when i used rdd.repartition(30) its repartition my rdd in 30 partition but
> some partition get 2 record , some get 1 record and some not getting any
> record .
>
> is there any way in spark so i can evenly distribute my record in all
> partition .
>
> Regards
> Prateek
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-can-evenly-distribute-my-records-in-all-partition-tp25394.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Expand Cluster

2015-11-16 Thread Sabarish Sasidharan
Spark will use the number of executors you specify in spark-submit. Are you
saying that Spark is not able to use more executors after you modify it in
spark-submit? Are you using dynamic allocation?

Regards
Sab

On Mon, Nov 16, 2015 at 5:54 PM, dineshranganathan <
dineshranganat...@gmail.com> wrote:

> I have my Spark application deployed on AWS EMR on yarn cluster mode.
> When I
> increase the capacity of my cluster by adding more Core instances on AWS, I
> don't see Spark picking up the new instances dynamically. Is there anything
> I can do to tell Spark to pick up the newly added boxes??
>
> Dan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Expand-Cluster-tp25393.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Size exceeds Integer.MAX_VALUE on EMR 4.0.0 Spark 1.4.1

2015-11-16 Thread Sabarish Sasidharan
You can try increasing the number of partitions before writing it out.

Regards
Sab

On Mon, Nov 16, 2015 at 3:46 PM, Zhang, Jingyu 
wrote:

> I am using spark-csv to save files in s3, it shown Size exceeds. Please let 
> me know how to fix it. Thanks.
>
> df.write()
> .format("com.databricks.spark.csv")
> .option("header", "true")
> .save("s3://newcars.csv");
>
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>   at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>   at 
> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
>   at 
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:70)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.




-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Spark and Spring Integrations

2015-11-15 Thread Sabarish Sasidharan
I took a look at the spring-spark project. I still don't see how the bean
factory would be available at the executor when the deserializer needs it.
Unless I am horribly wrong, you need to modify the spring-spark project in
the SpringBuilder class and initialize the spring context if the static
BEAN_FACTORY is null.

Alternatively you could use a simpler approach as shown below

rdd.mapPartitions(...{
ApplicationContext context =
SingletonApplicationContext.getSpringContext(); //initialize the spring
context if not initialized already and store it in a static variable
SpringBean springBean = context.getBean("springBean"); //note that all
dependencies will be autowired
springBean.doSomething();
})

You need to come up with the SingletonApplicationContext class which safely
creates a singleton spring context.

Because executors are long-lived vms, your spring context will only be
created once per jvm irrespective of how many transformations/actions you
call on your rdds. So unlike what I said before, you could use
mapPartitions() or map() or really any transformations/actions. But all
your functions would have to the first 2 lines repeatedly (ie initialize
context if not present and acquire the spring bean). You could think of
encapsulating somehow so that it is easier to do.

Regards
Sab

On Sun, Nov 15, 2015 at 11:50 AM, Netai Biswas 
wrote:

> Hi,
>
> Thanks for your response. I would like to inform you what exactly we are
> trying to achieve. I am not sure if we will can use mapPartitions() here
> or not.
>
> Sample Code:
>
> @Autowiredprivate SpringBean springBean;
> public void test() throws Exception {
> SparkConf conf = new SparkConf().setAppName("APP").setMaster(masterURL);
> conf.set("spark.serializer", 
> "de.paraplu.springspark.serialization.SpringAwareSerializer");
>sc = new JavaSparkContext(conf);
>
> sc.parallelize(list).foreach(new VoidFunction() {
> private static final long serialVersionUID = 1L;
>
> @Override
> public void call(String t) throws Exception {
> springBean.someAPI(t); // here we will have db transaction as 
> well.
> }
> });}
>
> Thanks,
> Netai
>
> On Sat, Nov 14, 2015 at 9:49 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> You are probably trying to access the spring context from the executors
>> after initializing it at the driver. And running into serialization issues.
>>
>> You could instead use mapPartitions() and initialize the spring context
>> from within that.
>>
>> That said I don't think that will solve all of your issues because you
>> won't be able to use the other rich transformations in Spark.
>>
>> I am afraid these two don't gel that well, unless and otherwise all your
>> context lookups for beans happen in the driver.
>>
>> Regards
>> Sab
>> On 13-Nov-2015 4:17 pm, "Netai Biswas"  wrote:
>>
>>> Hi,
>>>
>>> I am facing issue while integrating spark with spring.
>>>
>>> I am getting "java.lang.IllegalStateException: Cannot deserialize
>>> BeanFactory with id" errors for all beans. I have tried few solutions
>>> available in web. Please help me out to solve this issue.
>>>
>>> Few details:
>>>
>>> Java : 8
>>> Spark : 1.5.1
>>> Spring : 3.2.9.RELEASE
>>>
>>> Please let me know if you need more information or any sample code.
>>>
>>> Thanks,
>>> Netai
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Spark and Spring Integrations

2015-11-14 Thread Sabarish Sasidharan
You are probably trying to access the spring context from the executors
after initializing it at the driver. And running into serialization issues.

You could instead use mapPartitions() and initialize the spring context
from within that.

That said I don't think that will solve all of your issues because you
won't be able to use the other rich transformations in Spark.

I am afraid these two don't gel that well, unless and otherwise all your
context lookups for beans happen in the driver.

Regards
Sab
On 13-Nov-2015 4:17 pm, "Netai Biswas"  wrote:

> Hi,
>
> I am facing issue while integrating spark with spring.
>
> I am getting "java.lang.IllegalStateException: Cannot deserialize
> BeanFactory with id" errors for all beans. I have tried few solutions
> available in web. Please help me out to solve this issue.
>
> Few details:
>
> Java : 8
> Spark : 1.5.1
> Spring : 3.2.9.RELEASE
>
> Please let me know if you need more information or any sample code.
>
> Thanks,
> Netai
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: very slow parquet file write

2015-11-14 Thread Sabarish Sasidharan
How are you writing it out? Can you post some code?

Regards
Sab
On 14-Nov-2015 5:21 am, "Rok Roskar"  wrote:

> I'm not sure what you mean? I didn't do anything specifically to partition
> the columns
> On Nov 14, 2015 00:38, "Davies Liu"  wrote:
>
>> Do you have partitioned columns?
>>
>> On Thu, Nov 5, 2015 at 2:08 AM, Rok Roskar  wrote:
>> > I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions
>> into a
>> > parquet file on HDFS. I've got a few hundred nodes in the cluster, so
>> for
>> > the size of file this is way over-provisioned (I've tried it with fewer
>> > partitions and fewer nodes, no obvious effect). I was expecting the
>> dump to
>> > disk to be very fast -- the DataFrame is cached in memory and contains
>> just
>> > 14 columns (13 are floats and one is a string). When I write it out in
>> json
>> > format, this is indeed reasonably fast (though it still takes a few
>> minutes,
>> > which is longer than I would expect).
>> >
>> > However, when I try to write a parquet file it takes way longer -- the
>> first
>> > set of tasks finishes in a few minutes, but the subsequent tasks take
>> more
>> > than twice as long or longer. In the end it takes over half an hour to
>> write
>> > the file. I've looked at the disk I/O and cpu usage on the compute
>> nodes and
>> > it looks like the processors are fully loaded while the disk I/O is
>> > essentially zero for long periods of time. I don't see any obvious
>> garbage
>> > collection issues and there are no problems with memory.
>> >
>> > Any ideas on how to debug/fix this?
>> >
>> > Thanks!
>> >
>> >
>>
>


Re: send transformed RDD to s3 from slaves

2015-11-14 Thread Sabarish Sasidharan
It would be easier if you could show some code.

Regards
Sab
On 14-Nov-2015 6:26 am, "Walrus theCat"  wrote:

> Hi,
>
> I have an RDD which crashes the driver when being collected.  I want to
> send the data on its partitions out to S3 without bringing it back to the
> driver. I try calling rdd.foreachPartition, but the data that gets sent has
> not gone through the chain of transformations that I need.  It's the data
> as it was ingested initially.  After specifying my chain of
> transformations, but before calling foreachPartition, I call rdd.count in
> order to force the RDD to transform.  The data it sends out is still not
> transformed.  How do I get the RDD to send out transformed data when
> calling foreachPartition?
>
> Thanks
>


Re: Joining HDFS and JDBC data sources - benchmarks

2015-11-13 Thread Sabarish Sasidharan
If it is metadata why would we not cache it before we perform the join?

Regards
Sab
On 13-Nov-2015 10:27 pm, "Eran Medan"  wrote:

> Hi
> I'm looking for some benchmarks on joining data frames where most of the
> data is in HDFS (e.g. in parquet) and some "reference" or "metadata" is
> still in RDBMS. I am only looking at the very first join before any caching
> happens, and I assume there will be loss of parallelization because JDBCRDD
> is probably bottlenecked on the max amount of parallel connection the
> database server can hold.
>
> Are there any measurements / benchmarks that anyone did?
>
>
> ᐧ
>


Re: large, dense matrix multiplication

2015-11-13 Thread Sabarish Sasidharan
Hi Eilidh

Because you are multiplying with the transpose you don't have  to
necessarily build the right side of the matrix. I hope you see that. You
can broadcast blocks of the indexed row matrix to itself and achieve the
multiplication.

But for similarity computation you might want to use some approach like
locality sensitive hashing first to identify a bunch of similar customers
and then apply cosine similarity on that narrowed down list. That would
scale much better than matrix multiplication. You could try the following
options for the same.

https://github.com/soundcloud/cosine-lsh-join-spark
http://spark-packages.org/package/tdebatty/spark-knn-graphs
https://github.com/marufaytekin/lsh-spark

Regards
Sab
Hi Sab,

Thanks for your response. We’re thinking of trying a bigger cluster,
because we just started with 2 nodes. What we really want to know is
whether the code will scale up with larger matrices and more nodes. I’d be
interested to hear how large a matrix multiplication you managed to do?

Is there an alternative you’d recommend for calculating similarity over a
large dataset?

Thanks,
Eilidh

On 13 Nov 2015, at 09:55, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

We have done this by blocking but without using BlockMatrix. We used our
own blocking mechanism because BlockMatrix didn't exist in Spark 1.2. What
is the size of your block? How much memory are you giving to the executors?
I assume you are running on YARN, if so you would want to make sure your
yarn executor memory overhead is set to a higher value than default.

Just curious, could you also explain why you need matrix multiplication
with transpose? Smells like similarity computation.

Regards
Sab

On Thu, Nov 12, 2015 at 7:27 PM, Eilidh Troup  wrote:

> Hi,
>
> I’m trying to multiply a large squarish matrix with its transpose.
> Eventually I’d like to work with matrices of size 200,000 by 500,000, but
> I’ve started off first with 100 by 100 which was fine, and then with 10,000
> by 10,000 which failed with an out of memory exception.
>
> I used MLlib and BlockMatrix and tried various block sizes, and also tried
> switching disk serialisation on.
>
> We are running on a small cluster, using a CSV file in HDFS as the input
> data.
>
> Would anyone with experience of multiplying large, dense matrices in spark
> be able to comment on what to try to make this work?
>
> Thanks,
> Eilidh
>
>
> --
> The University of Edinburgh is a charitable body, registered in
> Scotland, with registration number SC005336.
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++



The University of Edinburgh is a charitable body, registered in
Scotland, with registration number SC005336.


Re: Stack Overflow Question

2015-11-13 Thread Sabarish Sasidharan
The reserved cores are to prevent starvation so that user B cam run jobs
when user A's job is already running and using almost all of the cluster.
You can change your scheduler configuration to use more cores.

Regards
Sab
On 13-Nov-2015 6:56 pm, "Parin Choganwala"  wrote:

> EMR 4.1.0 + Spark 1.5.0 + YARN Resource Allocation
>
> http://stackoverflow.com/q/33488869/1366507?sem=2
>


Re: large, dense matrix multiplication

2015-11-13 Thread Sabarish Sasidharan
We have done this by blocking but without using BlockMatrix. We used our
own blocking mechanism because BlockMatrix didn't exist in Spark 1.2. What
is the size of your block? How much memory are you giving to the executors?
I assume you are running on YARN, if so you would want to make sure your
yarn executor memory overhead is set to a higher value than default.

Just curious, could you also explain why you need matrix multiplication
with transpose? Smells like similarity computation.

Regards
Sab

On Thu, Nov 12, 2015 at 7:27 PM, Eilidh Troup  wrote:

> Hi,
>
> I’m trying to multiply a large squarish matrix with its transpose.
> Eventually I’d like to work with matrices of size 200,000 by 500,000, but
> I’ve started off first with 100 by 100 which was fine, and then with 10,000
> by 10,000 which failed with an out of memory exception.
>
> I used MLlib and BlockMatrix and tried various block sizes, and also tried
> switching disk serialisation on.
>
> We are running on a small cluster, using a CSV file in HDFS as the input
> data.
>
> Would anyone with experience of multiplying large, dense matrices in spark
> be able to comment on what to try to make this work?
>
> Thanks,
> Eilidh
>
>
> --
> The University of Edinburgh is a charitable body, registered in
> Scotland, with registration number SC005336.
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: thought experiment: use spark ML to real time prediction

2015-11-13 Thread Sabarish Sasidharan
That may not be an issue if the app using the models runs by itself (not
bundled into an existing app), which may actually be the right way to
design it considering separation of concerns.

Regards
Sab

On Fri, Nov 13, 2015 at 9:59 AM, DB Tsai  wrote:

> This will bring the whole dependencies of spark will may break the web app.
>
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
> On Thu, Nov 12, 2015 at 8:15 PM, Nirmal Fernando  wrote:
>
>>
>>
>> On Fri, Nov 13, 2015 at 2:04 AM, darren  wrote:
>>
>>> I agree 100%. Making the model requires large data and many cpus.
>>>
>>> Using it does not.
>>>
>>> This is a very useful side effect of ML models.
>>>
>>> If mlib can't use models outside spark that's a real shame.
>>>
>>
>> Well you can as mentioned earlier. You don't need Spark runtime for
>> predictions, save the serialized model and deserialize to use. (you need
>> the Spark Jars in the classpath though)
>>
>>>
>>>
>>> Sent from my Verizon Wireless 4G LTE smartphone
>>>
>>>
>>>  Original message 
>>> From: "Kothuvatiparambil, Viju" <
>>> viju.kothuvatiparam...@bankofamerica.com>
>>> Date: 11/12/2015 3:09 PM (GMT-05:00)
>>> To: DB Tsai , Sean Owen 
>>> Cc: Felix Cheung , Nirmal Fernando <
>>> nir...@wso2.com>, Andy Davidson , Adrian
>>> Tanase , "user @spark" ,
>>> Xiangrui Meng , hol...@pigscanfly.ca
>>> Subject: RE: thought experiment: use spark ML to real time prediction
>>>
>>> I am glad to see DB’s comments, make me feel I am not the only one
>>> facing these issues. If we are able to use MLLib to load the model in web
>>> applications (outside the spark cluster), that would have solved the
>>> issue.  I understand Spark is manly for processing big data in a
>>> distributed mode. But, there is no purpose in training a model using MLLib,
>>> if we are not able to use it in applications where needs to access the
>>> model.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Viju
>>>
>>>
>>>
>>> *From:* DB Tsai [mailto:dbt...@dbtsai.com]
>>> *Sent:* Thursday, November 12, 2015 11:04 AM
>>> *To:* Sean Owen
>>> *Cc:* Felix Cheung; Nirmal Fernando; Andy Davidson; Adrian Tanase; user
>>> @spark; Xiangrui Meng; hol...@pigscanfly.ca
>>> *Subject:* Re: thought experiment: use spark ML to real time prediction
>>>
>>>
>>>
>>> I think the use-case can be quick different from PMML.
>>>
>>>
>>>
>>> By having a Spark platform independent ML jar, this can empower users to
>>> do the following,
>>>
>>>
>>>
>>> 1) PMML doesn't contain all the models we have in mllib. Also, for a ML
>>> pipeline trained by Spark, most of time, PMML is not expressive enough to
>>> do all the transformation we have in Spark ML. As a result, if we are able
>>> to serialize the entire Spark ML pipeline after training, and then load
>>> them back in app without any Spark platform for production scorning, this
>>> will be very useful for production deployment of Spark ML models. The only
>>> issue will be if the transformer involves with shuffle, we need to figure
>>> out a way to handle it. When I chatted with Xiangrui about this, he
>>> suggested that we may tag if a transformer is shuffle ready. Currently, at
>>> Netflix, we are not able to use ML pipeline because of those issues, and we
>>> have to write our own scorers in our production which is quite a duplicated
>>> work.
>>>
>>>
>>>
>>> 2) If users can use Spark's linear algebra like vector or matrix code in
>>> their application, this will be very useful. This can help to share code in
>>> Spark training pipeline and production deployment. Also, lots of good stuff
>>> at Spark's mllib doesn't depend on Spark platform, and people can use them
>>> in their application without pulling lots of dependencies. In fact, in my
>>> project, I have to copy & paste code from mllib into my project to use
>>> those goodies in apps.
>>>
>>>
>>>
>>> 3) Currently, mllib depends on graphx which means in graphx, there is no
>>> way to use mllib's vector or matrix. And
>>>
>>
>>
>>
>> --
>>
>> Thanks & regards,
>> Nirmal
>>
>> Team Lead - WSO2 Machine Learner
>> Associate Technical Lead - Data Technologies Team, WSO2 Inc.
>> Mobile: +94715779733
>> Blog: http://nirmalfdo.blogspot.com/
>>
>>
>>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Why is Kryo not the default serializer?

2015-11-09 Thread Sabarish Sasidharan
I have seen some failures in our workloads with Kryo, one I remember is a
scenario with very large arrays. We could not get Kryo to work despite
using the different configuration properties. Switching to java serde was
what worked.

Regards
Sab

On Tue, Nov 10, 2015 at 11:43 AM, Hitoshi Ozawa 
wrote:

> If Kryo usage is recommended, why is Java serialization the default
> serializer instead of Kryo? Is there some limitation to using Kryo? I've
> read through the documentation but it just seem Kryo  is a better choice
> and
> should be made a default.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-Kryo-not-the-default-serializer-tp25338.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Spark EC2 script on Large clusters

2015-11-05 Thread Sabarish Sasidharan
Qubole uses yarn.

Regards
Sab
On 06-Nov-2015 8:31 am, "Jerry Lam"  wrote:

> Does Qubole use Yarn or Mesos for resource management?
>
> Sent from my iPhone
>
> > On 5 Nov, 2015, at 9:02 pm, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
> >
> > Qubole
>


Re: Spark EC2 script on Large clusters

2015-11-05 Thread Sabarish Sasidharan
Qubole is one option where you can use spots and get a couple other
benefits. We use Qubole at Manthan for our Spark workloads.

For ensuring all the nodes are ready, you could use
yarn.minregisteredresourcesratio config property to ensure the execution
doesn't start till the requisite containers have been allocated.

Regards
Sab
On 06-Nov-2015 12:22 am, "Christian"  wrote:

> Let me rephrase. Emr cost is about twice as much as the spot price, making
> it almost 2/3 of the overall cost.
> On Thu, Nov 5, 2015 at 11:50 AM Christian  wrote:
>
>> Hi Johnathan,
>>
>> We are using EMR now and it's costing way too much. We do spot pricing
>> and the emr addon cost is about 2/3 the price of the actual spot instance.
>> On Thu, Nov 5, 2015 at 11:31 AM Jonathan Kelly 
>> wrote:
>>
>>> Christian,
>>>
>>> Is there anything preventing you from using EMR, which will manage your
>>> cluster for you? Creating large clusters would take mins on EMR instead of
>>> hours. Also, EMR supports growing your cluster easily and recently added
>>> support for shrinking your cluster gracefully (even while jobs are running).
>>>
>>> ~ Jonathan
>>>
>>> On Thu, Nov 5, 2015 at 9:48 AM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 Yeah, as Shivaram mentioned, this issue is well-known. It's documented
 in SPARK-5189  and a
 bunch of related issues. Unfortunately, it's hard to resolve this issue in
 spark-ec2 without rewriting large parts of the project. But if you take a
 crack at it and succeed I'm sure a lot of people will be happy.

 I've started a separate project 
  -- which Shivaram also mentioned -- which aims to solve the problem
 of long launch times and other issues
  with spark-ec2.
 It's still very young and lacks several critical features, but we are
 making steady progress.

 Nick

 On Thu, Nov 5, 2015 at 12:30 PM Shivaram Venkataraman <
 shiva...@eecs.berkeley.edu> wrote:

> It is a known limitation that spark-ec2 is very slow for large
> clusters and as you mention most of this is due to the use of rsync to
> transfer things from the master to all the slaves.
>
> Nick cc'd has been working on an alternative approach at
> https://github.com/nchammas/flintrock that is more scalable.
>
> Thanks
> Shivaram
>
> On Thu, Nov 5, 2015 at 8:12 AM, Christian  wrote:
> > For starters, thanks for the awesome product!
> >
> > When creating ec2-clusters of 20-40 nodes, things work great. When
> we create
> > a cluster with the provided spark-ec2 script, it takes hours. When
> creating
> > a 200 node cluster, it takes 2 1/2 hours and for a 500 node cluster
> it takes
> > over 5 hours. One other problem we are having is that some nodes
> don't come
> > up when the other ones do, the process seems to just move on,
> skipping the
> > rsync and any installs on those ones.
> >
> > My guess as to why it takes so long to set up a large cluster is
> because of
> > the use of rsync. What if instead of using rsync, you synched to s3
> and then
> > did a pdsh to pull it down on all of the machines. This is a big
> deal for us
> > and if we can come up with a good plan, we might be able help out
> with the
> > required changes.
> >
> > Are there any suggestions on how to deal with some of the nodes not
> being
> > ready when the process starts?
> >
> > Thanks for your time,
> > Christian
> >
>

>>>


Re: How to use data from Database and reload every hour

2015-11-05 Thread Sabarish Sasidharan
Theoretically the executor is a long lived container. So you could use some
simple caching library or a simple Singleton to cache the data in your
executors, once they load it from mysql. But note that with lots of
executors you might choke your mysql.

Regards
Sab
On 05-Nov-2015 7:03 pm, "Kay-Uwe Moosheimer"  wrote:

> I have the following problem.
> We have MySQL and an Spark cluster.
> We need to load 5 different validation-instructions (several thousand of
> entries each) and use this information on the executors to decide if
> content from Kafka-Streaming is for process a or b.
> The streaming data from kafka are json messages and the validation-info
> from MySQL says „if field a is that and field b ist that then process a“
> and so on.
>
> The tables on MySQL are changing over time and we have to reload the data
> every hour.
> I tried to use broadcasting where I load the data and store it on HashSets
> and HashMaps (java code), but It’s not possible to redistribute the data.
>
> What would be the best way to resolve my problem?
> Se native jdbc in executor task an load the data – can the executor store
> this data on HashSets etc. for next call so that I only load the data every
> hour?
> Use other possibilities?
>
>


Re: Running FPGrowth over a JavaPairRDD?

2015-10-29 Thread Sabarish Sasidharan
Hi

You cannot use PairRDD but you can use JavaRDD. So in your case, to
make it work with least change, you would call run(transactions.values()).

Each MLLib implementation has its own data structure typically and you
would have to convert from your data structure before you invoke. For ex if
you were doing regression on transactions you would instead convert that to
an RDD of LabeledPoint using a transactions.map(). If you wanted clustering
you would convert that to an RDD of Vector.

And taking a step back, without knowing what you want to accomplish, What
your fp growth snippet will tell you is as to which sensor values occur
together most frequently. That may or may not be what you are looking for.

Regards
Sab
On 30-Oct-2015 3:00 am, "Fernando Paladini"  wrote:

> Hello guys!
>
> First of all, if you want to take a look in a more readable question, take
> a look in my StackOverflow question
> 
> (I've made the same question there).
>
> I want to test Spark machine learning algorithms and I have some questions
> on how to run these algorithms with non-native data types. I'm going to run
> FPGrowth algorithm over the input because I want to get the most frequent
> itemsets for this input.
>
> *My data is disposed as the following:*
>
> [timestamp, sensor1value, sensor2value] # id: 0[timestamp, sensor1value, 
> sensor2value] # id: 1[timestamp, sensor1value, sensor2value] # id: 
> 2[timestamp, sensor1value, sensor2value] # id: 3...
>
> As I need to use Java (because Python doesn't have a lot of machine
> learning algorithms from Spark), this data structure isn't very easy to
> handle / create.
>
> *To achieve this data structure in Java I can visualize two approaches:*
>
>1. Use existing Java classes and data types to structure the input (I
>think some problems can occur in Spark depending on how complex is my 
> data).
>2. Create my own class (don't know if it works with Spark algorithms)
>
> 1. Existing Java classes and data types
>
> In order to do that I've created a* List>>*, so
> I can keep my data structured and also can create a RDD:
>
> List>> algorithm_data = new ArrayList List>>();
> populate(algorithm_data);JavaPairRDD> transactions = 
> sc.parallelizePairs(algorithm_data);
>
> I don't feel okay with JavaPairRDD because FPGrowth algorithm seems to be not 
> available for this data structure, as I will show you later in this post.
>
> 2. Create my own class
>
> I could also create a new class to store the input properly:
>
> public class PointValue {
>
> private long timestamp;
> private double sensorMeasure1;
> private double sensorMeasure2;
>
> // Constructor, getters and setters omitted...
> }
>
> However, I don't know if I can do that and still use it with Spark
> algorithms without any problems (in other words, running Spark algorithms
> without headaches). I'll focus in the first approach, but if you see that
> the second one is easier to achieve, please tell me.
> The solution (based on approach #1):
>
> // Initializing SparkSparkConf conf = new SparkConf().setAppName("FP-growth 
> Example");JavaSparkContext sc = new JavaSparkContext(conf);
> // Getting data for ML algorithmList>> 
> algorithm_data = new ArrayList>>();
> populate(algorithm_data);JavaPairRDD> transactions = 
> sc.parallelizePairs(algorithm_data);
> // Running FPGrowthFPGrowth fpg = new 
> FPGrowth().setMinSupport(0.2).setNumPartitions(10);FPGrowthModel List>> model = fpg.run(transactions);
> // Printing everythingfor (FPGrowth.FreqItemset>> 
> itemset: model.freqItemsets().toJavaRDD().collect()) {
> System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());}
>
> But then I got:
>
> *The method run(JavaRDD) in the type FPGrowth is not applicable for 
> the arguments (JavaPairRDD>)*
>
> *What can I do in order to solve my problem (run FPGrowth over
> JavaPairRDD)?*
>
> I'm available to give you more information, just tell me exactly what you
> need.
> Thank you!
> Fernando Paladini
>


RE: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Sabarish Sasidharan
If you are writing to S3, also make sure that you are using the direct
output committer. I don't have streaming jobs but it helps in my machine
learning jobs. Also, though more partitions help in processing faster, they
do slow down writes to S3. So you might want to coalesce before writing to
S3.

Regards
Sab
On 29-Oct-2015 6:21 pm, "Afshartous, Nick"  wrote:

> < Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> Thanks for you input.  The 3 minute window was chosen because we write the
> output of each batch into S3.  And with smaller batch time intervals there
> were many small files being written to S3, something to avoid.  That was
> the explanation of the developer who made this decision (who's no longer on
> the team).   We're in the process of re-evaluating.
> --
>  Nick
>
> -Original Message-
> From: Adrian Tanase [mailto:atan...@adobe.com]
> Sent: Wednesday, October 28, 2015 4:53 PM
> To: Afshartous, Nick 
> Cc: user@spark.apache.org
> Subject: Re: Spark/Kafka Streaming Job Gets Stuck
>
> Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> You could also try increasing the parallelism via repartition to ensure
> smaller tasks that can safely fit in working memory.
>
> Sent from my iPhone
>
> > On 28 Oct 2015, at 17:45, Afshartous, Nick 
> wrote:
> >
> >
> > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)
> job and seeing a problem.  This is running in AWS/Yarn and the streaming
> batch interval is set to 3 minutes and this is a ten node cluster.
> >
> > Testing at 30,000 events per second we are seeing the streaming job get
> stuck (stack trace below) for over an hour.
> >
> > Thanks on any insights or suggestions.
> > --
> >  Nick
> >
> > org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartiti
> > onsToPair(JavaDStreamLike.scala:43)
> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
> > erDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
> > erDriver.main(StreamingKafkaConsumerDriver.java:71)
> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
> > ava:57)
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
> > orImpl.java:43)
> > java.lang.reflect.Method.invoke(Method.java:606)
> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(Application
> > Master.scala:480)
> >
> > Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
> > additional commands, e-mail: user-h...@spark.apache.org
> >
>
> Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Using Hadoop Custom Input format in Spark

2015-10-27 Thread Sabarish Sasidharan
Did you try the sc.binaryFiles() which gives you an RDD of
PortableDataStream that wraps around the underlying bytes.

On Tue, Oct 27, 2015 at 10:23 PM, Balachandar R.A.  wrote:

> Hello,
>
>
> I have developed a hadoop based solution that process a binary file. This
> uses classic hadoop MR technique. The binary file is about 10GB and divided
> into 73 HDFS blocks, and the business logic written as map process operates
> on each of these 73 blocks. We have developed a customInputFormat and
> CustomRecordReader in Hadoop that returns key (intWritable) and value
> (BytesWritable) to the map function. The value is nothing but the contents
> of a HDFS block(bianry data). The business logic knows how to read this
> data.
>
> Now, I would like to port this code in spark. I am a starter in spark and
> could run simple examples (wordcount, pi example) in spark. However, could
> not straightforward example to process binaryFiles in spark. I see there
> are two solutions for this use case. In the first, avoid using custom input
> format and record reader. Find a method (approach) in spark the creates a
> RDD for those HDFS blocks, use a map like method that feeds HDFS block
> content to the business logic. If this is not possible, I would like to
> re-use the custom input format and custom reader using some methods such as
> HadoopAPI, HadoopRDD etc. My problem:- I do not know whether the first
> approach is possible or not. If possible, can anyone please provide some
> pointers that contains examples? I was trying second approach but highly
> unsuccessful. Here is the code snippet I used
>
> object Driver {
> def myFunc(key : IntWritable, content : BytesWritable) = {
>println("my business logic")
>   // printing key and content value/size is 0
>}
>
>
> def main(args: Array[String]) {
>   // create a spark context
>   val conf = new  
> SparkConf().setAppName("Dummy").setMaster("spark://:7077")
>   val sc = new SparkContext(conf)
>   val rd = sc.newAPIHadoopFile("hdfs:///user/name/MyDataFile.dat", 
> classOf[RandomAccessInputFormat], classOf[IntWritable], 
> classOf[BytesWritable])
>   val count = rd.map (x => func(x._1, x._2)).collect()
>}
> }
>
> Can someone tell where I am doing wrong here? I think I am not using API
> the right way but failed to find some documentation/usage examples.
>
>
> Thanks in advancea
>
> - bala
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Huge shuffle data size

2015-10-23 Thread Sabarish Sasidharan
How many rows are you joining? How many rows in the output?

Regards
Sab
On 24-Oct-2015 2:32 am, "pratik khadloya"  wrote:

> Actually the groupBy is not taking a lot of time.
> The join that i do later takes the most (95 %) amount of time.
> Also, the grouping i am doing is based on the DataFrame api, which does
> not contain any function for reduceBy... i guess the DF automatically uses
> reduce by when we do a group by.
>
> ~Pratik
>
> On Fri, Oct 23, 2015 at 1:38 PM Kartik Mathur  wrote:
>
>> Don't use groupBy , use reduceByKey instead , groupBy should always be
>> avoided as it leads to lot of shuffle reads/writes.
>>
>> On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya 
>> wrote:
>>
>>> Sorry i sent the wrong join code snippet, the actual snippet is
>>>
>>> ggImpsDf.join(
>>>aggRevenueDf,
>>>aggImpsDf("id_1") <=> aggRevenueDf("id_1")
>>>  && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
>>>  && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
>>>  && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
>>>"inner")
>>>.select(
>>>  aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
>>>  aggImpsDf("day_hour_2"), aggImpsDf("metric1"),
>>> aggRevenueDf("metric2"))
>>>.coalesce(200)
>>>
>>>
>>> On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya 
>>> wrote:
>>>
 Hello,

 Data about my spark job is below. My source data is only 916MB (stage
 0) and 231MB (stage 1), but when i join the two data sets (stage 2) it
 takes a very long time and as i see the shuffled data is 614GB. Is this
 something expected? Both the data sets produce 200 partitions.

 Stage IdDescriptionSubmittedDurationTasks: 
 Succeeded/TotalInputOutputShuffle
 ReadShuffle Write2saveAsTable at Driver.scala:269
 
 +details

 2015/10/22 18:48:122.3 h
 200/200
 614.6 GB1saveAsTable at Driver.scala:269
 
 +details

 2015/10/22 18:46:022.1 min
 8/8
 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
 
 +details

 2015/10/22 18:46:0235 s
 3/3
 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins
 the two data sets is:

 hc.sql(query).
 mapPartitions(iter => {
   iter.map {
 case Row(
  ...
  ...
  ...
 )
   }
 }
 ).toDF()
 .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
 .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
   sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))


 Please advise on how to reduce the shuffle and speed this up.


 ~Pratik


>>


Re: Calrification on Spark-Hadoop Configuration

2015-10-01 Thread Sabarish Sasidharan
You can point to your custom HADOOP_CONF_DIR in your spark-env.sh

Regards
Sab
On 01-Oct-2015 5:22 pm, "Vinoth Sankar"  wrote:

> Hi,
>
> I'm new to Spark. For my application I need to overwrite Hadoop
> configurations (Can't change Configurations in Hadoop as it might affect my
> regular HDFS), so that Namenode IPs gets automatically resolved.What are
> the ways to do so. I tried giving "spark.hadoop.dfs.ha.namenodes.nn",
> "spark.hadoop.dfs.namenode.rpc-address.nn",
> "spark.hadoop.dfs.namenode.http-address.nn" and other core-site & hdfs-site
> conf properties in SparkConf Object. But still i get UnknownHostException.
>
> Regards
> Vinoth Sankar
>


Re: GroupBy Java objects in Java Spark

2015-09-24 Thread Sabarish Sasidharan
By java class objects if you mean your custom Java objects, yes of course.
That will work.

Regards
Sab
On 24-Sep-2015 3:36 pm, "Ramkumar V"  wrote:

> Hi,
>
> I want to know whether grouping by java class objects is possible or not
> in java Spark.
>
> I have Tuple2< JavaObject, JavaObject>. i want to groupbyKey and then i'll
> do some operations in values after grouping.
>
>
> *Thanks*,
> 
>
>


Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task and Yarn containers

2015-09-24 Thread Sabarish Sasidharan
A little caution is needed as one executor per node may not always be ideal
esp when your nodes have lots of RAM. But yes, using lesser number of
executors has benefits like more efficient broadcasts.

Regards
Sab
On 24-Sep-2015 2:57 pm, "Adrian Tanase"  wrote:

> RE: # because I already have a bunch of InputSplits, do I still need to
> specify the number of executors to get processing parallelized?
>
> I would say it’s best practice to have as many executors as data nodes and
> as many cores as you can get from the cluster – if YARN has enough
>  resources it will deploy the executors distributed across the cluster,
> then each of them will try to process the data locally (check the spark ui
> for NODE_LOCAL), with as many splits in parallel as you defined in
> spark.executor.cores
>
> -adrian
>
> From: Sandy Ryza
> Date: Thursday, September 24, 2015 at 2:43 AM
> To: Anfernee Xu
> Cc: "user@spark.apache.org"
> Subject: Re: Custom Hadoop InputSplit, Spark partitions, spark
> executors/task and Yarn containers
>
> Hi Anfernee,
>
> That's correct that each InputSplit will map to exactly a Spark partition.
>
> On YARN, each Spark executor maps to a single YARN container.  Each
> executor can run multiple tasks over its lifetime, both parallel and
> sequentially.
>
> If you enable dynamic allocation, after the stage including the
> InputSplits gets submitted, Spark will try to request an appropriate number
> of executors.
>
> The memory in the YARN resource requests is --executor-memory + what's set
> for spark.yarn.executor.memoryOverhead, which defaults to 10% of
> --executor-memory.
>
> -Sandy
>
> On Wed, Sep 23, 2015 at 2:38 PM, Anfernee Xu 
> wrote:
>
>> Hi Spark experts,
>>
>> I'm coming across these terminologies and having some confusions, could
>> you please help me understand them better?
>>
>> For instance I have implemented a Hadoop InputFormat to load my external
>> data in Spark, in turn my custom InputFormat will create a bunch of
>> InputSplit's, my questions is about
>>
>> # Each InputSplit will exactly map to a Spark partition, is that correct?
>>
>> # If I run on Yarn, how does Spark executor/task map to Yarn container?
>>
>> # because I already have a bunch of InputSplits, do I still need to
>> specify the number of executors to get processing parallelized?
>>
>> # How does -executor-memory map to the memory requirement in Yarn's
>> resource request?
>>
>> --
>> --Anfernee
>>
>
>


Re: Creating BlockMatrix with java API

2015-09-23 Thread Sabarish Sasidharan
What I meant is that something like this would work. Yes, it's less than
elegant but it works.

List, Matrix>> blocks = new
ArrayList,Matrix>>();
blocks.add(
new Tuple2, Matrix>(
new Tuple2(0, 0), Matrices.dense(2, 2, new double[] {0.0D,
1.1D, 2.0D, 3.1D})));
blocks.add(
new Tuple2, Matrix>(
new Tuple2(0, 1), Matrices.dense(2, 2, new double[] {0.0D,
1.1D, 2.0D, 3.1D})));
blocks.add(
new Tuple2, Matrix>(
new Tuple2(1, 0), Matrices.dense(2, 2, new double[] {0.0D,
1.1D, 2.0D, 3.1D})));
blocks.add(
new Tuple2, Matrix>(
new Tuple2(1, 1), Matrices.dense(2, 2, new double[] {0.0D,
1.1D, 2.0D, 3.1D})));
BlockMatrix bm = new
BlockMatrix(CurrentContext.getCurrentContext().parallelize(blocks).rdd(),
2, 2);


Regards
Sab

On Thu, Sep 24, 2015 at 2:35 AM, Pulasthi Supun Wickramasinghe <
pulasthi...@gmail.com> wrote:

> Hi YiZhi,
>
> Actually i was not able to try it out to see if it was working. I sent the
> previous reply assuming that Sabarish's solution would work :). Sorry if
> there was any confusion.
>
> Best Regards,
> Pulasthi
>
> On Wed, Sep 23, 2015 at 6:47 AM, YiZhi Liu  wrote:
>
>> Hi Pulasthi,
>>
>> Are you sure this worked? When I applied rdd.rdd() to the constructor
>> of BlockMatrix, the complier complained
>>
>> [error]
>> spark/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVD.java:38:
>> error: incompatible types: RDD,Matrix>>
>> cannot be converted to RDD,Matrix>>
>> [error] BlockMatrix blockMatrix = new BlockMatrix(rdd.rdd(), 2, 2);
>>
>> It must caused by the type elimination from scala to java. To make it
>> work, we have to define 'rdd' as JavaRDD> Object>, Matrix>>
>>
>> As Yanbo has mentioned, I think a Java friendly constructor is still in
>> demand.
>>
>> 2015-09-23 13:14 GMT+08:00 Pulasthi Supun Wickramasinghe
>> :
>> > Hi Sabarish
>> >
>> > Thanks, that would indeed solve my problem
>> >
>> > Best Regards,
>> > Pulasthi
>> >
>> > On Wed, Sep 23, 2015 at 12:55 AM, Sabarish Sasidharan
>> >  wrote:
>> >>
>> >> Hi Pulasthi
>> >>
>> >> You can always use JavaRDD.rdd() to get the scala rdd. So in your case,
>> >>
>> >> new BlockMatrix(rdd.rdd(), 2, 2)
>> >>
>> >> should work.
>> >>
>> >> Regards
>> >> Sab
>> >>
>> >> On Tue, Sep 22, 2015 at 10:50 PM, Pulasthi Supun Wickramasinghe
>> >>  wrote:
>> >>>
>> >>> Hi Yanbo,
>> >>>
>> >>> Thanks for the reply. I thought i might be missing something. Anyway i
>> >>> moved to using scala since it is the complete API.
>> >>>
>> >>> Best Regards,
>> >>> Pulasthi
>> >>>
>> >>> On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang 
>> wrote:
>> >>>>
>> >>>> This is due to the distributed matrices like
>> >>>> BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do not
>> provide Java
>> >>>> friendly constructors. I have file a SPARK-10757 to track this issue.
>> >>>>
>> >>>> 2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe
>> >>>> :
>> >>>>>
>> >>>>> Hi All,
>> >>>>>
>> >>>>> I am new to Spark and i am trying to do some BlockMatrix operations
>> >>>>> with the Mllib API's. But i can't seem to create a BlockMatrix with
>> the java
>> >>>>> API. I tried the following
>> >>>>>
>> >>>>> Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
>> >>>>> List,Matrix>> list = new
>> >>>>> ArrayList, Matrix>>();
>> >>>>> Tuple2 intTuple = new Tuple2> Integer>(0,0);
>> >>>>> Tuple2,Matrix> tuple2MatrixTuple2 = new
>> >>>>> Tuple2, Matrix>(intTuple,matrixa );
>> >>>>> list.add(tuple2MatrixTuple2);
>> >>>>> JavaRDD, Matrix>> rdd =
>> >>>>> sc.parallelize(list);
>> >>>>>
>> >>>>> BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);
>> >>>>>
>> >>>>>
>> >>>>> but since BlockMatrix only takes
>> >>>>>
>> "RDD,Matrix>>"
>> >>>>> this code does not work. sc.parallelize() retu

Re: K Means Explanation

2015-09-23 Thread Sabarish Sasidharan
You can't obtain that from the model. But you can always ask the model to
predict the cluster center for your vectors by calling predict().

Regards
Sab

On Wed, Sep 23, 2015 at 7:24 PM, Tapan Sharma 
wrote:

> Hi All,
>
> In the KMeans example provided under mllib, it traverse the outcome of
> KMeansModel to know the cluster centers like this:
>
> KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs,
> KMeans.K_MEANS_PARALLEL());
>
> System.out.println("Cluster centers:");
> for (Vector center : model.clusterCenters()) {
>   System.out.println(" " + center);
> }
> https://spark.apache.org/docs/1.3.0/mllib-clustering.html#k-means
> 
> *How can I know the points contained in the particular cluster?*
>
> Regards
> Tapan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-Explanation-tp24787.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Creating BlockMatrix with java API

2015-09-22 Thread Sabarish Sasidharan
Hi Pulasthi

You can always use JavaRDD.rdd() to get the scala rdd. So in your case,

new BlockMatrix(rdd.rdd(), 2, 2)

should work.

Regards
Sab

On Tue, Sep 22, 2015 at 10:50 PM, Pulasthi Supun Wickramasinghe <
pulasthi...@gmail.com> wrote:

> Hi Yanbo,
>
> Thanks for the reply. I thought i might be missing something. Anyway i
> moved to using scala since it is the complete API.
>
> Best Regards,
> Pulasthi
>
> On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang  wrote:
>
>> This is due to the distributed matrices like 
>> BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do
>> not provide Java friendly constructors. I have file a SPARK-10757
>>  to track this issue.
>>
>> 2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe <
>> pulasthi...@gmail.com>:
>>
>>> Hi All,
>>>
>>> I am new to Spark and i am trying to do some BlockMatrix operations with
>>> the Mllib API's. But i can't seem to create a BlockMatrix with the java
>>> API. I tried the following
>>>
>>> Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
>>> List,Matrix>> list = new 
>>> ArrayList, Matrix>>();
>>> Tuple2 intTuple = new Tuple2(0,0);
>>> Tuple2,Matrix> tuple2MatrixTuple2 = new 
>>> Tuple2, Matrix>(intTuple,matrixa );
>>> list.add(tuple2MatrixTuple2);
>>> JavaRDD, Matrix>> rdd = 
>>> sc.parallelize(list);
>>>
>>> BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);
>>>
>>>
>>> but since BlockMatrix only
>>> takes 
>>> "RDD,Matrix>>"
>>> this code does not work. sc.parallelize() returns a JavaRDD so the two
>>> are not compatible. I also couldn't find any code samples for this. Any
>>> help on this would be highly appreciated.
>>>
>>> Best Regards,
>>> Pulasthi
>>> --
>>> Pulasthi S. Wickramasinghe
>>> Graduate Student  | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> cell: 224-386-9035
>>>
>>
>>
>
>
> --
> Pulasthi S. Wickramasinghe
> Graduate Student  | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> cell: 224-386-9035
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: How to increase the Json parsing speed

2015-08-27 Thread Sabarish Sasidharan
How many executors are you using when using Spark SQL?

On Fri, Aug 28, 2015 at 12:12 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> I see that you are not reusing the same mapper instance in the Scala
> snippet.
>
> Regards
> Sab
>
> On Fri, Aug 28, 2015 at 9:38 AM, Gavin Yue  wrote:
>
>> Just did some tests.
>>
>> I have 6000 files, each has 14K records with 900Mb file size.  In spark
>> sql, it would take one task roughly 1 min to parse.
>>
>> On the local machine, using the same Jackson lib inside Spark lib. Just
>> parse it.
>>
>> FileInputStream fstream = new FileInputStream("testfile");
>> BufferedReader br = new BufferedReader(new
>> InputStreamReader(fstream));
>> String strLine;
>> Long begin = System.currentTimeMillis();
>>  while ((strLine = br.readLine()) != null)   {
>> JsonNode s = mapper.readTree(strLine);
>>  }
>> System.out.println(System.currentTimeMillis() - begin);
>>
>> In JDK8, it took *6270ms. *
>>
>> Same code in Scala, it would take *7486ms*
>>val begin =  java.lang.System.currentTimeMillis()
>> for(line <- Source.fromFile("testfile").getLines())
>> {
>>   val mapper = new ObjectMapper()
>>   mapper.registerModule(DefaultScalaModule)
>>   val s = mapper.readTree(line)
>> }
>> println(java.lang.System.currentTimeMillis() - begin)
>>
>>
>> One Json record contains two fileds :  ID and List[Event].
>>
>> I am guessing put all the events into List would take the left time.
>>
>> Any solution to speed this up?
>>
>> Thanks a lot!
>>
>>
>> On Thu, Aug 27, 2015 at 7:45 PM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> For your jsons, can you tell us what is your benchmark when running on a
>>> single machine using just plain Java (without Spark and Spark sql)?
>>>
>>> Regards
>>> Sab
>>> On 28-Aug-2015 7:29 am, "Gavin Yue"  wrote:
>>>
>>>> Hey
>>>>
>>>> I am using the Json4s-Jackson parser coming with spark and parsing
>>>> roughly 80m records with totally size 900mb.
>>>>
>>>> But the speed is slow.  It took my 50 nodes(16cores cpu,100gb mem)
>>>> roughly 30mins to parse Json to use spark sql.
>>>>
>>>> Jackson has the benchmark saying parsing should be ms level.
>>>>
>>>> Any way to increase speed?
>>>>
>>>> I am using spark 1.4 on Hadoop 2.7 with Java 8.
>>>>
>>>> Thanks a lot !
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: How to increase the Json parsing speed

2015-08-27 Thread Sabarish Sasidharan
I see that you are not reusing the same mapper instance in the Scala
snippet.

Regards
Sab

On Fri, Aug 28, 2015 at 9:38 AM, Gavin Yue  wrote:

> Just did some tests.
>
> I have 6000 files, each has 14K records with 900Mb file size.  In spark
> sql, it would take one task roughly 1 min to parse.
>
> On the local machine, using the same Jackson lib inside Spark lib. Just
> parse it.
>
> FileInputStream fstream = new FileInputStream("testfile");
> BufferedReader br = new BufferedReader(new
> InputStreamReader(fstream));
> String strLine;
> Long begin = System.currentTimeMillis();
>  while ((strLine = br.readLine()) != null)   {
> JsonNode s = mapper.readTree(strLine);
>  }
> System.out.println(System.currentTimeMillis() - begin);
>
> In JDK8, it took *6270ms. *
>
> Same code in Scala, it would take *7486ms*
>val begin =  java.lang.System.currentTimeMillis()
> for(line <- Source.fromFile("testfile").getLines())
> {
>   val mapper = new ObjectMapper()
>   mapper.registerModule(DefaultScalaModule)
>   val s = mapper.readTree(line)
> }
> println(java.lang.System.currentTimeMillis() - begin)
>
>
> One Json record contains two fileds :  ID and List[Event].
>
> I am guessing put all the events into List would take the left time.
>
> Any solution to speed this up?
>
> Thanks a lot!
>
>
> On Thu, Aug 27, 2015 at 7:45 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> For your jsons, can you tell us what is your benchmark when running on a
>> single machine using just plain Java (without Spark and Spark sql)?
>>
>> Regards
>> Sab
>> On 28-Aug-2015 7:29 am, "Gavin Yue"  wrote:
>>
>>> Hey
>>>
>>> I am using the Json4s-Jackson parser coming with spark and parsing
>>> roughly 80m records with totally size 900mb.
>>>
>>> But the speed is slow.  It took my 50 nodes(16cores cpu,100gb mem)
>>> roughly 30mins to parse Json to use spark sql.
>>>
>>> Jackson has the benchmark saying parsing should be ms level.
>>>
>>> Any way to increase speed?
>>>
>>> I am using spark 1.4 on Hadoop 2.7 with Java 8.
>>>
>>> Thanks a lot !
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: How to increase the Json parsing speed

2015-08-27 Thread Sabarish Sasidharan
For your jsons, can you tell us what is your benchmark when running on a
single machine using just plain Java (without Spark and Spark sql)?

Regards
Sab
On 28-Aug-2015 7:29 am, "Gavin Yue"  wrote:

> Hey
>
> I am using the Json4s-Jackson parser coming with spark and parsing roughly
> 80m records with totally size 900mb.
>
> But the speed is slow.  It took my 50 nodes(16cores cpu,100gb mem) roughly
> 30mins to parse Json to use spark sql.
>
> Jackson has the benchmark saying parsing should be ms level.
>
> Any way to increase speed?
>
> I am using spark 1.4 on Hadoop 2.7 with Java 8.
>
> Thanks a lot !
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Difference btw MEMORY_ONLY and MEMORY_AND_DISK

2015-08-18 Thread Sabarish Sasidharan
MEMORY_ONLY will fail if there is not enough memory but MEMORY_AND_DISK
will spill to disk

Regards
Sab

On Tue, Aug 18, 2015 at 12:45 PM, Harsha HN <99harsha.h@gmail.com>
wrote:

> Hello Sparkers,
>
> I would like to understand difference btw these Storage levels for a RDD
> portion that doesn't fit in memory.
> As it seems like in both storage levels, whatever portion doesnt fit in
> memory will be spilled to disk. Any difference as such?
>
> Thanks,
> Harsha
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Regarding rdd.collect()

2015-08-18 Thread Sabarish Sasidharan
It is still in memory for future rdd transformations and actions. What you
get in driver is a copy of the data.

Regards
Sab

On Tue, Aug 18, 2015 at 12:02 PM, praveen S  wrote:

> When I do an rdd.collect().. The data moves back to driver  Or is still
> held in memory across the executors?
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Sabarish Sasidharan
Try setting the yarn executor memory overhead to a higher value like 1g or
1.5g or more.

Regards
Sab
On 28-Jun-2015 9:22 am, "Ayman Farahat"  wrote:

> That's correct this is Yarn
> And spark 1.4
> Also using the Anaconda tar for Numpy and other Libs
>
>
> Sent from my iPhone
>
> On Jun 27, 2015, at 8:50 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
> Are you running on top of YARN? Plus pls provide your infrastructure
> details.
>
> Regards
> Sab
> On 28-Jun-2015 8:47 am, "Ayman Farahat" 
> wrote:
>
>> Hello;
>> I tried to adjust the number of blocks by repartitioning the input.
>> Here is How I do it;  (I am partitioning by users )
>>
>> tot = newrdd.map(lambda l:
>> (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
>> ratings = tot.values()
>> numIterations =8
>> rank = 80
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>
>>
>> I have 20 executors
>> with 5GM memory per executor.
>> When i use 80 factors I keep getting the following problem :
>>
>> Traceback (most recent call last):
>>   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
>> line 201, in trainImplicit
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 128, in callMLlibFunc
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 121, in callJavaFunc
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o113.trainImplicitALSModel.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 7 in stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage
>> 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com):
>> java.io.FileNotFoundException:
>> /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
>> (No such file or directory)
>> at java.io.RandomAccessFile.open(Native Method)
>> at java.io.RandomAccessFile.(RandomAccessFile.java:233)
>> at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
>> at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>> at
>> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
>> at
>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>> at
>> org.apache

Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Sabarish Sasidharan
Are you running on top of YARN? Plus pls provide your infrastructure
details.

Regards
Sab
On 28-Jun-2015 9:20 am, "Sabarish Sasidharan" <
sabarish.sasidha...@manthan.com> wrote:

> Are you running on top of YARN? Plus pls provide your infrastructure
> details.
>
> Regards
> Sab
> On 28-Jun-2015 8:47 am, "Ayman Farahat" 
> wrote:
>
>> Hello;
>> I tried to adjust the number of blocks by repartitioning the input.
>> Here is How I do it;  (I am partitioning by users )
>>
>> tot = newrdd.map(lambda l:
>> (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
>> ratings = tot.values()
>> numIterations =8
>> rank = 80
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>
>>
>> I have 20 executors
>> with 5GM memory per executor.
>> When i use 80 factors I keep getting the following problem :
>>
>> Traceback (most recent call last):
>>   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
>> line 201, in trainImplicit
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 128, in callMLlibFunc
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 121, in callJavaFunc
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o113.trainImplicitALSModel.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 7 in stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage
>> 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com):
>> java.io.FileNotFoundException:
>> /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
>> (No such file or directory)
>> at java.io.RandomAccessFile.open(Native Method)
>> at java.io.RandomAccessFile.(RandomAccessFile.java:233)
>> at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
>> at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>> at
>> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
>> at
>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>   

Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Sabarish Sasidharan
Are you running on top of YARN? Plus pls provide your infrastructure
details.

Regards
Sab
On 28-Jun-2015 8:47 am, "Ayman Farahat" 
wrote:

> Hello;
> I tried to adjust the number of blocks by repartitioning the input.
> Here is How I do it;  (I am partitioning by users )
>
> tot = newrdd.map(lambda l:
> (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
> ratings = tot.values()
> numIterations =8
> rank = 80
> model = ALS.trainImplicit(ratings, rank, numIterations)
>
>
> I have 20 executors
> with 5GM memory per executor.
> When i use 80 factors I keep getting the following problem :
>
> Traceback (most recent call last):
>   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
> model = ALS.trainImplicit(ratings, rank, numIterations)
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
> line 201, in trainImplicit
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
> line 128, in callMLlibFunc
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
> line 121, in callJavaFunc
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o113.trainImplicitALSModel.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 7 in stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage
> 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com):
> java.io.FileNotFoundException:
> /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
> (No such file or directory)
> at java.io.RandomAccessFile.open(Native Method)
> at java.io.RandomAccessFile.(RandomAccessFile.java:233)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
> at
> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
> at
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Jun 28, 2015 2:10:37 AM INFO: parquet.hadoop.ParquetFileReader: Initiating
> action with parallelism: 5
> ~
>
> On Jun 26, 2015, at 12:33 PM, Xiangrui Meng  wrote:
>
> So you have 100 partitions (blocks). This might be too many for your
> dataset. Try setting a smaller number of blocks, e.g., 32 or 64. When ALS
> starts iterations, you can see the shuffle read/write size from the
> "stages" tab of Spark WebUI. Vary number of blocks and check the numbers
> there. Kyro serializer doesn't hel

Re: Parquet problems

2015-06-24 Thread Sabarish Sasidharan
Did you try increasing the perm gen for the driver?

Regards
Sab
On 24-Jun-2015 4:40 pm, "Anders Arpteg"  wrote:

> When reading large (and many) datasets with the Spark 1.4.0 DataFrames
> parquet reader (the org.apache.spark.sql.parquet format), the following
> exceptions are thrown:
>
> Exception in thread "task-result-getter-0"
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "task-result-getter-0"
> Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError:
> PermGen space
> Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError:
> PermGen space
> Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError:
> PermGen space
>
> and many more like these from different threads. I've tried increasing the
> PermGen space using the -XX:MaxPermSize VM setting, but even after tripling
> the space, the same errors occur. I've also tried storing intermediate
> results, and am able to get the full job completed by running it multiple
> times and starting for the last successful intermediate result. There seems
> to be some memory leak in the parquet format. Any hints on how to fix this
> problem?
>
> Thanks,
> Anders
>


Re: Help optimising Spark SQL query

2015-06-23 Thread Sabarish Sasidharan
64GB in parquet could be many billions of rows because of the columnar
compression. And count distinct by itself is an expensive operation. This
is not just on Spark, even on Presto/Impala, you would see performance dip
with count distincts. And the cluster is not that powerful either.

The one issue here is that Spark has to sift through all the data to get to
just a week's worth. To achieve better performance you might want to
partition the data by date/week and then Spark wouldn't have to sift
through all the billions of rows to get to the millions it needs to
aggregate.

Regards
Sab

On Tue, Jun 23, 2015 at 4:35 PM, James Aley  wrote:

> Thanks for the suggestions everyone, appreciate the advice.
>
> I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead
> of 1.3, replacing the date casts with a "between" operation on the
> corresponding long constants instead and changing COUNT(*) to COUNT(1).
> None of these seem to have made any remarkable difference in running time
> for the query.
>
> I'll hook up YourKit and see if we can figure out where the CPU time is
> going, then post back.
>
> On 22 June 2015 at 16:01, Yin Huai  wrote:
>
>> Hi James,
>>
>> Maybe it's the DISTINCT causing the issue.
>>
>> I rewrote the query as follows. Maybe this one can finish faster.
>>
>> select
>>   sum(cnt) as uses,
>>   count(id) as users
>> from (
>>   select
>> count(*) cnt,
>> cast(id as string) as id,
>>   from usage_events
>>   where
>> from_unixtime(cast(timestamp_millis/1000 as bigint)) between
>> '2015-06-09' and '2015-06-16'
>>   group by cast(id as string)
>> ) tmp
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke 
>> wrote:
>>
>>> Generally (not only spark sql specific) you should not cast in the where
>>> part of a sql query. It is also not necessary in your case. Getting rid of
>>> casts in the whole query will be also beneficial.
>>>
>>> Le lun. 22 juin 2015 à 17:29, James Aley  a
>>> écrit :
>>>
 Hello,

 A colleague of mine ran the following Spark SQL query:

 select
   count(*) as uses,
   count (distinct cast(id as string)) as users
 from usage_events
 where
   from_unixtime(cast(timestamp_millis/1000 as bigint))
 between '2015-06-09' and '2015-06-16'

 The table contains billions of rows, but totals only 64GB of data
 across ~30 separate files, which are stored as Parquet with LZO compression
 in S3.

 From the referenced columns:

 * id is Binary, which we cast to a String so that we can DISTINCT by
 it. (I was already told this will improve in a later release, in a separate
 thread.)
 * timestamp_millis is a long, containing a unix timestamp with
 millisecond resolution

 This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
 instances, using 20 executors, each with 4GB memory. I can see from
 monitoring tools that the CPU usage is at 100% on all nodes, but incoming
 network seems a bit low at 2.5MB/s, suggesting to me that this is 
 CPU-bound.

 Does that seem slow? Can anyone offer any ideas by glancing at the
 query as to why this might be slow? We'll profile it meanwhile and post
 back if we find anything ourselves.

 A side issue - I've found that this query, and others, sometimes
 completes but doesn't return any results. There appears to be no error that
 I can see in the logs, and Spark reports the job as successful, but the
 connected JDBC client (SQLWorkbenchJ in this case), just sits there forever
 waiting. I did a quick Google and couldn't find anyone else having similar
 issues.


 Many thanks,

 James.

>>>
>>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Matrix Multiplication and mllib.recommendation

2015-06-17 Thread Sabarish Sasidharan
Nick is right. I too have implemented this way and it works just fine. In
my case, there can be even more products. You simply broadcast blocks of
products to userFeatures.mapPartitions() and BLAS multiply in there to get
recommendations. In my case 10K products form one block. Note that you
would then have to union your recommendations. And if there lots of product
blocks, you might also want to checkpoint once every few times.

Regards
Sab

On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
wrote:

> One issue is that you broadcast the product vectors and then do a dot
> product one-by-one with the user vector.
>
> You should try forming a matrix of the item vectors and doing the dot
> product as a matrix-vector multiply which will make things a lot faster.
>
> Another optimisation that is avalailable on 1.4 is a recommendProducts
> method that blockifies the factors to make use of level 3 BLAS (ie
> matrix-matrix multiply). I am not sure if this is available in The Python
> api yet.
>
> But you can do a version yourself by using mapPartitions over user
> factors, blocking the factors into sub-matrices and doing matrix multiply
> with item factor matrix to get scores on a block-by-block basis.
>
> Also as Ilya says more parallelism can help. I don't think it's so
> necessary to do LSH with 30,000 items.
>
> —
> Sent from Mailbox 
>
>
> On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya <
> ilya.gane...@capitalone.com> wrote:
>
>> Actually talk about this exact thing in a blog post here
>> http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
>> Keep in mind, you're actually doing a ton of math. Even with proper caching
>> and use of broadcast variables this will take a while defending on the size
>> of your cluster. To get real results you may want to look into locality
>> sensitive hashing to limit your search space and definitely look into
>> spinning up multiple threads to process your product features in parallel
>> to increase resource utilization on the cluster.
>>
>>
>>
>> Thank you,
>> Ilya Ganelin
>>
>>
>>
>> -Original Message-
>> *From: *afarahat [ayman.fara...@yahoo.com]
>> *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
>> *To: *user@spark.apache.org
>> *Subject: *Matrix Multiplication and mllib.recommendation
>>
>> Hello;
>> I am trying to get predictions after running the ALS model.
>> The model works fine. In the prediction/recommendation , I have about 30
>> ,000 products and 90 Millions users.
>> When i try the predict all it fails.
>> I have been trying to formulate the problem as a Matrix multiplication
>> where
>> I first get the product features, broadcast them and then do a dot
>> product.
>> Its still very slow. Any reason why
>> here is a sample code
>>
>> def doMultiply(x):
>> a = []
>> #multiply by
>> mylen = len(pf.value)
>> for i in range(mylen) :
>>   myprod = numpy.dot(x,pf.value[i][1])
>>   a.append(myprod)
>> return a
>>
>>
>> myModel = MatrixFactorizationModel.load(sc, "FlurryModelPath")
>> #I need to select which products to broadcast but lets try all
>> m1 = myModel.productFeatures().sample(False, 0.001)
>> pf = sc.broadcast(m1.collect())
>> uf = myModel.userFeatures()
>> f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Spark or Storm

2015-06-16 Thread Sabarish Sasidharan
Whatever you write in bolts would be the logic you want to apply on your
events. In Spark, that logic would be coded in map() or similar such
transformations and/or actions. Spark doesn't enforce a structure for
capturing your processing logic like Storm does.

Regards
Sab
Probably overloading the question a bit.

In Storm, Bolts have the functionality of getting triggered on events. Is
that kind of functionality possible with Spark streaming? During each phase
of the data processing, the transformed data is stored to the database and
this transformed data should then be sent to a new pipeline for further
processing

How can this be achieved using Spark?



On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast  wrote:

> I have a use-case where a stream of Incoming events have to be aggregated
> and joined to create Complex events. The aggregation will have to happen at
> an interval of 1 minute (or less).
>
> The pipeline is :
>   send events
>  enrich event
> Upstream services ---> KAFKA -> event Stream
> Processor > Complex Event Processor > Elastic
> Search.
>
> From what I understand, Storm will make a very good ESP and Spark
> Streaming will make a good CEP.
>
> But, we are also evaluating Storm with Trident.
>
> How does Spark Streaming compare with Storm with Trident?
>
> Sridhar Chellappa
>
>
>
>
>
>
>
>   On Wednesday, 17 June 2015 10:02 AM, ayan guha 
> wrote:
>
>
> I have a similar scenario where we need to bring data from kinesis to
> hbase. Data volecity is 20k per 10 mins. Little manipulation of data will
> be required but that's regardless of the tool so we will be writing that
> piece in Java pojo.
> All env is on aws. Hbase is on a long running EMR and kinesis on a
> separate cluster.
> TIA.
> Best
> Ayan
> On 17 Jun 2015 12:13, "Will Briggs"  wrote:
>
> The programming models for the two frameworks are conceptually rather
> different; I haven't worked with Storm for quite some time, but based on my
> old experience with it, I would equate Spark Streaming more with Storm's
> Trident API, rather than with the raw Bolt API. Even then, there are
> significant differences, but it's a bit closer.
>
> If you can share your use case, we might be able to provide better
> guidance.
>
> Regards,
> Will
>
> On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote:
>
> Hi All,
>
> I am evaluating spark VS storm ( spark streaming  ) and i am not able to
> see what is equivalent of Bolt in storm inside spark.
>
> Any help will be appreciated on this ?
>
> Thanks ,
> Ashish
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: ArrayIndexOutOfBoundsException in ALS.trainImplicit

2015-03-22 Thread Sabarish Sasidharan
My bad. This was an outofmemory disguised as something else.

Regards
Sab

On Sun, Mar 22, 2015 at 1:53 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> I am consistently running into this ArrayIndexOutOfBoundsException issue
> when using trainImplicit. I have tried changing the partitions and
> switching to JavaSerializer. But they don't seem to help. I see that this
> is the same as https://issues.apache.org/jira/browse/SPARK-3080. My
> lambda is 0.01, rank is 5,  iterations is 10 and alpha is 0.01. I am using
> 41 executors, each with 8GB on a 48 million dataset.
>
> 15/03/21 13:07:29 ERROR executor.Executor: Exception in task 12.0 in stage
> 2808.0 (TID 40575)
> java.lang.ArrayIndexOutOfBoundsException: 692
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:548)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at org.apache.spark.mllib.recommendation.ALS.org
> $apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:542)
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:510)
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:509)
> at
> org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
> at
> org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
>
> How can I get around this issue?
>
> ​Regards
> Sab
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


ArrayIndexOutOfBoundsException in ALS.trainImplicit

2015-03-21 Thread Sabarish Sasidharan
I am consistently running into this ArrayIndexOutOfBoundsException issue
when using trainImplicit. I have tried changing the partitions and
switching to JavaSerializer. But they don't seem to help. I see that this
is the same as https://issues.apache.org/jira/browse/SPARK-3080. My lambda
is 0.01, rank is 5,  iterations is 10 and alpha is 0.01. I am using 41
executors, each with 8GB on a 48 million dataset.

15/03/21 13:07:29 ERROR executor.Executor: Exception in task 12.0 in stage
2808.0 (TID 40575)
java.lang.ArrayIndexOutOfBoundsException: 692
at
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:548)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.mllib.recommendation.ALS.org
$apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:542)
at
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:510)
at
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:509)
at
org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
at
org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)

How can I get around this issue?

​Regards
Sab

-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: mapPartitions - How Does it Works

2015-03-18 Thread Sabarish Sasidharan
Unlike a map() wherein your task is acting on a row at a time, with
mapPartitions(), the task is passed the  entire content of the partition in
an iterator. You can then return back another iterator as the output. I
don't do scala, but from what I understand from your code snippet... The
iterator x can return all the rows in the partition. But you are returning
back after consuming the first row. Hence you see only 1,4,7 in your
output. These are the first rows of each of your 3 partitions.

Regards
Sab
On 18-Mar-2015 10:50 pm, "ashish.usoni"  wrote:

> I am trying to understand about mapPartitions but i am still not sure how
> it
> works
>
> in the below example it create three partition
> val parallel = sc.parallelize(1 to 10, 3)
>
> and when we do below
> parallel.mapPartitions( x => List(x.next).iterator).collect
>
> it prints value
> Array[Int] = Array(1, 4, 7)
>
> Can some one please explain why it prints 1,4,7 only
>
> Thanks,
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-How-Does-it-Works-tp22123.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-02 Thread Sabarish Sasidharan
Thanks Debasish, Reza and Pat. In my case, I am doing an SVD and then doing
the similarities computation. So a rowSimiliarities() would be a good fit,
looking forward to it.

In the meanwhile I will try to see if I can further limit the number of
similarities computed through some other fashion or use kmeans instead or a
combination of both. I have also been looking at Mahout's similarity
recommenders based on spark, but not sure if the row similarity would apply
in my case as my matrix is pretty dense.

Regards
Sab



On Tue, Mar 3, 2015 at 7:11 AM, Pat Ferrel  wrote:

> Sab, not sure what you require for the similarity metric or your use case
> but you can also look at spark-rowsimilarity or spark-itemsimilarity
> (column-wise) here
> http://mahout.apache.org/users/recommender/intro-cooccurrence-spark.html.
> These are optimized for LLR based “similarity” which is very simple to
> calculate since you don’t use either the item weight or the entire row or
> column vector values. Downsampling is done by number of values per column
> (or row) and by LLR strength. This keeps it to O(n)
>
> They run pretty fast and only use memory if you use the version that
> attaches application IDs to the rows and columns. Using
> SimilarityAnalysis.cooccurrence may help. It’s in the Spark/Scala part of
> Mahout.
>
> On Mar 2, 2015, at 12:56 PM, Reza Zadeh  wrote:
>
> Hi Sab,
> The current method is optimized for having many rows and few columns. In
> your case it is exactly the opposite. We are working on your case, tracked
> by this JIRA: https://issues.apache.org/jira/browse/SPARK-4823
> Your case is very common, so I will put some time into building it.
>
> In the meantime, if you're looking for groups of similar points, consider
> using K-means - it will get you clusters of similar rows with euclidean
> distance.
>
> Best,
> Reza
>
>
> On Sun, Mar 1, 2015 at 9:36 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>>
>> ​Hi Reza
>> ​​
>> I see that ((int, int), double) pairs are generated for any combination
>> that meets the criteria controlled by the threshold. But assuming a simple
>> 1x10K matrix that means I would need atleast 12GB memory per executor for
>> the flat map just for these pairs excluding any other overhead. Is that
>> correct? How can we make this scale for even larger n (when m stays small)
>> like 100 x 5 million.​ One is by using higher thresholds. The other is that
>> I use a SparseVector to begin with. Are there any other optimizations I can
>> take advantage of?
>>
>>
>>
>>
>> ​Thanks
>> Sab
>>
>>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-01 Thread Sabarish Sasidharan
​Hi Reza
​​
I see that ((int, int), double) pairs are generated for any combination
that meets the criteria controlled by the threshold. But assuming a simple
1x10K matrix that means I would need atleast 12GB memory per executor for
the flat map just for these pairs excluding any other overhead. Is that
correct? How can we make this scale for even larger n (when m stays small)
like 100 x 5 million.​ One is by using higher thresholds. The other is that
I use a SparseVector to begin with. Are there any other optimizations I can
take advantage of?

​Thanks
Sab


  1   2   >