Sorting the RDD

2016-03-02 Thread Angel Angel
Hello Sir/Madam,

I am try to sort the RDD using *sortByKey* function but i am getting the
following error.


My code is
1) convert the rdd array into key value pair.
2) after that sort by key

but i am getting the error *No implicit Ordering defined for any *

 [image: Inline image 1]



thanks


Re: select count(*) return wrong row counts

2016-03-02 Thread Mich Talebzadeh
This works fine

scala> sql("use oraclehadoop")
res1: org.apache.spark.sql.DataFrame = [result: string]
scala> sql("select count(1) from sales").show
+---+
|_c0|
+---+
|4991761|
+---+

You can do "select count(*) from tablename") as it is not dynamic sql. Does
it actually work?

Since count(*) returns one value you don't really need to map it. A simple
show() will work.

Alternatively

scala> var sqltext : String = ""
sqltext: String = ""
scala> sqltext = "select count(*) from sales"
sqltext: String = select count(*) from sales
scala> sql(sqltext)
res3: org.apache.spark.sql.DataFrame = [_c0: bigint]


HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 3 March 2016 at 02:38, Jesse F Chen  wrote:

> I am finding a strange issue with Spark SQL where "select count(*) "
> returns wrong row counts for certain tables.
>
> I am using TPCDS tables, so here are the actual counts:
>
>
> Row counts in
> raw generated files Spark SQL tables count(*) Parquet Spark SQL tables
> count(*) Text
> call_center
> 6
> 0
> 0
> catalog_page
> 11718
> 11515
> 11515
> catalog_returns
> 144067
> 138352
> 138352
> catalog_sales
> 1441548
> 1427257
> 1427257
> customer
> 10
> 93063
> 93063
> customer_address
> 5
> 48444
> 48444
> customer_demographics
> 1920800
> 1920800
> 1920800
> date_dim
> 73049
> 73049
> 73049
> household_demographics
> 7200
> 7200
> 7200
> income_band
> 20
> 20
> 20
> inventory
> 11745000
> 11158087
> 11158087
> item
> 18000
> 17917
> 17917
> promotion
> 300
> 289
> 289
> reason
> 35
> 35
> 35
> ship_mode
> 20
> 20
> 20
> store
> 12
> 3
> 3
> store_returns
> 287514
> 267471
> 267471
> store_sales
> 2880404
> 2620573
> 2620573
> time_dim
> 86400
> 86400
> 86400
> warehouse
> 5
> 4
> 4
> web_page
> 60
> 21
> 21
> web_returns
> 71763
> 65384
> 65384
> web_sales
> 719384
> 719025
> 719025
> web_site
> 30
> 25
> 25
>
> call_center returned 0 count :(
>
> The code used to do the count is fairly simple:
>
> df.registerTempTable(tablename)
> println("registered tempTable")
> val rc=sqlContext.sql("select count(*) from "+tablename)
> rc.map(t => "row count table "+tablename+": "+
> t(0)).collect().foreach(println)
>
> This made many tpcds-derived queries return WRONG results.
>
> Wanted to know if anything key is missing here.
>
> Jesse
>
>
> *JESSE CHEN*
> Big Data Performance | IBM Analytics
>
> Office: 408 463 2296
> Mobile: 408 828 9068
> Email: jfc...@us.ibm.com
>
>


Re: Spark on Yarn with Dynamic Resource Allocation. Container always marked as failed

2016-03-02 Thread Xiaoye Sun
Hi Jeff and Prabhu,

Thanks for your help.

I look deep in the nodemanager log and I found that I have a error message
like this:
2016-03-02 03:13:59,692 ERROR
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver: error
opening leveldb file
file:/data/yarn/cache/yarn/nm-local-dir/registeredExecutors.ldb
.
Creating new file, will not be able to recover state for existing
applications

This error message is also reported in the following jira ticket.
https://issues.apache.org/jira/browse/SPARK-13622

I reason for this problem is that in core-site.xml, I set hadoop.tmp.dir as
follows:

 hadoop.tmp.dir
 file:/home/xs6/hadoop-2.7.1/tmp


I solve the problem by remove "file:" from the value fields.

Thanks!

Xiaoye


On Wed, Mar 2, 2016 at 10:02 PM, Prabhu Joseph 
wrote:

> Is all NodeManager services restarted after the change in yarn-site.xml
>
> On Thu, Mar 3, 2016 at 6:00 AM, Jeff Zhang  wrote:
>
>> The executor may fail to start. You need to check the executor logs, if
>> there's no executor log then you need to check node manager log.
>>
>> On Wed, Mar 2, 2016 at 4:26 PM, Xiaoye Sun  wrote:
>>
>>> Hi all,
>>>
>>> I am very new to spark and yarn.
>>>
>>> I am running a BroadcastTest example application using spark 1.6.0 and
>>> Hadoop/Yarn 2.7.1. in a 5 nodes cluster.
>>>
>>> I configured my configuration files according to
>>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>>
>>> 1. copy
>>> ./spark-1.6.0/network/yarn/target/scala-2.10/spark-1.6.0-yarn-shuffle.jar
>>> to /hadoop-2.7.1/share/hadoop/yarn/lib/
>>> 2. yarn-site.xml is like this
>>> http://www.owlnet.rice.edu/~xs6/yarn-site.xml
>>> 3. spark-defaults.conf is like this
>>> http://www.owlnet.rice.edu/~xs6/spark-defaults.conf
>>> 4. spark-env.sh is like this
>>> http://www.owlnet.rice.edu/~xs6/spark-env.sh
>>> 5. the command I use to submit spark application is: ./bin/spark-submit
>>> --class org.apache.spark.examples.BroadcastTest --master yarn --deploy-mode
>>> cluster ./examples/target/spark-examples_2.10-1.6.0.jar 1 1000 Http
>>>
>>> However, the job is stuck at RUNNING status, and by looking at the log,
>>> I found that the executor is failed/cancelled frequently...
>>> Here is the log output http://www.owlnet.rice.edu/~xs6/stderr
>>> It shows something like
>>>
>>> 16/03/02 02:07:35 WARN yarn.YarnAllocator: Container marked as failed: 
>>> container_1456905762620_0002_01_02 on host: bold-x.rice.edu. Exit 
>>> status: 1. Diagnostics: Exception from container-launch.
>>>
>>>
>>> Is there anybody know what is the problem here?
>>> Best,
>>> Xiaoye
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


Re: Spark sql query taking long time

2016-03-02 Thread Ted Yu
Have you seen the thread 'Filter on a column having multiple values' where
Michael gave this example ?

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/107522969592/2840265927289860/2388bac36e.html

FYI

On Wed, Mar 2, 2016 at 9:33 PM, Angel Angel  wrote:

> Hello Sir/Madam,
>
> I am writing one application using spark sql.
>
> i made the vary big table using the following command
>
> *val dfCustomers1 =
> sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p =>
> Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF*
>
>
> Now i want to search the address(many address)  fields in the table and
> then extends the new table as per the searching.
>
> *var k = dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0)))*
>
>
>
> *for( a <-1 until 1500) {*
>
> * | var temp=
> dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a)))*
>
> * |  k = temp.unionAll(k)*
>
> *}*
>
> *k.show*
>
>
>
>
> But this is taking so long time. So can you suggest me the any optimized
> way, so i can reduce the execution time.
>
>
> My cluster has 3 slaves and 1 master.
>
>
> Thanks.
>


Spark sql query taking long time

2016-03-02 Thread Angel Angel
Hello Sir/Madam,

I am writing one application using spark sql.

i made the vary big table using the following command

*val dfCustomers1 =
sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p =>
Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF*


Now i want to search the address(many address)  fields in the table and
then extends the new table as per the searching.

*var k = dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0)))*



*for( a <-1 until 1500) {*

* | var temp=
dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a)))*

* |  k = temp.unionAll(k)*

*}*

*k.show*




But this is taking so long time. So can you suggest me the any optimized
way, so i can reduce the execution time.


My cluster has 3 slaves and 1 master.


Thanks.


Re: Fair scheduler pool details

2016-03-02 Thread Mark Hamstra
If I'm understanding you correctly, then you are correct that the fair
scheduler doesn't currently do everything that you want to achieve.  Fair
scheduler pools currently can be configured with a minimum number of cores
that they will need before accepting Tasks, but there isn't a way to
restrict a pool to use no more than a certain number of cores.  That means
that a lower-priority pool can grab all of the cores as long as there is no
demand on high-priority pools, and then the higher-priority pools will have
to wait for the lower-priority pool to complete Tasks before the
higher-priority pools will be able to run Tasks.  That means that fair
scheduling pools really aren't a sufficient means to satisfy multi-tenancy
requirements or other scenarios where you want a guarantee that there will
always be some cores available to run a high-priority job.  There is a JIRA
issue and a PR out there to address some of this issue, and I've been
starting to come around to the notion that we should support a max cores
configuration for fair scheduler pools, but there is nothing like that
available right now.  Neither is there a way at the application level in a
standalone-mode cluster for one application to pre-empt another in order to
acquires its cores or other resources.  YARN does provide some support for
that, and Mesos may as well, so that is the closest option that I think
currently exists to satisfy your requirement.

On Wed, Mar 2, 2016 at 6:20 PM, Eugene Morozov 
wrote:

> Mark,
>
> I'm trying to configure spark cluster to share resources between two pools.
>
> I can do that by assigning minimal shares (it works fine), but that means
> specific amount of cores is going to be wasted by just being ready to run
> anything. While that's better, than nothing, I'd like to specify percentage
> of cores instead of specific number of cores as cluster might be changed in
> size either up or down. Is there such an option?
>
> Also I haven't found anything about sort of preemptive scheduler for
> standalone deployment (it is slightly mentioned in SPARK-9882, but it seems
> to be abandoned). Do you know if there is such an activity?
>
> --
> Be well!
> Jean Morozov
>
> On Sun, Feb 21, 2016 at 4:32 AM, Mark Hamstra 
> wrote:
>
>> It's 2 -- and it's pretty hard to point to a line of code, a method, or
>> even a class since the scheduling of Tasks involves a pretty complex
>> interaction of several Spark components -- mostly the DAGScheduler,
>> TaskScheduler/TaskSchedulerImpl, TaskSetManager, Schedulable and Pool, as
>> well as the SchedulerBackend (CoarseGrainedSchedulerBackend in this case.)
>>  The key thing to understand, though, is the comment at the top of
>> SchedulerBackend.scala: "A backend interface for scheduling systems that
>> allows plugging in different ones under TaskSchedulerImpl. We assume a
>> Mesos-like model where the application gets resource offers as machines
>> become available and can launch tasks on them."  In other words, the whole
>> scheduling system is built on a model that starts with offers made by
>> workers when resources are available to run Tasks.  Other than the big
>> hammer of canceling a Job while interruptOnCancel is true, there isn't
>> really any facility for stopping or rescheduling Tasks that are already
>> started, so that rules out your option 1.  Similarly, option 3 is out
>> because the scheduler doesn't know when Tasks will complete; it just knows
>> when a new offer comes in and it is time to send more Tasks to be run on
>> the machine making the offer.
>>
>> What actually happens is that the Pool with which a Job is associated
>> maintains a queue of TaskSets needing to be scheduled.  When in
>> resourceOffers the TaskSchedulerImpl needs sortedTaskSets, the Pool
>> supplies those from its scheduling queue after first sorting it according
>> to the Pool's taskSetSchedulingAlgorithm.  In other words, what Spark's
>> fair scheduling does in essence is, in response to worker resource offers,
>> to send new Tasks to be run; those Tasks are taken in sets from the queue
>> of waiting TaskSets, sorted according to a scheduling algorithm.  There is
>> no pre-emption or rescheduling of Tasks that the scheduler has already sent
>> to the workers, nor is there any attempt to anticipate when already running
>> Tasks will complete.
>>
>>
>> On Sat, Feb 20, 2016 at 4:14 PM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to understand how this thing works underneath. Let's say I
>>> have two types of jobs - high important, that might use small amount of
>>> cores and has to be run pretty fast. And less important, but greedy - uses
>>> as many cores as available. So, the idea is to use two corresponding pools.
>>>
>>> Then thing I'm trying to understand is the following.
>>> I use standalone spark deployment (no YARN, no Mesos).
>>> Let's say that less important took all the cores and then someone 

Re: Using netlib-java in Spark 1.6 on linux

2016-03-02 Thread Sean Owen
This is really more a netlib question, but I'd guess strongly that you
haven't installed libgfortran on your machines. OS X doesn't need it;
netlib can't provide it though.

On Thu, Mar 3, 2016 at 1:06 AM, cindymc  wrote:
> I want to take advantage of the Breeze linear algebra libraries, built on
> netlib-java, used heavily by SparkML. I've found this amazingly
> time-consuming to figure out, and have only been able to do so on MacOS.  I
> want to do same on Linux:
>
> $ uname -a
> Linux slc10whv 3.8.13-68.3.4.el6uek.x86_64 #2 SMP Tue Jul 14 15:03:36 PDT
> 2015 x86_64 x86_64 x86_64 GNU/Linux
>
> This is for Spark 1.6.
>
> For MacOS, I was able to find the *.jars in the .ivy2 cache and add them to
> a combination of system and application classpaths.
>
> For Linux, I've downloaded the Spark 1.6 source and compiled with sbt like
> this:
> sbt/sbt -Pyarn -DskipTests=true -Phadoop-2.3 -Dhadoop.version=2.6.0
> -Pnetlib-lgpl clean update assembly package
>
> This gives me 'spark-assembly-1.6.0-hadoop2.6.0.jar' that appears to contain
> the *.so libs I need.  As an example:  netlib-native_ref-linux-x86_64.so
>
> Now I want to compile and package my application so it picks these
> netlib-java classes up at runtime.  Here's the command I'm using:
>
> spark-submit --properties-file project-defaults.conf --class
> "main.scala.SparkLDADemo" --jars
> lib/stanford-corenlp-3.6.0.jar,lib/stanford-corenlp-3.6.0-models.jar,/scratch/cmcmulle/programs/spark/spark-1.6.0/assembly/target/scala-2.10/spark-assembly-1.6.0-hadoop2.6.0.jar
> target/scala-2.10/sparksql-demo_2.10-1.0.jar
>
> Still, I get the dreaded:
> "16/03/02 16:49:21 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
> 16/03/02 16:49:21 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS"
>
> Can someone please tell me how to build/configure/run a standalone SparkML
> application using spark-submit such that it is able to load/use the
> netlib-java classes?
>
> Thanks --
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-netlib-java-in-Spark-1-6-on-linux-tp26386.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5 on Mesos

2016-03-02 Thread Tim Chen
You shouldn't need to specify --jars at all since you only have one jar.

The error is pretty odd as it suggests it's trying to load
/opt/spark/Example but that doesn't really seem to be anywhere in your
image or command.

Can you paste your stdout from the driver task launched by the cluster
dispatcher, that shows you the spark-submit command it eventually ran?


Tim



On Wed, Mar 2, 2016 at 5:42 PM, Ashish Soni  wrote:

> See below  and Attached the Dockerfile to build the spark image  ( between
> i just upgraded to 1.6 )
>
> I am running below setup -
>
> Mesos Master - Docker Container
> Mesos Slave 1 - Docker Container
> Mesos Slave 2 - Docker Container
> Marathon - Docker Container
> Spark MESOS Dispatcher - Docker Container
>
> when i submit the Spark PI Example Job using below command
>
> *docker run -it --rm -m 2g -e SPARK_MASTER="mesos://10.0.2.15:7077
> "  -e SPARK_IMAGE="spark_driver:**latest"
> spark_driver:latest ./bin/spark-submit  --deploy-mode cluster --name "PI
> Example" --class org.apache.spark.examples.**SparkPi
> http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
>  --jars
> /opt/spark/lib/spark-examples-**1.6.0-hadoop2.6.0.jar --verbose*
>
> Below is the ERROR
> Error: Cannot load main class from JAR file:/opt/spark/Example
> Run with --help for usage help or --verbose for debug output
>
>
> When i docker Inspect for the stopped / dead container i see below output
> what is interesting to see is some one or executor replaced by original
> command with below in highlighted and i do not see Executor is downloading
> the JAR -- IS this a BUG i am hitting or not sure if that is supposed to
> work this way and i am missing some configuration
>
> "Env": [
> "SPARK_IMAGE=spark_driver:latest",
> "SPARK_SCALA_VERSION=2.10",
> "SPARK_VERSION=1.6.0",
> "SPARK_EXECUTOR_URI=
> http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz;,
> "MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos-0.25.0.so",
> "SPARK_MASTER=mesos://10.0.2.15:7077",
>
> "SPARK_EXECUTOR_OPTS=-Dspark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/
> libmesos-0.25.0.so -Dspark.jars=
> http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
> -Dspark.mesos.mesosExecutor.cores=0.1 -Dspark.driver.supervise=false -
> Dspark.app.name=PI Example -Dspark.mesos.uris=
> http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
> -Dspark.mesos.executor.docker.image=spark_driver:latest
> -Dspark.submit.deployMode=cluster -Dspark.master=mesos://10.0.2.15:7077
> -Dspark.driver.extraClassPath=/opt/spark/custom/lib/*
> -Dspark.executor.extraClassPath=/opt/spark/custom/lib/*
> -Dspark.executor.uri=
> http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz
> -Dspark.mesos.executor.home=/opt/spark",
> "MESOS_SANDBOX=/mnt/mesos/sandbox",
>
> "MESOS_CONTAINER_NAME=mesos-e47f8d4c-5ee1-4d01-ad07-0d9a03ced62d-S1.43c08f82-e508-4d57-8c0b-fa05bee77fd6",
>
> "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
> "HADOOP_VERSION=2.6",
> "SPARK_HOME=/opt/spark"
> ],
> "Cmd": [
> "-c",
>* "./bin/spark-submit --name PI Example --master
> mesos://10.0.2.15:5050  --driver-cores 1.0
> --driver-memory 1024M --class org.apache.spark.examples.SparkPi
> $MESOS_SANDBOX/spark-examples-1.6.0-hadoop2.6.0.jar --jars
> /opt/spark/lib/spark-examples-1.6.0-hadoop2.6.0.jar --verbose"*
> ],
> "Image": "spark_driver:latest",
>
>
>
>
>
>
>
>
>
>
>
>
> On Wed, Mar 2, 2016 at 5:49 PM, Charles Allen <
> charles.al...@metamarkets.com> wrote:
>
>> @Tim yes, this is asking about 1.5 though
>>
>> On Wed, Mar 2, 2016 at 2:35 PM Tim Chen  wrote:
>>
>>> Hi Charles,
>>>
>>> I thought that's fixed with your patch in latest master now right?
>>>
>>> Ashish, yes please give me your docker image name (if it's in the public
>>> registry) and what you've tried and I can see what's wrong. I think it's
>>> most likely just the configuration of where the Spark home folder is in the
>>> image.
>>>
>>> Tim
>>>
>>> On Wed, Mar 2, 2016 at 2:28 PM, Charles Allen <
>>> charles.al...@metamarkets.com> wrote:
>>>
 Re: Spark on Mesos Warning regarding disk space:
 https://issues.apache.org/jira/browse/SPARK-12330

 That's a spark flaw I encountered on a very regular basis on mesos.
 That and a few other annoyances are fixed in
 https://github.com/metamx/spark/tree/v1.5.2-mmx

 Here's another mild annoyance I've encountered:
 https://issues.apache.org/jira/browse/SPARK-11714

 On Wed, Mar 2, 2016 at 1:31 PM Ashish Soni 
 wrote:

> I have no luck and i would to ask the question to spark committers
> will this be ever designed to run on mesos ?
>
> spark app as a docker 

Re: Building a REST Service with Spark back-end

2016-03-02 Thread Benjamin Kim
I want to ask about something related to this.

Does anyone know if there is or will be a command line equivalent of 
spark-shell client for Livy Spark Server or any other Spark Job Server? The 
reason that I am asking spark-shell does not handle multiple users on the same 
server well. Since a Spark Job Server can generate "sessions" for each user, it 
would be great if this were possible.

Another person in the Livy users group pointed out some advantages.

I think the use case makes complete sense for a number of reasons:
1. You wouldn't need an installation of spark and configs on the gateway machine
2. Since Livy is over HTTP, it'd be easier to run spark-shell in front of a 
firewall
3. Can "connect/disconnect" to sessions similar to screen in linux

Thanks,
Ben

> On Mar 2, 2016, at 1:11 PM, Guru Medasani  wrote:
> 
> Hi Yanlin,
> 
> This is a fairly new effort and is not officially released/supported by 
> Cloudera yet. I believe those numbers will be out once it is released.
> 
> Guru Medasani
> gdm...@gmail.com 
> 
> 
> 
>> On Mar 2, 2016, at 10:40 AM, yanlin wang > > wrote:
>> 
>> Did any one use Livy in real world high concurrency web app? I think it uses 
>> spark submit command line to create job... How about  job server or notebook 
>> comparing with Livy?
>> 
>> Thx,
>> Yanlin
>> 
>> Sent from my iPhone
>> 
>> On Mar 2, 2016, at 6:24 AM, Guru Medasani > > wrote:
>> 
>>> Hi Don,
>>> 
>>> Here is another REST interface for interacting with Spark from anywhere. 
>>> 
>>> https://github.com/cloudera/livy 
>>> 
>>> Here is an example to estimate PI using Spark from Python using requests 
>>> library. 
>>> 
>>> >>> data = {
>>> ...   'code': textwrap.dedent("""\
>>> ...  val NUM_SAMPLES = 10;
>>> ...  val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
>>> ...val x = Math.random();
>>> ...val y = Math.random();
>>> ...if (x*x + y*y < 1) 1 else 0
>>> ...  }.reduce(_ + _);
>>> ...  println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
>>> ...  """)
>>> ... }
>>> >>> r = requests.post(statements_url, data=json.dumps(data), 
>>> >>> headers=headers)
>>> >>> pprint.pprint(r.json())
>>> {u'id': 1,
>>>  u'output': {u'data': {u'text/plain': u'Pi is roughly 3.14004\nNUM_SAMPLES: 
>>> Int = 10\ncount: Int = 78501'},
>>>  u'execution_count': 1,
>>>  u'status': u'ok'},
>>>  u'state': u'available'}
>>> 
>>> 
>>> Guru Medasani
>>> gdm...@gmail.com 
>>> 
>>> 
>>> 
 On Mar 2, 2016, at 7:47 AM, Todd Nist > wrote:
 
 Have you looked at Apache Toree, http://toree.apache.org/ 
 .  This was formerly the Spark-Kernel from IBM 
 but contributed to apache.
 
 https://github.com/apache/incubator-toree 
 
 
 You can find a good overview on the spark-kernel here:
 http://www.spark.tc/how-to-enable-interactive-applications-against-apache-spark/
  
 
 
 Not sure if that is of value to you or not.
 
 HTH.
 
 -Todd
 
 On Tue, Mar 1, 2016 at 7:30 PM, Don Drake > wrote:
 I'm interested in building a REST service that utilizes a Spark SQL 
 Context to return records from a DataFrame (or IndexedRDD?) and even 
 add/update records.
 
 This will be a simple REST API, with only a few end-points.  I found this 
 example:
 
 https://github.com/alexmasselot/spark-play-activator 
 
 
 which looks close to what I am interested in doing.  
 
 Are there any other ideas or options if I want to run this in a YARN 
 cluster?
 
 Thanks.
 
 -Don
 
 -- 
 Donald Drake
 Drake Consulting
 http://www.drakeconsulting.com/ 
 https://twitter.com/dondrake 
 800-733-2143 
>>> 
> 



Stage contains task of large size

2016-03-02 Thread Bijuna
Spark users,

We are running spark application in standalone mode. We see warn messages in 
the logs which says

Stage 46 contains a task of very large size (983 KB) . The maximum recommended 
task size is 100 KB. 

What is the recommended approach to fix this warning. Please let me know.

Thank you
Bijuna

Sent from my iPad
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Renaming sc variable in sparkcontext throws task not serializable

2016-03-02 Thread Prashant Sharma
*This is a known issue. *
https://issues.apache.org/jira/browse/SPARK-3200


Prashant Sharma



On Thu, Mar 3, 2016 at 9:01 AM, Rahul Palamuttam 
wrote:

> Thank you Jeff.
>
> I have filed a JIRA under the following link :
>
> https://issues.apache.org/jira/browse/SPARK-13634
>
> For some reason the spark context is being pulled into the referencing
> environment of the closure.
> I also had no problems with batch jobs.
>
> On Wed, Mar 2, 2016 at 7:18 PM, Jeff Zhang  wrote:
>
>> I can reproduce it in spark-shell. But it works for batch job. Looks like
>> spark repl issue.
>>
>> On Thu, Mar 3, 2016 at 10:43 AM, Rahul Palamuttam > > wrote:
>>
>>> Hi All,
>>>
>>> We recently came across this issue when using the spark-shell and
>>> zeppelin.
>>> If we assign the sparkcontext variable (sc) to a new variable and
>>> reference
>>> another variable in an RDD lambda expression we get a task not
>>> serializable exception.
>>>
>>> The following three lines of code illustrate this :
>>>
>>> val temp = 10
>>> val newSC = sc
>>> val new RDD = newSC.parallelize(0 to 100).map(p => p + temp).
>>>
>>> I am not sure if this is a known issue, or we should file a JIRA for it.
>>> We originally came across this bug in the SciSpark project.
>>>
>>> Best,
>>>
>>> Rahul P
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


Re: Spark on Yarn with Dynamic Resource Allocation. Container always marked as failed

2016-03-02 Thread Prabhu Joseph
Is all NodeManager services restarted after the change in yarn-site.xml

On Thu, Mar 3, 2016 at 6:00 AM, Jeff Zhang  wrote:

> The executor may fail to start. You need to check the executor logs, if
> there's no executor log then you need to check node manager log.
>
> On Wed, Mar 2, 2016 at 4:26 PM, Xiaoye Sun  wrote:
>
>> Hi all,
>>
>> I am very new to spark and yarn.
>>
>> I am running a BroadcastTest example application using spark 1.6.0 and
>> Hadoop/Yarn 2.7.1. in a 5 nodes cluster.
>>
>> I configured my configuration files according to
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>
>> 1. copy
>> ./spark-1.6.0/network/yarn/target/scala-2.10/spark-1.6.0-yarn-shuffle.jar
>> to /hadoop-2.7.1/share/hadoop/yarn/lib/
>> 2. yarn-site.xml is like this
>> http://www.owlnet.rice.edu/~xs6/yarn-site.xml
>> 3. spark-defaults.conf is like this
>> http://www.owlnet.rice.edu/~xs6/spark-defaults.conf
>> 4. spark-env.sh is like this http://www.owlnet.rice.edu/~xs6/spark-env.sh
>> 5. the command I use to submit spark application is: ./bin/spark-submit
>> --class org.apache.spark.examples.BroadcastTest --master yarn --deploy-mode
>> cluster ./examples/target/spark-examples_2.10-1.6.0.jar 1 1000 Http
>>
>> However, the job is stuck at RUNNING status, and by looking at the log, I
>> found that the executor is failed/cancelled frequently...
>> Here is the log output http://www.owlnet.rice.edu/~xs6/stderr
>> It shows something like
>>
>> 16/03/02 02:07:35 WARN yarn.YarnAllocator: Container marked as failed: 
>> container_1456905762620_0002_01_02 on host: bold-x.rice.edu. Exit 
>> status: 1. Diagnostics: Exception from container-launch.
>>
>>
>> Is there anybody know what is the problem here?
>> Best,
>> Xiaoye
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Renaming sc variable in sparkcontext throws task not serializable

2016-03-02 Thread Jeff Zhang
I can reproduce it in spark-shell. But it works for batch job. Looks like
spark repl issue.

On Thu, Mar 3, 2016 at 10:43 AM, Rahul Palamuttam 
wrote:

> Hi All,
>
> We recently came across this issue when using the spark-shell and zeppelin.
> If we assign the sparkcontext variable (sc) to a new variable and reference
> another variable in an RDD lambda expression we get a task not
> serializable exception.
>
> The following three lines of code illustrate this :
>
> val temp = 10
> val newSC = sc
> val new RDD = newSC.parallelize(0 to 100).map(p => p + temp).
>
> I am not sure if this is a known issue, or we should file a JIRA for it.
> We originally came across this bug in the SciSpark project.
>
> Best,
>
> Rahul P
>



-- 
Best Regards

Jeff Zhang


Re: rdd cache name

2016-03-02 Thread charles li
thanks a lot, Xinh, that's very helpful for me.

On Thu, Mar 3, 2016 at 12:54 AM, Xinh Huynh  wrote:

> Hi Charles,
>
> You can set the RDD name before using it. Just do before caching:
> (Scala) myRdd.setName("Charles RDD")
> (Python) myRdd.setName('Charles RDD')
> Reference: PySpark doc:
> http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
>
> Fraction cached is the percentage of partitions of an RDD that are cached.
> From the code:
> (rdd.numCachedPartitions * 100.0 / rdd.numPartitions)
> Code is here:
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
> Fraction cached will be less than 100% if there isn't enough room for all
> cached RDDs to fit in the cache. If it's a problem, you may want to
> increase your in-memory cache size or cache off-heap or to disk.
>
> Xinh
>
> On Wed, Mar 2, 2016 at 1:48 AM, charles li 
> wrote:
>
>> hi, there, I feel a little confused about the *cache* in spark.
>>
>> first, is there any way to *customize the cached RDD name*, it's not
>> convenient for me when looking at the storage page, there are the kind of
>> RDD in the RDD Name column, I hope to make it as my customized name, kinds
>> of 'rdd 1', 'rrd of map', 'rdd of groupby' and so on.
>>
>> second, can some one tell me what exactly the '*Fraction Cached*' mean
>> under the hood?
>>
>> great thanks
>>
>>
>>
>> ​
>>
>> --
>> *--*
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Renaming sc variable in sparkcontext throws task not serializable

2016-03-02 Thread Rahul Palamuttam
Hi All,

We recently came across this issue when using the spark-shell and zeppelin.
If we assign the sparkcontext variable (sc) to a new variable and reference
another variable in an RDD lambda expression we get a task not serializable
exception.

The following three lines of code illustrate this :

val temp = 10
val newSC = sc
val new RDD = newSC.parallelize(0 to 100).map(p => p + temp).

I am not sure if this is a known issue, or we should file a JIRA for it.
We originally came across this bug in the SciSpark project.

Best,

Rahul P


select count(*) return wrong row counts

2016-03-02 Thread Jesse F Chen


I am finding a strange issue with Spark SQL where "select count(*) "
returns wrong row counts for certain tables.

I am using TPCDS tables, so here are the actual counts:


  
Row counts in Spark SQL tables  Spark SQL 
tables  
raw generated count(*) Parquet  count(*) 
Text 
files   
  

  
 call_center6 0 
0 

  
 catalog_page   11718 11515 
11515 

  
 catalog_returns   144067138352
138352 

  
 catalog_sales1441548   1427257   
1427257 

  
 customer  10 93063 
93063 

  
 customer_address   5 48444 
48444 

  
 customer_demographics1920800   1920800   
1920800 

  
 date_dim   73049 73049 
73049 

  
 household_demographics  7200  7200 
 7200 

  
 income_band   2020 
   20 

  
 inventory   11745000  11158087  
11158087 

  
 item   18000 17917 
17917 

  
 promotion300   289 
  289 

  
 reason3535 
   35 

  
 ship_mode 2020 
   20 

  
 store 12 3 
3 

  
 store_returns 287514267471
267471 

  
 store_sales  2880404   2620573   
2620573 

  
 time_dim   86400 86400 
86400 

  
 warehouse  5 4 
4 

  
 web_page  6021 
   21 

  
 web_returns71763 65384 
65384 

  
 web_sales 719384719025
719025 

  
 web_site  3025 
   25 

  



call_center returned 0 count :(

The code used to do the count is fairly simple:

  df.registerTempTable(tablename)
  println("registered tempTable")

Re: Fair scheduler pool details

2016-03-02 Thread Eugene Morozov
Mark,

I'm trying to configure spark cluster to share resources between two pools.

I can do that by assigning minimal shares (it works fine), but that means
specific amount of cores is going to be wasted by just being ready to run
anything. While that's better, than nothing, I'd like to specify percentage
of cores instead of specific number of cores as cluster might be changed in
size either up or down. Is there such an option?

Also I haven't found anything about sort of preemptive scheduler for
standalone deployment (it is slightly mentioned in SPARK-9882, but it seems
to be abandoned). Do you know if there is such an activity?

--
Be well!
Jean Morozov

On Sun, Feb 21, 2016 at 4:32 AM, Mark Hamstra 
wrote:

> It's 2 -- and it's pretty hard to point to a line of code, a method, or
> even a class since the scheduling of Tasks involves a pretty complex
> interaction of several Spark components -- mostly the DAGScheduler,
> TaskScheduler/TaskSchedulerImpl, TaskSetManager, Schedulable and Pool, as
> well as the SchedulerBackend (CoarseGrainedSchedulerBackend in this case.)
>  The key thing to understand, though, is the comment at the top of
> SchedulerBackend.scala: "A backend interface for scheduling systems that
> allows plugging in different ones under TaskSchedulerImpl. We assume a
> Mesos-like model where the application gets resource offers as machines
> become available and can launch tasks on them."  In other words, the whole
> scheduling system is built on a model that starts with offers made by
> workers when resources are available to run Tasks.  Other than the big
> hammer of canceling a Job while interruptOnCancel is true, there isn't
> really any facility for stopping or rescheduling Tasks that are already
> started, so that rules out your option 1.  Similarly, option 3 is out
> because the scheduler doesn't know when Tasks will complete; it just knows
> when a new offer comes in and it is time to send more Tasks to be run on
> the machine making the offer.
>
> What actually happens is that the Pool with which a Job is associated
> maintains a queue of TaskSets needing to be scheduled.  When in
> resourceOffers the TaskSchedulerImpl needs sortedTaskSets, the Pool
> supplies those from its scheduling queue after first sorting it according
> to the Pool's taskSetSchedulingAlgorithm.  In other words, what Spark's
> fair scheduling does in essence is, in response to worker resource offers,
> to send new Tasks to be run; those Tasks are taken in sets from the queue
> of waiting TaskSets, sorted according to a scheduling algorithm.  There is
> no pre-emption or rescheduling of Tasks that the scheduler has already sent
> to the workers, nor is there any attempt to anticipate when already running
> Tasks will complete.
>
>
> On Sat, Feb 20, 2016 at 4:14 PM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm trying to understand how this thing works underneath. Let's say I
>> have two types of jobs - high important, that might use small amount of
>> cores and has to be run pretty fast. And less important, but greedy - uses
>> as many cores as available. So, the idea is to use two corresponding pools.
>>
>> Then thing I'm trying to understand is the following.
>> I use standalone spark deployment (no YARN, no Mesos).
>> Let's say that less important took all the cores and then someone runs
>> high important job. Then I see three possibilities:
>> 1. Spark kill some executors that currently runs less important
>> partitions to assign them to a high performant job.
>> 2. Spark will wait until some partitions of less important job will be
>> completely processed and then first executors that become free will be
>> assigned to process high important job.
>> 3. Spark will figure out specific time, when particular stages of
>> partitions of less important jobs is done, and instead of continue with
>> this job, these executors will be reassigned to high important one.
>>
>> Which one it is? Could you please point me to a class / method / line of
>> code?
>> --
>> Be well!
>> Jean Morozov
>>
>
>


Re: Spark 1.5 on Mesos

2016-03-02 Thread Ashish Soni
See below  and Attached the Dockerfile to build the spark image  ( between
i just upgraded to 1.6 )

I am running below setup -

Mesos Master - Docker Container
Mesos Slave 1 - Docker Container
Mesos Slave 2 - Docker Container
Marathon - Docker Container
Spark MESOS Dispatcher - Docker Container

when i submit the Spark PI Example Job using below command

*docker run -it --rm -m 2g -e SPARK_MASTER="mesos://10.0.2.15:7077
"  -e SPARK_IMAGE="spark_driver:**latest"
spark_driver:latest ./bin/spark-submit  --deploy-mode cluster --name "PI
Example" --class org.apache.spark.examples.**SparkPi
http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
 --jars
/opt/spark/lib/spark-examples-**1.6.0-hadoop2.6.0.jar --verbose*

Below is the ERROR
Error: Cannot load main class from JAR file:/opt/spark/Example
Run with --help for usage help or --verbose for debug output


When i docker Inspect for the stopped / dead container i see below output
what is interesting to see is some one or executor replaced by original
command with below in highlighted and i do not see Executor is downloading
the JAR -- IS this a BUG i am hitting or not sure if that is supposed to
work this way and i am missing some configuration

"Env": [
"SPARK_IMAGE=spark_driver:latest",
"SPARK_SCALA_VERSION=2.10",
"SPARK_VERSION=1.6.0",
"SPARK_EXECUTOR_URI=
http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz;,
"MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos-0.25.0.so",
"SPARK_MASTER=mesos://10.0.2.15:7077",

"SPARK_EXECUTOR_OPTS=-Dspark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/
libmesos-0.25.0.so -Dspark.jars=
http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
-Dspark.mesos.mesosExecutor.cores=0.1 -Dspark.driver.supervise=false -
Dspark.app.name=PI Example -Dspark.mesos.uris=
http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
-Dspark.mesos.executor.docker.image=spark_driver:latest
-Dspark.submit.deployMode=cluster -Dspark.master=mesos://10.0.2.15:7077
-Dspark.driver.extraClassPath=/opt/spark/custom/lib/*
-Dspark.executor.extraClassPath=/opt/spark/custom/lib/*
-Dspark.executor.uri=
http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz
-Dspark.mesos.executor.home=/opt/spark",
"MESOS_SANDBOX=/mnt/mesos/sandbox",

"MESOS_CONTAINER_NAME=mesos-e47f8d4c-5ee1-4d01-ad07-0d9a03ced62d-S1.43c08f82-e508-4d57-8c0b-fa05bee77fd6",

"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"HADOOP_VERSION=2.6",
"SPARK_HOME=/opt/spark"
],
"Cmd": [
"-c",
   * "./bin/spark-submit --name PI Example --master
mesos://10.0.2.15:5050  --driver-cores 1.0
--driver-memory 1024M --class org.apache.spark.examples.SparkPi
$MESOS_SANDBOX/spark-examples-1.6.0-hadoop2.6.0.jar --jars
/opt/spark/lib/spark-examples-1.6.0-hadoop2.6.0.jar --verbose"*
],
"Image": "spark_driver:latest",












On Wed, Mar 2, 2016 at 5:49 PM, Charles Allen  wrote:

> @Tim yes, this is asking about 1.5 though
>
> On Wed, Mar 2, 2016 at 2:35 PM Tim Chen  wrote:
>
>> Hi Charles,
>>
>> I thought that's fixed with your patch in latest master now right?
>>
>> Ashish, yes please give me your docker image name (if it's in the public
>> registry) and what you've tried and I can see what's wrong. I think it's
>> most likely just the configuration of where the Spark home folder is in the
>> image.
>>
>> Tim
>>
>> On Wed, Mar 2, 2016 at 2:28 PM, Charles Allen <
>> charles.al...@metamarkets.com> wrote:
>>
>>> Re: Spark on Mesos Warning regarding disk space:
>>> https://issues.apache.org/jira/browse/SPARK-12330
>>>
>>> That's a spark flaw I encountered on a very regular basis on mesos. That
>>> and a few other annoyances are fixed in
>>> https://github.com/metamx/spark/tree/v1.5.2-mmx
>>>
>>> Here's another mild annoyance I've encountered:
>>> https://issues.apache.org/jira/browse/SPARK-11714
>>>
>>> On Wed, Mar 2, 2016 at 1:31 PM Ashish Soni 
>>> wrote:
>>>
 I have no luck and i would to ask the question to spark committers will
 this be ever designed to run on mesos ?

 spark app as a docker container not working at all on mesos  ,if any
 one would like the code i can send it over to have a look.

 Ashish

 On Wed, Mar 2, 2016 at 12:23 PM, Sathish Kumaran Vairavelu <
 vsathishkuma...@gmail.com> wrote:

> Try passing jar using --jars option
>
> On Wed, Mar 2, 2016 at 10:17 AM Ashish Soni 
> wrote:
>
>> I made some progress but now i am stuck at this point , Please help
>> as looks like i am close to get it working
>>
>> I have everything running in docker container including mesos slave
>> and master
>>
>> When i try to 

Re: Mapper side join with DataFrames API

2016-03-02 Thread Deepak Gopalakrishnan
Hello,

I'm using 1.6.0 on EMR

On Thu, Mar 3, 2016 at 12:34 AM, Yong Zhang  wrote:

> What version of Spark you are using?
>
> I am also trying to figure out how to do the map side join in Spark.
>
> In 1.5.x, there is a broadcast function in the Dataframe, and it caused
> OOM for me simple test case, even one side of join is very small.
>
> I am still trying to find out the root cause yet.
>
> Yong
>
> --
> Date: Wed, 2 Mar 2016 15:38:29 +0530
> Subject: Re: Mapper side join with DataFrames API
> From: dgk...@gmail.com
> To: mich...@databricks.com
> CC: user@spark.apache.org
>
>
> Thanks for the help guys.
>
> Just to ask a part of my question in a little different way.
>
> I have attached my screenshots here. There is so much of memory that is
> unused and yet there is a spill ( as in screenshots). Any idea why ?
>
> Thanks
> Deepak
>
> On Wed, Mar 2, 2016 at 5:14 AM, Michael Armbrust 
> wrote:
>
> Its helpful to always include the output of df.explain(true) when you are
> asking about performance.
>
> On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan 
> wrote:
>
> Hello All,
>
> I'm trying to join 2 dataframes A and B with a
>
> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a");
>
> Now what I have done is that I have registeredTempTables for A and B after
> loading these DataFrames from different sources. I need the join to be
> really fast and I was wondering if there is a way to use the SQL statement
> and then being able to do a mapper side join ( say my table B is small) ?
>
> I read some articles on using broadcast to do mapper side joins. Could I
> do something like this and then execute my sql statement to achieve mapper
> side join ?
>
> DataFrame B = sparkContext.broadcast(B);
> B.registerTempTable("B");
>
>
> I have a join as stated above and I see in my executor logs the below :
>
> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0
> (TID 1114) in 20354 ms on localhost (196/200)
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty
> blocks out of 200 blocks
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
> blocks out of 128 blocks
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID
> 1115). 2511 bytes result sent to driver
> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0
> (TID 1115) in 27621 ms on localhost (197/200)
>
> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort
> data of 256.0 KB to disk (0  time so far)*
>
>
> Now, I have around 10G of executor memory and my memory faction should be
> the default ( 0.75 as per the documentation) and my memory usage is < 1.5G(
> obtained from the Storage tab on Spark dashboard), but still it says
> spilling sort data. I'm a little surprised why this happens even when I
> have enough memory free.
> Any inputs will be greatly appreciated!
> Thanks
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>
>
>
>
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>
> - To
> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>



-- 
Regards,
*Deepak Gopalakrishnan*
*Mobile*:+918891509774
*Skype* : deepakgk87
http://myexps.blogspot.com


Using netlib-java in Spark 1.6 on linux

2016-03-02 Thread cindymc
I want to take advantage of the Breeze linear algebra libraries, built on
netlib-java, used heavily by SparkML. I've found this amazingly
time-consuming to figure out, and have only been able to do so on MacOS.  I
want to do same on Linux:

$ uname -a
Linux slc10whv 3.8.13-68.3.4.el6uek.x86_64 #2 SMP Tue Jul 14 15:03:36 PDT
2015 x86_64 x86_64 x86_64 GNU/Linux

This is for Spark 1.6.

For MacOS, I was able to find the *.jars in the .ivy2 cache and add them to
a combination of system and application classpaths.

For Linux, I've downloaded the Spark 1.6 source and compiled with sbt like
this:
sbt/sbt -Pyarn -DskipTests=true -Phadoop-2.3 -Dhadoop.version=2.6.0
-Pnetlib-lgpl clean update assembly package

This gives me 'spark-assembly-1.6.0-hadoop2.6.0.jar' that appears to contain
the *.so libs I need.  As an example:  netlib-native_ref-linux-x86_64.so

Now I want to compile and package my application so it picks these
netlib-java classes up at runtime.  Here's the command I'm using:

spark-submit --properties-file project-defaults.conf --class
"main.scala.SparkLDADemo" --jars
lib/stanford-corenlp-3.6.0.jar,lib/stanford-corenlp-3.6.0-models.jar,/scratch/cmcmulle/programs/spark/spark-1.6.0/assembly/target/scala-2.10/spark-assembly-1.6.0-hadoop2.6.0.jar
target/scala-2.10/sparksql-demo_2.10-1.0.jar

Still, I get the dreaded:
"16/03/02 16:49:21 WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS
16/03/02 16:49:21 WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeRefBLAS"

Can someone please tell me how to build/configure/run a standalone SparkML
application using spark-submit such that it is able to load/use the
netlib-java classes?

Thanks --



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-netlib-java-in-Spark-1-6-on-linux-tp26386.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does DataFrame.collect() maintain the underlying schema?

2016-03-02 Thread Mohammad Tariq
I think this could be the reason :

DataFrame sorts the column of each record lexicographically if we do a *select
**. So, if we wish to maintain a specific column ordering while processing
we should use do *select col1, col2...* instead of select *.

However, this is just what I feel. Let's wait for comments from the gurus.



[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Thu, Mar 3, 2016 at 5:35 AM, Mohammad Tariq  wrote:

> Cool. Here is it how it goes...
>
> I am reading Avro objects from a Kafka topic as a DStream, converting it
> into a DataFrame so that I can filter out records based on some conditions
> and finally do some aggregations on these filtered records. During the
> process I also need to tag each record based on the value of a particular
> column, and for this I am iterating over Array[Row] returned by
> DataFrame.collect().
>
> I am good as far as these things are concerned. The only thing which I am
> not getting is the reason behind changed column ordering within each Row.
> Say my actual record is [Tariq, IN, APAC]. When I
> do println(row.mkString("~")) it shows [IN~APAC~Tariq].
>
> I hope I was able to explain my use case to you!
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Thu, Mar 3, 2016 at 5:21 AM, Sainath Palla 
> wrote:
>
>> Hi Tariq,
>>
>> Can you tell in brief what kind of operation you have to do? I can try
>> helping you out with that.
>> In general, if you are trying to use any group operations you can use
>> window operations.
>>
>> On Wed, Mar 2, 2016 at 6:40 PM, Mohammad Tariq 
>> wrote:
>>
>>> Hi Sainath,
>>>
>>> Thank you for the prompt response!
>>>
>>> Could you please elaborate your answer a bit? I'm sorry I didn't quite
>>> get this. What kind of operation I can perform using SQLContext? It just
>>> helps us during things like DF creation, schema application etc, IMHO.
>>>
>>>
>>>
>>> [image: http://]
>>>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>> 
>>>
>>>
>>> On Thu, Mar 3, 2016 at 4:59 AM, Sainath Palla 
>>> wrote:
>>>
 Instead of collecting the data frame, you can try using a sqlContext on
 the data frame. But it depends on what kind of operations are you trying to
 perform.

 On Wed, Mar 2, 2016 at 6:21 PM, Mohammad Tariq 
 wrote:

> Hi list,
>
> *Scenario :*
> I am creating a DStream by reading an Avro object from a Kafka topic
> and then converting it into a DataFrame to perform some operations on the
> data. I call DataFrame.collect() and perform the intended operation on 
> each
> Row of Array[Row] returned by DataFrame.collect().
>
> *Problem : *
> Calling DataFrame.collect() changes the schema of the underlying
> record, thus making it impossible to get the columns by index(as the order
> gets changed).
>
> *Query :*
> Is it the way DataFrame.collect() behaves or am I doing something
> wrong here? In former case is there any way I can maintain the schema 
> while
> getting each Row?
>
> Any pointers/suggestions would be really helpful. Many thanks!
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>


>>>
>>
>


Spark job on YARN ApplicationMaster DEBUG log

2016-03-02 Thread Prabhu Joseph
Hi All,

I am trying to add DEBUG for Spark ApplicationMaster for it is not working.

On running Spark job, passed

-Dlog4j.configuration=file:/opt/mapr/spark/spark-1.4.1/conf/log4j.properties

The log4j.properties has log4j.rootCategory=DEBUG, console

Spark Executor Containers has DEBUG logs but not the ApplicationMaster
container.


Re: spark 1.6 new memory management - some issues with tasks not using all executors

2016-03-02 Thread Koert Kuipers
with the locality issue resolved, i am still struggling with the new memory
management.

i am seeing tasks on tiny amounts of data take 15 seconds, of which 14 are
spend in GC. with the legacy memory management (spark.memory.useLegacyMode
= false ) they complete in 1 - 2 seconds.

since we are permanently caching a very large number of RDDs, my suspicion
is that with the new memory management these cached RDDs happily gobble up
all the memory, and need to be evicted to run my small job, leading to the
slowness.

i can revert to legacy memory management mode, so this is not an issue, but
i am worried that at some point the legacy memory management will be
deprecated and then i am stuck with this performance issue.

On Mon, Feb 29, 2016 at 12:47 PM, Koert Kuipers  wrote:

> setting spark.shuffle.reduceLocality.enabled=false worked for me, thanks
>
>
> is there any reference to the benefits of setting reduceLocality to true?
> i am tempted to disable it across the board.
>
> On Mon, Feb 29, 2016 at 9:51 AM, Yin Yang  wrote:
>
>> The default value for spark.shuffle.reduceLocality.enabled is true.
>>
>> To reduce surprise to users of 1.5 and earlier releases, should the
>> default value be set to false ?
>>
>> On Mon, Feb 29, 2016 at 5:38 AM, Lior Chaga  wrote:
>>
>>> Hi Koret,
>>> Try spark.shuffle.reduceLocality.enabled=false
>>> This is an undocumented configuration.
>>> See:
>>> https://github.com/apache/spark/pull/8280
>>> https://issues.apache.org/jira/browse/SPARK-10567
>>>
>>> It solved the problem for me (both with and without memory legacy mode)
>>>
>>>
>>> On Sun, Feb 28, 2016 at 11:16 PM, Koert Kuipers 
>>> wrote:
>>>
 i find it particularly confusing that a new memory management module
 would change the locations. its not like the hash partitioner got replaced.
 i can switch back and forth between legacy and "new" memory management and
 see the distribution change... fully reproducible

 On Sun, Feb 28, 2016 at 11:24 AM, Lior Chaga 
 wrote:

> Hi,
> I've experienced a similar problem upgrading from spark 1.4 to spark
> 1.6.
> The data is not evenly distributed across executors, but in my case it
> also reproduced with legacy mode.
> Also tried 1.6.1 rc-1, with same results.
>
> Still looking for resolution.
>
> Lior
>
> On Fri, Feb 19, 2016 at 2:01 AM, Koert Kuipers 
> wrote:
>
>> looking at the cached rdd i see a similar story:
>> with useLegacyMode = true the cached rdd is spread out across 10
>> executors, but with useLegacyMode = false the data for the cached rdd 
>> sits
>> on only 3 executors (the rest all show 0s). my cached RDD is a key-value
>> RDD that got partitioned (hash partitioner, 50 partitions) before being
>> cached.
>>
>> On Thu, Feb 18, 2016 at 6:51 PM, Koert Kuipers 
>> wrote:
>>
>>> hello all,
>>> we are just testing a semi-realtime application (it should return
>>> results in less than 20 seconds from cached RDDs) on spark 1.6.0. before
>>> this it used to run on spark 1.5.1
>>>
>>> in spark 1.6.0 the performance is similar to 1.5.1 if i set
>>> spark.memory.useLegacyMode = true, however if i switch to
>>> spark.memory.useLegacyMode = false the queries take about 50% to 100% 
>>> more
>>> time.
>>>
>>> the issue becomes clear when i focus on a single stage: the
>>> individual tasks are not slower at all, but they run on less executors.
>>> in my test query i have 50 tasks and 10 executors. both with
>>> useLegacyMode = true and useLegacyMode = false the tasks finish in 
>>> about 3
>>> seconds and show as running PROCESS_LOCAL. however when  useLegacyMode =
>>> false the tasks run on just 3 executors out of 10, while with 
>>> useLegacyMode
>>> = true they spread out across 10 executors. all the tasks running on 
>>> just a
>>> few executors leads to the slower results.
>>>
>>> any idea why this would happen?
>>> thanks! koert
>>>
>>>
>>>
>>
>

>>>
>>
>


Re: Spark on Yarn with Dynamic Resource Allocation. Container always marked as failed

2016-03-02 Thread Jeff Zhang
The executor may fail to start. You need to check the executor logs, if
there's no executor log then you need to check node manager log.

On Wed, Mar 2, 2016 at 4:26 PM, Xiaoye Sun  wrote:

> Hi all,
>
> I am very new to spark and yarn.
>
> I am running a BroadcastTest example application using spark 1.6.0 and
> Hadoop/Yarn 2.7.1. in a 5 nodes cluster.
>
> I configured my configuration files according to
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> 1. copy
> ./spark-1.6.0/network/yarn/target/scala-2.10/spark-1.6.0-yarn-shuffle.jar
> to /hadoop-2.7.1/share/hadoop/yarn/lib/
> 2. yarn-site.xml is like this
> http://www.owlnet.rice.edu/~xs6/yarn-site.xml
> 3. spark-defaults.conf is like this
> http://www.owlnet.rice.edu/~xs6/spark-defaults.conf
> 4. spark-env.sh is like this http://www.owlnet.rice.edu/~xs6/spark-env.sh
> 5. the command I use to submit spark application is: ./bin/spark-submit
> --class org.apache.spark.examples.BroadcastTest --master yarn --deploy-mode
> cluster ./examples/target/spark-examples_2.10-1.6.0.jar 1 1000 Http
>
> However, the job is stuck at RUNNING status, and by looking at the log, I
> found that the executor is failed/cancelled frequently...
> Here is the log output http://www.owlnet.rice.edu/~xs6/stderr
> It shows something like
>
> 16/03/02 02:07:35 WARN yarn.YarnAllocator: Container marked as failed: 
> container_1456905762620_0002_01_02 on host: bold-x.rice.edu. Exit status: 
> 1. Diagnostics: Exception from container-launch.
>
>
> Is there anybody know what is the problem here?
> Best,
> Xiaoye
>



-- 
Best Regards

Jeff Zhang


Re: getPreferredLocations race condition in spark 1.6.0?

2016-03-02 Thread Andy Sloane
Done, thanks.
https://issues.apache.org/jira/browse/SPARK-13631

Will continue discussion there.


On Wed, Mar 2, 2016 at 4:09 PM Shixiong(Ryan) Zhu 
wrote:

> I think it's a bug. Could you open a ticket here:
> https://issues.apache.org/jira/browse/SPARK
>
> On Wed, Mar 2, 2016 at 3:46 PM, Andy Sloane  wrote:
>
>> We are seeing something that looks a lot like a regression from spark
>> 1.2. When we run jobs with multiple threads, we have a crash somewhere
>> inside getPreferredLocations, as was fixed in SPARK-4454. Except now it's
>> inside
>> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs
>> instead of DAGScheduler directly.
>>
>> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly
>> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our
>> threaded test case, though once in a while it passes.
>>
>> The stack trace is huge, but starts like this:
>>
>> Caused by: java.lang.NullPointerException: null
>> at
>> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>> at
>> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>> at
>> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
>>
>> The full trace is available here:
>> https://gist.github.com/andy256/97611f19924bbf65cf49
>>
>> Does this ring any bells? I will attempt to nail down the commit with git
>> bisect next.
>>
>> Thanks
>> -Andy
>>
>>
>


Re: Does DataFrame.collect() maintain the underlying schema?

2016-03-02 Thread Mohammad Tariq
Cool. Here is it how it goes...

I am reading Avro objects from a Kafka topic as a DStream, converting it
into a DataFrame so that I can filter out records based on some conditions
and finally do some aggregations on these filtered records. During the
process I also need to tag each record based on the value of a particular
column, and for this I am iterating over Array[Row] returned by
DataFrame.collect().

I am good as far as these things are concerned. The only thing which I am
not getting is the reason behind changed column ordering within each Row.
Say my actual record is [Tariq, IN, APAC]. When I
do println(row.mkString("~")) it shows [IN~APAC~Tariq].

I hope I was able to explain my use case to you!



[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Thu, Mar 3, 2016 at 5:21 AM, Sainath Palla 
wrote:

> Hi Tariq,
>
> Can you tell in brief what kind of operation you have to do? I can try
> helping you out with that.
> In general, if you are trying to use any group operations you can use
> window operations.
>
> On Wed, Mar 2, 2016 at 6:40 PM, Mohammad Tariq  wrote:
>
>> Hi Sainath,
>>
>> Thank you for the prompt response!
>>
>> Could you please elaborate your answer a bit? I'm sorry I didn't quite
>> get this. What kind of operation I can perform using SQLContext? It just
>> helps us during things like DF creation, schema application etc, IMHO.
>>
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>> On Thu, Mar 3, 2016 at 4:59 AM, Sainath Palla 
>> wrote:
>>
>>> Instead of collecting the data frame, you can try using a sqlContext on
>>> the data frame. But it depends on what kind of operations are you trying to
>>> perform.
>>>
>>> On Wed, Mar 2, 2016 at 6:21 PM, Mohammad Tariq 
>>> wrote:
>>>
 Hi list,

 *Scenario :*
 I am creating a DStream by reading an Avro object from a Kafka topic
 and then converting it into a DataFrame to perform some operations on the
 data. I call DataFrame.collect() and perform the intended operation on each
 Row of Array[Row] returned by DataFrame.collect().

 *Problem : *
 Calling DataFrame.collect() changes the schema of the underlying
 record, thus making it impossible to get the columns by index(as the order
 gets changed).

 *Query :*
 Is it the way DataFrame.collect() behaves or am I doing something wrong
 here? In former case is there any way I can maintain the schema while
 getting each Row?

 Any pointers/suggestions would be really helpful. Many thanks!


 [image: http://]

 Tariq, Mohammad
 about.me/mti
 [image: http://]
 


>>>
>>>
>>
>


Re: Does DataFrame.collect() maintain the underlying schema?

2016-03-02 Thread Sainath Palla
Hi Tariq,

Can you tell in brief what kind of operation you have to do? I can try
helping you out with that.
In general, if you are trying to use any group operations you can use
window operations.

On Wed, Mar 2, 2016 at 6:40 PM, Mohammad Tariq  wrote:

> Hi Sainath,
>
> Thank you for the prompt response!
>
> Could you please elaborate your answer a bit? I'm sorry I didn't quite get
> this. What kind of operation I can perform using SQLContext? It just helps
> us during things like DF creation, schema application etc, IMHO.
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Thu, Mar 3, 2016 at 4:59 AM, Sainath Palla 
> wrote:
>
>> Instead of collecting the data frame, you can try using a sqlContext on
>> the data frame. But it depends on what kind of operations are you trying to
>> perform.
>>
>> On Wed, Mar 2, 2016 at 6:21 PM, Mohammad Tariq 
>> wrote:
>>
>>> Hi list,
>>>
>>> *Scenario :*
>>> I am creating a DStream by reading an Avro object from a Kafka topic and
>>> then converting it into a DataFrame to perform some operations on the data.
>>> I call DataFrame.collect() and perform the intended operation on each Row
>>> of Array[Row] returned by DataFrame.collect().
>>>
>>> *Problem : *
>>> Calling DataFrame.collect() changes the schema of the underlying record,
>>> thus making it impossible to get the columns by index(as the order gets
>>> changed).
>>>
>>> *Query :*
>>> Is it the way DataFrame.collect() behaves or am I doing something wrong
>>> here? In former case is there any way I can maintain the schema while
>>> getting each Row?
>>>
>>> Any pointers/suggestions would be really helpful. Many thanks!
>>>
>>>
>>> [image: http://]
>>>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>> 
>>>
>>>
>>
>>
>


getPreferredLocations race condition in spark 1.6.0?

2016-03-02 Thread Andy Sloane
We are seeing something that looks a lot like a regression from spark 1.2.
When we run jobs with multiple threads, we have a crash somewhere inside
getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside
org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs
instead of DAGScheduler directly.

I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly
flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our
threaded test case, though once in a while it passes.

The stack trace is huge, but starts like this:

Caused by: java.lang.NullPointerException: null
at
org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
at
org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
at
org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)

The full trace is available here:
https://gist.github.com/andy256/97611f19924bbf65cf49

Does this ring any bells? I will attempt to nail down the commit with git
bisect next.

Thanks
-Andy


Re: Does pyspark still lag far behind the Scala API in terms of features

2016-03-02 Thread Nicholas Chammas
We’re veering off from the original question of this thread, but to
clarify, my comment earlier was this:

So in short, DataFrames are the “new RDD”—i.e. the new base structure you
should be using in your Spark programs wherever possible.

RDDs are not going away, and clearly in your case DataFrames are not that
helpful, so sure, continue to use RDDs. There’s nothing wrong with that.
No-one is saying you *must* use DataFrames, and Spark will continue to
offer its RDD API.

However, my original comment to Jules still stands: If you can, use
DataFrames. In most cases they will offer you a better development
experience and better performance across languages, and future Spark
optimizations will mostly be enabled by the structure that DataFrames
provide.

DataFrames are the “new RDD” in the sense that they are the new foundation
for much of the new work that has been done in recent versions and that is
coming in Spark 2.0 and beyond.

Many people work with semi-structured data and have a relatively easy path
to DataFrames, as I explained in my previous email. If, however, you’re
working with data that has very little structure, like in Darren’s case,
then yes, DataFrames are probably not going to help that much. Stick with
RDDs and you’ll be fine.
​

On Wed, Mar 2, 2016 at 6:28 PM Darren Govoni  wrote:

> Our data is made up of single text documents scraped off the web. We store
> these in a  RDD. A Dataframe or similar structure makes no sense at that
> point. And the RDD is transient.
>
> So my point is. Dataframes should not replace plain old rdd since rdds
> allow for more flexibility and sql etc is not even usable on our data while
> in rdd. So all those nice dataframe apis aren't usable until it's
> structured. Which is the core problem anyway.
>
>
>
> Sent from my Verizon Wireless 4G LTE smartphone
>
>
>  Original message 
> From: Nicholas Chammas 
> Date: 03/02/2016 5:43 PM (GMT-05:00)
> To: Darren Govoni , Jules Damji ,
> Joshua Sorrell 
> Cc: user@spark.apache.org
> Subject: Re: Does pyspark still lag far behind the Scala API in terms of
> features
>
> Plenty of people get their data in Parquet, Avro, or ORC files; or from a
> database; or do their initial loading of un- or semi-structured data using
> one of the various data source libraries
>  which help
> with type-/schema-inference.
>
> All of these paths help you get to a DataFrame very quickly.
>
> Nick
>
> On Wed, Mar 2, 2016 at 5:22 PM Darren Govoni  wrote:
>
> Dataframes are essentially structured tables with schemas. So where does
>> the non typed data sit before it becomes structured if not in a traditional
>> RDD?
>>
>> For us almost all the processing comes before there is structure to it.
>>
>>
>>
>>
>>
>> Sent from my Verizon Wireless 4G LTE smartphone
>>
>>
>>  Original message 
>> From: Nicholas Chammas 
>> Date: 03/02/2016 5:13 PM (GMT-05:00)
>> To: Jules Damji , Joshua Sorrell 
>>
>> Cc: user@spark.apache.org
>> Subject: Re: Does pyspark still lag far behind the Scala API in terms of
>> features
>>
>> > However, I believe, investing (or having some members of your group)
>> learn and invest in Scala is worthwhile for few reasons. One, you will get
>> the performance gain, especially now with Tungsten (not sure how it relates
>> to Python, but some other knowledgeable people on the list, please chime
>> in).
>>
>> The more your workload uses DataFrames, the less of a difference there
>> will be between the languages (Scala, Java, Python, or R) in terms of
>> performance.
>>
>> One of the main benefits of Catalyst (which DFs enable) is that it
>> automatically optimizes DataFrame operations, letting you focus on _what_
>> you want while Spark will take care of figuring out _how_.
>>
>> Tungsten takes things further by tightly managing memory using the type
>> information made available to it via DataFrames. This benefit comes into
>> play regardless of the language used.
>>
>> So in short, DataFrames are the "new RDD"--i.e. the new base structure
>> you should be using in your Spark programs wherever possible. And with
>> DataFrames, what language you use matters much less in terms of performance.
>>
>> Nick
>>
>> On Tue, Mar 1, 2016 at 12:07 PM Jules Damji  wrote:
>>
>>> Hello Joshua,
>>>
>>> comments are inline...
>>>
>>> On Mar 1, 2016, at 5:03 AM, Joshua Sorrell  wrote:
>>>
>>> I haven't used Spark in the last year and a half. I am about to start a
>>> project with a new team, and we need to decide whether to use pyspark or
>>> Scala.
>>>
>>>
>>> Indeed, good questions, and they do come up lot in trainings that I have
>>> attended, where this inevitable question is raised.
>>> I believe, it depends on your 

Re: Does DataFrame.collect() maintain the underlying schema?

2016-03-02 Thread Mohammad Tariq
Hi Sainath,

Thank you for the prompt response!

Could you please elaborate your answer a bit? I'm sorry I didn't quite get
this. What kind of operation I can perform using SQLContext? It just helps
us during things like DF creation, schema application etc, IMHO.



[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Thu, Mar 3, 2016 at 4:59 AM, Sainath Palla 
wrote:

> Instead of collecting the data frame, you can try using a sqlContext on
> the data frame. But it depends on what kind of operations are you trying to
> perform.
>
> On Wed, Mar 2, 2016 at 6:21 PM, Mohammad Tariq  wrote:
>
>> Hi list,
>>
>> *Scenario :*
>> I am creating a DStream by reading an Avro object from a Kafka topic and
>> then converting it into a DataFrame to perform some operations on the data.
>> I call DataFrame.collect() and perform the intended operation on each Row
>> of Array[Row] returned by DataFrame.collect().
>>
>> *Problem : *
>> Calling DataFrame.collect() changes the schema of the underlying record,
>> thus making it impossible to get the columns by index(as the order gets
>> changed).
>>
>> *Query :*
>> Is it the way DataFrame.collect() behaves or am I doing something wrong
>> here? In former case is there any way I can maintain the schema while
>> getting each Row?
>>
>> Any pointers/suggestions would be really helpful. Many thanks!
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>
>


Re: Does DataFrame.collect() maintain the underlying schema?

2016-03-02 Thread Sainath Palla
Instead of collecting the data frame, you can try using a sqlContext on the
data frame. But it depends on what kind of operations are you trying to
perform.

On Wed, Mar 2, 2016 at 6:21 PM, Mohammad Tariq  wrote:

> Hi list,
>
> *Scenario :*
> I am creating a DStream by reading an Avro object from a Kafka topic and
> then converting it into a DataFrame to perform some operations on the data.
> I call DataFrame.collect() and perform the intended operation on each Row
> of Array[Row] returned by DataFrame.collect().
>
> *Problem : *
> Calling DataFrame.collect() changes the schema of the underlying record,
> thus making it impossible to get the columns by index(as the order gets
> changed).
>
> *Query :*
> Is it the way DataFrame.collect() behaves or am I doing something wrong
> here? In former case is there any way I can maintain the schema while
> getting each Row?
>
> Any pointers/suggestions would be really helpful. Many thanks!
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>


Re: Does pyspark still lag far behind the Scala API in terms of features

2016-03-02 Thread Darren Govoni


Our data is made up of single text documents scraped off the web. We store 
these in a  RDD. A Dataframe or similar structure makes no sense at that point. 
And the RDD is transient.
So my point is. Dataframes should not replace plain old rdd since rdds allow 
for more flexibility and sql etc is not even usable on our data while in rdd. 
So all those nice dataframe apis aren't usable until it's structured. Which is 
the core problem anyway.


Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Nicholas Chammas  
Date: 03/02/2016  5:43 PM  (GMT-05:00) 
To: Darren Govoni , Jules Damji , 
Joshua Sorrell  
Cc: user@spark.apache.org 
Subject: Re: Does pyspark still lag far behind the Scala API in terms of 
features 

Plenty of people get their data in Parquet, Avro, or ORC files; or from a 
database; or do their initial loading of un- or semi-structured data using one 
of the various data source libraries which help with type-/schema-inference.
All of these paths help you get to a DataFrame very quickly.
Nick
On Wed, Mar 2, 2016 at 5:22 PM Darren Govoni  wrote:


Dataframes are essentially structured tables with schemas. So where does the 
non typed data sit before it becomes structured if not in a traditional RDD?
For us almost all the processing comes before there is structure to it.




Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Nicholas Chammas  
Date: 03/02/2016  5:13 PM  (GMT-05:00) 
To: Jules Damji , Joshua Sorrell  
Cc: user@spark.apache.org 
Subject: Re: Does pyspark still lag far behind the Scala API in terms of 
features 

> However, I believe, investing (or having some members of your group) learn 
>and invest in Scala is worthwhile for few reasons. One, you will get the 
>performance gain, especially now with Tungsten (not sure how it relates to 
>Python, but some other knowledgeable people on the list, please chime in).
The more your workload uses DataFrames, the less of a difference there will be 
between the languages (Scala, Java, Python, or R) in terms of performance.
One of the main benefits of Catalyst (which DFs enable) is that it 
automatically optimizes DataFrame operations, letting you focus on _what_ you 
want while Spark will take care of figuring out _how_.
Tungsten takes things further by tightly managing memory using the type 
information made available to it via DataFrames. This benefit comes into play 
regardless of the language used.
So in short, DataFrames are the "new RDD"--i.e. the new base structure you 
should be using in your Spark programs wherever possible. And with DataFrames, 
what language you use matters much less in terms of performance.
Nick
On Tue, Mar 1, 2016 at 12:07 PM Jules Damji  wrote:
Hello Joshua,
comments are inline...

On Mar 1, 2016, at 5:03 AM, Joshua Sorrell  wrote:
I haven't used Spark in the last year and a half. I am about to start a project 
with a new team, and we need to decide whether to use pyspark or Scala.
Indeed, good questions, and they do come up lot in trainings that I have 
attended, where this inevitable question is raised.I believe, it depends on 
your level of comfort zone or adventure into newer things.
True, for the most part that Apache Spark committers have been committed to 
keep the APIs at parity across all the language offerings, even though in some 
cases, in particular Python, they have lagged by a minor release. To the the 
extent that they’re committed to level-parity is a good sign. It might to be 
the case with some experimental APIs, where they lag behind,  but for the most 
part, they have been admirably consistent. 
With Python there’s a minor performance hit, since there’s an extra level of 
indirection in the architecture and an additional Python PID that the executors 
launch to execute your pickled Python lambdas. Other than that it boils down to 
your comfort zone. I recommend looking at Sameer’s slides on (Advanced Spark 
for DevOps Training) where he walks through the pySpark and Python 
architecture. 

We are NOT a java shop. So some of the build tools/procedures will require some 
learning overhead if we go the Scala route. What I want to know is: is the 
Scala version of Spark still far enough ahead of pyspark to be well worth any 
initial training overhead?  
If you are a very advanced Python shop and if you’ve in-house libraries that 
you have written in Python that don’t exist in Scala or some ML libs that don’t 
exist in the Scala version and will require fair amount of porting and gap is 
too large, then perhaps it makes sense to stay put with Python.
However, I believe, investing (or having some members of your group) learn and 
invest in Scala is worthwhile for few reasons. One, you will get 

Does DataFrame.collect() maintain the underlying schema?

2016-03-02 Thread Mohammad Tariq
Hi list,

*Scenario :*
I am creating a DStream by reading an Avro object from a Kafka topic and
then converting it into a DataFrame to perform some operations on the data.
I call DataFrame.collect() and perform the intended operation on each Row
of Array[Row] returned by DataFrame.collect().

*Problem : *
Calling DataFrame.collect() changes the schema of the underlying record,
thus making it impossible to get the columns by index(as the order gets
changed).

*Query :*
Is it the way DataFrame.collect() behaves or am I doing something wrong
here? In former case is there any way I can maintain the schema while
getting each Row?

Any pointers/suggestions would be really helpful. Many thanks!


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



Re: Does pyspark still lag far behind the Scala API in terms of features

2016-03-02 Thread ayan guha
+1 on all the pointers.

@Darren - it would probably good idea to explain your scenario a little
more in terms of structured vs un-structured datasets. Then people here can
give you better input on how you can use DF.


On Thu, Mar 3, 2016 at 9:43 AM, Nicholas Chammas  wrote:

> Plenty of people get their data in Parquet, Avro, or ORC files; or from a
> database; or do their initial loading of un- or semi-structured data using
> one of the various data source libraries
>  which help
> with type-/schema-inference.
>
> All of these paths help you get to a DataFrame very quickly.
>
> Nick
>
> On Wed, Mar 2, 2016 at 5:22 PM Darren Govoni  wrote:
>
> Dataframes are essentially structured tables with schemas. So where does
>> the non typed data sit before it becomes structured if not in a traditional
>> RDD?
>>
>> For us almost all the processing comes before there is structure to it.
>>
>>
>>
>>
>>
>> Sent from my Verizon Wireless 4G LTE smartphone
>>
>>
>>  Original message 
>> From: Nicholas Chammas 
>> Date: 03/02/2016 5:13 PM (GMT-05:00)
>> To: Jules Damji , Joshua Sorrell 
>>
>> Cc: user@spark.apache.org
>> Subject: Re: Does pyspark still lag far behind the Scala API in terms of
>> features
>>
>> > However, I believe, investing (or having some members of your group)
>> learn and invest in Scala is worthwhile for few reasons. One, you will get
>> the performance gain, especially now with Tungsten (not sure how it relates
>> to Python, but some other knowledgeable people on the list, please chime
>> in).
>>
>> The more your workload uses DataFrames, the less of a difference there
>> will be between the languages (Scala, Java, Python, or R) in terms of
>> performance.
>>
>> One of the main benefits of Catalyst (which DFs enable) is that it
>> automatically optimizes DataFrame operations, letting you focus on _what_
>> you want while Spark will take care of figuring out _how_.
>>
>> Tungsten takes things further by tightly managing memory using the type
>> information made available to it via DataFrames. This benefit comes into
>> play regardless of the language used.
>>
>> So in short, DataFrames are the "new RDD"--i.e. the new base structure
>> you should be using in your Spark programs wherever possible. And with
>> DataFrames, what language you use matters much less in terms of performance.
>>
>> Nick
>>
>> On Tue, Mar 1, 2016 at 12:07 PM Jules Damji  wrote:
>>
>>> Hello Joshua,
>>>
>>> comments are inline...
>>>
>>> On Mar 1, 2016, at 5:03 AM, Joshua Sorrell  wrote:
>>>
>>> I haven't used Spark in the last year and a half. I am about to start a
>>> project with a new team, and we need to decide whether to use pyspark or
>>> Scala.
>>>
>>>
>>> Indeed, good questions, and they do come up lot in trainings that I have
>>> attended, where this inevitable question is raised.
>>> I believe, it depends on your level of comfort zone or adventure into
>>> newer things.
>>>
>>> True, for the most part that Apache Spark committers have been committed
>>> to keep the APIs at parity across all the language offerings, even though
>>> in some cases, in particular Python, they have lagged by a minor release.
>>> To the the extent that they’re committed to level-parity is a good sign. It
>>> might to be the case with some experimental APIs, where they lag behind,
>>>  but for the most part, they have been admirably consistent.
>>>
>>> With Python there’s a minor performance hit, since there’s an extra
>>> level of indirection in the architecture and an additional Python PID that
>>> the executors launch to execute your pickled Python lambdas. Other than
>>> that it boils down to your comfort zone. I recommend looking at Sameer’s
>>> slides on (Advanced Spark for DevOps Training) where he walks through the
>>> pySpark and Python architecture.
>>>
>>>
>>> We are NOT a java shop. So some of the build tools/procedures will
>>> require some learning overhead if we go the Scala route. What I want to
>>> know is: is the Scala version of Spark still far enough ahead of pyspark to
>>> be well worth any initial training overhead?
>>>
>>>
>>> If you are a very advanced Python shop and if you’ve in-house libraries
>>> that you have written in Python that don’t exist in Scala or some ML libs
>>> that don’t exist in the Scala version and will require fair amount of
>>> porting and gap is too large, then perhaps it makes sense to stay put with
>>> Python.
>>>
>>> However, I believe, investing (or having some members of your group)
>>> learn and invest in Scala is worthwhile for few reasons. One, you will get
>>> the performance gain, especially now with Tungsten (not sure how it relates
>>> to Python, but some other knowledgeable people on the list, please chime
>>> in). Two, since Spark is written in Scala, 

Re: Does pyspark still lag far behind the Scala API in terms of features

2016-03-02 Thread Nicholas Chammas
Plenty of people get their data in Parquet, Avro, or ORC files; or from a
database; or do their initial loading of un- or semi-structured data using
one of the various data source libraries
 which help with
type-/schema-inference.

All of these paths help you get to a DataFrame very quickly.

Nick

On Wed, Mar 2, 2016 at 5:22 PM Darren Govoni  wrote:

Dataframes are essentially structured tables with schemas. So where does
> the non typed data sit before it becomes structured if not in a traditional
> RDD?
>
> For us almost all the processing comes before there is structure to it.
>
>
>
>
>
> Sent from my Verizon Wireless 4G LTE smartphone
>
>
>  Original message 
> From: Nicholas Chammas 
> Date: 03/02/2016 5:13 PM (GMT-05:00)
> To: Jules Damji , Joshua Sorrell 
> Cc: user@spark.apache.org
> Subject: Re: Does pyspark still lag far behind the Scala API in terms of
> features
>
> > However, I believe, investing (or having some members of your group)
> learn and invest in Scala is worthwhile for few reasons. One, you will get
> the performance gain, especially now with Tungsten (not sure how it relates
> to Python, but some other knowledgeable people on the list, please chime
> in).
>
> The more your workload uses DataFrames, the less of a difference there
> will be between the languages (Scala, Java, Python, or R) in terms of
> performance.
>
> One of the main benefits of Catalyst (which DFs enable) is that it
> automatically optimizes DataFrame operations, letting you focus on _what_
> you want while Spark will take care of figuring out _how_.
>
> Tungsten takes things further by tightly managing memory using the type
> information made available to it via DataFrames. This benefit comes into
> play regardless of the language used.
>
> So in short, DataFrames are the "new RDD"--i.e. the new base structure you
> should be using in your Spark programs wherever possible. And with
> DataFrames, what language you use matters much less in terms of performance.
>
> Nick
>
> On Tue, Mar 1, 2016 at 12:07 PM Jules Damji  wrote:
>
>> Hello Joshua,
>>
>> comments are inline...
>>
>> On Mar 1, 2016, at 5:03 AM, Joshua Sorrell  wrote:
>>
>> I haven't used Spark in the last year and a half. I am about to start a
>> project with a new team, and we need to decide whether to use pyspark or
>> Scala.
>>
>>
>> Indeed, good questions, and they do come up lot in trainings that I have
>> attended, where this inevitable question is raised.
>> I believe, it depends on your level of comfort zone or adventure into
>> newer things.
>>
>> True, for the most part that Apache Spark committers have been committed
>> to keep the APIs at parity across all the language offerings, even though
>> in some cases, in particular Python, they have lagged by a minor release.
>> To the the extent that they’re committed to level-parity is a good sign. It
>> might to be the case with some experimental APIs, where they lag behind,
>>  but for the most part, they have been admirably consistent.
>>
>> With Python there’s a minor performance hit, since there’s an extra level
>> of indirection in the architecture and an additional Python PID that the
>> executors launch to execute your pickled Python lambdas. Other than that it
>> boils down to your comfort zone. I recommend looking at Sameer’s slides on
>> (Advanced Spark for DevOps Training) where he walks through the pySpark and
>> Python architecture.
>>
>>
>> We are NOT a java shop. So some of the build tools/procedures will
>> require some learning overhead if we go the Scala route. What I want to
>> know is: is the Scala version of Spark still far enough ahead of pyspark to
>> be well worth any initial training overhead?
>>
>>
>> If you are a very advanced Python shop and if you’ve in-house libraries
>> that you have written in Python that don’t exist in Scala or some ML libs
>> that don’t exist in the Scala version and will require fair amount of
>> porting and gap is too large, then perhaps it makes sense to stay put with
>> Python.
>>
>> However, I believe, investing (or having some members of your group)
>> learn and invest in Scala is worthwhile for few reasons. One, you will get
>> the performance gain, especially now with Tungsten (not sure how it relates
>> to Python, but some other knowledgeable people on the list, please chime
>> in). Two, since Spark is written in Scala, it gives you an enormous
>> advantage to read sources (which are well documented and highly readable)
>> should you have to consult or learn nuances of certain API method or action
>> not covered comprehensively in the docs. And finally, there’s a long term
>> benefit in learning Scala for reasons other than Spark. For example,
>> writing other scalable and distributed applications.
>>
>>
>> Particularly, we will be using 

Re: Spark 1.5 on Mesos

2016-03-02 Thread Tim Chen
Hi Charles,

I thought that's fixed with your patch in latest master now right?

Ashish, yes please give me your docker image name (if it's in the public
registry) and what you've tried and I can see what's wrong. I think it's
most likely just the configuration of where the Spark home folder is in the
image.

Tim

On Wed, Mar 2, 2016 at 2:28 PM, Charles Allen  wrote:

> Re: Spark on Mesos Warning regarding disk space:
> https://issues.apache.org/jira/browse/SPARK-12330
>
> That's a spark flaw I encountered on a very regular basis on mesos. That
> and a few other annoyances are fixed in
> https://github.com/metamx/spark/tree/v1.5.2-mmx
>
> Here's another mild annoyance I've encountered:
> https://issues.apache.org/jira/browse/SPARK-11714
>
> On Wed, Mar 2, 2016 at 1:31 PM Ashish Soni  wrote:
>
>> I have no luck and i would to ask the question to spark committers will
>> this be ever designed to run on mesos ?
>>
>> spark app as a docker container not working at all on mesos  ,if any one
>> would like the code i can send it over to have a look.
>>
>> Ashish
>>
>> On Wed, Mar 2, 2016 at 12:23 PM, Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>>> Try passing jar using --jars option
>>>
>>> On Wed, Mar 2, 2016 at 10:17 AM Ashish Soni 
>>> wrote:
>>>
 I made some progress but now i am stuck at this point , Please help as
 looks like i am close to get it working

 I have everything running in docker container including mesos slave and
 master

 When i try to submit the pi example i get below error
 *Error: Cannot load main class from JAR file:/opt/spark/Example*

 Below is the command i use to submit as a docker container

 docker run -it --rm -e SPARK_MASTER="mesos://10.0.2.15:7077"  -e
 SPARK_IMAGE="spark_driver:latest" spark_driver:latest ./bin/spark-submit
 --deploy-mode cluster --name "PI Example" --class
 org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory
 512m --executor-cores 1
 http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar


 On Tue, Mar 1, 2016 at 2:59 PM, Timothy Chen  wrote:

> Can you go through the Mesos UI and look at the driver/executor log
> from steer file and see what the problem is?
>
> Tim
>
> On Mar 1, 2016, at 8:05 AM, Ashish Soni  wrote:
>
> Not sure what is the issue but i am getting below error  when i try to
> run spark PI example
>
> Blacklisting Mesos slave value: "5345asdasdasdkas234234asdasdasdasd"
>due to too many failures; is Spark installed on it?
> WARN TaskSchedulerImpl: Initial job has not accepted any resources; 
> check your cluster UI to ensure that workers are registered and have 
> sufficient resources
>
>
> On Mon, Feb 29, 2016 at 1:39 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> May be the Mesos executor couldn't find spark image or the
>> constraints are not satisfied. Check your Mesos UI if you see Spark
>> application in the Frameworks tab
>>
>> On Mon, Feb 29, 2016 at 12:23 PM Ashish Soni 
>> wrote:
>>
>>> What is the Best practice , I have everything running as docker
>>> container in single host ( mesos and marathon also as docker container )
>>>  and everything comes up fine but when i try to launch the spark shell i
>>> get below error
>>>
>>>
>>> SQL context available as sqlContext.
>>>
>>> scala> val data = sc.parallelize(1 to 100)
>>> data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>>> parallelize at :27
>>>
>>> scala> data.count
>>> [Stage 0:>
>>>  (0 + 0) / 2]16/02/29 18:21:12 WARN TaskSchedulerImpl: Initial job has 
>>> not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient resources
>>> 16/02/29 18:21:27 WARN TaskSchedulerImpl: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient resources
>>>
>>>
>>>
>>> On Mon, Feb 29, 2016 at 12:04 PM, Tim Chen 
>>> wrote:
>>>
 No you don't have to run Mesos in docker containers to run Spark in
 docker containers.

 Once you have Mesos cluster running you can then specfiy the Spark
 configurations in your Spark job (i.e: 
 spark.mesos.executor.docker.image=mesosphere/spark:1.6)
 and Mesos will automatically launch docker containers for you.

 Tim

 On Mon, Feb 29, 2016 at 7:36 AM, Ashish Soni  wrote:

> Yes i read that and not much details here.
>
> Is it true that we 

Re: Spark 1.5 on Mesos

2016-03-02 Thread Charles Allen
Re: Spark on Mesos Warning regarding disk space:
https://issues.apache.org/jira/browse/SPARK-12330

That's a spark flaw I encountered on a very regular basis on mesos. That
and a few other annoyances are fixed in
https://github.com/metamx/spark/tree/v1.5.2-mmx

Here's another mild annoyance I've encountered:
https://issues.apache.org/jira/browse/SPARK-11714

On Wed, Mar 2, 2016 at 1:31 PM Ashish Soni  wrote:

> I have no luck and i would to ask the question to spark committers will
> this be ever designed to run on mesos ?
>
> spark app as a docker container not working at all on mesos  ,if any one
> would like the code i can send it over to have a look.
>
> Ashish
>
> On Wed, Mar 2, 2016 at 12:23 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Try passing jar using --jars option
>>
>> On Wed, Mar 2, 2016 at 10:17 AM Ashish Soni 
>> wrote:
>>
>>> I made some progress but now i am stuck at this point , Please help as
>>> looks like i am close to get it working
>>>
>>> I have everything running in docker container including mesos slave and
>>> master
>>>
>>> When i try to submit the pi example i get below error
>>> *Error: Cannot load main class from JAR file:/opt/spark/Example*
>>>
>>> Below is the command i use to submit as a docker container
>>>
>>> docker run -it --rm -e SPARK_MASTER="mesos://10.0.2.15:7077"  -e
>>> SPARK_IMAGE="spark_driver:latest" spark_driver:latest ./bin/spark-submit
>>> --deploy-mode cluster --name "PI Example" --class
>>> org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory
>>> 512m --executor-cores 1
>>> http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
>>>
>>>
>>> On Tue, Mar 1, 2016 at 2:59 PM, Timothy Chen  wrote:
>>>
 Can you go through the Mesos UI and look at the driver/executor log
 from steer file and see what the problem is?

 Tim

 On Mar 1, 2016, at 8:05 AM, Ashish Soni  wrote:

 Not sure what is the issue but i am getting below error  when i try to
 run spark PI example

 Blacklisting Mesos slave value: "5345asdasdasdkas234234asdasdasdasd"
due to too many failures; is Spark installed on it?
 WARN TaskSchedulerImpl: Initial job has not accepted any resources; 
 check your cluster UI to ensure that workers are registered and have 
 sufficient resources


 On Mon, Feb 29, 2016 at 1:39 PM, Sathish Kumaran Vairavelu <
 vsathishkuma...@gmail.com> wrote:

> May be the Mesos executor couldn't find spark image or the constraints
> are not satisfied. Check your Mesos UI if you see Spark application in the
> Frameworks tab
>
> On Mon, Feb 29, 2016 at 12:23 PM Ashish Soni 
> wrote:
>
>> What is the Best practice , I have everything running as docker
>> container in single host ( mesos and marathon also as docker container )
>>  and everything comes up fine but when i try to launch the spark shell i
>> get below error
>>
>>
>> SQL context available as sqlContext.
>>
>> scala> val data = sc.parallelize(1 to 100)
>> data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>> parallelize at :27
>>
>> scala> data.count
>> [Stage 0:>
>>  (0 + 0) / 2]16/02/29 18:21:12 WARN TaskSchedulerImpl: Initial job has 
>> not
>> accepted any resources; check your cluster UI to ensure that workers are
>> registered and have sufficient resources
>> 16/02/29 18:21:27 WARN TaskSchedulerImpl: Initial job has not
>> accepted any resources; check your cluster UI to ensure that workers are
>> registered and have sufficient resources
>>
>>
>>
>> On Mon, Feb 29, 2016 at 12:04 PM, Tim Chen  wrote:
>>
>>> No you don't have to run Mesos in docker containers to run Spark in
>>> docker containers.
>>>
>>> Once you have Mesos cluster running you can then specfiy the Spark
>>> configurations in your Spark job (i.e: 
>>> spark.mesos.executor.docker.image=mesosphere/spark:1.6)
>>> and Mesos will automatically launch docker containers for you.
>>>
>>> Tim
>>>
>>> On Mon, Feb 29, 2016 at 7:36 AM, Ashish Soni 
>>> wrote:
>>>
 Yes i read that and not much details here.

 Is it true that we need to have spark installed on each mesos
 docker container ( master and slave ) ...

 Ashish

 On Fri, Feb 26, 2016 at 2:14 PM, Tim Chen 
 wrote:

> https://spark.apache.org/docs/latest/running-on-mesos.html should
> be the best source, what problems were you running into?
>
> Tim
>
> On Fri, Feb 26, 2016 at 11:06 AM, Yin Yang 
> wrote:
>
>> Have you read this ?

Re: Does pyspark still lag far behind the Scala API in terms of features

2016-03-02 Thread Darren Govoni


Dataframes are essentially structured tables with schemas. So where does the 
non typed data sit before it becomes structured if not in a traditional RDD?
For us almost all the processing comes before there is structure to it.




Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Nicholas Chammas  
Date: 03/02/2016  5:13 PM  (GMT-05:00) 
To: Jules Damji , Joshua Sorrell  
Cc: user@spark.apache.org 
Subject: Re: Does pyspark still lag far behind the Scala API in terms of 
features 

> However, I believe, investing (or having some members of your group) learn 
>and invest in Scala is worthwhile for few reasons. One, you will get the 
>performance gain, especially now with Tungsten (not sure how it relates to 
>Python, but some other knowledgeable people on the list, please chime in).
The more your workload uses DataFrames, the less of a difference there will be 
between the languages (Scala, Java, Python, or R) in terms of performance.
One of the main benefits of Catalyst (which DFs enable) is that it 
automatically optimizes DataFrame operations, letting you focus on _what_ you 
want while Spark will take care of figuring out _how_.
Tungsten takes things further by tightly managing memory using the type 
information made available to it via DataFrames. This benefit comes into play 
regardless of the language used.
So in short, DataFrames are the "new RDD"--i.e. the new base structure you 
should be using in your Spark programs wherever possible. And with DataFrames, 
what language you use matters much less in terms of performance.
Nick
On Tue, Mar 1, 2016 at 12:07 PM Jules Damji  wrote:
Hello Joshua,
comments are inline...

On Mar 1, 2016, at 5:03 AM, Joshua Sorrell  wrote:
I haven't used Spark in the last year and a half. I am about to start a project 
with a new team, and we need to decide whether to use pyspark or Scala.
Indeed, good questions, and they do come up lot in trainings that I have 
attended, where this inevitable question is raised.I believe, it depends on 
your level of comfort zone or adventure into newer things.
True, for the most part that Apache Spark committers have been committed to 
keep the APIs at parity across all the language offerings, even though in some 
cases, in particular Python, they have lagged by a minor release. To the the 
extent that they’re committed to level-parity is a good sign. It might to be 
the case with some experimental APIs, where they lag behind,  but for the most 
part, they have been admirably consistent. 
With Python there’s a minor performance hit, since there’s an extra level of 
indirection in the architecture and an additional Python PID that the executors 
launch to execute your pickled Python lambdas. Other than that it boils down to 
your comfort zone. I recommend looking at Sameer’s slides on (Advanced Spark 
for DevOps Training) where he walks through the pySpark and Python 
architecture. 

We are NOT a java shop. So some of the build tools/procedures will require some 
learning overhead if we go the Scala route. What I want to know is: is the 
Scala version of Spark still far enough ahead of pyspark to be well worth any 
initial training overhead?  
If you are a very advanced Python shop and if you’ve in-house libraries that 
you have written in Python that don’t exist in Scala or some ML libs that don’t 
exist in the Scala version and will require fair amount of porting and gap is 
too large, then perhaps it makes sense to stay put with Python.
However, I believe, investing (or having some members of your group) learn and 
invest in Scala is worthwhile for few reasons. One, you will get the 
performance gain, especially now with Tungsten (not sure how it relates to 
Python, but some other knowledgeable people on the list, please chime in). Two, 
since Spark is written in Scala, it gives you an enormous advantage to read 
sources (which are well documented and highly readable) should you have to 
consult or learn nuances of certain API method or action not covered 
comprehensively in the docs. And finally, there’s a long term benefit in 
learning Scala for reasons other than Spark. For example, writing other 
scalable and distributed applications.

Particularly, we will be using Spark Streaming. I know a couple of years ago 
that practically forced the decision to use Scala.  Is this still the case?
You’ll notice that certain APIs call are not available, at least for now, in 
Python. http://spark.apache.org/docs/latest/streaming-programming-guide.html

CheersJules
--
The Best Ideas Are Simple
Jules S. Damji
e-mail:dmat...@comcast.net
e-mail:jules.da...@gmail.com




Re: Does pyspark still lag far behind the Scala API in terms of features

2016-03-02 Thread Nicholas Chammas
> However, I believe, investing (or having some members of your group)
learn and invest in Scala is worthwhile for few reasons. One, you will get
the performance gain, especially now with Tungsten (not sure how it relates
to Python, but some other knowledgeable people on the list, please chime
in).

The more your workload uses DataFrames, the less of a difference there will
be between the languages (Scala, Java, Python, or R) in terms of
performance.

One of the main benefits of Catalyst (which DFs enable) is that it
automatically optimizes DataFrame operations, letting you focus on _what_
you want while Spark will take care of figuring out _how_.

Tungsten takes things further by tightly managing memory using the type
information made available to it via DataFrames. This benefit comes into
play regardless of the language used.

So in short, DataFrames are the "new RDD"--i.e. the new base structure you
should be using in your Spark programs wherever possible. And with
DataFrames, what language you use matters much less in terms of performance.

Nick

On Tue, Mar 1, 2016 at 12:07 PM Jules Damji  wrote:

> Hello Joshua,
>
> comments are inline...
>
> On Mar 1, 2016, at 5:03 AM, Joshua Sorrell  wrote:
>
> I haven't used Spark in the last year and a half. I am about to start a
> project with a new team, and we need to decide whether to use pyspark or
> Scala.
>
>
> Indeed, good questions, and they do come up lot in trainings that I have
> attended, where this inevitable question is raised.
> I believe, it depends on your level of comfort zone or adventure into
> newer things.
>
> True, for the most part that Apache Spark committers have been committed
> to keep the APIs at parity across all the language offerings, even though
> in some cases, in particular Python, they have lagged by a minor release.
> To the the extent that they’re committed to level-parity is a good sign. It
> might to be the case with some experimental APIs, where they lag behind,
>  but for the most part, they have been admirably consistent.
>
> With Python there’s a minor performance hit, since there’s an extra level
> of indirection in the architecture and an additional Python PID that the
> executors launch to execute your pickled Python lambdas. Other than that it
> boils down to your comfort zone. I recommend looking at Sameer’s slides on
> (Advanced Spark for DevOps Training) where he walks through the pySpark and
> Python architecture.
>
>
> We are NOT a java shop. So some of the build tools/procedures will require
> some learning overhead if we go the Scala route. What I want to know is: is
> the Scala version of Spark still far enough ahead of pyspark to be well
> worth any initial training overhead?
>
>
> If you are a very advanced Python shop and if you’ve in-house libraries
> that you have written in Python that don’t exist in Scala or some ML libs
> that don’t exist in the Scala version and will require fair amount of
> porting and gap is too large, then perhaps it makes sense to stay put with
> Python.
>
> However, I believe, investing (or having some members of your group) learn
> and invest in Scala is worthwhile for few reasons. One, you will get the
> performance gain, especially now with Tungsten (not sure how it relates to
> Python, but some other knowledgeable people on the list, please chime in).
> Two, since Spark is written in Scala, it gives you an enormous advantage to
> read sources (which are well documented and highly readable) should you
> have to consult or learn nuances of certain API method or action not
> covered comprehensively in the docs. And finally, there’s a long term
> benefit in learning Scala for reasons other than Spark. For example,
> writing other scalable and distributed applications.
>
>
> Particularly, we will be using Spark Streaming. I know a couple of years
> ago that practically forced the decision to use Scala.  Is this still the
> case?
>
>
> You’ll notice that certain APIs call are not available, at least for now,
> in Python.
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
>
>
> Cheers
> Jules
>
> --
> The Best Ideas Are Simple
> Jules S. Damji
> e-mail:dmat...@comcast.net
> e-mail:jules.da...@gmail.com
>
>


Re: Spark Streaming 1.6 mapWithState not working well with Kryo Serialization

2016-03-02 Thread Shixiong(Ryan) Zhu
See https://issues.apache.org/jira/browse/SPARK-12591

After applying the patch, it should work. However, if you want to enable
"registrationRequired", you still need to register
"org.apache.spark.streaming.util.OpenHashMapBasedStateMap",
"org.apache.spark.streaming.util.EmptyStateMap" and
"org.apache.spark.streaming.rdd.MapWithStateRDDRecord" by yourself because
these classes are defined in the Streaming project and we don't want to use
them in Spark core.


On Wed, Mar 2, 2016 at 1:41 PM, Aris  wrote:

> Hello Spark folks and especially TD,
>
> I am using the Spark Streaming 1.6 mapWithState API, and I am trying to
> enforce Kryo Serialization with
>
> SparkConf.set("spark.kryo.registrationRequired", "true")
>
> However, this appears to be impossible! I registered all the classes that
> are my own, but I problem with a
> class org.apache.spark.streaming.rdd.MapWithStateRDDRecord, which is set as
> private[streaming] .
>
>
> The error:
>
> java.lang.IllegalArgumentException: Class is not registered:
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord
> Note: To register this class use:
> kryo.register(org.apache.spark.streaming.rdd.MapWithStateRDDRecord.class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
>
> Since this class is private with spark streaming itself, I cannot actually
> register it with Kryo, and I cannot do registrationRequired in order to
> make sure *everything* has been serialized with Kryo.
>
> Is this a bug? Can I somehow solve this?
>
> Aris
>


AVRO vs Parquet

2016-03-02 Thread Timothy Spann
Which format is the best format for SparkSQL adhoc queries and general data 
storage?

There are lots of specialized cases, but generally accessing some but not all 
the available columns with a reasonable subset of the data.

I am learning towards Parquet as it has great support in Spark.

I also have to consider any file on HDFS may be accessed from other tools like 
Hive, Impala, HAWQ.

Suggestions?
—
airis.DATA
Timothy Spann, Senior Solutions Architect
C: 609-250-5894
http://airisdata.com/
http://meetup.com/nj-datascience




Spark Streaming 1.6 mapWithState not working well with Kryo Serialization

2016-03-02 Thread Aris
Hello Spark folks and especially TD,

I am using the Spark Streaming 1.6 mapWithState API, and I am trying to
enforce Kryo Serialization with

SparkConf.set("spark.kryo.registrationRequired", "true")

However, this appears to be impossible! I registered all the classes that
are my own, but I problem with a
class org.apache.spark.streaming.rdd.MapWithStateRDDRecord, which is set as
private[streaming] .


The error:

java.lang.IllegalArgumentException: Class is not registered:
org.apache.spark.streaming.rdd.MapWithStateRDDRecord
Note: To register this class use:
kryo.register(org.apache.spark.streaming.rdd.MapWithStateRDDRecord.class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)

Since this class is private with spark streaming itself, I cannot actually
register it with Kryo, and I cannot do registrationRequired in order to
make sure *everything* has been serialized with Kryo.

Is this a bug? Can I somehow solve this?

Aris


Re: Spark 1.5 on Mesos

2016-03-02 Thread Ashish Soni
I have no luck and i would to ask the question to spark committers will
this be ever designed to run on mesos ?

spark app as a docker container not working at all on mesos  ,if any one
would like the code i can send it over to have a look.

Ashish

On Wed, Mar 2, 2016 at 12:23 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Try passing jar using --jars option
>
> On Wed, Mar 2, 2016 at 10:17 AM Ashish Soni  wrote:
>
>> I made some progress but now i am stuck at this point , Please help as
>> looks like i am close to get it working
>>
>> I have everything running in docker container including mesos slave and
>> master
>>
>> When i try to submit the pi example i get below error
>> *Error: Cannot load main class from JAR file:/opt/spark/Example*
>>
>> Below is the command i use to submit as a docker container
>>
>> docker run -it --rm -e SPARK_MASTER="mesos://10.0.2.15:7077"  -e
>> SPARK_IMAGE="spark_driver:latest" spark_driver:latest ./bin/spark-submit
>> --deploy-mode cluster --name "PI Example" --class
>> org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory
>> 512m --executor-cores 1
>> http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
>>
>>
>> On Tue, Mar 1, 2016 at 2:59 PM, Timothy Chen  wrote:
>>
>>> Can you go through the Mesos UI and look at the driver/executor log from
>>> steer file and see what the problem is?
>>>
>>> Tim
>>>
>>> On Mar 1, 2016, at 8:05 AM, Ashish Soni  wrote:
>>>
>>> Not sure what is the issue but i am getting below error  when i try to
>>> run spark PI example
>>>
>>> Blacklisting Mesos slave value: "5345asdasdasdkas234234asdasdasdasd"
>>>due to too many failures; is Spark installed on it?
>>> WARN TaskSchedulerImpl: Initial job has not accepted any resources; 
>>> check your cluster UI to ensure that workers are registered and have 
>>> sufficient resources
>>>
>>>
>>> On Mon, Feb 29, 2016 at 1:39 PM, Sathish Kumaran Vairavelu <
>>> vsathishkuma...@gmail.com> wrote:
>>>
 May be the Mesos executor couldn't find spark image or the constraints
 are not satisfied. Check your Mesos UI if you see Spark application in the
 Frameworks tab

 On Mon, Feb 29, 2016 at 12:23 PM Ashish Soni 
 wrote:

> What is the Best practice , I have everything running as docker
> container in single host ( mesos and marathon also as docker container )
>  and everything comes up fine but when i try to launch the spark shell i
> get below error
>
>
> SQL context available as sqlContext.
>
> scala> val data = sc.parallelize(1 to 100)
> data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at :27
>
> scala> data.count
> [Stage 0:>  (0
> + 0) / 2]16/02/29 18:21:12 WARN TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient resources
> 16/02/29 18:21:27 WARN TaskSchedulerImpl: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient resources
>
>
>
> On Mon, Feb 29, 2016 at 12:04 PM, Tim Chen  wrote:
>
>> No you don't have to run Mesos in docker containers to run Spark in
>> docker containers.
>>
>> Once you have Mesos cluster running you can then specfiy the Spark
>> configurations in your Spark job (i.e: 
>> spark.mesos.executor.docker.image=mesosphere/spark:1.6)
>> and Mesos will automatically launch docker containers for you.
>>
>> Tim
>>
>> On Mon, Feb 29, 2016 at 7:36 AM, Ashish Soni 
>> wrote:
>>
>>> Yes i read that and not much details here.
>>>
>>> Is it true that we need to have spark installed on each mesos docker
>>> container ( master and slave ) ...
>>>
>>> Ashish
>>>
>>> On Fri, Feb 26, 2016 at 2:14 PM, Tim Chen  wrote:
>>>
 https://spark.apache.org/docs/latest/running-on-mesos.html should
 be the best source, what problems were you running into?

 Tim

 On Fri, Feb 26, 2016 at 11:06 AM, Yin Yang 
 wrote:

> Have you read this ?
> https://spark.apache.org/docs/latest/running-on-mesos.html
>
> On Fri, Feb 26, 2016 at 11:03 AM, Ashish Soni <
> asoni.le...@gmail.com> wrote:
>
>> Hi All ,
>>
>> Is there any proper documentation as how to run spark on mesos ,
>> I am trying from the last few days and not able to make it work.
>>
>> Please help
>>
>> Ashish
>>
>
>

>>>
>>
>
>>>
>>


Re: Building a REST Service with Spark back-end

2016-03-02 Thread Guru Medasani
Hi Yanlin,

This is a fairly new effort and is not officially released/supported by 
Cloudera yet. I believe those numbers will be out once it is released.

Guru Medasani
gdm...@gmail.com



> On Mar 2, 2016, at 10:40 AM, yanlin wang  wrote:
> 
> Did any one use Livy in real world high concurrency web app? I think it uses 
> spark submit command line to create job... How about  job server or notebook 
> comparing with Livy?
> 
> Thx,
> Yanlin
> 
> Sent from my iPhone
> 
> On Mar 2, 2016, at 6:24 AM, Guru Medasani  > wrote:
> 
>> Hi Don,
>> 
>> Here is another REST interface for interacting with Spark from anywhere. 
>> 
>> https://github.com/cloudera/livy 
>> 
>> Here is an example to estimate PI using Spark from Python using requests 
>> library. 
>> 
>> >>> data = {
>> ...   'code': textwrap.dedent("""\
>> ...  val NUM_SAMPLES = 10;
>> ...  val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
>> ...val x = Math.random();
>> ...val y = Math.random();
>> ...if (x*x + y*y < 1) 1 else 0
>> ...  }.reduce(_ + _);
>> ...  println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
>> ...  """)
>> ... }
>> >>> r = requests.post(statements_url, data=json.dumps(data), headers=headers)
>> >>> pprint.pprint(r.json())
>> {u'id': 1,
>>  u'output': {u'data': {u'text/plain': u'Pi is roughly 3.14004\nNUM_SAMPLES: 
>> Int = 10\ncount: Int = 78501'},
>>  u'execution_count': 1,
>>  u'status': u'ok'},
>>  u'state': u'available'}
>> 
>> 
>> Guru Medasani
>> gdm...@gmail.com 
>> 
>> 
>> 
>>> On Mar 2, 2016, at 7:47 AM, Todd Nist >> > wrote:
>>> 
>>> Have you looked at Apache Toree, http://toree.apache.org/ 
>>> .  This was formerly the Spark-Kernel from IBM 
>>> but contributed to apache.
>>> 
>>> https://github.com/apache/incubator-toree 
>>> 
>>> 
>>> You can find a good overview on the spark-kernel here:
>>> http://www.spark.tc/how-to-enable-interactive-applications-against-apache-spark/
>>>  
>>> 
>>> 
>>> Not sure if that is of value to you or not.
>>> 
>>> HTH.
>>> 
>>> -Todd
>>> 
>>> On Tue, Mar 1, 2016 at 7:30 PM, Don Drake >> > wrote:
>>> I'm interested in building a REST service that utilizes a Spark SQL Context 
>>> to return records from a DataFrame (or IndexedRDD?) and even add/update 
>>> records.
>>> 
>>> This will be a simple REST API, with only a few end-points.  I found this 
>>> example:
>>> 
>>> https://github.com/alexmasselot/spark-play-activator 
>>> 
>>> 
>>> which looks close to what I am interested in doing.  
>>> 
>>> Are there any other ideas or options if I want to run this in a YARN 
>>> cluster?
>>> 
>>> Thanks.
>>> 
>>> -Don
>>> 
>>> -- 
>>> Donald Drake
>>> Drake Consulting
>>> http://www.drakeconsulting.com/ 
>>> https://twitter.com/dondrake 
>>> 800-733-2143 
>> 



Spark on Yarn with Dynamic Resource Allocation. Container always marked as failed

2016-03-02 Thread Xiaoye Sun
Hi all,

I am very new to spark and yarn.

I am running a BroadcastTest example application using spark 1.6.0 and
Hadoop/Yarn 2.7.1. in a 5 nodes cluster.

I configured my configuration files according to
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

1. copy
./spark-1.6.0/network/yarn/target/scala-2.10/spark-1.6.0-yarn-shuffle.jar
to /hadoop-2.7.1/share/hadoop/yarn/lib/
2. yarn-site.xml is like this http://www.owlnet.rice.edu/~xs6/yarn-site.xml
3. spark-defaults.conf is like this
http://www.owlnet.rice.edu/~xs6/spark-defaults.conf
4. spark-env.sh is like this http://www.owlnet.rice.edu/~xs6/spark-env.sh
5. the command I use to submit spark application is: ./bin/spark-submit
--class org.apache.spark.examples.BroadcastTest --master yarn --deploy-mode
cluster ./examples/target/spark-examples_2.10-1.6.0.jar 1 1000 Http

However, the job is stuck at RUNNING status, and by looking at the log, I
found that the executor is failed/cancelled frequently...
Here is the log output http://www.owlnet.rice.edu/~xs6/stderr
It shows something like

16/03/02 02:07:35 WARN yarn.YarnAllocator: Container marked as failed:
container_1456905762620_0002_01_02 on host: bold-x.rice.edu. Exit
status: 1. Diagnostics: Exception from container-launch.


Is there anybody know what is the problem here?
Best,
Xiaoye


Avro SerDe Issue w/ Manual Partitions?

2016-03-02 Thread Chris Miller
Hi,

I have a strange issue occurring when I use manual partitions.

If I create a table as follows, I am able to query the data with no problem:


CREATE TABLE test1
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 's3://analytics-bucket/prod/logs/avro/2016/03/02/'
TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc');


If I create the table like this, however, and then add a partition with a
LOCATION specified, I am unable to query:


CREATE TABLE test2
PARTITIONED BY (ds STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc');

ALTER TABLE test7 ADD PARTITION (ds='1') LOCATION
's3://analytics-bucket/prod/logs/avro/2016/03/02/';


This is what happens


SELECT * FROM test2 LIMIT 1;

org.apache.avro.AvroTypeException: Found ActionEnum, expecting union
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


The data is exactly the same, and I can still go back and query the test1
table without issue. I don't have control over the directory structure, so
I need to add the partitions manually so that I can specify a location.

For what it's worth, "ActionEnum" is the first field in my schema. This
same table and query structure works fine with Hive. When I try to run this
with SparkSQL, however, I get the above error.

Anyone have any idea what the problem is here? Thanks!

--
Chris Miller


Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Reynold Xin
Thanks. Once you create the jira just reply to this email with the link.

On Wednesday, March 2, 2016, Ewan Leith  wrote:

> Thanks, I'll create the JIRA for it. Happy to help contribute to a patch if 
> we can, not sure if my own scala skills will be up to it but perhaps one of 
> my colleagues' will :)
>
> Ewan
>
> I don't think that exists right now, but it's definitely a good option to
> have. I myself have run into this issue a few times.
>
> Can you create a JIRA ticket so we can track it? Would be even better if
> you are interested in working on a patch! Thanks.
>
>
> On Wed, Mar 2, 2016 at 11:51 AM, Ewan Leith  > wrote:
>
>> Hi Reynold, yes that would be perfect for our use case.
>>
>> I assume it doesn't exist though, otherwise I really need to go re-read the 
>> docs!
>>
>> Thanks to both of you for replying by the way, I know you must be hugely 
>> busy.
>>
>> Ewan
>>
>> Are you looking for "relaxed" mode that simply return nulls for fields
>> that doesn't exist or have incompatible schema?
>>
>>
>> On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith > > wrote:
>>
>>> Thanks Michael, it's not a great example really, as the data I'm working 
>>> with has some source files that do fit the schema, and some that don't (out 
>>> of millions that do work, perhaps 10 might not).
>>>
>>> In an ideal world for us the select would probably return the valid records 
>>> only.
>>>
>>> We're trying out the new dataset APIs to see if we can do some 
>>> pre-filtering that way.
>>>
>>> Thanks,
>>> Ewan
>>>
>>> -dev +user
>>>
>>> StructType(StructField(data,ArrayType(StructType(StructField(
 *stuff,ArrayType(*StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), StructField(othertype,
 ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))
>>>
>>>
>>> Its not a great error message, but as the schema above shows, stuff is
>>> an array, not a struct.  So, you need to pick a particular element (using
>>> []) before you can pull out a specific field.  It would be easier to see
>>> this if you ran sqlContext.read.json(s1Rdd).printSchema(), which gives
>>> you a tree view.  Try the following.
>>>
>>>
>>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")
>>>
>>> On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith >> > wrote:
>>>
 When you create a dataframe using the *sqlContext.read.schema()* API,
 if you pass in a schema that’s compatible with some of the records, but
 incompatible with others, it seems you can’t do a .select on the
 problematic columns, instead you get an AnalysisException error.



 I know loading the wrong data isn’t good behaviour, but if you’re
 reading data from (for example) JSON files, there’s going to be malformed
 files along the way. I think it would be nice to handle this error in a
 nicer way, though I don’t know the best way to approach it.



 Before I raise a JIRA ticket about it, would people consider this to be
 a bug or expected behaviour?



 I’ve attached a couple of sample JSON files and the steps below to
 reproduce it, by taking the inferred schema from the simple1.json file, and
 applying it to a union of simple1.json and simple2.json. You can visually
 see the data has been parsed as I think you’d want if you do a .select on
 the parent column and print out the output, but when you do a select on the
 problem column you instead get an exception.



 *scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x =>
 x._2)*

 s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map
 at :27



 *scala> val s1schema = sqlContext.read.json(s1Rdd).schema*

 s1schema: org.apache.spark.sql.types.StructType =
 StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true),
 StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))



 *scala>
 sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)*

 [WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don
 Joeh]),null], [null,WrappedArray([ACME,2])]))]

 [WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])],
 [WrappedArray([2,null]),null]))]


Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
Thanks, I'll create the JIRA for it. Happy to help contribute to a patch if we 
can, not sure if my own scala skills will be up to it but perhaps one of my 
colleagues' will :)

Ewan

I don't think that exists right now, but it's definitely a good option to have. 
I myself have run into this issue a few times.

Can you create a JIRA ticket so we can track it? Would be even better if you 
are interested in working on a patch! Thanks.


On Wed, Mar 2, 2016 at 11:51 AM, Ewan Leith 
> wrote:

Hi Reynold, yes that would be perfect for our use case.

I assume it doesn't exist though, otherwise I really need to go re-read the 
docs!

Thanks to both of you for replying by the way, I know you must be hugely busy.

Ewan

Are you looking for "relaxed" mode that simply return nulls for fields that 
doesn't exist or have incompatible schema?


On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith 
> wrote:

Thanks Michael, it's not a great example really, as the data I'm working with 
has some source files that do fit the schema, and some that don't (out of 
millions that do work, perhaps 10 might not).

In an ideal world for us the select would probably return the valid records 
only.

We're trying out the new dataset APIs to see if we can do some pre-filtering 
that way.

Thanks,
Ewan

-dev +user

StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

Its not a great error message, but as the schema above shows, stuff is an 
array, not a struct.  So, you need to pick a particular element (using []) 
before you can pull out a specific field.  It would be easier to see this if 
you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a tree view. 
 Try the following.

sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")

On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
> wrote:
When you create a dataframe using the sqlContext.read.schema() API, if you pass 
in a schema that’s compatible with some of the records, but incompatible with 
others, it seems you can’t do a .select on the problematic columns, instead you 
get an AnalysisException error.

I know loading the wrong data isn’t good behaviour, but if you’re reading data 
from (for example) JSON files, there’s going to be malformed files along the 
way. I think it would be nice to handle this error in a nicer way, though I 
don’t know the best way to approach it.

Before I raise a JIRA ticket about it, would people consider this to be a bug 
or expected behaviour?

I’ve attached a couple of sample JSON files and the steps below to reproduce 
it, by taking the inferred schema from the simple1.json file, and applying it 
to a union of simple1.json and simple2.json. You can visually see the data has 
been parsed as I think you’d want if you do a .select on the parent column and 
print out the output, but when you do a select on the problem column you 
instead get an exception.

scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)
s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at 
:27

scala> val s1schema = sqlContext.read.json(s1Rdd).schema
s1schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

scala> 
sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)
[WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don Joeh]),null], 
[null,WrappedArray([ACME,2])]))]
[WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])], 
[WrappedArray([2,null]),null]))]

scala> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")
org.apache.spark.sql.AnalysisException: cannot resolve 'data.stuff[onetype]' 
due to data type mismatch: argument 2 requires integral type, however, 
'onetype' is of string type.;
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
  

Re: SFTP Compressed CSV into Dataframe

2016-03-02 Thread Sumedh Wale

On Thursday 03 March 2016 12:47 AM, Benjamin Kim wrote:

I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV file? I am able 
to download the file first locally using the SFTP Client in the spark-sftp package. Then, 
I load the file into a dataframe using the spark-csv package, which automatically 
decompresses the file. I just want to remove the "downloading file to local" 
step and directly have the remote file decompressed, read, and loaded. Can someone give 
me any hints?


One easy way on Linux, of course, is to use sshfs 
(https://github.com/libfuse/sshfs) and mount the remote directory 
locally. Since this uses FUSE, so works fine with normal user privileges.



Thanks,
Ben


Thanks

--
Sumedh Wale
SnappyData (http://www.snappydata.io)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Reynold Xin
I don't think that exists right now, but it's definitely a good option to
have. I myself have run into this issue a few times.

Can you create a JIRA ticket so we can track it? Would be even better if
you are interested in working on a patch! Thanks.


On Wed, Mar 2, 2016 at 11:51 AM, Ewan Leith 
wrote:

> Hi Reynold, yes that would be perfect for our use case.
>
> I assume it doesn't exist though, otherwise I really need to go re-read the 
> docs!
>
> Thanks to both of you for replying by the way, I know you must be hugely busy.
>
> Ewan
>
> Are you looking for "relaxed" mode that simply return nulls for fields
> that doesn't exist or have incompatible schema?
>
>
> On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith 
> wrote:
>
>> Thanks Michael, it's not a great example really, as the data I'm working 
>> with has some source files that do fit the schema, and some that don't (out 
>> of millions that do work, perhaps 10 might not).
>>
>> In an ideal world for us the select would probably return the valid records 
>> only.
>>
>> We're trying out the new dataset APIs to see if we can do some pre-filtering 
>> that way.
>>
>> Thanks,
>> Ewan
>>
>> -dev +user
>>
>> StructType(StructField(data,ArrayType(StructType(StructField(
>>> *stuff,ArrayType(*StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
>>> StructField(name,StringType,true)),true),true), StructField(othertype,
>>> ArrayType(StructType(StructField(company,StringType,true),
>>> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>>
>>
>> Its not a great error message, but as the schema above shows, stuff is
>> an array, not a struct.  So, you need to pick a particular element (using
>> []) before you can pull out a specific field.  It would be easier to see
>> this if you ran sqlContext.read.json(s1Rdd).printSchema(), which gives
>> you a tree view.  Try the following.
>>
>>
>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")
>>
>> On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
>> wrote:
>>
>>> When you create a dataframe using the *sqlContext.read.schema()* API,
>>> if you pass in a schema that’s compatible with some of the records, but
>>> incompatible with others, it seems you can’t do a .select on the
>>> problematic columns, instead you get an AnalysisException error.
>>>
>>>
>>>
>>> I know loading the wrong data isn’t good behaviour, but if you’re
>>> reading data from (for example) JSON files, there’s going to be malformed
>>> files along the way. I think it would be nice to handle this error in a
>>> nicer way, though I don’t know the best way to approach it.
>>>
>>>
>>>
>>> Before I raise a JIRA ticket about it, would people consider this to be
>>> a bug or expected behaviour?
>>>
>>>
>>>
>>> I’ve attached a couple of sample JSON files and the steps below to
>>> reproduce it, by taking the inferred schema from the simple1.json file, and
>>> applying it to a union of simple1.json and simple2.json. You can visually
>>> see the data has been parsed as I think you’d want if you do a .select on
>>> the parent column and print out the output, but when you do a select on the
>>> problem column you instead get an exception.
>>>
>>>
>>>
>>> *scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x =>
>>> x._2)*
>>>
>>> s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map
>>> at :27
>>>
>>>
>>>
>>> *scala> val s1schema = sqlContext.read.json(s1Rdd).schema*
>>>
>>> s1schema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
>>> StructField(name,StringType,true)),true),true),
>>> StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
>>> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>>>
>>>
>>>
>>> *scala>
>>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)*
>>>
>>> [WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don
>>> Joeh]),null], [null,WrappedArray([ACME,2])]))]
>>>
>>> [WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])],
>>> [WrappedArray([2,null]),null]))]
>>>
>>>
>>>
>>> *scala>
>>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")*
>>>
>>> org.apache.spark.sql.AnalysisException: cannot resolve
>>> 'data.stuff[onetype]' due to data type mismatch: argument 2 requires
>>> integral type, however, 'onetype' is of string type.;
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
>>>
>>> at
>>> 

Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
Hi Reynold, yes that would be perfect for our use case.

I assume it doesn't exist though, otherwise I really need to go re-read the 
docs!

Thanks to both of you for replying by the way, I know you must be hugely busy.

Ewan

Are you looking for "relaxed" mode that simply return nulls for fields that 
doesn't exist or have incompatible schema?


On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith 
> wrote:

Thanks Michael, it's not a great example really, as the data I'm working with 
has some source files that do fit the schema, and some that don't (out of 
millions that do work, perhaps 10 might not).

In an ideal world for us the select would probably return the valid records 
only.

We're trying out the new dataset APIs to see if we can do some pre-filtering 
that way.

Thanks,
Ewan

-dev +user

StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

Its not a great error message, but as the schema above shows, stuff is an 
array, not a struct.  So, you need to pick a particular element (using []) 
before you can pull out a specific field.  It would be easier to see this if 
you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a tree view. 
 Try the following.

sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")

On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
> wrote:
When you create a dataframe using the sqlContext.read.schema() API, if you pass 
in a schema that’s compatible with some of the records, but incompatible with 
others, it seems you can’t do a .select on the problematic columns, instead you 
get an AnalysisException error.

I know loading the wrong data isn’t good behaviour, but if you’re reading data 
from (for example) JSON files, there’s going to be malformed files along the 
way. I think it would be nice to handle this error in a nicer way, though I 
don’t know the best way to approach it.

Before I raise a JIRA ticket about it, would people consider this to be a bug 
or expected behaviour?

I’ve attached a couple of sample JSON files and the steps below to reproduce 
it, by taking the inferred schema from the simple1.json file, and applying it 
to a union of simple1.json and simple2.json. You can visually see the data has 
been parsed as I think you’d want if you do a .select on the parent column and 
print out the output, but when you do a select on the problem column you 
instead get an exception.

scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)
s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at 
:27

scala> val s1schema = sqlContext.read.json(s1Rdd).schema
s1schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

scala> 
sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)
[WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don Joeh]),null], 
[null,WrappedArray([ACME,2])]))]
[WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])], 
[WrappedArray([2,null]),null]))]

scala> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")
org.apache.spark.sql.AnalysisException: cannot resolve 'data.stuff[onetype]' 
due to data type mismatch: argument 2 requires integral type, however, 
'onetype' is of string type.;
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)

(The full exception is attached too).

What do people think, is this a bug?

Thanks,
Ewan


-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org
For additional commands, e-mail: 

Re: SFTP Compressed CSV into Dataframe

2016-03-02 Thread Ewan Leith
The Apache Commons library will let you access files on an SFTP server via a 
Java library, no local file handling involved

https://commons.apache.org/proper/commons-vfs/filesystems.html

Hope this helps,
Ewan

I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV file? 
I am able to download the file first locally using the SFTP Client in the 
spark-sftp package. Then, I load the file into a dataframe using the spark-csv 
package, which automatically decompresses the file. I just want to remove the 
"downloading file to local" step and directly have the remote file 
decompressed, read, and loaded. Can someone give me any hints?

Thanks,
Ben



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SFTP Compressed CSV into Dataframe

2016-03-02 Thread Holden Karau
So doing a quick look through the README & code for spark-sftp it seems
that the way this connector works is by downloading the file locally on the
driver program and this is not configurable - so you would probably need to
find a different connector (and you probably shouldn't use spark-sftp for
large files). It also seems that it might not work in a cluster environment
(which the projects README also warns about). You might have better luck
using FUSE + sftp, although you will still want your remote gzip csv file
to be split into multiple files since gzip isn't a splittable compression
format.

On Wed, Mar 2, 2016 at 11:17 AM, Benjamin Kim  wrote:

> I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV
> file? I am able to download the file first locally using the SFTP Client in
> the spark-sftp package. Then, I load the file into a dataframe using the
> spark-csv package, which automatically decompresses the file. I just want
> to remove the "downloading file to local" step and directly have the remote
> file decompressed, read, and loaded. Can someone give me any hints?
>
> Thanks,
> Ben
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Reynold Xin
Are you looking for "relaxed" mode that simply return nulls for fields that
doesn't exist or have incompatible schema?


On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith 
wrote:

> Thanks Michael, it's not a great example really, as the data I'm working with 
> has some source files that do fit the schema, and some that don't (out of 
> millions that do work, perhaps 10 might not).
>
> In an ideal world for us the select would probably return the valid records 
> only.
>
> We're trying out the new dataset APIs to see if we can do some pre-filtering 
> that way.
>
> Thanks,
> Ewan
>
> -dev +user
>
> StructType(StructField(data,ArrayType(StructType(StructField(
>> *stuff,ArrayType(*StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
>> StructField(name,StringType,true)),true),true), StructField(othertype,
>> ArrayType(StructType(StructField(company,StringType,true),
>> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>
>
> Its not a great error message, but as the schema above shows, stuff is an
> array, not a struct.  So, you need to pick a particular element (using [])
> before you can pull out a specific field.  It would be easier to see this
> if you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a
> tree view.  Try the following.
>
>
> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")
>
> On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
> wrote:
>
>> When you create a dataframe using the *sqlContext.read.schema()* API, if
>> you pass in a schema that’s compatible with some of the records, but
>> incompatible with others, it seems you can’t do a .select on the
>> problematic columns, instead you get an AnalysisException error.
>>
>>
>>
>> I know loading the wrong data isn’t good behaviour, but if you’re reading
>> data from (for example) JSON files, there’s going to be malformed files
>> along the way. I think it would be nice to handle this error in a nicer
>> way, though I don’t know the best way to approach it.
>>
>>
>>
>> Before I raise a JIRA ticket about it, would people consider this to be a
>> bug or expected behaviour?
>>
>>
>>
>> I’ve attached a couple of sample JSON files and the steps below to
>> reproduce it, by taking the inferred schema from the simple1.json file, and
>> applying it to a union of simple1.json and simple2.json. You can visually
>> see the data has been parsed as I think you’d want if you do a .select on
>> the parent column and print out the output, but when you do a select on the
>> problem column you instead get an exception.
>>
>>
>>
>> *scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)*
>>
>> s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at
>> :27
>>
>>
>>
>> *scala> val s1schema = sqlContext.read.json(s1Rdd).schema*
>>
>> s1schema: org.apache.spark.sql.types.StructType =
>> StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
>> StructField(name,StringType,true)),true),true),
>> StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
>> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>>
>>
>>
>> *scala>
>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)*
>>
>> [WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don
>> Joeh]),null], [null,WrappedArray([ACME,2])]))]
>>
>> [WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])],
>> [WrappedArray([2,null]),null]))]
>>
>>
>>
>> *scala>
>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")*
>>
>> org.apache.spark.sql.AnalysisException: cannot resolve
>> 'data.stuff[onetype]' due to data type mismatch: argument 2 requires
>> integral type, however, 'onetype' is of string type.;
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
>>
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>>
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>>
>>
>>
>> (The full exception is attached too).
>>
>>
>>
>> What do people think, is this a bug?
>>
>>
>>
>> Thanks,
>>
>> Ewan
>>
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>
>


SFTP Compressed CSV into Dataframe

2016-03-02 Thread Benjamin Kim
I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV file? 
I am able to download the file first locally using the SFTP Client in the 
spark-sftp package. Then, I load the file into a dataframe using the spark-csv 
package, which automatically decompresses the file. I just want to remove the 
"downloading file to local" step and directly have the remote file 
decompressed, read, and loaded. Can someone give me any hints?

Thanks,
Ben



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
Thanks Michael, it's not a great example really, as the data I'm working with 
has some source files that do fit the schema, and some that don't (out of 
millions that do work, perhaps 10 might not).

In an ideal world for us the select would probably return the valid records 
only.

We're trying out the new dataset APIs to see if we can do some pre-filtering 
that way.

Thanks,
Ewan

-dev +user

StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

Its not a great error message, but as the schema above shows, stuff is an 
array, not a struct.  So, you need to pick a particular element (using []) 
before you can pull out a specific field.  It would be easier to see this if 
you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a tree view. 
 Try the following.

sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")

On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
> wrote:
When you create a dataframe using the sqlContext.read.schema() API, if you pass 
in a schema that's compatible with some of the records, but incompatible with 
others, it seems you can't do a .select on the problematic columns, instead you 
get an AnalysisException error.

I know loading the wrong data isn't good behaviour, but if you're reading data 
from (for example) JSON files, there's going to be malformed files along the 
way. I think it would be nice to handle this error in a nicer way, though I 
don't know the best way to approach it.

Before I raise a JIRA ticket about it, would people consider this to be a bug 
or expected behaviour?

I've attached a couple of sample JSON files and the steps below to reproduce 
it, by taking the inferred schema from the simple1.json file, and applying it 
to a union of simple1.json and simple2.json. You can visually see the data has 
been parsed as I think you'd want if you do a .select on the parent column and 
print out the output, but when you do a select on the problem column you 
instead get an exception.

scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)
s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at 
:27

scala> val s1schema = sqlContext.read.json(s1Rdd).schema
s1schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

scala> 
sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)
[WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don Joeh]),null], 
[null,WrappedArray([ACME,2])]))]
[WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])], 
[WrappedArray([2,null]),null]))]

scala> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")
org.apache.spark.sql.AnalysisException: cannot resolve 'data.stuff[onetype]' 
due to data type mismatch: argument 2 requires integral type, however, 
'onetype' is of string type.;
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)

(The full exception is attached too).

What do people think, is this a bug?

Thanks,
Ewan


-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org
For additional commands, e-mail: 
dev-h...@spark.apache.org



Re: Unit testing framework for Spark Jobs?

2016-03-02 Thread radoburansky
I am sure you have googled this:
https://github.com/holdenk/spark-testing-base

On Wed, Mar 2, 2016 at 6:54 PM, SRK [via Apache Spark User List] <
ml-node+s1001560n2638...@n3.nabble.com> wrote:

> Hi,
>
> What is a good unit testing framework for Spark batch/streaming jobs? I
> have core spark, spark sql with dataframes and streaming api getting used.
> Any good framework to cover unit tests for these APIs?
>
> Thanks!
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380p26384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: spark streaming

2016-03-02 Thread Vinti Maheshwari
Thanks Shixiong. Sure. Please find the details:

Spark-version: 1.5.2
I am doing data aggregation using check pointing, not sure if this is
causing issue.
Also, i am using perl_kafka producer to push data to kafka and then my
spark program is reading it from kafka. Not sure, if i need to use
createStream function instead of createDirectStream?

My program:

def main(args: Array[String]): Unit = {

val checkpointDirectory = "hdfs://:8020/user/spark/"  + args(2)

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val conf = new SparkConf().setAppName("HBaseStream")
  val sc = new SparkContext(conf)
  // create a StreamingContext, the main entry point for all
streaming functionality
  val ssc = new StreamingContext(sc, Seconds(1))

  val brokers = args(0)
  val topics= args(1)
  val topicsSet = topics.split(",").toSet
  val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokers, "auto.offset.reset" -> "smallest")

  val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

  // parse the lines of data into coverage objects
  val inputStream = messages.map(_._2)
  ssc.checkpoint(checkpointDirectory)
  inputStream.print(1)
  val parsedStream = inputStream
.map(line => {
  val splitLines = line.split(",")
  (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
})
  import breeze.linalg.{DenseVector => BDV}
  import scala.util.Try

  val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
  prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

  state.checkpoint(Duration(1))
  state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
  ssc
}
val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)
context.start()
context.awaitTermination()

  }
}

Thanks & Regards,

Vinti


Thanks & Regards,

Vinti




On Wed, Mar 2, 2016 at 10:28 AM, Shixiong(Ryan) Zhu  wrote:

> Hey,
>
> KafkaUtils.createDirectStream doesn't need a StorageLevel as it doesn't
> store blocks to BlockManager. However, the error is not related
> to StorageLevel. It may be a bug. Could you provide more info about it?
> E.g., Spark version, your codes, logs.
>
> On Wed, Mar 2, 2016 at 3:02 AM, Vinti Maheshwari 
> wrote:
>
>> Hi All,
>>
>> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
>> program as currently i am getting
>> MetadataFetchFailedException*. *I am not sure where i should pass
>> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream
>> doesn't allow to pass that parameter.
>>
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>> StringDecoder](
>>   ssc, kafkaParams, topicsSet)
>>
>>
>> Full Error:
>>
>> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
>> location for shuffle 0*
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> at
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
>> at
>> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
>> at
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> 

Re: Unit testing framework for Spark Jobs?

2016-03-02 Thread Ricardo Paiva
I use the plain and old Junit

Spark batch example:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.junit.AfterClass
import org.junit.Assert.assertEquals
import org.junit.BeforeClass
import org.junit.Test

object TestMyCode {

  var sc: SparkContext = null

  @BeforeClass
  def setup(): Unit = {
val sparkConf = new SparkConf()
  .setAppName("Test Spark")
  .setMaster("local[*]")
sc = new SparkContext(sparkConf)
  }

  @AfterClass
  def cleanup(): Unit = {
sc.stop()
  }
}

class TestMyCode {

  @Test
  def testSaveNumbersToExtractor(): Unit = {
val sql = new SQLContext(TestDataframeToTableau.sc)
import sql.implicits._

val numList = List(1, 2, 3, 4, 5)
val df = TestDataframeToTableau.sc.parallelize(numList).toDF
val numDf = df.select(df("_1").alias("num"))
assertEquals(5, numDf.count)
  }

}

On Wed, Mar 2, 2016 at 2:54 PM, SRK [via Apache Spark User List] <
ml-node+s1001560n26380...@n3.nabble.com> wrote:

> Hi,
>
> What is a good unit testing framework for Spark batch/streaming jobs? I
> have core spark, spark sql with dataframes and streaming api getting used.
> Any good framework to cover unit tests for these APIs?
>
> Thanks!
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>



-- 
Ricardo Paiva
Big Data / Semântica
2483-6432
*globo.com* 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380p26383.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: spark streaming

2016-03-02 Thread Shixiong(Ryan) Zhu
Hey,

KafkaUtils.createDirectStream doesn't need a StorageLevel as it doesn't
store blocks to BlockManager. However, the error is not related
to StorageLevel. It may be a bug. Could you provide more info about it?
E.g., Spark version, your codes, logs.

On Wed, Mar 2, 2016 at 3:02 AM, Vinti Maheshwari 
wrote:

> Hi All,
>
> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
> program as currently i am getting
> MetadataFetchFailedException*. *I am not sure where i should pass
> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
> allow to pass that parameter.
>
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
>   ssc, kafkaParams, topicsSet)
>
>
> Full Error:
>
> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0*
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
> at
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> )
>
> Thanks,
> ~Vinti
>


Re: Unit testing framework for Spark Jobs?

2016-03-02 Thread Silvio Fiorito
Please check out the following for some good resources:

https://github.com/holdenk/spark-testing-base


https://spark-summit.org/east-2016/events/beyond-collect-and-parallelize-for-tests/





On 3/2/16, 12:54 PM, "SRK"  wrote:

>Hi,
>
>What is a good unit testing framework for Spark batch/streaming jobs? I have
>core spark, spark sql with dataframes and streaming api getting used. Any
>good framework to cover unit tests for these APIs?
>
>Thanks!
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unit testing framework for Spark Jobs?

2016-03-02 Thread Yin Yang
Cycling prior bits:

http://search-hadoop.com/m/q3RTto4sby1Cd2rt=Re+Unit+test+with+sqlContext

On Wed, Mar 2, 2016 at 9:54 AM, SRK  wrote:

> Hi,
>
> What is a good unit testing framework for Spark batch/streaming jobs? I
> have
> core spark, spark sql with dataframes and streaming api getting used. Any
> good framework to cover unit tests for these APIs?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Michael Armbrust
-dev +user

StructType(StructField(data,ArrayType(StructType(StructField(
> *stuff,ArrayType(*StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
> StructField(name,StringType,true)),true),true), StructField(othertype,
> ArrayType(StructType(StructField(company,StringType,true),
> StructField(id,LongType,true)),true),true)),true),true)),true),true))


Its not a great error message, but as the schema above shows, stuff is an
array, not a struct.  So, you need to pick a particular element (using [])
before you can pull out a specific field.  It would be easier to see this
if you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a
tree view.  Try the following.

sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")

On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
wrote:

> When you create a dataframe using the *sqlContext.read.schema()* API, if
> you pass in a schema that’s compatible with some of the records, but
> incompatible with others, it seems you can’t do a .select on the
> problematic columns, instead you get an AnalysisException error.
>
>
>
> I know loading the wrong data isn’t good behaviour, but if you’re reading
> data from (for example) JSON files, there’s going to be malformed files
> along the way. I think it would be nice to handle this error in a nicer
> way, though I don’t know the best way to approach it.
>
>
>
> Before I raise a JIRA ticket about it, would people consider this to be a
> bug or expected behaviour?
>
>
>
> I’ve attached a couple of sample JSON files and the steps below to
> reproduce it, by taking the inferred schema from the simple1.json file, and
> applying it to a union of simple1.json and simple2.json. You can visually
> see the data has been parsed as I think you’d want if you do a .select on
> the parent column and print out the output, but when you do a select on the
> problem column you instead get an exception.
>
>
>
> *scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)*
>
> s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at
> :27
>
>
>
> *scala> val s1schema = sqlContext.read.json(s1Rdd).schema*
>
> s1schema: org.apache.spark.sql.types.StructType =
> StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
> StructField(name,StringType,true)),true),true),
> StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>
>
>
> *scala>
> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)*
>
> [WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don
> Joeh]),null], [null,WrappedArray([ACME,2])]))]
>
> [WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])],
> [WrappedArray([2,null]),null]))]
>
>
>
> *scala>
> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")*
>
> org.apache.spark.sql.AnalysisException: cannot resolve
> 'data.stuff[onetype]' due to data type mismatch: argument 2 requires
> integral type, however, 'onetype' is of string type.;
>
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
>
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
>
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>
>
>
> (The full exception is attached too).
>
>
>
> What do people think, is this a bug?
>
>
>
> Thanks,
>
> Ewan
>
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>


Unit testing framework for Spark Jobs?

2016-03-02 Thread SRK
Hi,

What is a good unit testing framework for Spark batch/streaming jobs? I have
core spark, spark sql with dataframes and streaming api getting used. Any
good framework to cover unit tests for these APIs?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark on Yarn with Dynamic Resource Allocation. Container always marked as failed

2016-03-02 Thread Xiaoye Sun
Hi all,

I am very new to spark and yarn.

I am running a BroadcastTest example application using spark 1.6.0 and
Hadoop/Yarn 2.7.1. in a 5 nodes cluster.

I configured my configuration files according to
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

1. copy
./spark-1.6.0/network/yarn/target/scala-2.10/spark-1.6.0-yarn-shuffle.jar
to /hadoop-2.7.1/share/hadoop/yarn/lib/
2. yarn-site.xml is like this http://www.owlnet.rice.edu/~xs6/yarn-site.xml
3. spark-defaults.conf is like this
http://www.owlnet.rice.edu/~xs6/spark-defaults.conf
4. spark-env.sh is like this http://www.owlnet.rice.edu/~xs6/spark-env.sh
5. the command I use to submit spark application is: ./bin/spark-submit
--class org.apache.spark.examples.BroadcastTest --master yarn --deploy-mode
cluster ./examples/target/spark-examples_2.10-1.6.0.jar 1 1000 Http

However, the job is stuck at RUNNING status, and by looking at the log, I
found that the executor is failed/cancelled frequently...
Here is the log output http://www.owlnet.rice.edu/~xs6/stderr
It shows something like

16/03/02 02:07:35 WARN yarn.YarnAllocator: Container marked as failed:
container_1456905762620_0002_01_02 on host: bold-x.rice.edu. Exit
status: 1. Diagnostics: Exception from container-launch.


Is there anybody know what is the problem here?
Best,
Xiaoye


Re: How to control the number of parquet files getting created under a partition ?

2016-03-02 Thread swetha kasireddy
Thanks. I tried this yesterday and it seems to be working.

On Wed, Mar 2, 2016 at 1:49 AM, James Hammerton  wrote:

> Hi,
>
> Based on the behaviour I've seen using parquet, the number of partitions
> in the DataFrame will determine the number of files in each parquet
> partition.
>
> I.e. when you use "PARTITION BY" you're actually partitioning twice, once
> via the partitions spark has created internally and then again with the
> partitions you specify in the "PARTITION BY" clause.
>
> So if you have 10 partitions in your DataFrame, and save that as a parquet
> file or table partitioned on a column with 3 values, you'll get 30
> partitions, 10 per parquet partition.
>
> You can reduce the number of partitions in the DataFrame by using
> coalesce() before saving the data.
>
> Regards,
>
> James
>
>
> On 1 March 2016 at 21:01, SRK  wrote:
>
>> Hi,
>>
>> How can I control the number of parquet files getting created under a
>> partition? I have my sqlContext queries to create a table and insert the
>> records as follows. It seems to create around 250 parquet files under each
>> partition though I was expecting that to create around 2 or 3 files. Due
>> to
>> the large number of files, it takes a lot of time to scan the records. Any
>> suggestions as to how to control the number of parquet files under each
>> partition would be of great help.
>>
>>  sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS testUserDts
>> (userId STRING, savedDate STRING) PARTITIONED BY (partitioner STRING)
>> stored as PARQUET LOCATION '/user/testId/testUserDts' ")
>>
>>   sqlContext.sql(
>> """from testUserDtsTemp ps   insert overwrite table testUserDts
>> partition(partitioner)  select ps.userId, ps.savedDate ,  ps.partitioner
>> """.stripMargin)
>>
>>
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-the-number-of-parquet-files-getting-created-under-a-partition-tp26374.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Add Jars to Master/Worker classpath

2016-03-02 Thread Sumedh Wale

  
  
On Wednesday 02 March 2016 09:39 PM,
  Matthias Niehoff wrote:


  no, not to driver and executor but to the master
and worker instances of the spark standalone cluster
  
  


Why exactly does adding jars to driver/executor extraClassPath not
work?

Classpath of master/worker is setup by AbstractCommandBuilder that
explicitly adds the following:

jars named "datanucleus-*", environment variables: _SPARK_ASSEMBLY
(for assembly jar), SPARK_DIST_CLASSPATH, HADOOP_CONF_DIR,
YARN_CONF_DIR

So you can set SPARK_DIST_CLASSPATH in conf/spark-env.sh to add the
required jars (separated by platform's File.pathSeparator).


thanks
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)


  
Am 2. März 2016 um 17:05 schrieb Igor
  Berman :
  
spark.driver.extraClassPath
  spark.executor.extraClassPath



  

  2016-03-02 18:01 GMT+02:00
Matthias Niehoff :

  Hi,


we want to add jars to the Master and
  Worker class path mainly for logging reason
  (we have a redis appender to send logs to
  redis -> logstash -> elasticsearch). 



  While it is working
  with setting SPARK_CLASSPATH, this
  solution is afaik deprecated and should
  not be used. Furthermore we are also using —driver-java-options
  and spark.executor.extraClassPath
  which leads to exceptions when running
  our apps in standalone cluster mode. 
  
  
  So what is the best way to
  add jars to the master and
  worker classpath?
  

  Thank you
  
  
  
  -- 
  

  
Matthias
Niehoff | IT-Consultant | Agile
Software Factory  | Consulting
  codecentric
AG | Zeppelinstr 2 | 76185
Karlsruhe | Deutschland
  tel: +49 (0)
721.9595-681 |
fax: +49 (0)
721.9595-666 |
mobil: +49 (0)
172.1702676
  www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de 

Sitz
der Gesellschaft: Solingen | HRB
25917| Amtsgericht Wuppertal
  Vorstand:
Michael Hochgürtel . Mirko
Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier
(Vorsitzender) . Klaus Jäger .
Jürgen Schütz
  
  
  Diese
E-Mail einschließlich evtl.
beigefügter Dateien enthält
vertrauliche und/oder rechtlich
geschützte Informationen. Wenn
Sie nicht der richtige Adressat
sind oder diese E-Mail
irrtümlich erhalten haben,
informieren Sie bitte sofort den
Absender und löschen Sie diese
E-Mail und evtl. beigefügter
Dateien umgehend. Das unerlaubte
Kopieren, Nutzen 

Re: please add Christchurch Apache Spark Meetup Group

2016-03-02 Thread Sean Owen
(I have the site's svn repo handy, so I just added it.)


On Wed, Mar 2, 2016 at 5:16 PM, Raazesh Sainudiin
 wrote:
> Hi,
>
> Please add Christchurch Apache Spark Meetup Group to the community list
> here:
> http://spark.apache.org/community.html
>
> Our Meetup URI is:
> http://www.meetup.com/Christchurch-Apache-Spark-Meetup/
>
> Thanks,
> Raaz

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to achieve nested for loop in Spark

2016-03-02 Thread Vikash Kumar
Can we implement nested for/while loop in spark? I have to convert some SQL
procedure code into Spark. And it has multiple loops and processing and I
want to implement this in spark. How to implement this.

   1. open cursor and fetch for personType
   2. open cursor and fetch for personGroup
   3. open cursor and fetch for personname
   4.  some processing
   5. end cursor personname
   6. end cursor personGroup
   7. end cursor personType

something like this I want to implement in spark. Please suggest


Re: Spark 1.5 on Mesos

2016-03-02 Thread Sathish Kumaran Vairavelu
Try passing jar using --jars option
On Wed, Mar 2, 2016 at 10:17 AM Ashish Soni  wrote:

> I made some progress but now i am stuck at this point , Please help as
> looks like i am close to get it working
>
> I have everything running in docker container including mesos slave and
> master
>
> When i try to submit the pi example i get below error
> *Error: Cannot load main class from JAR file:/opt/spark/Example*
>
> Below is the command i use to submit as a docker container
>
> docker run -it --rm -e SPARK_MASTER="mesos://10.0.2.15:7077"  -e
> SPARK_IMAGE="spark_driver:latest" spark_driver:latest ./bin/spark-submit
> --deploy-mode cluster --name "PI Example" --class
> org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory
> 512m --executor-cores 1
> http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar
>
>
> On Tue, Mar 1, 2016 at 2:59 PM, Timothy Chen  wrote:
>
>> Can you go through the Mesos UI and look at the driver/executor log from
>> steer file and see what the problem is?
>>
>> Tim
>>
>> On Mar 1, 2016, at 8:05 AM, Ashish Soni  wrote:
>>
>> Not sure what is the issue but i am getting below error  when i try to
>> run spark PI example
>>
>> Blacklisting Mesos slave value: "5345asdasdasdkas234234asdasdasdasd"
>>due to too many failures; is Spark installed on it?
>> WARN TaskSchedulerImpl: Initial job has not accepted any resources; 
>> check your cluster UI to ensure that workers are registered and have 
>> sufficient resources
>>
>>
>> On Mon, Feb 29, 2016 at 1:39 PM, Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>>> May be the Mesos executor couldn't find spark image or the constraints
>>> are not satisfied. Check your Mesos UI if you see Spark application in the
>>> Frameworks tab
>>>
>>> On Mon, Feb 29, 2016 at 12:23 PM Ashish Soni 
>>> wrote:
>>>
 What is the Best practice , I have everything running as docker
 container in single host ( mesos and marathon also as docker container )
  and everything comes up fine but when i try to launch the spark shell i
 get below error


 SQL context available as sqlContext.

 scala> val data = sc.parallelize(1 to 100)
 data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
 parallelize at :27

 scala> data.count
 [Stage 0:>  (0
 + 0) / 2]16/02/29 18:21:12 WARN TaskSchedulerImpl: Initial job has not
 accepted any resources; check your cluster UI to ensure that workers are
 registered and have sufficient resources
 16/02/29 18:21:27 WARN TaskSchedulerImpl: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient resources



 On Mon, Feb 29, 2016 at 12:04 PM, Tim Chen  wrote:

> No you don't have to run Mesos in docker containers to run Spark in
> docker containers.
>
> Once you have Mesos cluster running you can then specfiy the Spark
> configurations in your Spark job (i.e: 
> spark.mesos.executor.docker.image=mesosphere/spark:1.6)
> and Mesos will automatically launch docker containers for you.
>
> Tim
>
> On Mon, Feb 29, 2016 at 7:36 AM, Ashish Soni 
> wrote:
>
>> Yes i read that and not much details here.
>>
>> Is it true that we need to have spark installed on each mesos docker
>> container ( master and slave ) ...
>>
>> Ashish
>>
>> On Fri, Feb 26, 2016 at 2:14 PM, Tim Chen  wrote:
>>
>>> https://spark.apache.org/docs/latest/running-on-mesos.html should
>>> be the best source, what problems were you running into?
>>>
>>> Tim
>>>
>>> On Fri, Feb 26, 2016 at 11:06 AM, Yin Yang 
>>> wrote:
>>>
 Have you read this ?
 https://spark.apache.org/docs/latest/running-on-mesos.html

 On Fri, Feb 26, 2016 at 11:03 AM, Ashish Soni <
 asoni.le...@gmail.com> wrote:

> Hi All ,
>
> Is there any proper documentation as how to run spark on mesos , I
> am trying from the last few days and not able to make it work.
>
> Please help
>
> Ashish
>


>>>
>>
>

>>
>


please add Christchurch Apache Spark Meetup Group

2016-03-02 Thread Raazesh Sainudiin
Hi,

Please add Christchurch Apache Spark Meetup Group to the community list
here:
http://spark.apache.org/community.html

Our Meetup URI is:
http://www.meetup.com/Christchurch-Apache-Spark-Meetup/

Thanks,
Raaz


Re: rdd cache name

2016-03-02 Thread Xinh Huynh
Hi Charles,

You can set the RDD name before using it. Just do before caching:
(Scala) myRdd.setName("Charles RDD")
(Python) myRdd.setName('Charles RDD')
Reference: PySpark doc:
http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

Fraction cached is the percentage of partitions of an RDD that are cached.
>From the code:
(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)
Code is here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
Fraction cached will be less than 100% if there isn't enough room for all
cached RDDs to fit in the cache. If it's a problem, you may want to
increase your in-memory cache size or cache off-heap or to disk.

Xinh

On Wed, Mar 2, 2016 at 1:48 AM, charles li  wrote:

> hi, there, I feel a little confused about the *cache* in spark.
>
> first, is there any way to *customize the cached RDD name*, it's not
> convenient for me when looking at the storage page, there are the kind of
> RDD in the RDD Name column, I hope to make it as my customized name, kinds
> of 'rdd 1', 'rrd of map', 'rdd of groupby' and so on.
>
> second, can some one tell me what exactly the '*Fraction Cached*' mean
> under the hood?
>
> great thanks
>
>
>
> ​
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


Re: Building a REST Service with Spark back-end

2016-03-02 Thread yanlin wang
Did any one use Livy in real world high concurrency web app? I think it uses 
spark submit command line to create job... How about  job server or notebook 
comparing with Livy?

Thx,
Yanlin

Sent from my iPhone

> On Mar 2, 2016, at 6:24 AM, Guru Medasani  wrote:
> 
> Hi Don,
> 
> Here is another REST interface for interacting with Spark from anywhere. 
> 
> https://github.com/cloudera/livy
> 
> Here is an example to estimate PI using Spark from Python using requests 
> library. 
> 
> >>> data = {
> ...   'code': textwrap.dedent("""\
> ...  val NUM_SAMPLES = 10;
> ...  val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
> ...val x = Math.random();
> ...val y = Math.random();
> ...if (x*x + y*y < 1) 1 else 0
> ...  }.reduce(_ + _);
> ...  println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
> ...  """)
> ... }
> >>> r = requests.post(statements_url, data=json.dumps(data), headers=headers)
> >>> pprint.pprint(r.json())
> {u'id': 1,
>  u'output': {u'data': {u'text/plain': u'Pi is roughly 3.14004\nNUM_SAMPLES: 
> Int = 10\ncount: Int = 78501'},
>  u'execution_count': 1,
>  u'status': u'ok'},
>  u'state': u'available'}
> 
> 
> Guru Medasani
> gdm...@gmail.com
> 
> 
> 
>> On Mar 2, 2016, at 7:47 AM, Todd Nist  wrote:
>> 
>> Have you looked at Apache Toree, http://toree.apache.org/.  This was 
>> formerly the Spark-Kernel from IBM but contributed to apache.
>> 
>> https://github.com/apache/incubator-toree
>> 
>> You can find a good overview on the spark-kernel here:
>> http://www.spark.tc/how-to-enable-interactive-applications-against-apache-spark/
>> 
>> Not sure if that is of value to you or not.
>> 
>> HTH.
>> 
>> -Todd
>> 
>>> On Tue, Mar 1, 2016 at 7:30 PM, Don Drake  wrote:
>>> I'm interested in building a REST service that utilizes a Spark SQL Context 
>>> to return records from a DataFrame (or IndexedRDD?) and even add/update 
>>> records.
>>> 
>>> This will be a simple REST API, with only a few end-points.  I found this 
>>> example:
>>> 
>>> https://github.com/alexmasselot/spark-play-activator
>>> 
>>> which looks close to what I am interested in doing.  
>>> 
>>> Are there any other ideas or options if I want to run this in a YARN 
>>> cluster?
>>> 
>>> Thanks.
>>> 
>>> -Don
>>> 
>>> -- 
>>> Donald Drake
>>> Drake Consulting
>>> http://www.drakeconsulting.com/
>>> https://twitter.com/dondrake
>>> 800-733-2143
> 


Re: Add Jars to Master/Worker classpath

2016-03-02 Thread Prabhu Joseph
Matthias,

 Can you check appending the jars in LAUNCH_CLASSPATH of
spark-1.4.1/sbin/spark_class

2016-03-02 21:39 GMT+05:30 Matthias Niehoff :

> no, not to driver and executor but to the master and worker instances of
> the spark standalone cluster
>
> Am 2. März 2016 um 17:05 schrieb Igor Berman :
>
>> spark.driver.extraClassPath
>> spark.executor.extraClassPath
>>
>> 2016-03-02 18:01 GMT+02:00 Matthias Niehoff <
>> matthias.nieh...@codecentric.de>:
>>
>>> Hi,
>>>
>>> we want to add jars to the Master and Worker class path mainly for
>>> logging reason (we have a redis appender to send logs to redis -> logstash
>>> -> elasticsearch).
>>>
>>> While it is working with setting SPARK_CLASSPATH, this solution is
>>> afaik deprecated and should not be used. Furthermore we are also using 
>>> —driver-java-options
>>> and spark.executor.extraClassPath which leads to exceptions when
>>> running our apps in standalone cluster mode.
>>>
>>> So what is the best way to add jars to the master and worker classpath?
>>>
>>> Thank you
>>>
>>> --
>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>> 172.1702676
>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>> www.more4fi.de
>>>
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>> Schütz
>>>
>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>> E-Mail ist nicht gestattet
>>>
>>
>>
>
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>


Re: Spark 1.5 on Mesos

2016-03-02 Thread Ashish Soni
I made some progress but now i am stuck at this point , Please help as
looks like i am close to get it working

I have everything running in docker container including mesos slave and
master

When i try to submit the pi example i get below error
*Error: Cannot load main class from JAR file:/opt/spark/Example*

Below is the command i use to submit as a docker container

docker run -it --rm -e SPARK_MASTER="mesos://10.0.2.15:7077"  -e
SPARK_IMAGE="spark_driver:latest" spark_driver:latest ./bin/spark-submit
--deploy-mode cluster --name "PI Example" --class
org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory
512m --executor-cores 1
http://10.0.2.15/spark-examples-1.6.0-hadoop2.6.0.jar


On Tue, Mar 1, 2016 at 2:59 PM, Timothy Chen  wrote:

> Can you go through the Mesos UI and look at the driver/executor log from
> steer file and see what the problem is?
>
> Tim
>
> On Mar 1, 2016, at 8:05 AM, Ashish Soni  wrote:
>
> Not sure what is the issue but i am getting below error  when i try to run
> spark PI example
>
> Blacklisting Mesos slave value: "5345asdasdasdkas234234asdasdasdasd"
>due to too many failures; is Spark installed on it?
> WARN TaskSchedulerImpl: Initial job has not accepted any resources; check 
> your cluster UI to ensure that workers are registered and have sufficient 
> resources
>
>
> On Mon, Feb 29, 2016 at 1:39 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> May be the Mesos executor couldn't find spark image or the constraints
>> are not satisfied. Check your Mesos UI if you see Spark application in the
>> Frameworks tab
>>
>> On Mon, Feb 29, 2016 at 12:23 PM Ashish Soni 
>> wrote:
>>
>>> What is the Best practice , I have everything running as docker
>>> container in single host ( mesos and marathon also as docker container )
>>>  and everything comes up fine but when i try to launch the spark shell i
>>> get below error
>>>
>>>
>>> SQL context available as sqlContext.
>>>
>>> scala> val data = sc.parallelize(1 to 100)
>>> data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>>> parallelize at :27
>>>
>>> scala> data.count
>>> [Stage 0:>  (0 +
>>> 0) / 2]16/02/29 18:21:12 WARN TaskSchedulerImpl: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient resources
>>> 16/02/29 18:21:27 WARN TaskSchedulerImpl: Initial job has not accepted
>>> any resources; check your cluster UI to ensure that workers are registered
>>> and have sufficient resources
>>>
>>>
>>>
>>> On Mon, Feb 29, 2016 at 12:04 PM, Tim Chen  wrote:
>>>
 No you don't have to run Mesos in docker containers to run Spark in
 docker containers.

 Once you have Mesos cluster running you can then specfiy the Spark
 configurations in your Spark job (i.e: 
 spark.mesos.executor.docker.image=mesosphere/spark:1.6)
 and Mesos will automatically launch docker containers for you.

 Tim

 On Mon, Feb 29, 2016 at 7:36 AM, Ashish Soni 
 wrote:

> Yes i read that and not much details here.
>
> Is it true that we need to have spark installed on each mesos docker
> container ( master and slave ) ...
>
> Ashish
>
> On Fri, Feb 26, 2016 at 2:14 PM, Tim Chen  wrote:
>
>> https://spark.apache.org/docs/latest/running-on-mesos.html should be
>> the best source, what problems were you running into?
>>
>> Tim
>>
>> On Fri, Feb 26, 2016 at 11:06 AM, Yin Yang 
>> wrote:
>>
>>> Have you read this ?
>>> https://spark.apache.org/docs/latest/running-on-mesos.html
>>>
>>> On Fri, Feb 26, 2016 at 11:03 AM, Ashish Soni >> > wrote:
>>>
 Hi All ,

 Is there any proper documentation as how to run spark on mesos , I
 am trying from the last few days and not able to make it work.

 Please help

 Ashish

>>>
>>>
>>
>

>>>
>


Re: Add Jars to Master/Worker classpath

2016-03-02 Thread Matthias Niehoff
no, not to driver and executor but to the master and worker instances of
the spark standalone cluster

Am 2. März 2016 um 17:05 schrieb Igor Berman :

> spark.driver.extraClassPath
> spark.executor.extraClassPath
>
> 2016-03-02 18:01 GMT+02:00 Matthias Niehoff <
> matthias.nieh...@codecentric.de>:
>
>> Hi,
>>
>> we want to add jars to the Master and Worker class path mainly for
>> logging reason (we have a redis appender to send logs to redis -> logstash
>> -> elasticsearch).
>>
>> While it is working with setting SPARK_CLASSPATH, this solution is afaik
>> deprecated and should not be used. Furthermore we are also using 
>> —driver-java-options
>> and spark.executor.extraClassPath which leads to exceptions when running
>> our apps in standalone cluster mode.
>>
>> So what is the best way to add jars to the master and worker classpath?
>>
>> Thank you
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Add Jars to Master/Worker classpath

2016-03-02 Thread Igor Berman
spark.driver.extraClassPath
spark.executor.extraClassPath

2016-03-02 18:01 GMT+02:00 Matthias Niehoff :

> Hi,
>
> we want to add jars to the Master and Worker class path mainly for logging
> reason (we have a redis appender to send logs to redis -> logstash ->
> elasticsearch).
>
> While it is working with setting SPARK_CLASSPATH, this solution is afaik
> deprecated and should not be used. Furthermore we are also using 
> —driver-java-options
> and spark.executor.extraClassPath which leads to exceptions when running
> our apps in standalone cluster mode.
>
> So what is the best way to add jars to the master and worker classpath?
>
> Thank you
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>


Add Jars to Master/Worker classpath

2016-03-02 Thread Matthias Niehoff
Hi,

we want to add jars to the Master and Worker class path mainly for logging
reason (we have a redis appender to send logs to redis -> logstash ->
elasticsearch).

While it is working with setting SPARK_CLASSPATH, this solution is afaik
deprecated and should not be used. Furthermore we are also using
—driver-java-options
and spark.executor.extraClassPath which leads to exceptions when running
our apps in standalone cluster mode.

So what is the best way to add jars to the master and worker classpath?

Thank you

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-02 Thread Daniel Siegmann
In the past I have seen this happen when I filled up HDFS and some core
nodes became unhealthy. There was no longer anywhere to replicate the data.
>From your command it looks like you should have 1 master and 2 core nodes
in your cluster. Can you verify both the core nodes are healthy?

On Wed, Mar 2, 2016 at 6:01 AM, Oleg Ruchovets  wrote:

> Here is my command:
>aws emr create-cluster --release-label emr-4.3.0 --name "ClusterJava8"
> --use-default-roles   --applications  Name=Ganglia Name=Hive Name=Hue
> Name=Mahout Name=Pig  Name=Spark  --ec2-attributes KeyName=CC-ES-Demo
>  --instance-count 3 --instance-type m3.xlarge  --use-default-roles
> --bootstrap-action Path=s3://crayon-emr-scripts/emr_java_8.sh
>
> I am using bootstrap script to install java 8.
>
> When I choose applications (Name=Ganglia Name=Hive Name=Hue Name=Mahout
> Name=Pig  Name=Spark) problem is gone. I fixed on the way Lzo not found
> exception. Now I have another problem that I have no idea why it happens:
> I tries to copy file to hdfs and got this exception (file is very small ,
> just couple of kb).
>
>
>
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /input/test.txt._COPYING_ could only be replicated to 0 nodes instead of
> minReplication (=1).  There are 0 datanode(s) running and no node(s) are
> excluded in this operation.
> at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:632)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:238)
> at com.sun.proxy.$Proxy9.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1441)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454)
> put: File /input/test.txt._COPYING_ could only be replicated to 0 nodes
> instead of minReplication (=1).  There are 0 datanode(s) running and no
> node(s) are excluded in this operation.
>
>
> On Wed, Mar 2, 2016 at 4:09 AM, Gourav Sengupta  > wrote:
>
>> Hi,
>>
>> which region are you using the EMR clusters from? Is there any tweaking
>> of the HADOOP parameters that you are doing before starting the clusters?
>>
>> If you are using AWS CLI to start the cluster just send across the
>> command.
>>
>> I have, never till date, faced any such issues in the Ireland region.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Mar 1, 2016 at 9:15 AM, Oleg Ruchovets 
>> wrote:
>>
>>> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell
>>> but it looks it does't work and throws exceptions.
>>> Please advice:
>>>
>>> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
>>> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
>>> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
>>> support was removed in 8.0
>>> 

Re: Configuring Ports for Network Security

2016-03-02 Thread Guru Prateek Pinnadhari
Thanks for your response.

End users and developers in our scenario need terminal / SSH access to the 
cluster. So cluster isolation from external networks is not an option.

We use a Hortonworks based hadoop cluster. Knox is useful but as users also 
have shell access, we need iptables.

Even with Knox in place, protocol mandates using iptables to allow 
communication only on specific ports between nodes.






On 3/2/16, 6:33 PM, "Jörn Franke"  wrote:

>You can make the nodes non-reachable from any computer external to the 
>cluster. Applications can be deployed on an edge node that is connected to the 
>cluster. 
>Do you use Hadoop for managing the cluster? Then you may want to look at 
>Apache Knox. 
>
>> On 02 Mar 2016, at 15:14, zgpinnadhari  wrote:
>> 
>> Hi
>> 
>> We want to use spark in a secure cluster with iptables enabled.
>> For this, we need a specific list of ports used by spark so that we can
>> whitelist them.
>> 
>> From what I could learn from -
>> http://spark.apache.org/docs/latest/security.html#configuring-ports-for-network-security
>>  
>> - there are several ports chosen "randomly" which pose a challenge while
>> coming up with specific iptables rules as we cannot allow any-any.
>> 
>> What is the recommendation here?
>> 
>> Can we specify a port range somewhere from which spark can choose randomly?
>> 
>> What do other secure / hardened clusters do?
>> 
>> Thanks!
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Ports-for-Network-Security-tp26376.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



org.apache.spark.sql.types.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow

2016-03-02 Thread dmt
Hi,

the following error is raised using Spark 1.5.2 or 1.6.0, in stand alone
mode, on my computer.
Has anyone had the same problem, and do you know what might cause this
exception ? Thanks in advance.

/16/03/02 15:12:27 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 9,
192.168.1.36): java.lang.ClassCastException:
org.apache.spark.sql.types.GenericArrayData cannot be cast to
org.apache.spark.sql.catalyst.InternalRow
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:50)
at
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:247)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
at
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
at
org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:117)
at
org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:115)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/03/02 15:12:27 INFO TaskSetManager: Starting task 9.1 in stage 0.0 (TID
17, 192.168.1.36, PROCESS_LOCAL, 2236 bytes)
16/03/02 15:12:27 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID
11) in 921 ms on 192.168.1.36 (10/17)
16/03/02 15:12:27 INFO TaskSetManager: Finished task 13.0 in stage 0.0 (TID
13) in 871 ms on 192.168.1.36 (11/17)
16/03/02 15:12:27 INFO TaskSetManager: Finished task 14.0 in stage 0.0 (TID
14) in 885 ms on 192.168.1.36 (12/17)
16/03/02 15:12:27 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID
8) in 981 ms on 192.168.1.36 (13/17)
16/03/02 15:12:27 INFO TaskSetManager: Finished task 15.0 in stage 0.0 (TID
15) in 844 ms on 192.168.1.36 (14/17)
16/03/02 15:12:27 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID
10) in 1007 ms on 192.168.1.36 (15/17)
16/03/02 15:12:28 INFO TaskSetManager: Lost task 9.1 in stage 0.0 (TID 17)
on executor 192.168.1.36: java.lang.ClassCastException
(org.apache.spark.sql.types.GenericArrayData cannot be cast to
org.apache.spark.sql.catalyst.InternalRow) [duplicate 1]
16/03/02 15:12:28 INFO TaskSetManager: Starting task 9.2 in stage 0.0 (TID
18, 192.168.1.36, PROCESS_LOCAL, 2236 bytes)
16/03/02 15:12:28 INFO TaskSetManager: Finished task 16.0 in stage 0.0 (TID
16) in 537 ms on 192.168.1.36 (16/17)
16/03/02 15:12:28 INFO TaskSetManager: Lost task 9.2 in stage 0.0 (TID 18)
on executor 192.168.1.36: java.lang.ClassCastException
(org.apache.spark.sql.types.GenericArrayData cannot be cast to
org.apache.spark.sql.catalyst.InternalRow) [duplicate 2]
16/03/02 15:12:28 INFO TaskSetManager: Starting task 9.3 in stage 0.0 (TID
19, 192.168.1.36, PROCESS_LOCAL, 2236 bytes)
16/03/02 15:12:29 WARN TaskSetManager: Lost task 9.3 in stage 0.0 (TID 19,
192.168.1.36): java.lang.ClassCastException

16/03/02 15:12:29 ERROR TaskSetManager: Task 9 in stage 0.0 failed 4 times;
aborting job
16/03/02 

Re: Configuring Ports for Network Security

2016-03-02 Thread Jörn Franke
You can make the nodes non-reachable from any computer external to the cluster. 
Applications can be deployed on an edge node that is connected to the cluster. 
Do you use Hadoop for managing the cluster? Then you may want to look at Apache 
Knox. 

> On 02 Mar 2016, at 15:14, zgpinnadhari  wrote:
> 
> Hi
> 
> We want to use spark in a secure cluster with iptables enabled.
> For this, we need a specific list of ports used by spark so that we can
> whitelist them.
> 
> From what I could learn from -
> http://spark.apache.org/docs/latest/security.html#configuring-ports-for-network-security
>  
> - there are several ports chosen "randomly" which pose a challenge while
> coming up with specific iptables rules as we cannot allow any-any.
> 
> What is the recommendation here?
> 
> Can we specify a port range somewhere from which spark can choose randomly?
> 
> What do other secure / hardened clusters do?
> 
> Thanks!
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Ports-for-Network-Security-tp26376.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building a REST Service with Spark back-end

2016-03-02 Thread Guru Medasani
Hi Don,

Here is another REST interface for interacting with Spark from anywhere. 

https://github.com/cloudera/livy 

Here is an example to estimate PI using Spark from Python using requests 
library. 

>>> data = {
...   'code': textwrap.dedent("""\
...  val NUM_SAMPLES = 10;
...  val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
...val x = Math.random();
...val y = Math.random();
...if (x*x + y*y < 1) 1 else 0
...  }.reduce(_ + _);
...  println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
...  """)
... }
>>> r = requests.post(statements_url, data=json.dumps(data), headers=headers)
>>> pprint.pprint(r.json())
{u'id': 1,
 u'output': {u'data': {u'text/plain': u'Pi is roughly 3.14004\nNUM_SAMPLES: Int 
= 10\ncount: Int = 78501'},
 u'execution_count': 1,
 u'status': u'ok'},
 u'state': u'available'}


Guru Medasani
gdm...@gmail.com



> On Mar 2, 2016, at 7:47 AM, Todd Nist  wrote:
> 
> Have you looked at Apache Toree, http://toree.apache.org/ 
> .  This was formerly the Spark-Kernel from IBM but 
> contributed to apache.
> 
> https://github.com/apache/incubator-toree 
> 
> 
> You can find a good overview on the spark-kernel here:
> http://www.spark.tc/how-to-enable-interactive-applications-against-apache-spark/
>  
> 
> 
> Not sure if that is of value to you or not.
> 
> HTH.
> 
> -Todd
> 
> On Tue, Mar 1, 2016 at 7:30 PM, Don Drake  > wrote:
> I'm interested in building a REST service that utilizes a Spark SQL Context 
> to return records from a DataFrame (or IndexedRDD?) and even add/update 
> records.
> 
> This will be a simple REST API, with only a few end-points.  I found this 
> example:
> 
> https://github.com/alexmasselot/spark-play-activator 
> 
> 
> which looks close to what I am interested in doing.  
> 
> Are there any other ideas or options if I want to run this in a YARN cluster?
> 
> Thanks.
> 
> -Don
> 
> -- 
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/ 
> https://twitter.com/dondrake 
> 800-733-2143 



Configuring Ports for Network Security

2016-03-02 Thread zgpinnadhari
Hi

We want to use spark in a secure cluster with iptables enabled.
For this, we need a specific list of ports used by spark so that we can
whitelist them.

>From what I could learn from -
http://spark.apache.org/docs/latest/security.html#configuring-ports-for-network-security
 
- there are several ports chosen "randomly" which pose a challenge while
coming up with specific iptables rules as we cannot allow any-any.

What is the recommendation here?

Can we specify a port range somewhere from which spark can choose randomly?

What do other secure / hardened clusters do?

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Ports-for-Network-Security-tp26376.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building a REST Service with Spark back-end

2016-03-02 Thread Todd Nist
Have you looked at Apache Toree, http://toree.apache.org/.  This was
formerly the Spark-Kernel from IBM but contributed to apache.

https://github.com/apache/incubator-toree

You can find a good overview on the spark-kernel here:
http://www.spark.tc/how-to-enable-interactive-applications-against-apache-spark/

Not sure if that is of value to you or not.

HTH.

-Todd

On Tue, Mar 1, 2016 at 7:30 PM, Don Drake  wrote:

> I'm interested in building a REST service that utilizes a Spark SQL
> Context to return records from a DataFrame (or IndexedRDD?) and even
> add/update records.
>
> This will be a simple REST API, with only a few end-points.  I found this
> example:
>
> https://github.com/alexmasselot/spark-play-activator
>
> which looks close to what I am interested in doing.
>
> Are there any other ideas or options if I want to run this in a YARN
> cluster?
>
> Thanks.
>
> -Don
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake 
> 800-733-2143
>


Re: SparkR Count vs Take performance

2016-03-02 Thread Dirceu Semighini Filho
Thanks Sun, this explain why I was getting too many jobs running, my RDDs
were empty.



2016-03-02 10:29 GMT-03:00 Sun, Rui :

> This is nothing to do with object serialization/deserialization. It is
> expected behavior that take(1) most likely runs slower than count() on an
> empty RDD.
>
> This is all about the algorithm with which take() is implemented. Take()
> 1. Reads one partition to get the elements
> 2. If the fetched elements do not satisfy the limit, it will estimate the
> number of additional partitions and fetch elements in them.
> Take() repeats the step 2 until it get the desired number of elements or
> it will go through all partitions.
>
> So take(1) on an empty RDD will go through all partitions in a sequential
> way.
>
> Comparing with take(), Count() also computes all partition, but the
> computation is parallel on all partitions at once.
>
> Take() implementation in SparkR is less optimized than that in Scala as
> SparkR won't estimate the number of additional partitions but will read
> just one partition in each fetch.
>
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: Wednesday, March 2, 2016 3:37 AM
> To: Dirceu Semighini Filho 
> Cc: user 
> Subject: Re: SparkR Count vs Take performance
>
> Yeah one surprising result is that you can't call isEmpty on an RDD of
> nonserializable objects. You can't do much with an RDD of nonserializable
> objects anyway, but they can exist as an intermediate stage.
>
> We could fix that pretty easily with a little copy and paste of the
> take() code; right now isEmpty is simple but has this drawback.
>
> On Tue, Mar 1, 2016 at 7:18 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
> > Great, I didn't noticed this isEmpty method.
> > Well serialization is been a problem in this project, we have noticed
> > a lot of time been spent in serializing and deserializing things to
> > send and get from the cluster.
> >
> > 2016-03-01 15:47 GMT-03:00 Sean Owen :
> >>
> >> There is an "isEmpty" method that basically does exactly what your
> >> second version does.
> >>
> >> I have seen it be unusually slow at times because it must copy 1
> >> element to the driver, and it's possible that's slow. It still
> >> shouldn't be slow in general, and I'd be surprised if it's slower
> >> than a count in all but pathological cases.
> >>
> >>
> >>
> >> On Tue, Mar 1, 2016 at 6:03 PM, Dirceu Semighini Filho
> >>  wrote:
> >> > Hello all.
> >> > I have a script that create a dataframe from this operation:
> >> >
> >> > mytable <- sql(sqlContext,("SELECT ID_PRODUCT, ... FROM mytable"))
> >> >
> >> > rSparkDf <- createPartitionedDataFrame(sqlContext,myRdataframe)
> >> > dFrame <-
> >> > join(mytable,rSparkDf,mytable$ID_PRODUCT==rSparkDf$ID_PRODUCT)
> >> >
> >> > After filtering this dFrame with this:
> >> >
> >> >
> >> > I tried to execute the following
> >> > filteredDF <- filterRDD(toRDD(dFrame),function (row) {row['COLUMN']
> >> > %in% c("VALUES", ...)}) Now I need to know if the resulting
> >> > dataframe is empty, and to do that I tried this two codes:
> >> > if(count(filteredDF) > 0)
> >> > and
> >> > if(length(take(filteredDF,1)) > 0)
> >> > I thought that the second one, using take, shoule run faster than
> >> > count, but that didn't happen.
> >> > The take operation creates one job per partition of my rdd (which
> >> > was
> >> > 200)
> >> > and this make it to run slower than the count.
> >> > Is this the expected behaviour?
> >> >
> >> > Regards,
> >> > Dirceu
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>


RE: SparkR Count vs Take performance

2016-03-02 Thread Sun, Rui
This is nothing to do with object serialization/deserialization. It is expected 
behavior that take(1) most likely runs slower than count() on an empty RDD.

This is all about the algorithm with which take() is implemented. Take()  
1. Reads one partition to get the elements
2. If the fetched elements do not satisfy the limit, it will estimate the 
number of additional partitions and fetch elements in them.
Take() repeats the step 2 until it get the desired number of elements or it 
will go through all partitions.

So take(1) on an empty RDD will go through all partitions in a sequential way.

Comparing with take(), Count() also computes all partition, but the computation 
is parallel on all partitions at once.

Take() implementation in SparkR is less optimized than that in Scala as SparkR 
won't estimate the number of additional partitions but will read just one 
partition in each fetch.

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, March 2, 2016 3:37 AM
To: Dirceu Semighini Filho 
Cc: user 
Subject: Re: SparkR Count vs Take performance

Yeah one surprising result is that you can't call isEmpty on an RDD of 
nonserializable objects. You can't do much with an RDD of nonserializable 
objects anyway, but they can exist as an intermediate stage.

We could fix that pretty easily with a little copy and paste of the
take() code; right now isEmpty is simple but has this drawback.

On Tue, Mar 1, 2016 at 7:18 PM, Dirceu Semighini Filho 
 wrote:
> Great, I didn't noticed this isEmpty method.
> Well serialization is been a problem in this project, we have noticed 
> a lot of time been spent in serializing and deserializing things to 
> send and get from the cluster.
>
> 2016-03-01 15:47 GMT-03:00 Sean Owen :
>>
>> There is an "isEmpty" method that basically does exactly what your 
>> second version does.
>>
>> I have seen it be unusually slow at times because it must copy 1 
>> element to the driver, and it's possible that's slow. It still 
>> shouldn't be slow in general, and I'd be surprised if it's slower 
>> than a count in all but pathological cases.
>>
>>
>>
>> On Tue, Mar 1, 2016 at 6:03 PM, Dirceu Semighini Filho 
>>  wrote:
>> > Hello all.
>> > I have a script that create a dataframe from this operation:
>> >
>> > mytable <- sql(sqlContext,("SELECT ID_PRODUCT, ... FROM mytable"))
>> >
>> > rSparkDf <- createPartitionedDataFrame(sqlContext,myRdataframe)
>> > dFrame <- 
>> > join(mytable,rSparkDf,mytable$ID_PRODUCT==rSparkDf$ID_PRODUCT)
>> >
>> > After filtering this dFrame with this:
>> >
>> >
>> > I tried to execute the following
>> > filteredDF <- filterRDD(toRDD(dFrame),function (row) {row['COLUMN'] 
>> > %in% c("VALUES", ...)}) Now I need to know if the resulting 
>> > dataframe is empty, and to do that I tried this two codes:
>> > if(count(filteredDF) > 0)
>> > and
>> > if(length(take(filteredDF,1)) > 0)
>> > I thought that the second one, using take, shoule run faster than 
>> > count, but that didn't happen.
>> > The take operation creates one job per partition of my rdd (which 
>> > was
>> > 200)
>> > and this make it to run slower than the count.
>> > Is this the expected behaviour?
>> >
>> > Regards,
>> > Dirceu
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming: StorageLevel.MEMORY_AND_DISK_SER setting for KafkaUtils.createDirectStream

2016-03-02 Thread Vinti Maheshwari
Increasing Spark_executors_instances to 4 worked.
SPARK_EXECUTOR_INSTANCES="4" #Number of workers to start (Default: 2)

Regards,
Vinti




On Wed, Mar 2, 2016 at 4:28 AM, Vinti Maheshwari 
wrote:

> Thanks much Saisai. Got it.
> So i think increasing worker executor memory might work. Trying that.
>
> Regards,
> ~Vinti
>
> On Wed, Mar 2, 2016 at 4:21 AM, Saisai Shao 
> wrote:
>
>> You don't have to specify the storage level for direct Kafka API, since
>> it doesn't require to store the input data ahead of time. Only
>> receiver-based approach could specify the storage level.
>>
>> Thanks
>> Saisai
>>
>> On Wed, Mar 2, 2016 at 7:08 PM, Vinti Maheshwari 
>> wrote:
>>
>>> Hi All,
>>>
>>> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my
>>> spark-streaming program as currently i am getting
>>> MetadataFetchFailedException*. *I am not sure where i should pass
>>> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream
>>> doesn't allow to pass that parameter.
>>>
>>>
>>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>>> StringDecoder](
>>>   ssc, kafkaParams, topicsSet)
>>>
>>>
>>> Full Error:
>>>
>>> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
>>> output location for shuffle 0*
>>> at
>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
>>> at
>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
>>> at
>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> at
>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>> at
>>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
>>> at
>>> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
>>> at
>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
>>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> )
>>>
>>> Thanks,
>>> ~Vinti
>>>
>>>
>>
>


Re: Spark streaming: StorageLevel.MEMORY_AND_DISK_SER setting for KafkaUtils.createDirectStream

2016-03-02 Thread Vinti Maheshwari
Thanks much Saisai. Got it.
So i think increasing worker executor memory might work. Trying that.

Regards,
~Vinti

On Wed, Mar 2, 2016 at 4:21 AM, Saisai Shao  wrote:

> You don't have to specify the storage level for direct Kafka API, since it
> doesn't require to store the input data ahead of time. Only receiver-based
> approach could specify the storage level.
>
> Thanks
> Saisai
>
> On Wed, Mar 2, 2016 at 7:08 PM, Vinti Maheshwari 
> wrote:
>
>> Hi All,
>>
>> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
>> program as currently i am getting
>> MetadataFetchFailedException*. *I am not sure where i should pass
>> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream
>> doesn't allow to pass that parameter.
>>
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>> StringDecoder](
>>   ssc, kafkaParams, topicsSet)
>>
>>
>> Full Error:
>>
>> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
>> location for shuffle 0*
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> at
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
>> at
>> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
>> at
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> )
>>
>> Thanks,
>> ~Vinti
>>
>>
>


Re: Spark streaming: StorageLevel.MEMORY_AND_DISK_SER setting for KafkaUtils.createDirectStream

2016-03-02 Thread Saisai Shao
You don't have to specify the storage level for direct Kafka API, since it
doesn't require to store the input data ahead of time. Only receiver-based
approach could specify the storage level.

Thanks
Saisai

On Wed, Mar 2, 2016 at 7:08 PM, Vinti Maheshwari 
wrote:

> Hi All,
>
> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
> program as currently i am getting
> MetadataFetchFailedException*. *I am not sure where i should pass
> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
> allow to pass that parameter.
>
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
>   ssc, kafkaParams, topicsSet)
>
>
> Full Error:
>
> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0*
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
> at
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> )
>
> Thanks,
> ~Vinti
>
>


Spark streaming: StorageLevel.MEMORY_AND_DISK_SER setting for KafkaUtils.createDirectStream

2016-03-02 Thread Vinti Maheshwari
Hi All,

I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
program as currently i am getting
MetadataFetchFailedException*. *I am not sure where i should pass
StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
allow to pass that parameter.


val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)


Full Error:

*org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0*
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

)

Thanks,
~Vinti


Re: [Spark 1.5.2]: Iterate through Dataframe columns and put it in map

2016-03-02 Thread Mohammad Tariq
Hi Divya,

You could call *collect()* method provided by DataFram API. This will give
you an *Array[Rows]*. You could then iterate over this array and create
your map. Something like this :

val mapOfVals = scala.collection.mutable.Map[String,String]()
var rows = DataFrame.collect()
rows.foreach(r => mapOfVals.put(r.getString(0), r.getString(1)))
println("KEYS : " + mapOfVals.keys)
println("VALS : " + mapOfVals.values)

Hope this helps!





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Wed, Mar 2, 2016 at 3:52 PM, Divya Gehlot 
wrote:

> Hi,
>
> I need to iterate through columns in dataframe based on certain condition
> and put it in map .
>
> Dataset
> Column1  Column2
> Car   Model1
> Bike   Model2
> Car Model2
> Bike   Model 2
>
> I want to iterate through above dataframe and put it in map where car is
> key and model1 and model 2 as values
>
>
> Thanks,
> Regards,
> Divya
>
>


spark streaming

2016-03-02 Thread Vinti Maheshwari
Hi All,

I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
program as currently i am getting
MetadataFetchFailedException*. *I am not sure where i should pass
StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
allow to pass that parameter.


val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)


Full Error:

*org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0*
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

)

Thanks,
~Vinti


Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-02 Thread Oleg Ruchovets
Here is my command:
   aws emr create-cluster --release-label emr-4.3.0 --name "ClusterJava8"
--use-default-roles   --applications  Name=Ganglia Name=Hive Name=Hue
Name=Mahout Name=Pig  Name=Spark  --ec2-attributes KeyName=CC-ES-Demo
 --instance-count 3 --instance-type m3.xlarge  --use-default-roles
--bootstrap-action Path=s3://crayon-emr-scripts/emr_java_8.sh

I am using bootstrap script to install java 8.

When I choose applications (Name=Ganglia Name=Hive Name=Hue Name=Mahout
Name=Pig  Name=Spark) problem is gone. I fixed on the way Lzo not found
exception. Now I have another problem that I have no idea why it happens:
I tries to copy file to hdfs and got this exception (file is very small ,
just couple of kb).



org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/input/test.txt._COPYING_ could only be replicated to 0 nodes instead of
minReplication (=1).  There are 0 datanode(s) running and no node(s) are
excluded in this operation.
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:632)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1476)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:238)
at com.sun.proxy.$Proxy9.addBlock(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.addBlock(Unknown Source)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1441)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454)
put: File /input/test.txt._COPYING_ could only be replicated to 0 nodes
instead of minReplication (=1).  There are 0 datanode(s) running and no
node(s) are excluded in this operation.


On Wed, Mar 2, 2016 at 4:09 AM, Gourav Sengupta 
wrote:

> Hi,
>
> which region are you using the EMR clusters from? Is there any tweaking of
> the HADOOP parameters that you are doing before starting the clusters?
>
> If you are using AWS CLI to start the cluster just send across the command.
>
> I have, never till date, faced any such issues in the Ireland region.
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Mar 1, 2016 at 9:15 AM, Oleg Ruchovets 
> wrote:
>
>> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell
>> but it looks it does't work and throws exceptions.
>> Please advice:
>>
>> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
>> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
>> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
>> support was removed in 8.0
>> 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop
>> 16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop
>> 16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
>> with modify permissions: Set(hadoop)
>> 16/03/01 09:11:49 INFO HttpServer: Starting HTTP Server
>> 16/03/01 09:11:49 INFO Utils: Successfully started service 'HTTP class
>> server' on port 47223.
>> Welcome to
>>     __
>> 

Inconsistent performance across multiple iterations of same application

2016-03-02 Thread Harsh Rathi
Hi,

I am doing spark-submit with same resources from a bash script on EC2
Cluster. But, the time taken in running this application is varying a lot
from 5 mins to 60 mins across multiple iterations.
I am restarting spark cluster with stop-all.sh and start-all.sh scripts
after every run of application.
What else I need to do so that app runs as fresh ?


Harsh Rathi


[Spark 1.5.2]: Iterate through Dataframe columns and put it in map

2016-03-02 Thread Divya Gehlot
Hi,

I need to iterate through columns in dataframe based on certain condition
and put it in map .

Dataset
Column1  Column2
Car   Model1
Bike   Model2
Car Model2
Bike   Model 2

I want to iterate through above dataframe and put it in map where car is
key and model1 and model 2 as values


Thanks,
Regards,
Divya


Re: How to control the number of parquet files getting created under a partition ?

2016-03-02 Thread James Hammerton
Hi,

Based on the behaviour I've seen using parquet, the number of partitions in
the DataFrame will determine the number of files in each parquet partition.

I.e. when you use "PARTITION BY" you're actually partitioning twice, once
via the partitions spark has created internally and then again with the
partitions you specify in the "PARTITION BY" clause.

So if you have 10 partitions in your DataFrame, and save that as a parquet
file or table partitioned on a column with 3 values, you'll get 30
partitions, 10 per parquet partition.

You can reduce the number of partitions in the DataFrame by using
coalesce() before saving the data.

Regards,

James


On 1 March 2016 at 21:01, SRK  wrote:

> Hi,
>
> How can I control the number of parquet files getting created under a
> partition? I have my sqlContext queries to create a table and insert the
> records as follows. It seems to create around 250 parquet files under each
> partition though I was expecting that to create around 2 or 3 files. Due to
> the large number of files, it takes a lot of time to scan the records. Any
> suggestions as to how to control the number of parquet files under each
> partition would be of great help.
>
>  sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS testUserDts
> (userId STRING, savedDate STRING) PARTITIONED BY (partitioner STRING)
> stored as PARQUET LOCATION '/user/testId/testUserDts' ")
>
>   sqlContext.sql(
> """from testUserDtsTemp ps   insert overwrite table testUserDts
> partition(partitioner)  select ps.userId, ps.savedDate ,  ps.partitioner
> """.stripMargin)
>
>
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-the-number-of-parquet-files-getting-created-under-a-partition-tp26374.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


rdd cache name

2016-03-02 Thread charles li
hi, there, I feel a little confused about the *cache* in spark.

first, is there any way to *customize the cached RDD name*, it's not
convenient for me when looking at the storage page, there are the kind of
RDD in the RDD Name column, I hope to make it as my customized name, kinds
of 'rdd 1', 'rrd of map', 'rdd of groupby' and so on.

second, can some one tell me what exactly the '*Fraction Cached*' mean
under the hood?

great thanks



​

-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: Spark Mllib kmeans execution

2016-03-02 Thread Sonal Goyal
It will run distributed
On Mar 2, 2016 3:00 PM, "Priya Ch"  wrote:

> Hi All,
>
>   I am running k-means clustering algorithm. Now, when I am running the
> algorithm as -
>
> val conf = new SparkConf
> val sc = new SparkContext(conf)
> .
> .
> val kmeans = new KMeans()
> val model = kmeans.run(RDD[Vector])
> .
> .
> .
> The 'kmeans' object gets created on driver. Now does *kmeans.run() *get
> executed on each partition of the rdd in distributed fashion or else does
> the entire RDD is brought to driver and then gets executed at the driver on
> the entire RDD ??
>
> Thanks,
> Padma Ch
>
>
>


Spark Mllib kmeans execution

2016-03-02 Thread Priya Ch
Hi All,

  I am running k-means clustering algorithm. Now, when I am running the
algorithm as -

val conf = new SparkConf
val sc = new SparkContext(conf)
.
.
val kmeans = new KMeans()
val model = kmeans.run(RDD[Vector])
.
.
.
The 'kmeans' object gets created on driver. Now does *kmeans.run() *get
executed on each partition of the rdd in distributed fashion or else does
the entire RDD is brought to driver and then gets executed at the driver on
the entire RDD ??

Thanks,
Padma Ch


  1   2   >