Re: spark-shell failing but pyspark works

2016-04-02 Thread Cyril Scetbon
Nobody has any idea ?

> On Mar 31, 2016, at 23:22, Cyril Scetbon  wrote:
> 
> Hi,
> 
> I'm having issues to create a StreamingContext with Scala using spark-shell. 
> It tries to access the localhost interface and the Application Master is not 
> running on this interface :
> 
> ERROR ApplicationMaster: Failed to connect to driver at localhost:47257, 
> retrying ...
> 
> I don't have the issue with Python and pyspark which works fine (you can see 
> it uses the ip address) : 
> 
> ApplicationMaster: Driver now available: 192.168.10.100:43290
> 
> I use similar codes though :
> 
> test.scala :
> --
> 
> import org.apache.spark._
> import org.apache.spark.streaming._
> val app = "test-scala"
> val conf = new SparkConf().setAppName(app).setMaster("yarn-client")
> val ssc = new StreamingContext(conf, Seconds(3))
> 
> command used : spark-shell -i test.scala
> 
> test.py :
> ---
> 
> from pyspark import SparkConf, SparkContext
> from pyspark.streaming import StreamingContext
> app = "test-python"
> conf = SparkConf().setAppName(app).setMaster("yarn-client")
> sc = SparkContext(conf=conf)
> ssc = StreamingContext(sc, 3)
> 
> command used : pyspark test.py
> 
> Any idea why scala can't instantiate it ? I thought python was barely using 
> scala under the hood, but it seems there are differences. Are there any 
> parameters set using Scala but not Python ? 
> 
> Thanks
> -- 
> Cyril SCETBON
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[jira] Vijay Parmar shared "PIG-4824: FOREACH throwing error" with you

2016-04-02 Thread Vijay Parmar (JIRA)
Vijay Parmar shared an issue with you
---

Does anyone else also facing a similar issue with FOREACH on Pig?

> FOREACH throwing error
> --
>
> Key: PIG-4824
> URL: https://issues.apache.org/jira/browse/PIG-4824
> Project: Pig
>  Issue Type: Bug
>Reporter: vivek singh
>Assignee: Vijay Parmar
>





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Working out SQRT on a list

2016-04-02 Thread Mich Talebzadeh
Try this specifying sqrt as a Math function

scala> val l = List(2,9,90,66)
l: List[Int] = List(2, 9, 90, 66)
scala> l.map(x => Math.sqrt(x*x))
res0: List[Double] = List(2.0, 9.0, 90.0, 66.0)

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 2 April 2016 at 21:40, Ashok Kumar  wrote:

> Hi
>
> I like a simple sqrt operation on a list but I don't get the result
>
> scala val l = List (1,5,786,25)
> l: List[Int] = List(1, 5, 786, 25)
>
> scala> l.map(x => x * x)
> res42: List[Int] = List(1, 25, 617796, 625)
>
> scala> l.map(x => x * x).sqrt
> :28: error: value sqrt is not a member of List[Int]
>   l.map(x => x * x).sqrt
>
> Any ideas
>
> Thanks
>


RE: Spark vs Redshift

2016-04-02 Thread rajesh.prabhu
Hi Eris,

I also found this rather old discussion, about Spark Vs Redshift.
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-v-Redshift-td18112.html

Regards,
Rajesh

Basel, Switzerland
Ph: +41 77 941 0562
rajesh.pra...@wipro.com

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Saturday, April 02, 2016 11:16 PM
To: Eris Lawrence
Cc: user @spark
Subject: Re: Spark vs Redshift


** This mail has reached you via an external source **
Hi,

Like anything else your mileage varies using any tool.

To start what is your use case here (fit for your needs)? You stated that you 
want to perform OLAP on large datasets. OLAP is normally performed on large 
data sets anyway so I assume you already have some form of Data Warehouse 
commercial or otherwise. Do you also need to do Big Data analytics containing a 
variety of  data formats including un-structured data?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 2 April 2016 at 21:34, Eris Lawrence 
> wrote:
Hi Spark devs,

I was recently into a tech session about data processing with spark vs redshift 
which concluded with metrics and datapoint that for 2 Billion data, Select 
queries on data based on filters on attributes were faster and cheaper on AWS 
Redshift as compared to an AWS Spark cluster.

I have researched around a bit, and both Redshift and Spark seem to processing 
softwares where we want to do OLAP queries on a large dataset. I was wondering 
in which usecases does Spark has an edge over Redshift? Are there certain kind 
of Complex queries where Spark can outperform Redshift? Or does Redshift only 
work well with schema defined data?

Please share your experience with either of the technologies. Thanks.

Cheers,
Eris.

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Re: Spark vs Redshift

2016-04-02 Thread Mich Talebzadeh
Hi,

Like anything else your mileage varies using any tool.

To start what is your use case here (fit for your needs)? You stated that
you want to perform OLAP on large datasets. OLAP is normally performed on
large data sets anyway so I assume you already have some form of Data
Warehouse commercial or otherwise. Do you also need to do Big Data
analytics containing a variety of  data formats including un-structured
data?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 2 April 2016 at 21:34, Eris Lawrence  wrote:

> Hi Spark devs,
>
> I was recently into a tech session about data processing with spark vs
> redshift which concluded with metrics and datapoint that for 2 Billion
> data, Select queries on data based on filters on attributes were faster and
> cheaper on AWS Redshift as compared to an AWS Spark cluster.
>
> I have researched around a bit, and both Redshift and Spark seem to
> processing softwares where we want to do OLAP queries on a large dataset. I
> was wondering in which usecases does Spark has an edge over Redshift? Are
> there certain kind of Complex queries where Spark can outperform Redshift?
> Or does Redshift only work well with schema defined data?
>
> Please share your experience with either of the technologies. Thanks.
>
> Cheers,
> Eris.
>


Working out SQRT on a list

2016-04-02 Thread Ashok Kumar
Hi 
I like a simple sqrt operation on a list but I don't get the result
scala val l = List (1,5,786,25)l: List[Int] = List(1, 5, 786, 25)
scala> l.map(x => x * x)res42: List[Int] = List(1, 25, 617796, 625)
scala> l.map(x => x * x).sqrt:28: error: value sqrt is not a member of 
List[Int]              l.map(x => x * x).sqrt
Any ideas
Thanks

Spark vs Redshift

2016-04-02 Thread Eris Lawrence
Hi Spark devs,

I was recently into a tech session about data processing with spark vs
redshift which concluded with metrics and datapoint that for 2 Billion
data, Select queries on data based on filters on attributes were faster and
cheaper on AWS Redshift as compared to an AWS Spark cluster.

I have researched around a bit, and both Redshift and Spark seem to
processing softwares where we want to do OLAP queries on a large dataset. I
was wondering in which usecases does Spark has an edge over Redshift? Are
there certain kind of Complex queries where Spark can outperform Redshift?
Or does Redshift only work well with schema defined data?

Please share your experience with either of the technologies. Thanks.

Cheers,
Eris.


Re: --packages configuration equivalent item name?

2016-04-02 Thread Russell Jurney
Thanks, Andy!

On Mon, Mar 28, 2016 at 8:44 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Russell
>
> I use Jupyter python notebooks a lot. Here is how I start the server
>
> set -x # turn debugging on
>
> #set +x # turn debugging off
>
>
> # https://github.com/databricks/spark-csv
>
> # http://spark-packages.org/package/datastax/spark-cassandra-connector
>
> #
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md
>
> #
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md#pyspark-with-data-frames
>
>
> # packages are ',' seperate with no white space
>
> extraPkgs="--packages
> com.databricks:spark-csv_2.11:1.3.0,datastax:spark-cassandra-connector:1.6.0-M1-s_2.10"
>
>
> export PYSPARK_PYTHON=python3
>
> export PYSPARK_DRIVER_PYTHON=python3
>
> IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs --conf
> spark.cassandra.connection.host=
> ec2-54-153-102-232.us-west-1.compute.amazonaws.com $*
>
>
>
> From: Russell Jurney 
> Date: Sunday, March 27, 2016 at 7:22 PM
> To: "user @spark" 
> Subject: --packages configuration equivalent item name?
>
> I run PySpark with CSV support like so: IPYTHON=1 pyspark --packages
> com.databricks:spark-csv_2.10:1.4.0
>
> I don't want to type this --packages argument each time. Is there a config
> item for --packages? I can't find one in the reference at
> http://spark.apache.org/docs/latest/configuration.html
>
> If there is no way to do this, please let me know so I can make a JIRA for
> this feature.
>
> Thanks!
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>
>


-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


What is the most efficient way to do a sorted reduce in PySpark?

2016-04-02 Thread Russell Jurney
Dear Spark Users,

I need assistance in understanding which way I should do a sorted reduce in
PySpark. Yes, I know all reduces are sorted because sorting is grouping,
but what I mean is that I need to create a tuple where the first field is a
key, and the second field is a sorted list of all items with that key. It
should look like this:

(id, [(id,A),(id,B),(id,C),...])

And I'm wondering what the 'right' way to do this is. Because this is a
slow operation.

Option 1: sort in the reduce

origin/dest
flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
  .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))

Option 2: sort in a subsequent map step

flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
  .reduceByKey(lambda a, b: a + b)\
  .map(lambda tuple:
(
  tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4])))
)

The second option would seem to be more efficient, unless PySpark is smart
and uses the reduce sort for 'free.' Or maybe there is some other
optimization PySpark does. In any case, both are 'slow', but which one is
faster?

Thanks!

Stack Overflow:
http://stackoverflow.com/questions/36376369/what-is-the-most-efficient-way-to-do-a-sorted-reduce-in-pyspark
Gist: https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade
-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


Re: spark-shell with different username

2016-04-02 Thread Matt Tenenbaum
Hi Mich. I certainly should have included that info in my original message
(sorry!): it's a mac, running OS X (10.11.3).

Cheers
-mt

On Fri, Apr 1, 2016 at 11:16 PM, Mich Talebzadeh 
wrote:

> Matt,
>
> What OS are you using on your laptop? Sounds like Ubuntu or something?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 2 April 2016 at 01:17, Matt Tenenbaum 
> wrote:
>
>> Hello all —
>>
>> tl;dr: I’m having an issue running spark-shell from my laptop (or other
>> non-cluster-affiliated machine), and I think the issue boils down to
>> usernames. Can I convince spark/scala that I’m someone other than $USER?
>>
>> A bit of background: our cluster is CDH 5.4.8, installed with Cloudera
>> Manager 5.5. We use LDAP, and my login on all hadoop-affiliated machines
>> (including the gateway boxes we use for running scheduled work) is
>> ‘matt.tenenbaum’. When I run spark-shell on one of those machines,
>> everything is fine:
>>
>> [matt.tenenbaum@remote-machine ~]$ HADOOP_CONF_DIR=/etc/hadoop/conf 
>> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
>> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>>
>> Everything starts up correctly, I get a scala prompt, the SparkContext
>> and SQL context are correctly initialized, and I’m off to the races:
>>
>> 16/04/01 23:27:00 INFO session.SessionState: Created local directory: 
>> /tmp/35b58974-dad5-43c6-9864-43815d101ca0_resources
>> 16/04/01 23:27:00 INFO session.SessionState: Created HDFS directory: 
>> /tmp/hive/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0
>> 16/04/01 23:27:00 INFO session.SessionState: Created local directory: 
>> /tmp/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0
>> 16/04/01 23:27:00 INFO session.SessionState: Created HDFS directory: 
>> /tmp/hive/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0/_tmp_space.db
>> 16/04/01 23:27:00 INFO repl.SparkILoop: Created sql context (with Hive 
>> support)..
>> SQL context available as sqlContext.
>>
>> scala> 1 + 41
>> res0: Int = 42
>>
>> scala> sc
>> res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4e9bd2c8
>>
>> I am running 1.6 from a downloaded tgz file, rather than the spark-shell
>> made available to the cluster from CDH. I can copy that tgz to my laptop,
>> and grab a copy of the cluster configurations, and in a perfect world I
>> would then be able to run everything in the same way
>>
>> [matt@laptop ~]$ HADOOP_CONF_DIR=path/to/hadoop/conf 
>> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
>> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>>
>> Notice there are two things that are different:
>>
>>1. My local username on my laptop is ‘matt’, which does not match my
>>name on the remote machine.
>>2. The Hadoop configs live somewhere other than /etc/hadoop/conf
>>
>> Alas, #1 proves fatal because of cluster permissions (there is no
>> /user/matt/ in HDFS, and ‘matt’ is not a valid LDAP user). In the
>> initialization logging output, I can see that fail in an expected way:
>>
>> 16/04/01 16:37:19 INFO yarn.Client: Setting up container launch context for 
>> our AM
>> 16/04/01 16:37:19 INFO yarn.Client: Setting up the launch environment for 
>> our AM container
>> 16/04/01 16:37:19 INFO yarn.Client: Preparing resources for our AM container
>> 16/04/01 16:37:20 WARN util.NativeCodeLoader: Unable to load native-hadoop 
>> library for your platform... using builtin-java classes where applicable
>> 16/04/01 16:37:21 ERROR spark.SparkContext: Error initializing SparkContext.
>> org.apache.hadoop.security.AccessControlException: Permission denied: 
>> user=matt, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
>> at 
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>> at 
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>> at (... etc ...)
>>
>> Fine. In other circumstances I’ve told Hadoop explicitly who I am by
>> setting HADOOP_USER_NAME. Maybe that works here?
>>
>> [matt@laptop ~]$ HADOOP_USER_NAME=matt.tenenbaum HADOOP_CONF_DIR=soma-conf 
>> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
>> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>>
>> Eventually that fails too, but not for the same reason. Setting
>> HADOOP_USER_NAME is sufficient to allow initialization to get past the
>> access-control problems, and I can see it request a new application from
>> the cluster
>>
>> 16/04/01 16:43:08 INFO yarn.Client: Will allocate AM container, with 896 MB 
>> memory including 384 MB overhead
>> 16/04/01 16:43:08 INFO yarn.Client: Setting up container launch context 

RE: Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-02 Thread Silvio Fiorito
In the meantime you can simply define your custom metric source in the 
org.apache.spark package.


From: Walid Lezzar
Sent: Saturday, April 2, 2016 4:23 AM
To: Saisai Shao
Cc: spark users
Subject: Re: Spark Metrics : Why is the Sink class declared private[spark] ?

This is great ! Hope this jira will be resolved for the next version of spark

Thanks.

Le 2 avr. 2016 ? 01:07, Saisai Shao 
> a ?crit :

There's a JIRA (https://issues.apache.org/jira/browse/SPARK-14151) about it, 
please take a look.

Thanks
Saisai

On Sat, Apr 2, 2016 at 6:48 AM, Walid Lezzar 
> wrote:
Hi,

I looked into the spark code at how spark report metrics using the 
MetricsSystem class. I've seen that the spark MetricsSystem class when 
instantiated parses the metrics.properties file, tries to find the sinks class 
name and load them dinamically. It would be great to implement my own sink by 
inheriting from the org.apache.spark.metrics.sinks.Sink class but 
unfortunately, this class has been declared private[spark] ! So it is not 
possible to inverit from it ! Why is that ? Is this gonna change in future 
spark versions ?
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Multiple lookups; consolidate result and run further aggregations

2016-04-02 Thread Ted Yu
Looking at the implementation for lookup in PairRDDFunctions, I think your
understanding is correct.


On Sat, Apr 2, 2016 at 3:16 AM, Nirav Patel  wrote:

> I will start by question: Is spark lookup function on pair rdd is a driver
> action. ie result is returned to driver?
>
> I have list of Keys on driver side and I want to perform multiple parallel
> lookups on pair rdd which returns Seq[V]; consolidate results; and perform
> further aggregation/transformation over cluster.
>
> val seqVal = lookupKeys.flatMap(key => {
>
> dataRdd.lookup(key)
>
>   })
>
>
> Here's what I think will happen internally:
>
> lookup up for Seq[V]  return result to driver
>
> Consolidation of each Seq[v] has to happen on driver due to flatMap
> function
>
> All subsequent operation will happen on driver side unless I do
> sparkContext.parallelize(seqVal)
>
> Is this correct?
>
> Also, what I am trying to do is efficient multiple lookup. Another option
> is to broadcast lookup keys and perform join.
>
> Please advice.
>
> Thanks
> Nirav
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


[no subject]

2016-04-02 Thread Hemalatha A
Hello,

As per Spark programming guide, it says "we should have 2-4 partitions for
each CPU in your cluster.". In this case how does 1 CPU core process 2-4
partitions at the same time?
Link - http://spark.apache.org/docs/latest/programming-guide.html (under
Rdd section)

Does it do context switching between tasks or run them in parallel? If it
does context switching how is it efficient compared to 1:1 partition vs
Core?

PS: If we are using Kafka direct API  in which kafka partitions=  Rdd
partitions. Does that mean we should give 40 kafka partitions for 10 CPU
Cores?

-- 


Regards
Hemalatha


Re: Scala: Perform Unit Testing in spark

2016-04-02 Thread Ted Yu
I think you should specify dependencies in this way:

*"org.apache.spark" % "spark-core_2.10" % "1.6.0"* % "tests"

Please refer to http://www.scalatest.org/user_guide/using_scalatest_with_sbt

On Fri, Apr 1, 2016 at 3:33 PM, Shishir Anshuman 
wrote:

> When I added *"org.apache.spark" % "spark-core_2.10" % "1.6.0",  *it
> should include spark-core_2.10-1.6.1-tests.jar.
> Why do I need to use the jar file explicitly?
>
> And how do I use the jars for compiling with *sbt* and running the tests
> on spark?
>
>
> On Sat, Apr 2, 2016 at 3:46 AM, Ted Yu  wrote:
>
>> You need to include the following jars:
>>
>> jar tvf ./core/target/spark-core_2.10-1.6.1-tests.jar | grep SparkFunSuite
>>   1787 Thu Mar 03 09:06:14 PST 2016
>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>>   1780 Thu Mar 03 09:06:14 PST 2016
>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>>   3982 Thu Mar 03 09:06:14 PST 2016 org/apache/spark/SparkFunSuite.class
>>
>> jar tvf ./mllib/target/spark-mllib_2.10-1.6.1-tests.jar | grep
>> MLlibTestSparkContext
>>   1447 Thu Mar 03 09:53:54 PST 2016
>> org/apache/spark/mllib/util/MLlibTestSparkContext.class
>>   1704 Thu Mar 03 09:53:54 PST 2016
>> org/apache/spark/mllib/util/MLlibTestSparkContext$class.class
>>
>> On Fri, Apr 1, 2016 at 3:07 PM, Shishir Anshuman <
>> shishiranshu...@gmail.com> wrote:
>>
>>> I got the file ALSSuite.scala and trying to run it. I have copied the
>>> file under *src/test/scala *in my project folder. When I run *sbt test*,
>>> I get errors. I have attached the screenshot of the errors. Befor *sbt
>>> test*, I am building the package with *sbt package*.
>>>
>>> Dependencies of *simple.sbt*:
>>>




 *libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.10" %
 "1.6.0", "org.apache.spark" % "spark-mllib_2.10" % "1.6.0" )*
>>>
>>>
>>>
>>>
>>> On Sat, Apr 2, 2016 at 2:21 AM, Ted Yu  wrote:
>>>
 Assuming your code is written in Scala, I would suggest using
 ScalaTest.

 Please take a look at the XXSuite.scala files under mllib/

 On Fri, Apr 1, 2016 at 1:31 PM, Shishir Anshuman <
 shishiranshu...@gmail.com> wrote:

> Hello,
>
> I have a code written in scala using Mllib. I want to perform unit
> testing it. I cant decide between Junit 4 and ScalaTest.
> I am new to Spark. Please guide me how to proceed with the testing.
>
> Thank you.
>


>>>
>>
>


Re: spark 1.5.2 - value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey, myData)]

2016-04-02 Thread Nirav Patel
In second class I re-declared following and compile error went away. Your
soln worked too.

 implicit val rowKeyOrdering = rowKeyOrd

Thanks
Nirav



On Wed, Mar 30, 2016 at 7:36 PM, Ted Yu  wrote:

> Have you tried the following construct ?
>
> new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey()
>
> See core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
>
> On Wed, Mar 30, 2016 at 5:20 AM, Nirav Patel 
> wrote:
>
>> Hi, I am trying to use filterByRange feature of spark OrderedRDDFunctions
>> in a hope that it will speed up filtering by scanning only required
>> partitions.
>> I have created Paired RDD with a RangePartitioner in one scala class and
>> in another class I am trying to access this RDD and do following:
>>
>> In first scala class called RDDInitializer  I do:
>>
>>  implicit val rowKeyOrdering = rowKeyOrd
>>
>> val repartitionRdd = rowdataRdd.partitionBy(new RangePartitioner(
>> minPartitions.toInt, dataRdd, true))
>>
>> dataRdd  = repartitionRdd.sortByKey()
>>
>>
>> In second scala class I do:
>>
>> import org.apache.spark.SparkContext._
>>
>> RDDInitializer.dataRdd.filterByRange(myKeyFirst, myKeyLast)
>> But I am getting following compile error:
>>
>> "value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey,
>> myData)]"
>>
>>
>> Looks like I can use all methods of OrderedRDDFunctions inside first
>> scala class where implicit rowKeyOrdering is defined but not in second
>> class.
>>
>>
>> Please help me resolve this compile error.
>>
>>
>> Thanks
>>
>> Nirav
>>
>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Multiple lookups; consolidate result and run further aggregations

2016-04-02 Thread Nirav Patel
I will start by question: Is spark lookup function on pair rdd is a driver
action. ie result is returned to driver?

I have list of Keys on driver side and I want to perform multiple parallel
lookups on pair rdd which returns Seq[V]; consolidate results; and perform
further aggregation/transformation over cluster.

val seqVal = lookupKeys.flatMap(key => {

dataRdd.lookup(key)

  })


Here's what I think will happen internally:

lookup up for Seq[V]  return result to driver

Consolidation of each Seq[v] has to happen on driver due to flatMap function

All subsequent operation will happen on driver side unless I do
sparkContext.parallelize(seqVal)

Is this correct?

Also, what I am trying to do is efficient multiple lookup. Another option
is to broadcast lookup keys and perform join.

Please advice.

Thanks
Nirav

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: spark-shell with different username

2016-04-02 Thread Sebastian YEPES FERNANDEZ
Matt, have you tried using the parameter  --*proxy*-*user* matt
On Apr 2, 2016 8:17 AM, "Mich Talebzadeh"  wrote:

> Matt,
>
> What OS are you using on your laptop? Sounds like Ubuntu or something?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 2 April 2016 at 01:17, Matt Tenenbaum 
> wrote:
>
>> Hello all —
>>
>> tl;dr: I’m having an issue running spark-shell from my laptop (or other
>> non-cluster-affiliated machine), and I think the issue boils down to
>> usernames. Can I convince spark/scala that I’m someone other than $USER?
>>
>> A bit of background: our cluster is CDH 5.4.8, installed with Cloudera
>> Manager 5.5. We use LDAP, and my login on all hadoop-affiliated machines
>> (including the gateway boxes we use for running scheduled work) is
>> ‘matt.tenenbaum’. When I run spark-shell on one of those machines,
>> everything is fine:
>>
>> [matt.tenenbaum@remote-machine ~]$ HADOOP_CONF_DIR=/etc/hadoop/conf 
>> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
>> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>>
>> Everything starts up correctly, I get a scala prompt, the SparkContext
>> and SQL context are correctly initialized, and I’m off to the races:
>>
>> 16/04/01 23:27:00 INFO session.SessionState: Created local directory: 
>> /tmp/35b58974-dad5-43c6-9864-43815d101ca0_resources
>> 16/04/01 23:27:00 INFO session.SessionState: Created HDFS directory: 
>> /tmp/hive/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0
>> 16/04/01 23:27:00 INFO session.SessionState: Created local directory: 
>> /tmp/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0
>> 16/04/01 23:27:00 INFO session.SessionState: Created HDFS directory: 
>> /tmp/hive/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0/_tmp_space.db
>> 16/04/01 23:27:00 INFO repl.SparkILoop: Created sql context (with Hive 
>> support)..
>> SQL context available as sqlContext.
>>
>> scala> 1 + 41
>> res0: Int = 42
>>
>> scala> sc
>> res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4e9bd2c8
>>
>> I am running 1.6 from a downloaded tgz file, rather than the spark-shell
>> made available to the cluster from CDH. I can copy that tgz to my laptop,
>> and grab a copy of the cluster configurations, and in a perfect world I
>> would then be able to run everything in the same way
>>
>> [matt@laptop ~]$ HADOOP_CONF_DIR=path/to/hadoop/conf 
>> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
>> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>>
>> Notice there are two things that are different:
>>
>>1. My local username on my laptop is ‘matt’, which does not match my
>>name on the remote machine.
>>2. The Hadoop configs live somewhere other than /etc/hadoop/conf
>>
>> Alas, #1 proves fatal because of cluster permissions (there is no
>> /user/matt/ in HDFS, and ‘matt’ is not a valid LDAP user). In the
>> initialization logging output, I can see that fail in an expected way:
>>
>> 16/04/01 16:37:19 INFO yarn.Client: Setting up container launch context for 
>> our AM
>> 16/04/01 16:37:19 INFO yarn.Client: Setting up the launch environment for 
>> our AM container
>> 16/04/01 16:37:19 INFO yarn.Client: Preparing resources for our AM container
>> 16/04/01 16:37:20 WARN util.NativeCodeLoader: Unable to load native-hadoop 
>> library for your platform... using builtin-java classes where applicable
>> 16/04/01 16:37:21 ERROR spark.SparkContext: Error initializing SparkContext.
>> org.apache.hadoop.security.AccessControlException: Permission denied: 
>> user=matt, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
>> at 
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>> at 
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>> at (... etc ...)
>>
>> Fine. In other circumstances I’ve told Hadoop explicitly who I am by
>> setting HADOOP_USER_NAME. Maybe that works here?
>>
>> [matt@laptop ~]$ HADOOP_USER_NAME=matt.tenenbaum HADOOP_CONF_DIR=soma-conf 
>> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
>> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>>
>> Eventually that fails too, but not for the same reason. Setting
>> HADOOP_USER_NAME is sufficient to allow initialization to get past the
>> access-control problems, and I can see it request a new application from
>> the cluster
>>
>> 16/04/01 16:43:08 INFO yarn.Client: Will allocate AM container, with 896 MB 
>> memory including 384 MB overhead
>> 16/04/01 16:43:08 INFO yarn.Client: Setting up container launch context for 
>> our AM
>> 16/04/01 16:43:08 INFO yarn.Client: Setting up the launch 

Re: How to efficiently Scan (not filter nor lookup) part of Paird RDD or Ordered RDD

2016-04-02 Thread Nirav Patel
@IIya Ganellin, not sure how zipWithIndex() will do less then O(n) scan.
Spark doc doesnt mention anything about it.

I found solution with spark 1.5.2 OrderedRDDFunctions. It has filterByRange
api.

Thanks

On Sun, Jan 24, 2016 at 10:27 PM, Sonal Goyal  wrote:

> One thing you can also look at is to save your data in a way that can be
> accessed through file patterns. Eg by hour, zone etc so that you only load
> what you need.
> On Jan 24, 2016 10:00 PM, "Ilya Ganelin"  wrote:
>
>> The solution I normally use is to zipWithIndex() and then use the filter
>> operation. Filter is an O(m) operation where m is the size of your
>> partition, not an O(N) operation.
>>
>> -Ilya Ganelin
>>
>> On Sat, Jan 23, 2016 at 5:48 AM, Nirav Patel 
>> wrote:
>>
>>> Problem is I have RDD of about 10M rows and it keeps growing. Everytime
>>> when we want to perform query and compute on subset of data we have to use
>>> filter and then some aggregation. Here I know filter goes through each
>>> partitions and every rows of RDD which may not be efficient at all.
>>>
>>> Spark having Ordered RDD functions I dont see why it's so difficult to
>>> implement such function. Cassandra/Hbase has it for years where they can
>>> fetch data only from certain partitions based on your rowkey. Scala TreeMap
>>> has Range function to do the same.
>>>
>>> I think people have been looking for this for while. I see several post
>>> asking this.
>>>
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-td20170.html#a26048
>>>
>>> By the way, I assume there
>>> Thanks
>>> Nirav
>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] 
>>>
>>>   [image: LinkedIn]
>>>   [image: Twitter]
>>>   [image: Facebook]
>>>   [image: YouTube]
>>> 
>>
>>
>>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-02 Thread Walid Lezzar
This is great ! Hope this jira will be resolved for the next version of spark

Thanks.

> Le 2 avr. 2016 à 01:07, Saisai Shao  a écrit :
> 
> There's a JIRA (https://issues.apache.org/jira/browse/SPARK-14151) about it, 
> please take a look.
> 
> Thanks
> Saisai
> 
>> On Sat, Apr 2, 2016 at 6:48 AM, Walid Lezzar  wrote:
>> Hi,
>> 
>> I looked into the spark code at how spark report metrics using the 
>> MetricsSystem class. I've seen that the spark MetricsSystem class when 
>> instantiated parses the metrics.properties file, tries to find the sinks 
>> class name and load them dinamically. It would be great to implement my own 
>> sink by inheriting from the org.apache.spark.metrics.sinks.Sink class but 
>> unfortunately, this class has been declared private[spark] ! So it is not 
>> possible to inverit from it ! Why is that ? Is this gonna change in future 
>> spark versions ?
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: spark-shell with different username

2016-04-02 Thread Mich Talebzadeh
Matt,

What OS are you using on your laptop? Sounds like Ubuntu or something?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 2 April 2016 at 01:17, Matt Tenenbaum  wrote:

> Hello all —
>
> tl;dr: I’m having an issue running spark-shell from my laptop (or other
> non-cluster-affiliated machine), and I think the issue boils down to
> usernames. Can I convince spark/scala that I’m someone other than $USER?
>
> A bit of background: our cluster is CDH 5.4.8, installed with Cloudera
> Manager 5.5. We use LDAP, and my login on all hadoop-affiliated machines
> (including the gateway boxes we use for running scheduled work) is
> ‘matt.tenenbaum’. When I run spark-shell on one of those machines,
> everything is fine:
>
> [matt.tenenbaum@remote-machine ~]$ HADOOP_CONF_DIR=/etc/hadoop/conf 
> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>
> Everything starts up correctly, I get a scala prompt, the SparkContext and
> SQL context are correctly initialized, and I’m off to the races:
>
> 16/04/01 23:27:00 INFO session.SessionState: Created local directory: 
> /tmp/35b58974-dad5-43c6-9864-43815d101ca0_resources
> 16/04/01 23:27:00 INFO session.SessionState: Created HDFS directory: 
> /tmp/hive/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0
> 16/04/01 23:27:00 INFO session.SessionState: Created local directory: 
> /tmp/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0
> 16/04/01 23:27:00 INFO session.SessionState: Created HDFS directory: 
> /tmp/hive/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0/_tmp_space.db
> 16/04/01 23:27:00 INFO repl.SparkILoop: Created sql context (with Hive 
> support)..
> SQL context available as sqlContext.
>
> scala> 1 + 41
> res0: Int = 42
>
> scala> sc
> res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4e9bd2c8
>
> I am running 1.6 from a downloaded tgz file, rather than the spark-shell
> made available to the cluster from CDH. I can copy that tgz to my laptop,
> and grab a copy of the cluster configurations, and in a perfect world I
> would then be able to run everything in the same way
>
> [matt@laptop ~]$ HADOOP_CONF_DIR=path/to/hadoop/conf 
> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>
> Notice there are two things that are different:
>
>1. My local username on my laptop is ‘matt’, which does not match my
>name on the remote machine.
>2. The Hadoop configs live somewhere other than /etc/hadoop/conf
>
> Alas, #1 proves fatal because of cluster permissions (there is no
> /user/matt/ in HDFS, and ‘matt’ is not a valid LDAP user). In the
> initialization logging output, I can see that fail in an expected way:
>
> 16/04/01 16:37:19 INFO yarn.Client: Setting up container launch context for 
> our AM
> 16/04/01 16:37:19 INFO yarn.Client: Setting up the launch environment for our 
> AM container
> 16/04/01 16:37:19 INFO yarn.Client: Preparing resources for our AM container
> 16/04/01 16:37:20 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> 16/04/01 16:37:21 ERROR spark.SparkContext: Error initializing SparkContext.
> org.apache.hadoop.security.AccessControlException: Permission denied: 
> user=matt, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
> at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
> at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
> at (... etc ...)
>
> Fine. In other circumstances I’ve told Hadoop explicitly who I am by
> setting HADOOP_USER_NAME. Maybe that works here?
>
> [matt@laptop ~]$ HADOOP_USER_NAME=matt.tenenbaum HADOOP_CONF_DIR=soma-conf 
> SPARK_HOME=spark-1.6.0-bin-hadoop2.6 
> spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
>
> Eventually that fails too, but not for the same reason. Setting
> HADOOP_USER_NAME is sufficient to allow initialization to get past the
> access-control problems, and I can see it request a new application from
> the cluster
>
> 16/04/01 16:43:08 INFO yarn.Client: Will allocate AM container, with 896 MB 
> memory including 384 MB overhead
> 16/04/01 16:43:08 INFO yarn.Client: Setting up container launch context for 
> our AM
> 16/04/01 16:43:08 INFO yarn.Client: Setting up the launch environment for our 
> AM container
> 16/04/01 16:43:08 INFO yarn.Client: Preparing resources for our AM container
> ... [resource uploads happen here] ...
> 16/04/01 16:46:16 INFO spark.SecurityManager: Changing view acls to: 
> matt,matt.tenenbaum
> 16/04/01 

Spark streaming rawSocketStream with protobuf

2016-04-02 Thread lokeshkumar
I am trying the spark streaming and listening to a socket, I am using the
rawSocketStream method to create a receiver and a DStream. But when I print
the DStream I get the below exception.*Code to create a
DStream:*JavaSparkContext jsc = new JavaSparkContext("Master",
"app");JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
Seconds(3));JavaReceiverInputDStream rawStream =
jssc.rawSocketStream("localhost", );log.info(tracePrefix + "Created the
stream ...");rawStream.print();jssc.start();jssc.awaitTermination();*Code to
send a protobug object over TCP connection:*FileInputStream input = new
FileInputStream("address_book");AddressBook book =
AddressBookProtos.AddressBook.parseFrom(input);log.info(tracePrefix + "Size
of contacts: " + book.getPersonList().size());ServerSocket serverSocket =
new ServerSocket();log.info(tracePrefix + "Waiting for connections
...");Socket s1 = serverSocket.accept();log.info(tracePrefix + "Accepted a
connection ...");while(true) {Thread.sleep(3000);ObjectOutputStream
out = new ObjectOutputStream(s1.getOutputStream());   
out.writeByte(book.getSerializedSize());out.write(book.toByteArray());   
out.flush();log.info(tracePrefix + "Written to new socket");}*Stacktrace
is shown below:*java.lang.IllegalArgumentExceptionat
java.nio.ByteBuffer.allocate(ByteBuffer.java:334)at
org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
   
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
   
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
   
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
   
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
   
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)   
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)   
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)at
org.apache.spark.scheduler.Task.run(Task.scala:89)at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
  
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
  
at java.lang.Thread.run(Thread.java:745)2016-04-02 07:45:47,607 ERROR
[Executor task launch worker-0]
org.apache.spark.streaming.receiver.ReceiverSupervisorImplStopped receiver
with error: java.lang.IllegalArgumentException2016-04-02 07:45:47,613 ERROR
[Executor task launch worker-0] org.apache.spark.executor.ExecutorException
in task 0.0 in stage 0.0 (TID 0)java.lang.IllegalArgumentExceptionat
java.nio.ByteBuffer.allocate(ByteBuffer.java:334)at
org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
   
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
   
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
   
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
   
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
   
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)   
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)   
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)at
org.apache.spark.scheduler.Task.run(Task.scala:89)at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
  
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
  
at java.lang.Thread.run(Thread.java:745)2016-04-02 07:45:47,646 ERROR
[task-result-getter-0] org.apache.spark.scheduler.TaskSetManagerTask 0 in
stage 0.0 failed 1 times; aborting job2016-04-02 07:45:47,656 ERROR
[submit-job-thread-pool-0]
org.apache.spark.streaming.scheduler.ReceiverTrackerReceiver has been
stopped. Try to restart it.org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 0.0 (TID 0, localhost):
java.lang.IllegalArgumentExceptionat
java.nio.ByteBuffer.allocate(ByteBuffer.java:334)at
org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88)
   
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
   
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
   
at