Re: Connect the two tables in spark sql

2016-03-01 Thread Mich Talebzadeh
If you are using Spark-sql as opposed to spark-shell, then you can just use
UNION as in SQL for this. Pretty straight forward.

SELECT * from TABLE_A
UNION
SELECT * from TABLE_B
ORDER BY COLUMN_A, COLUMN_B;

Example

spark-sql> SELECT * FROM dummy where id = 1
 > UNION
 > SELECT * FROM dummy2 where id = 10
 > ORDER by id;

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 2 March 2016 at 06:40, Mao, Wei  wrote:

> It should be a “union” operation instead of “join”.
>
>
>
> And besides from Ted’s answer, if you are working with DataSet API:
>
>
>
> *def **union(other: Dataset[**T**]): Dataset[**T**] = withPlan[**T**](other){
> (left, right) =>*
>
>
>
> Thanks,
>
> William
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Wednesday, March 2, 2016 11:41 AM
> *To:* Angel Angel
> *Cc:* user
> *Subject:* Re: Connect the two tables in spark sql
>
>
>
> You only showed one record from each table.
>
>
>
> Have you looked at the following method in DataFrame ?
>
>
>
>   def unionAll(other: DataFrame): DataFrame = withPlan {
>
>
>
> On Tue, Mar 1, 2016 at 7:13 PM, Angel Angel 
> wrote:
>
> Hello Sir/Madam,
>
>
>
> I am using the spark sql for the data operation.
>
>
>
> I have two tables with the same fields.
>
>
>
> Table 1
>
> name
>
> address
>
> phone Number
>
>  sagar
>
>  india
>
>  
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Table 2
>
> name
>
> address
>
> phone Number
>
>  jaya
>
>  india
>
>  222
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> I want to join this tables like the following way
>
>
> Result Table
>
> name
>
> address
>
> phone Number
>
>  jaya
>
>  india
>
>  222
>
>  sagar
>
>  india
>
>  
>
>
>
>
>
> How can i join this table. I tried using the join command it add the table
> to left side.
>
>
>
> Please help me to solve this query.
>
>
>
> Thanks,
>
>
>


Re: SPARK SQL HiveContext Error

2016-03-01 Thread Gourav Sengupta
Hi Mich,
thanks a ton for your kind response, but this error was happening because
of loading derby classes mroe than once

In my second email I mentioned the steps that I took in order to resolve
the issue.


Thanks and Regards,
Gourav

On Tue, Mar 1, 2016 at 8:54 PM, Mich Talebzadeh 
wrote:

> Hi Gourav,
>
> Did you modify the following line in your code
>
>  val conf = new
> SparkConf().setAppName("IdeaProjects").setMaster("local[*]").set("spark.driver.allowMultipleContexts",
> "true")
>
> I checked every line in your code they work fine in spark shell with the
> following package added
>
> spark-shell --master spark://50.140.197.217:7077 --packages
> amplab:succinct:0.1.6
>
> Can you explain how it worked?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 March 2016 at 18:20, Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> FIRST ATTEMPT:
>> Use build.sbt in IntelliJ and it was giving me nightmares with several
>> incompatibility and library issues though the sbt version was compliant
>> with the scala version
>>
>> SECOND ATTEMPT:
>> Created a new project with no entries in build.sbt file and imported all
>> the files in $SPARK_HOME/lib/*jar into the project. This started causing
>> issues I reported earlier
>>
>> FINAL ATTEMPT:
>> removed all the files from the import (removing them from dependencies)
>> which had the word derby in it and this resolved the issue.
>>
>> Please note that the following additional jars were included in the
>> library folder than the ones which are usually supplied with the SPARK
>> distribution:
>> 1. ojdbc7.jar
>> 2. spark-csv***jar file
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Mar 1, 2016 at 5:19 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am getting the error  "*java.lang.SecurityException: sealing
>>> violation: can't seal package org.apache.derby.impl.services.locks: already
>>> loaded"*   after running the following code in SCALA.
>>>
>>> I do not have any other instances of sparkContext running from my system.
>>>
>>> I will be grateful for if anyone could kindly help me out.
>>>
>>>
>>> Environment:
>>> SCALA: 1.6
>>> OS: MAC OS X
>>>
>>> 
>>>
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.sql.types._
>>> import org.apache.spark.sql.SQLContext
>>>
>>> // Import SuccinctRDD
>>> import edu.berkeley.cs.succinct._
>>>
>>> object test1 {
>>>   def main(args: Array[String]) {
>>> //the below line returns nothing
>>> println(SparkContext.jarOfClass(this.getClass).toString())
>>> val logFile = "/tmp/README.md" // Should be some file on your system
>>>
>>> val conf = new 
>>> SparkConf().setAppName("IdeaProjects").setMaster("local[*]")
>>> val sc = new SparkContext(conf)
>>> val logData = sc.textFile(logFile, 2).cache()
>>> val numAs = logData.filter(line => line.contains("a")).count()
>>> val numBs = logData.filter(line => line.contains("b")).count()
>>> println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
>>>
>>>
>>> // Create a Spark RDD as a collection of articles; ctx is the 
>>> SparkContext
>>> val articlesRDD = sc.textFile("/tmp/README.md").map(_.getBytes)
>>>
>>> // Compress the Spark RDD into a Succinct Spark RDD, and persist it in 
>>> memory
>>> // Note that this is a time consuming step (usually at 8GB/hour/core) 
>>> since data needs to be compressed.
>>> // We are actively working on making this step faster.
>>> val succinctRDD = articlesRDD.succinct.persist()
>>>
>>>
>>> // SuccinctRDD supports a set of powerful primitives directly on 
>>> compressed RDD
>>> // Let us start by counting the number of occurrences of "Berkeley" 
>>> across all Wikipedia articles
>>> val count = succinctRDD.count("the")
>>>
>>> // Now suppose we want to find all offsets in the collection at which 
>>> ìBerkeleyî occurs; and
>>> // create an RDD containing all resulting offsets
>>> val offsetsRDD = succinctRDD.search("and")
>>>
>>> // Let us look at the first ten results in the above RDD
>>> val offsets = offsetsRDD.take(10)
>>>
>>> // Finally, let us extract 20 bytes before and after one of the 
>>> occurrences of ìBerkeleyî
>>> val offset = offsets(0)
>>> val data = succinctRDD.extract(offset - 20, 40)
>>>
>>> println(data)
>>> println(">>>")
>>>
>>>
>>> // Create a schema
>>> val citySchema = StructType(Seq(
>>>   StructField("Name", StringType, false),
>>>   StructField("Length", IntegerType, true),
>>>   StructField("Area", DoubleType, false),
>>>   

Re: Spark UI standalone "crashes" after an application finishes

2016-03-01 Thread Gourav Sengupta
Hi Teng,

I was not asking the question, I was responding in terms of what to expect
from SPARK UI in terms of how you start using SPARK application.

Thanks and Regards,
Gourav

On Tue, Mar 1, 2016 at 8:30 PM, Teng Qiu  wrote:

> as Gourav said, the application UI on port 4040 will no more available
> after your spark app finished. you should go to spark master's UI
> (port 8080), and take a look "completed applications"...
>
> refer to doc: http://spark.apache.org/docs/latest/monitoring.html
> read the first "note that" :)
>
> 2016-03-01 21:13 GMT+01:00 Gourav Sengupta :
> > Hi,
> >
> > in case you are submitting your SPARK jobs then the UI is only available
> > when the job is running.
> >
> > Else if you are starting a SPARK cluster in standalone mode or HADOOP or
> > etc, then the SPARK UI remains alive.
> >
> > The other way to keep the SPARK UI alive is to use the Jupyter notebook
> for
> > Python or Scala (see Apache Toree) or use Zeppelin.
> >
> >
> > Regards,
> > Gourav Sengupta
> >
> > On Mon, Feb 29, 2016 at 11:48 PM, Sumona Routh 
> wrote:
> >>
> >> Hi there,
> >> I've been doing some performance tuning of our Spark application, which
> is
> >> using Spark 1.2.1 standalone. I have been using the spark metrics to
> graph
> >> out details as I run the jobs, as well as the UI to review the tasks and
> >> stages.
> >>
> >> I notice that after my application completes, or is near completion, the
> >> UI "crashes." I get a Connection Refused response. Sometimes, the page
> >> eventually recovers and will load again, but sometimes I end up having
> to
> >> restart the Spark master to get it back. When I look at my graphs on the
> >> app, the memory consumption (of driver, executors, and what I believe
> to be
> >> the daemon (spark.jvm.total.used)) appears to be healthy. Monitoring the
> >> master machine itself, memory and CPU appear healthy as well.
> >>
> >> Has anyone else seen this issue? Are there logs for the UI itself, and
> >> where might I find those?
> >>
> >> Thanks!
> >> Sumona
> >
> >
>


RE: Connect the two tables in spark sql

2016-03-01 Thread Mao, Wei
It should be a “union” operation instead of “join”.

And besides from Ted’s answer, if you are working with DataSet API:

def union(other: Dataset[T]): Dataset[T] = withPlan[T](other){ (left, right) =>

Thanks,
William

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, March 2, 2016 11:41 AM
To: Angel Angel
Cc: user
Subject: Re: Connect the two tables in spark sql

You only showed one record from each table.

Have you looked at the following method in DataFrame ?

  def unionAll(other: DataFrame): DataFrame = withPlan {

On Tue, Mar 1, 2016 at 7:13 PM, Angel Angel 
> wrote:
Hello Sir/Madam,

I am using the spark sql for the data operation.

I have two tables with the same fields.

Table 1

name

address

phone Number

 sagar

 india

 















Table 2

name

address

phone Number

 jaya

 india

 222
















I want to join this tables like the following way


Result Table

name

address

phone Number

 jaya

 india

 222

 sagar

 india

 



How can i join this table. I tried using the join command it add the table to 
left side.

Please help me to solve this query.

Thanks,



Re: Spark execuotr Memory profiling

2016-03-01 Thread Nirav Patel
Thanks Nilesh,

Thanks for sharing those docs. I have came across most of those tuning in
past and believe me I have tune the hack of out of this job. What I can't
beleive is spark needs 4x more resource then MapReduce to run the same job
(for dataset magnitude of >100GB).
I was able to run my job after giving ridiculous amount of executor memory
(20g) and overhead(4g)! MapReduce counterpart worked just with 2gb mapper
and 6gb reducer!

I am still not been able to take a heap dump of stuck spark executor so
that I can analyze memory usage. But there's definitely some limitation
with framework which doesn't makes it ideal at all for shuffle heavy jobs!



On Sat, Feb 20, 2016 at 10:29 PM, Kuchekar 
wrote:

> Hi Nirav,
>
> I recently attended the Spark Summit East 2016 and almost
> every talk about errors faced by community and/or tuning topics for Spark
> mentioned this being the main problem (Executor lost and JVM out of
> memory).
>
>  Checkout this blogs that explains how to tune spark
> ,
> cheatsheet for tuning spark 
> .
>
> Hope this helps, keep the community posted what resolved your issue if it
> does.
>
> Thanks.
>
> Kuchekar, Nilesh
>
> On Sat, Feb 20, 2016 at 11:29 AM, Nirav Patel 
> wrote:
>
>> Thanks Nilesh. I don't think there;s heavy communication between driver
>> and executor. However I'll try the settings you suggested.
>>
>> I can not replace groupBy with reduceBy as its not an associative
>> operation.
>>
>> It is very frustrating to be honest. It was a piece of cake with map
>> reduce compare to amount to time I am putting in tuning spark make things
>> work. To remove doubt that executor might be running multiple tasks
>> (executor.cores) and hence reduce to share memory, I set executor.cores to
>> 1 so only 1 task have all the 15gb to its disposal!. Which is already 3
>> times it needs for most skewed key. I am going to need to profile for sure
>> to understand what spark executors are doing there. For sure they are not
>> willing to explain the situation but rather will say 'use reduceBy'
>>
>>
>>
>>
>>
>> On Thu, Feb 11, 2016 at 9:42 AM, Kuchekar 
>> wrote:
>>
>>> Hi Nirav,
>>>
>>>   I faced similar issue with Yarn, EMR 1.5.2 and
>>> following Spark Conf helped me. You can set the values accordingly
>>>
>>> conf= (SparkConf().set("spark.master","yarn-client").setAppName(
>>> "HalfWay").set("spark.driver.memory", "15G").set("spark.yarn.am.memory",
>>> "15G"))
>>>
>>> conf=conf.set("spark.driver.maxResultSize","10G").set(
>>> "spark.storage.memoryFraction","0.6").set("spark.shuffle.memoryFraction"
>>> ,"0.6").set("spark.yarn.executor.memoryOverhead","4000")
>>>
>>> conf = conf.set("spark.executor.cores","4").set("spark.executor.memory",
>>> "15G").set("spark.executor.instances","6")
>>>
>>> Is it also possible to use reduceBy in place of groupBy that might help
>>> the shuffling too.
>>>
>>>
>>> Kuchekar, Nilesh
>>>
>>> On Wed, Feb 10, 2016 at 8:09 PM, Nirav Patel 
>>> wrote:
>>>
 We have been trying to solve memory issue with a spark job that
 processes 150GB of data (on disk). It does a groupBy operation; some of the
 executor will receive somehwere around (2-4M scala case objects) to work
 with. We are using following spark config:

 "executorInstances": "15",

  "executorCores": "1", (we reduce it to one so single task gets
 all the executorMemory! at least that's the assumption here)

  "executorMemory": "15000m",

  "minPartitions": "2000",

  "taskCpus": "1",

  "executorMemoryOverhead": "1300",

  "shuffleManager": "tungsten-sort",

   "storageFraction": "0.4"


 This is a snippet of what we see in spark UI for a Job that fails.

 This is a *stage* of this job that fails.

 Stage IdPool NameDescriptionSubmittedDurationTasks: Succeeded/Total
 InputOutputShuffle Read ▾Shuffle WriteFailure Reason
 5 (retry 15) prod
 
  map
 at SparkDataJobs.scala:210
 
 +details

 2016/02/09 21:30:06 13 min
 130/389 (16 failed)
 1982.6 MB 818.7 MB org.apache.spark.shuffle.FetchFailedException:
 Error in opening
 FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/fasd/appcache/application_1454975800192_0447/blockmgr-abb77b52-9761-457a-b67d-42a15b975d76/0c/shuffle_0_39_0.data,
 offset=11421300, length=2353}

 This is one of the single *task* attempt from above stage that threw
 OOM

 2 22361 0 FAILED PROCESS_LOCAL 38 / nd1.mycom.local 

Re: Connect the two tables in spark sql

2016-03-01 Thread Ted Yu
You only showed one record from each table.

Have you looked at the following method in DataFrame ?

  def unionAll(other: DataFrame): DataFrame = withPlan {

On Tue, Mar 1, 2016 at 7:13 PM, Angel Angel  wrote:

> Hello Sir/Madam,
>
> I am using the spark sql for the data operation.
>
> I have two tables with the same fields.
>
> Table 1
> name address  phone Number
>  sagar  india  
>
>
>
>
>
> Table 2
> name address  phone Number
>  jaya  india  222
>
>
>
>
> I want to join this tables like the following way
>
>
> Result Table
> name address  phone Number
>  jaya  india  222
>  sagar  india  
>
> How can i join this table. I tried using the join command it add the table
> to left side.
>
> Please help me to solve this query.
>
> Thanks,
>


Connect the two tables in spark sql

2016-03-01 Thread Angel Angel
Hello Sir/Madam,

I am using the spark sql for the data operation.

I have two tables with the same fields.

Table 1
name address  phone Number
 sagar  india  





Table 2
name address  phone Number
 jaya  india  222




I want to join this tables like the following way


Result Table
name address  phone Number
 jaya  india  222
 sagar  india  

How can i join this table. I tried using the join command it add the table
to left side.

Please help me to solve this query.

Thanks,


Does anyone have spark code style guide xml file ?

2016-03-01 Thread Minglei Zhang
Hello,

Appreciate if you have xml file with the following style code ?
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide

thanks.


Re: Does anyone have spark code style guide xml file ?

2016-03-01 Thread Ted Yu
See this in source repo:

./.idea/projectCodeStyle.xml

On Tue, Mar 1, 2016 at 6:55 PM, zml张明磊  wrote:

> Hello,
>
>
>
> Appreciate if you have xml file with the following style code ?
>
> https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
>
>
>
> thanks.
>


Does anyone have spark code style guide xml file ?

2016-03-01 Thread zml张明磊
Hello,

Appreciate if you have xml file with the following style code ?
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide

thanks.


Re: Spark executor killed without apparent reason

2016-03-01 Thread Ted Yu
Using pastebin seems to be better.

The attachment may not go through.

FYI

On Tue, Mar 1, 2016 at 6:07 PM, Jeff Zhang  wrote:

> Do you mind to attach the whole yarn app log ?
>
> On Wed, Mar 2, 2016 at 10:03 AM, Nirav Patel 
> wrote:
>
>> Hi Ryan,
>>
>> I did search "OutOfMemoryError" earlier and just now but it doesn't
>> indicate anything else.
>>
>> Another thing is Job fails at "saveAsHadoopDataset" call to huge rdd.
>> Most of the executors fails at this stage. I don't understand that as well.
>> Because that should be a straight write job to filesystem. All the complex
>> joins and logic were in previous stages which all ran successfully.
>>
>> Thanks
>> Nirav
>>
>> On Wed, Mar 2, 2016 at 2:22 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Could you search "OutOfMemoryError" in the executor logs? It could be
>>> "OufOfMemoryError: Direct Buffer Memory" or something else.
>>>
>>> On Tue, Mar 1, 2016 at 6:23 AM, Nirav Patel 
>>> wrote:
>>>
 Hi,

 We are using spark 1.5.2 or yarn. We have a spark application utilizing
 about 15GB executor memory and 1500 overhead. However, at certain stage we
 notice higher GC time (almost same as task time) spent. These executors are
 bound to get killed at some point. However, nodemanager or resource manager
 logs doesn't indicate failure due to 'beyond physical/virtual memory
 limits' nor I see any 'heap space' or 'gc overhead exceeded' errors in
 executor logs. Some of these high GC executor gets killed eventually but I
 can't seem to find reason. Based on application logs it seems like executor
 didn't respond to driver for long period of time and connection was reset.

 Following are logs from 'yarn logs -applicationId appId_1232_xxx'


 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage
 8.0 (TID 15318). 2099 bytes result sent to driver
 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got
 assigned task 15333
 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage
 8.0 (TID 15333)
 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125
 non-empty blocks out of 3007 blocks
 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
 remote fetches in 10 ms
 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
 maprnode5 has been quiet for 12 ms while there are outstanding
 requests. Assuming connection is dead; please adjust spark.network.timeout
 if this is wrong.
 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
 requests outstanding when connection from maprnode5 is closed
 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
 starting block fetches
 java.io.IOException: Connection from maprnode5 closed
 at
 org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
 at
 io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
 at
 io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
 at
 io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
 at
 io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
 at
 io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
 at
 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
 

Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-01 Thread Nirav Patel
Hi,

I have a spark jobs that runs on yarn and keeps failing at line where i do :


val hConf = HBaseConfiguration.create
hConf.setInt("hbase.client.scanner.caching", 1)
hConf.setBoolean("hbase.cluster.distributed", true)

new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)


Basically at this stage multiple Executors fails after high GC
activities. However none of the executor logs, driver logs or node
manager logs indicate any OutOfMemory errors or GC Overhead Exceeded
errors or memory limits exceeded errors. I don't see any other reason
for Executor failures as well.



Driver Logs:

Failing Oozie Launcher, Main class
[org.apache.oozie.action.hadoop.SparkMain], main() threw exception,
Job aborted due to stage failure: Task 388 in stage 22.0 failed 4
times, most recent failure: Lost task 388.3 in stage 22.0 (TID 32141,
maprnode5): ExecutorLostFailure (executor 5 lost)
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 388 in stage 22.0 failed 4 times, most recent failure: Lost task
388.3 in stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure
(executor 5 lost)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)



Executor logs:


16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage
8.0 (TID 15318). 2099 bytes result sent to driver
16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 15333
16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage
8.0 (TID 15333)
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting
125 non-empty blocks out of 3007 blocks
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
remote fetches in 10 ms
16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
maprnode5 has been quiet for 12 ms while there are outstanding
requests. Assuming connection is dead; please adjust
spark.network.timeout if this is wrong.
16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from maprnode5 is closed
16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
starting block fetches
java.io.IOException: Connection from maprnode5 closed
at 
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at 
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at 

Re: Spark executor killed without apparent reason

2016-03-01 Thread Jeff Zhang
Do you mind to attach the whole yarn app log ?

On Wed, Mar 2, 2016 at 10:03 AM, Nirav Patel  wrote:

> Hi Ryan,
>
> I did search "OutOfMemoryError" earlier and just now but it doesn't
> indicate anything else.
>
> Another thing is Job fails at "saveAsHadoopDataset" call to huge rdd. Most
> of the executors fails at this stage. I don't understand that as well.
> Because that should be a straight write job to filesystem. All the complex
> joins and logic were in previous stages which all ran successfully.
>
> Thanks
> Nirav
>
> On Wed, Mar 2, 2016 at 2:22 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Could you search "OutOfMemoryError" in the executor logs? It could be
>> "OufOfMemoryError: Direct Buffer Memory" or something else.
>>
>> On Tue, Mar 1, 2016 at 6:23 AM, Nirav Patel 
>> wrote:
>>
>>> Hi,
>>>
>>> We are using spark 1.5.2 or yarn. We have a spark application utilizing
>>> about 15GB executor memory and 1500 overhead. However, at certain stage we
>>> notice higher GC time (almost same as task time) spent. These executors are
>>> bound to get killed at some point. However, nodemanager or resource manager
>>> logs doesn't indicate failure due to 'beyond physical/virtual memory
>>> limits' nor I see any 'heap space' or 'gc overhead exceeded' errors in
>>> executor logs. Some of these high GC executor gets killed eventually but I
>>> can't seem to find reason. Based on application logs it seems like executor
>>> didn't respond to driver for long period of time and connection was reset.
>>>
>>> Following are logs from 'yarn logs -applicationId appId_1232_xxx'
>>>
>>>
>>> 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage
>>> 8.0 (TID 15318). 2099 bytes result sent to driver
>>> 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got
>>> assigned task 15333
>>> 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage
>>> 8.0 (TID 15333)
>>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125
>>> non-empty blocks out of 3007 blocks
>>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
>>> remote fetches in 10 ms
>>> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
>>> maprnode5 has been quiet for 12 ms while there are outstanding
>>> requests. Assuming connection is dead; please adjust spark.network.timeout
>>> if this is wrong.
>>> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
>>> requests outstanding when connection from maprnode5 is closed
>>> 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
>>> starting block fetches
>>> java.io.IOException: Connection from maprnode5 closed
>>> at
>>> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>> at
>>> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
>>> at
>>> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>> at java.lang.Thread.run(Thread.java:744)
>>> 

Re: Spark executor killed without apparent reason

2016-03-01 Thread Nirav Patel
Hi Ryan,

I did search "OutOfMemoryError" earlier and just now but it doesn't
indicate anything else.

Another thing is Job fails at "saveAsHadoopDataset" call to huge rdd. Most
of the executors fails at this stage. I don't understand that as well.
Because that should be a straight write job to filesystem. All the complex
joins and logic were in previous stages which all ran successfully.

Thanks
Nirav

On Wed, Mar 2, 2016 at 2:22 AM, Shixiong(Ryan) Zhu 
wrote:

> Could you search "OutOfMemoryError" in the executor logs? It could be
> "OufOfMemoryError: Direct Buffer Memory" or something else.
>
> On Tue, Mar 1, 2016 at 6:23 AM, Nirav Patel  wrote:
>
>> Hi,
>>
>> We are using spark 1.5.2 or yarn. We have a spark application utilizing
>> about 15GB executor memory and 1500 overhead. However, at certain stage we
>> notice higher GC time (almost same as task time) spent. These executors are
>> bound to get killed at some point. However, nodemanager or resource manager
>> logs doesn't indicate failure due to 'beyond physical/virtual memory
>> limits' nor I see any 'heap space' or 'gc overhead exceeded' errors in
>> executor logs. Some of these high GC executor gets killed eventually but I
>> can't seem to find reason. Based on application logs it seems like executor
>> didn't respond to driver for long period of time and connection was reset.
>>
>> Following are logs from 'yarn logs -applicationId appId_1232_xxx'
>>
>>
>> 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage
>> 8.0 (TID 15318). 2099 bytes result sent to driver
>> 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got
>> assigned task 15333
>> 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0
>> (TID 15333)
>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125
>> non-empty blocks out of 3007 blocks
>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
>> remote fetches in 10 ms
>> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
>> maprnode5 has been quiet for 12 ms while there are outstanding
>> requests. Assuming connection is dead; please adjust spark.network.timeout
>> if this is wrong.
>> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
>> requests outstanding when connection from maprnode5 is closed
>> 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
>> starting block fetches
>> java.io.IOException: Connection from maprnode5 closed
>> at
>> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
>> at
>> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>> at java.lang.Thread.run(Thread.java:744)
>> 16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3)
>> for 6 outstanding blocks after 5000 ms
>> 16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive
>> connection to maprnode5, creating a new 

Re: Update edge weight in graphx

2016-03-01 Thread 王肇康
Naveen,

You can "modify" an RDD by creating a new RDD based on the existing one. You
can add vertices and edges via **union** operation and create a new graph with
the new vertex RDD and the edge RDD. By this way, you can "modify" the old
graph.

  

Best wishes,

Zhaokang

  

> On Mar 2 2016, at 6:49 am, Mohammed Guller moham...@glassbeam.com
wrote:  

>

> Like RDDs, Graphs are also immutable.

>

> Mohammed  
Author: Big Data Analytics with Spark

>

>  
\-Original Message-  
From: naveen.marri [mailto:naveenkumarmarri6...@gmail.com]  
Sent: Monday, February 29, 2016 9:11 PM  
To: user@spark.apache.org  
Subject: Update edge weight in graphx

>

> Hi,  
 
 I'm trying to implement an algorithm using graphx which involves updating 
edge weight during every iteration. the format is [Node]-[Node]--[Weight]  
  Ex:  
  I checked in docs of graphx but didn't find any resources to change the 
weight of the edge for a same RDD   
 I know RDDs are immutable , is there any way to do this in graphx  
 Also is there any way to dynamically add vertices and edges to the graph 
within same RDD?
>

>  Regards,  
 Naveen
>

> \--  
View this message in context:   
Sent from the Apache Spark User List mailing list archive at Nabble.com.

>

> \-  
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org

>

>  
\-  
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org  
For additional commands, e-mail: user-h...@spark.apache.org



Re: Configure Spark Resource on AWS CLI Not Working

2016-03-01 Thread Jonathan Kelly
Weiwei,

Please see this documentation for configuring Spark and other apps on EMR
4.x:
http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-configure-apps.html
This documentation about what has changed between 3.x and 4.x should also
be helpful:
http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-release-differences.html

~ Jonathan

On Fri, Feb 26, 2016 at 6:38 PM Weiwei Zhang 
wrote:

> Hi there,
>
> I am trying to configure memory for spark using AWS CLI. However, I got
> the following message:
>
> *A client error (ValidationException) occurred when calling the RunJobFlow
> operation: Cannot specify args for application 'Spark' when release label
> is used.*
>
> In the aws 'create-cluster' command, I have '--release-label emr-4.0.0
> --applications Name=Hadoop
> Name=Spark,Args=[-d,num-executors=4,spark.executor.memory=3000M,spark.driver.memory=4000M]'
> and it seems like I cannot specify args when there is '--release-label'.
> How do I get around this?
>
> I also tried using a JSON configuration file saved in a S3 bucket and add
> "--configurations http://path/bucket/config.json; to the command but it
> gave me an 403 error (access denied). But when I did "aws s3 ls
> (s3://bucket)" I could see that bucket and the config.json in the bucket.
>
> Please advise. Thank you very much.
>
> Best Regards,
> Vivian
>


Building a REST Service with Spark back-end

2016-03-01 Thread Don Drake
I'm interested in building a REST service that utilizes a Spark SQL Context
to return records from a DataFrame (or IndexedRDD?) and even add/update
records.

This will be a simple REST API, with only a few end-points.  I found this
example:

https://github.com/alexmasselot/spark-play-activator

which looks close to what I am interested in doing.

Are there any other ideas or options if I want to run this in a YARN
cluster?

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: scikit learn on EMR PySpark

2016-03-01 Thread Jonathan Kelly
Hi, Myles,

We do not install scikit-learn or spark-sklearn on EMR clusters by default,
but you may install them yourself by just doing "sudo pip install
scikit-learn spark-sklearn" (either by ssh'ing to the master instance and
running this manually, or by running it as an EMR Step).

~ Jonathan

On Tue, Mar 1, 2016 at 3:20 PM Gartland, Myles 
wrote:

> New to Spark and MLlib. Coming from sickit learn.
>
> I am launching my Spark 1.6 instance through AWS EMR and pyspark. All the
> examples using Mllib work fine.
>
> But I have seen a couple examples where you can combine scikit learn
> packages and syntax with mllib.
>
> Like in this example-
> https://databricks.com/blog/2016/02/08/auto-scaling-scikit-learn-with-spark.html
>
> However, it does not seem that Pyspark on AWS EMR comes with scikit (or
> other standard pydata packages) loaded.
>
> Is this something you can/should load on pyspark and how would you do it?
>
> Thanks for assisting.
>
>
> Myles
>


Re: DataSet Evidence

2016-03-01 Thread Michael Armbrust
Hey Steve,

This isn't possible today, but it would not be hard to allow.  You should
open a feature request JIRA.

Michael

On Mon, Feb 29, 2016 at 4:55 PM, Steve Lewis  wrote:

>  I have a relatively complex Java object that I would like to use in a
> dataset
>
> if I say
>
> Encoder evidence = Encoders.kryo(MyType.class);
>
> JavaRDD rddMyType= generateRDD(); // some code
>
>  Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), 
> evidence);
>
>
> I get one column - the whole object
>
> The object is a bean with all fields having getters and setters but some of 
> the fields are other complex java objects -
>
> It would be fine to serielize the objects in these fields with Kryo or Java 
> serialization but the Bean serializer treats all referenced objects as beans 
> and some lack the required getter and setter fields
>
> How can I get my columns with bean serializer even if some of the values in 
> the columns are not bean types
>
>


Re: Mapper side join with DataFrames API

2016-03-01 Thread Michael Armbrust
Its helpful to always include the output of df.explain(true) when you are
asking about performance.

On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan 
wrote:

> Hello All,
>
> I'm trying to join 2 dataframes A and B with a
>
> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a");
>
> Now what I have done is that I have registeredTempTables for A and B after
> loading these DataFrames from different sources. I need the join to be
> really fast and I was wondering if there is a way to use the SQL statement
> and then being able to do a mapper side join ( say my table B is small) ?
>
> I read some articles on using broadcast to do mapper side joins. Could I
> do something like this and then execute my sql statement to achieve mapper
> side join ?
>
> DataFrame B = sparkContext.broadcast(B);
> B.registerTempTable("B");
>
>
> I have a join as stated above and I see in my executor logs the below :
>
> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0
> (TID 1114) in 20354 ms on localhost (196/200)
>
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty
> blocks out of 200 blocks
>
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
>
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
> blocks out of 128 blocks
>
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
>
> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID
> 1115). 2511 bytes result sent to driver
>
> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0
> (TID 1115) in 27621 ms on localhost (197/200)
>
> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort
> data of 256.0 KB to disk (0  time so far)*
>
>
> Now, I have around 10G of executor memory and my memory faction should be
> the default ( 0.75 as per the documentation) and my memory usage is < 1.5G(
> obtained from the Storage tab on Spark dashboard), but still it says
> spilling sort data. I'm a little surprised why this happens even when I
> have enough memory free.
>
> Any inputs will be greatly appreciated!
>
> Thanks
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>


Re: Checkpoint RDD ReliableCheckpointRDD at foreachRDD has different number of partitions from original RDD MapPartitionsRDD at reduceByKeyAndWindow

2016-03-01 Thread RK
I had an incorrect variable name in line 70 while sanitizing the code for this 
email.
Here is the actual code:

45    val windowedEventCounts = events.reduceByKeyAndWindow(_ + _, _ - _, 30, 
5, filterFunc = filterFunction)
        val usefulEvents = windowedEventCounts.filter { case (event, count) => 
{ count > requestThreshold } }
70    usefulEvents.foreachRDD(filteredEvents => { ... })


Thanks,RK
  From: RK 
 To: User  
 Sent: Tuesday, March 1, 2016 3:17 PM
 Subject: Checkpoint RDD ReliableCheckpointRDD at foreachRDD has different 
number of partitions from original RDD MapPartitionsRDD at reduceByKeyAndWindow
   
Here is a code snippet in my spark job. I added the numbers at the start of 
code lines to show the relevant line numbers in exception.


45    val windowedEventCounts = events.reduceByKeyAndWindow(_ + _, _ - _, 30, 
5, filterFunc = filterFunction)
        val usefulEvents = windowedEventCounts.filter { case (event, count) => 
{ count > requestThreshold } }
70    usefulEvents.foreachRDD(events => { ... })


Every once in a while, I see this error in my log files.

org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD[28052] at 
foreachRDD at EventProcessor.scala:70(103) has different number of partitions 
from original RDD MapPartitionsRDD[28050] at reduceByKeyAndWindow at 
EventProcessor.scala:45(108)


Has anyone seen this issue and under what circumstances will it exception occur?
Thanks,RK

  

scikit learn on EMR PySpark

2016-03-01 Thread Gartland, Myles
New to Spark and MLlib. Coming from sickit learn.

I am launching my Spark 1.6 instance through AWS EMR and pyspark. All the 
examples using Mllib work fine.

But I have seen a couple examples where you can combine scikit learn packages 
and syntax with mllib.

Like in this example- 
https://databricks.com/blog/2016/02/08/auto-scaling-scikit-learn-with-spark.html

However, it does not seem that Pyspark on AWS EMR comes with scikit (or other 
standard pydata packages) loaded.

Is this something you can/should load on pyspark and how would you do it?

Thanks for assisting.


Myles


Checkpoint RDD ReliableCheckpointRDD at foreachRDD has different number of partitions from original RDD MapPartitionsRDD at reduceByKeyAndWindow

2016-03-01 Thread RK
Here is a code snippet in my spark job. I added the numbers at the start of 
code lines to show the relevant line numbers in exception.


45    val windowedEventCounts = events.reduceByKeyAndWindow(_ + _, _ - _, 30, 
5, filterFunc = filterFunction)
        val usefulEvents = windowedEventCounts.filter { case (event, count) => 
{ count > requestThreshold } }
70    usefulEvents.foreachRDD(events => { ... })


Every once in a while, I see this error in my log files.

org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD[28052] at 
foreachRDD at EventProcessor.scala:70(103) has different number of partitions 
from original RDD MapPartitionsRDD[28050] at reduceByKeyAndWindow at 
EventProcessor.scala:45(108)


Has anyone seen this issue and under what circumstances will it exception occur?
Thanks,RK

RE: Update edge weight in graphx

2016-03-01 Thread Mohammed Guller
Like RDDs, Graphs are also immutable.

Mohammed
Author: Big Data Analytics with Spark


-Original Message-
From: naveen.marri [mailto:naveenkumarmarri6...@gmail.com] 
Sent: Monday, February 29, 2016 9:11 PM
To: user@spark.apache.org
Subject: Update edge weight in graphx

Hi,
   
 I'm trying to implement an algorithm using graphx which involves updating 
edge weight during every iteration. the format is [Node]-[Node]--[Weight]
  Ex: 
  I checked in docs of graphx but didn't find any resources to change the 
weight of the edge for a same RDD 
 I know RDDs are immutable , is there any way to do this in graphx
 Also is there any way to dynamically add vertices and edges to the graph 
within same RDD?

 Regards,
 Naveen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Update-edge-weight-in-graphx-tp26367.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Proposal] Enabling time series analysis on spark metrics

2016-03-01 Thread Reynold Xin
Is the suggestion just to use a different config (and maybe fallback to
appid) in order to publish metrics? Seems reasonable.


On Tue, Mar 1, 2016 at 8:17 AM, Karan Kumar 
wrote:

> +dev mailing list
>
> Time series analysis on metrics becomes quite useful when running spark
> jobs using a workflow manager like oozie.
>
> Would love to take this up if the community thinks its worthwhile.
>
> On Tue, Feb 23, 2016 at 2:59 PM, Karan Kumar 
> wrote:
>
>> HI
>>
>> Spark at the moment uses application ID to report metrics. I was thinking
>> that if we can create an option to export metrics on a user-controlled key.
>> This will allow us to do time series analysis on counters by dumping these
>> counters in a DB such as graphite.
>>
>> One of the approaches I had in mind was allowing a user to set a property
>> via the spark client. If that property is set, use the property value to
>> report metrics else use the current implementation
>> of
>> reporting metrics on appid.
>>
>> Thoughts?
>>
>> --
>> Thanks
>> Karan
>>
>
>
>
> --
> Thanks
> Karan
>


Re: Converting array to DF

2016-03-01 Thread Ashok Kumar
Thanks great
val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), 
("g", 6))
weights.toSeq.toDF("weights","value").orderBy(desc("value")).collect.foreach(println)
 

On Tuesday, 1 March 2016, 20:52, Shixiong(Ryan) Zhu 
 wrote:
 

 For Array, you need to all `toSeq` at first. Scala can convert Array to 
ArrayOps automatically. However, it's not a `Seq` and you need to call `toSeq` 
explicitly.
On Tue, Mar 1, 2016 at 1:02 AM, Ashok Kumar  
wrote:

Thank you sir
This works OKimport sqlContext.implicits._
val weights = Seq(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), 
("g", 6))
weights.toDF("weights","value").orderBy(desc("value")).collect.foreach(println)
Please why Array did not work? 

On Tuesday, 1 March 2016, 8:51, Jeff Zhang  wrote:
 

 Change Array to Seq and import sqlContext.implicits._


On Tue, Mar 1, 2016 at 4:38 PM, Ashok Kumar  
wrote:

 Hi,
I have this
val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), 
("g", 6))
weights.toDF("weights","value")
I want to convert the Array to DF but I get thisor
weights: Array[(String, Int)] = Array((a,3), (b,2), (c,5), (d,1), (e,9), (f,4), 
(g,6))
:33: error: value toDF is not a member of Array[(String, Int)]
  weights.toDF("weights","value")
I want to label columns and print out the contents in value order please I 
don't know why I am getting this error
Thanks




-- 
Best Regards

Jeff Zhang

   



  

Re: Spark executor killed without apparent reason

2016-03-01 Thread Shixiong(Ryan) Zhu
Could you search "OutOfMemoryError" in the executor logs? It could be
"OufOfMemoryError: Direct Buffer Memory" or something else.

On Tue, Mar 1, 2016 at 6:23 AM, Nirav Patel  wrote:

> Hi,
>
> We are using spark 1.5.2 or yarn. We have a spark application utilizing
> about 15GB executor memory and 1500 overhead. However, at certain stage we
> notice higher GC time (almost same as task time) spent. These executors are
> bound to get killed at some point. However, nodemanager or resource manager
> logs doesn't indicate failure due to 'beyond physical/virtual memory
> limits' nor I see any 'heap space' or 'gc overhead exceeded' errors in
> executor logs. Some of these high GC executor gets killed eventually but I
> can't seem to find reason. Based on application logs it seems like executor
> didn't respond to driver for long period of time and connection was reset.
>
> Following are logs from 'yarn logs -applicationId appId_1232_xxx'
>
>
> 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0
> (TID 15318). 2099 bytes result sent to driver
> 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned
> task 15333
> 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0
> (TID 15333)
> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125
> non-empty blocks out of 3007 blocks
> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
> remote fetches in 10 ms
> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
> maprnode5 has been quiet for 12 ms while there are outstanding
> requests. Assuming connection is dead; please adjust spark.network.timeout
> if this is wrong.
> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
> requests outstanding when connection from maprnode5 is closed
> 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
> starting block fetches
> java.io.IOException: Connection from maprnode5 closed
> at
> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:744)
> 16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3)
> for 6 outstanding blocks after 5000 ms
> 16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive
> connection to maprnode5, creating a new one.
> 16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in
> connection from maprnode5
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> 

How to control the number of parquet files getting created under a partition ?

2016-03-01 Thread SRK
Hi,

How can I control the number of parquet files getting created under a
partition? I have my sqlContext queries to create a table and insert the
records as follows. It seems to create around 250 parquet files under each
partition though I was expecting that to create around 2 or 3 files. Due to
the large number of files, it takes a lot of time to scan the records. Any
suggestions as to how to control the number of parquet files under each
partition would be of great help.

 sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS testUserDts
(userId STRING, savedDate STRING) PARTITIONED BY (partitioner STRING) 
stored as PARQUET LOCATION '/user/testId/testUserDts' ")

  sqlContext.sql(
"""from testUserDtsTemp ps   insert overwrite table testUserDts 
partition(partitioner)  select ps.userId, ps.savedDate ,  ps.partitioner
""".stripMargin)



Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-the-number-of-parquet-files-getting-created-under-a-partition-tp26374.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [ERROR]: Spark 1.5.2 + Hbase 1.1 + Hive 1.2 + HbaseIntegration

2016-03-01 Thread Teng Qiu
and also make sure that hbase-site.xml is set in your classpath on all
nodes, both master and workers, and also client.

normally i put it into $SPARK_HOME/conf/ then the spark cluster will
be started with this conf file.

btw. @Ted, did you tried insert into hbase table with spark's
HiveContext? i got this issue:
https://issues.apache.org/jira/browse/SPARK-6628

and there is a patch available: https://issues.apache.org/jira/browse/HIVE-11166


2016-03-01 15:16 GMT+01:00 Ted Yu :
> 16/03/01 01:36:31 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.RuntimeException: hbase-default.xml file seems to be for an older
> version of HBase (null), this version is 1.1.2.2.3.4.0-3485
>
> The above was likely caused by some component being built with different
> release of hbase.
>
> Try setting "hbase.defaults.for.version.skip" to true.
>
> Cheers
>
>
> On Mon, Feb 29, 2016 at 9:12 PM, Ted Yu  wrote:
>>
>> 16/02/29 23:09:34 INFO ZooKeeper: Initiating client connection,
>> connectString=localhost:2181 sessionTimeout=9
>> watcher=hconnection-0x26fa89a20x0, quorum=localhost:2181, baseZNode=/hbase
>>
>> Since baseZNode didn't match what you set in hbase-site.xml, the cause was
>> likely that hbase-site.xml being inaccessible to your Spark job.
>>
>> Please add it in your classpath.
>>
>> On Mon, Feb 29, 2016 at 8:42 PM, Ted Yu  wrote:
>>>
>>> 16/02/29 23:09:34 INFO ClientCnxn: Opening socket connection to server
>>> localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL
>>> (unknown error)
>>>
>>> Is your cluster secure cluster ?
>>>
>>> bq. Trace :
>>>
>>> Was there any output after 'Trace :' ?
>>>
>>> Was hbase-site.xml accessible to your Spark job ?
>>>
>>> Thanks
>>>
>>> On Mon, Feb 29, 2016 at 8:27 PM, Divya Gehlot 
>>> wrote:

 Hi,
 I am getting error when I am trying to connect hive table (which is
 being created through HbaseIntegration) in spark

 Steps I followed :
 Hive Table creation code  :
 CREATE EXTERNAL TABLE IF NOT EXISTS TEST(NAME STRING,AGE INT)
 STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
 WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,0:AGE")
 TBLPROPERTIES ("hbase.table.name" = "TEST",
 "hbase.mapred.output.outputtable" = "TEST");


 DESCRIBE TEST ;
 col_namedata_typecomment
 namestring from deserializer
 age   int from deserializer


 Spark Code :
 import org.apache.spark._
 import org.apache.spark.sql._

 val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 hiveContext.sql("from TEST SELECT  NAME").collect.foreach(println)


 Starting Spark shell
 spark-shell --jars
 /usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
 --driver-class-path
 /usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
 --packages com.databricks:spark-csv_2.10:1.3.0  --master yarn-client -i
 /TestDivya/Spark/InstrumentCopyToHDFSHive.scala

 Stack Trace :

> Stack SQL context available as sqlContext.
> Loading /TestDivya/Spark/InstrumentCopyToHDFSHive.scala...
> import org.apache.spark._
> import org.apache.spark.sql._
> 16/02/29 23:09:29 INFO HiveContext: Initializing execution hive,
> version 1.2.1
> 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
> 2.7.1.2.3.4.0-3485
> 16/02/29 23:09:29 INFO ClientWrapper: Loaded
> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
> 2.7.1.2.3.4.0-3485
> 16/02/29 23:09:29 INFO HiveContext: default warehouse location is
> /user/hive/warehouse
> 16/02/29 23:09:29 INFO HiveContext: Initializing
> HiveMetastoreConnection version 1.2.1 using Spark classes.
> 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
> 2.7.1.2.3.4.0-3485

Re: Converting array to DF

2016-03-01 Thread Shixiong(Ryan) Zhu
For Array, you need to all `toSeq` at first. Scala can convert Array to
ArrayOps automatically. However, it's not a `Seq` and you need to call
`toSeq` explicitly.

On Tue, Mar 1, 2016 at 1:02 AM, Ashok Kumar 
wrote:

> Thank you sir
>
> This works OK
> import sqlContext.implicits._
> val weights = Seq(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f",
> 4), ("g", 6))
>
> weights.toDF("weights","value").orderBy(desc("value")).collect.foreach(println)
>
> Please why Array did not work?
>
>
> On Tuesday, 1 March 2016, 8:51, Jeff Zhang  wrote:
>
>
> Change Array to Seq and import sqlContext.implicits._
>
>
>
> On Tue, Mar 1, 2016 at 4:38 PM, Ashok Kumar 
> wrote:
>
> Hi,
>
> I have this
>
> val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9),
> ("f", 4), ("g", 6))
> weights.toDF("weights","value")
>
> I want to convert the Array to DF but I get thisor
>
> weights: Array[(String, Int)] = Array((a,3), (b,2), (c,5), (d,1), (e,9),
> (f,4), (g,6))
> :33: error: value toDF is not a member of Array[(String, Int)]
>   weights.toDF("weights","value")
>
> I want to label columns and print out the contents in value order please I
> don't know why I am getting this error
>
> Thanks
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: Shuffle guarantees

2016-03-01 Thread Corey Nolet
Nevermind, a look @ the ExternalSorter class tells me that the iterator for
each key that's only partially ordered ends up being merge sorted by
equality after the fact. Wanted to post my finding on here for others who
may have the same questions.


On Tue, Mar 1, 2016 at 3:05 PM, Corey Nolet  wrote:

> The reason I'm asking, I see a comment in the ExternalSorter class that
> says this:
>
> "If we need to aggregate by key, we either use a total ordering from the
> ordering parameter, or read the keys with the same hash code and compare
> them with each other for equality to merge values".
>
> How can this be assumed if the object used for the key, for instance, in
> the case where a HashPartitioner is used, cannot assume ordering and
> therefore cannot assume a comparator can be used?
>
> On Tue, Mar 1, 2016 at 2:56 PM, Corey Nolet  wrote:
>
>> So if I'm using reduceByKey() with a HashPartitioner, I understand that
>> the hashCode() of my key is used to create the underlying shuffle files.
>>
>> Is anything other than hashCode() used in the shuffle files when the data
>> is pulled into the reducers and run through the reduce function? The reason
>> I'm asking is because there's a possibility of hashCode() colliding in two
>> different objects which end up hashing to the same number, right?
>>
>>
>>
>


Re: Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException

2016-03-01 Thread Shixiong(Ryan) Zhu
Could you show the full companion object? It looks weird that having
`override` in a companion object of a case class.

On Tue, Mar 1, 2016 at 11:16 AM, Yuval Itzchakov  wrote:

> As I said, it is the method which eventually serializes the object. It is
> declared inside a companion object of a case class.
>
> The problem is that Spark will still try to serialize the method, as it
> needs to execute on the worker. How will that change the fact that
> `EncodeJson[T]` is not serializable?
>
>
> On Tue, Mar 1, 2016, 21:12 Shixiong(Ryan) Zhu 
> wrote:
>
>> Don't know where "argonaut.EncodeJson$$anon$2" comes from. However, you
>> can always put your codes into an method of an "object". Then just call it
>> like a Java static method.
>>
>> On Tue, Mar 1, 2016 at 10:30 AM, Yuval.Itzchakov 
>> wrote:
>>
>>> I have a small snippet of code which relays on  argonaut
>>>    for JSON serialization which is ran from a
>>> `PairRDDFunctions.mapWithState` once a session is completed.
>>>
>>> This is the code snippet (not that important):
>>>
>>>   override def sendMessage(pageView: PageView): Unit = {
>>> Future {
>>>   LogHolder.logger.info(s"Sending pageview: ${pageView.id} to
>>> automation")
>>>   try {
>>> Http(url)
>>>   .postData(pageView.asJson.toString)
>>>   .option(HttpOptions.connTimeout(timeOutMilliseconds))
>>>   .asString
>>>   .throwError
>>>   }
>>>   catch {
>>> case NonFatal(e) => LogHolder.logger.error("Failed to send
>>> pageview", e)
>>>   }
>>> }
>>>   }
>>>
>>> argonaut relys on a user implementation of a trait called
>>> `EncodeJson[T]`,
>>> which tells argonaut how to serialize and deserialize the object.
>>>
>>> The problem is, that the trait `EncodeJson[T]` is not serializable, thus
>>> throwing a NotSerializableException:
>>>
>>> Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2
>>> Serialization stack:
>>> - object not serializable (class: argonaut.EncodeJson$$anon$2,
>>> value: argonaut.EncodeJson$$anon$2@6415f61e)
>>>
>>> This is obvious and understandable.
>>>
>>> The question I have is - What possible ways are there to work around
>>> this?
>>> I'm currently depended on a third-party library which I can't control of
>>> change to implement Serializable in anyway. I've seen this  this
>>> StackOverflow answer
>>> <
>>> http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
>>> >
>>> but couldn't implement any reasonable workaround.
>>>
>>> Anyone have any ideas?
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>


Re: Spark UI standalone "crashes" after an application finishes

2016-03-01 Thread Teng Qiu
as Gourav said, the application UI on port 4040 will no more available
after your spark app finished. you should go to spark master's UI
(port 8080), and take a look "completed applications"...

refer to doc: http://spark.apache.org/docs/latest/monitoring.html
read the first "note that" :)

2016-03-01 21:13 GMT+01:00 Gourav Sengupta :
> Hi,
>
> in case you are submitting your SPARK jobs then the UI is only available
> when the job is running.
>
> Else if you are starting a SPARK cluster in standalone mode or HADOOP or
> etc, then the SPARK UI remains alive.
>
> The other way to keep the SPARK UI alive is to use the Jupyter notebook for
> Python or Scala (see Apache Toree) or use Zeppelin.
>
>
> Regards,
> Gourav Sengupta
>
> On Mon, Feb 29, 2016 at 11:48 PM, Sumona Routh  wrote:
>>
>> Hi there,
>> I've been doing some performance tuning of our Spark application, which is
>> using Spark 1.2.1 standalone. I have been using the spark metrics to graph
>> out details as I run the jobs, as well as the UI to review the tasks and
>> stages.
>>
>> I notice that after my application completes, or is near completion, the
>> UI "crashes." I get a Connection Refused response. Sometimes, the page
>> eventually recovers and will load again, but sometimes I end up having to
>> restart the Spark master to get it back. When I look at my graphs on the
>> app, the memory consumption (of driver, executors, and what I believe to be
>> the daemon (spark.jvm.total.used)) appears to be healthy. Monitoring the
>> master machine itself, memory and CPU appear healthy as well.
>>
>> Has anyone else seen this issue? Are there logs for the UI itself, and
>> where might I find those?
>>
>> Thanks!
>> Sumona
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark UI standalone "crashes" after an application finishes

2016-03-01 Thread Gourav Sengupta
Hi,

in case you are submitting your SPARK jobs then the UI is only available
when the job is running.

Else if you are starting a SPARK cluster in standalone mode or HADOOP or
etc, then the SPARK UI remains alive.

The other way to keep the SPARK UI alive is to use the Jupyter notebook for
Python or Scala (see Apache Toree) or use Zeppelin.


Regards,
Gourav Sengupta

On Mon, Feb 29, 2016 at 11:48 PM, Sumona Routh  wrote:

> Hi there,
> I've been doing some performance tuning of our Spark application, which is
> using Spark 1.2.1 standalone. I have been using the spark metrics to graph
> out details as I run the jobs, as well as the UI to review the tasks and
> stages.
>
> I notice that after my application completes, or is near completion, the
> UI "crashes." I get a Connection Refused response. Sometimes, the page
> eventually recovers and will load again, but sometimes I end up having to
> restart the Spark master to get it back. When I look at my graphs on the
> app, the memory consumption (of driver, executors, and what I believe to be
> the daemon (spark.jvm.total.used)) appears to be healthy. Monitoring the
> master machine itself, memory and CPU appear healthy as well.
>
> Has anyone else seen this issue? Are there logs for the UI itself, and
> where might I find those?
>
> Thanks!
> Sumona
>


Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-01 Thread Gourav Sengupta
Hi,

which region are you using the EMR clusters from? Is there any tweaking of
the HADOOP parameters that you are doing before starting the clusters?

If you are using AWS CLI to start the cluster just send across the command.

I have, never till date, faced any such issues in the Ireland region.


Regards,
Gourav Sengupta

On Tue, Mar 1, 2016 at 9:15 AM, Oleg Ruchovets  wrote:

> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell but
> it looks it does't work and throws exceptions.
> Please advice:
>
> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
> support was removed in 8.0
> 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop
> 16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop
> 16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
> with modify permissions: Set(hadoop)
> 16/03/01 09:11:49 INFO HttpServer: Starting HTTP Server
> 16/03/01 09:11:49 INFO Utils: Successfully started service 'HTTP class
> server' on port 47223.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.6.0
>   /_/
>
> Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_71)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 16/03/01 09:11:53 INFO SparkContext: Running Spark version 1.6.0
> 16/03/01 09:11:53 INFO SecurityManager: Changing view acls to: hadoop
> 16/03/01 09:11:53 INFO SecurityManager: Changing modify acls to: hadoop
> 16/03/01 09:11:53 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
> with modify permissions: Set(hadoop)
> 16/03/01 09:11:54 INFO Utils: Successfully started service 'sparkDriver'
> on port 52143.
> 16/03/01 09:11:54 INFO Slf4jLogger: Slf4jLogger started
> 16/03/01 09:11:54 INFO Remoting: Starting remoting
> 16/03/01 09:11:54 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@172.31.39.37:42989]
> 16/03/01 09:11:54 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 42989.
> 16/03/01 09:11:54 INFO SparkEnv: Registering MapOutputTracker
> 16/03/01 09:11:54 INFO SparkEnv: Registering BlockManagerMaster
> 16/03/01 09:11:54 INFO DiskBlockManager: Created local directory at
> /mnt/tmp/blockmgr-afaf0e7f-086e-49f1-946d-798e605a3fdc
> 16/03/01 09:11:54 INFO MemoryStore: MemoryStore started with capacity
> 518.1 MB
> 16/03/01 09:11:55 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/03/01 09:11:55 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 16/03/01 09:11:55 INFO SparkUI: Started SparkUI at
> http://172.31.39.37:4040
> 16/03/01 09:11:55 INFO RMProxy: Connecting to ResourceManager at /
> 172.31.39.37:8032
> 16/03/01 09:11:55 INFO Client: Requesting a new application from cluster
> with 2 NodeManagers
> 16/03/01 09:11:55 INFO Client: Verifying our application has not requested
> more than the maximum memory capability of the cluster (11520 MB per
> container)
> 16/03/01 09:11:55 INFO Client: Will allocate AM container, with 896 MB
> memory including 384 MB overhead
> 16/03/01 09:11:55 INFO Client: Setting up container launch context for our
> AM
> 16/03/01 09:11:55 INFO Client: Setting up the launch environment for our
> AM container
> 16/03/01 09:11:55 INFO Client: Preparing resources for our AM container
> 16/03/01 09:11:56 INFO Client: Uploading resource
> file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar ->
> hdfs://
> 172.31.39.37:8020/user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
> 16/03/01 09:11:56 INFO MetricsSaver: MetricsConfigRecord
> disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec:
> 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500
> lastModified: 1456818856695
> 16/03/01 09:11:56 INFO MetricsSaver: Created MetricsSaver
> j-2FT6QNFSPTHNX:i-5f6bcadb:SparkSubmit:04807 period:60
> /mnt/var/em/raw/i-5f6bcadb_20160301_SparkSubmit_04807_raw.bin
> 16/03/01 09:11:56 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
> could only be replicated to 0 nodes instead of minReplication (=1).  There
> are 0 datanode(s) running and no node(s) are excluded in this operation.
> at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
> at
> 

Re: Shuffle guarantees

2016-03-01 Thread Corey Nolet
The reason I'm asking, I see a comment in the ExternalSorter class that
says this:

"If we need to aggregate by key, we either use a total ordering from the
ordering parameter, or read the keys with the same hash code and compare
them with each other for equality to merge values".

How can this be assumed if the object used for the key, for instance, in
the case where a HashPartitioner is used, cannot assume ordering and
therefore cannot assume a comparator can be used?

On Tue, Mar 1, 2016 at 2:56 PM, Corey Nolet  wrote:

> So if I'm using reduceByKey() with a HashPartitioner, I understand that
> the hashCode() of my key is used to create the underlying shuffle files.
>
> Is anything other than hashCode() used in the shuffle files when the data
> is pulled into the reducers and run through the reduce function? The reason
> I'm asking is because there's a possibility of hashCode() colliding in two
> different objects which end up hashing to the same number, right?
>
>
>


Shuffle guarantees

2016-03-01 Thread Corey Nolet
So if I'm using reduceByKey() with a HashPartitioner, I understand that the
hashCode() of my key is used to create the underlying shuffle files.

Is anything other than hashCode() used in the shuffle files when the data
is pulled into the reducers and run through the reduce function? The reason
I'm asking is because there's a possibility of hashCode() colliding in two
different objects which end up hashing to the same number, right?


Re: Spark 1.5 on Mesos

2016-03-01 Thread Timothy Chen
Can you go through the Mesos UI and look at the driver/executor log from steer 
file and see what the problem is?

Tim

> On Mar 1, 2016, at 8:05 AM, Ashish Soni  wrote:
> 
> Not sure what is the issue but i am getting below error  when i try to run 
> spark PI example
> 
> Blacklisting Mesos slave value: "5345asdasdasdkas234234asdasdasdasd"
>due to too many failures; is Spark installed on it?
> WARN TaskSchedulerImpl: Initial job has not accepted any resources; check 
> your cluster UI to ensure that workers are registered and have sufficient 
> resources
> 
>> On Mon, Feb 29, 2016 at 1:39 PM, Sathish Kumaran Vairavelu 
>>  wrote:
>> May be the Mesos executor couldn't find spark image or the constraints are 
>> not satisfied. Check your Mesos UI if you see Spark application in the 
>> Frameworks tab
>> 
>>> On Mon, Feb 29, 2016 at 12:23 PM Ashish Soni  wrote:
>>> What is the Best practice , I have everything running as docker container 
>>> in single host ( mesos and marathon also as docker container )  and 
>>> everything comes up fine but when i try to launch the spark shell i get 
>>> below error
>>> 
>>> 
>>> SQL context available as sqlContext.
>>> 
>>> scala> val data = sc.parallelize(1 to 100)
>>> data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at 
>>> parallelize at :27
>>> 
>>> scala> data.count
>>> [Stage 0:>  (0 + 0) 
>>> / 2]16/02/29 18:21:12 WARN TaskSchedulerImpl: Initial job has not accepted 
>>> any resources; check your cluster UI to ensure that workers are registered 
>>> and have sufficient resources
>>> 16/02/29 18:21:27 WARN TaskSchedulerImpl: Initial job has not accepted any 
>>> resources; check your cluster UI to ensure that workers are registered and 
>>> have sufficient resources
>>>  
>>> 
>>> 
 On Mon, Feb 29, 2016 at 12:04 PM, Tim Chen  wrote:
 No you don't have to run Mesos in docker containers to run Spark in docker 
 containers.
 
 Once you have Mesos cluster running you can then specfiy the Spark 
 configurations in your Spark job (i.e: 
 spark.mesos.executor.docker.image=mesosphere/spark:1.6) and Mesos will 
 automatically launch docker containers for you.
 
 Tim
 
> On Mon, Feb 29, 2016 at 7:36 AM, Ashish Soni  
> wrote:
> Yes i read that and not much details here.
> 
> Is it true that we need to have spark installed on each mesos docker 
> container ( master and slave ) ...
> 
> Ashish
> 
>> On Fri, Feb 26, 2016 at 2:14 PM, Tim Chen  wrote:
>> https://spark.apache.org/docs/latest/running-on-mesos.html should be the 
>> best source, what problems were you running into?
>> 
>> Tim
>> 
>>> On Fri, Feb 26, 2016 at 11:06 AM, Yin Yang  wrote:
>>> Have you read this ?
>>> https://spark.apache.org/docs/latest/running-on-mesos.html
>>> 
 On Fri, Feb 26, 2016 at 11:03 AM, Ashish Soni  
 wrote:
 Hi All , 
 
 Is there any proper documentation as how to run spark on mesos , I am 
 trying from the last few days and not able to make it work.
 
 Please help
 
 Ashish
> 


Re: SparkR Count vs Take performance

2016-03-01 Thread Sean Owen
Yeah one surprising result is that you can't call isEmpty on an RDD of
nonserializable objects. You can't do much with an RDD of
nonserializable objects anyway, but they can exist as an intermediate
stage.

We could fix that pretty easily with a little copy and paste of the
take() code; right now isEmpty is simple but has this drawback.

On Tue, Mar 1, 2016 at 7:18 PM, Dirceu Semighini Filho
 wrote:
> Great, I didn't noticed this isEmpty method.
> Well serialization is been a problem in this project, we have noticed a lot
> of time been spent in serializing and deserializing things to send and get
> from the cluster.
>
> 2016-03-01 15:47 GMT-03:00 Sean Owen :
>>
>> There is an "isEmpty" method that basically does exactly what your
>> second version does.
>>
>> I have seen it be unusually slow at times because it must copy 1
>> element to the driver, and it's possible that's slow. It still
>> shouldn't be slow in general, and I'd be surprised if it's slower than
>> a count in all but pathological cases.
>>
>>
>>
>> On Tue, Mar 1, 2016 at 6:03 PM, Dirceu Semighini Filho
>>  wrote:
>> > Hello all.
>> > I have a script that create a dataframe from this operation:
>> >
>> > mytable <- sql(sqlContext,("SELECT ID_PRODUCT, ... FROM mytable"))
>> >
>> > rSparkDf <- createPartitionedDataFrame(sqlContext,myRdataframe)
>> > dFrame <- join(mytable,rSparkDf,mytable$ID_PRODUCT==rSparkDf$ID_PRODUCT)
>> >
>> > After filtering this dFrame with this:
>> >
>> >
>> > I tried to execute the following
>> > filteredDF <- filterRDD(toRDD(dFrame),function (row) {row['COLUMN'] %in%
>> > c("VALUES", ...)})
>> > Now I need to know if the resulting dataframe is empty, and to do that I
>> > tried this two codes:
>> > if(count(filteredDF) > 0)
>> > and
>> > if(length(take(filteredDF,1)) > 0)
>> > I thought that the second one, using take, shoule run faster than count,
>> > but
>> > that didn't happen.
>> > The take operation creates one job per partition of my rdd (which was
>> > 200)
>> > and this make it to run slower than the count.
>> > Is this the expected behaviour?
>> >
>> > Regards,
>> > Dirceu
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-01 Thread Alexander Pivovarov
EMR-4.3.0 and Spark-1.6.0 works fine for me
I use r3.2xlarge boxes  (spot) (even 3 slave boxes works fine)

I use the following settings (in json)

[
  {
"Classification": "spark-defaults",
"Properties": {
  "spark.driver.extraJavaOptions": "-Dfile.encoding=UTF-8",
  "spark.executor.extraJavaOptions": "-Dfile.encoding=UTF-8"
}
  },
  {
"Classification": "spark",
"Properties": {
  "maximizeResourceAllocation": "true"
}
  },
  {
"Classification": "spark-log4j",
"Properties": {
  "log4j.logger.com.amazon": "WARN",
  "log4j.logger.com.amazonaws": "WARN",
  "log4j.logger.amazon.emr": "WARN",
  "log4j.logger.akka": "WARN"
}
  }
]


BTW, Oleg you do not need to cd /usr/bin

just ssh as hadoop to master box and type
$ spark-shell

you can look at how spark works at :8088  WEB UI

On Tue, Mar 1, 2016 at 10:38 AM, Daniel Siegmann <
daniel.siegm...@teamaol.com> wrote:

> How many core nodes does your cluster have?
>
> On Tue, Mar 1, 2016 at 4:15 AM, Oleg Ruchovets 
> wrote:
>
>> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell
>> but it looks it does't work and throws exceptions.
>> Please advice:
>>
>> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
>> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
>> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
>> support was removed in 8.0
>> 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop
>> 16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop
>> 16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
>> with modify permissions: Set(hadoop)
>> 16/03/01 09:11:49 INFO HttpServer: Starting HTTP Server
>> 16/03/01 09:11:49 INFO Utils: Successfully started service 'HTTP class
>> server' on port 47223.
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 1.6.0
>>   /_/
>>
>> Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_71)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>> 16/03/01 09:11:53 INFO SparkContext: Running Spark version 1.6.0
>> 16/03/01 09:11:53 INFO SecurityManager: Changing view acls to: hadoop
>> 16/03/01 09:11:53 INFO SecurityManager: Changing modify acls to: hadoop
>> 16/03/01 09:11:53 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
>> with modify permissions: Set(hadoop)
>> 16/03/01 09:11:54 INFO Utils: Successfully started service 'sparkDriver'
>> on port 52143.
>> 16/03/01 09:11:54 INFO Slf4jLogger: Slf4jLogger started
>> 16/03/01 09:11:54 INFO Remoting: Starting remoting
>> 16/03/01 09:11:54 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp://sparkDriverActorSystem@172.31.39.37:42989]
>> 16/03/01 09:11:54 INFO Utils: Successfully started service
>> 'sparkDriverActorSystem' on port 42989.
>> 16/03/01 09:11:54 INFO SparkEnv: Registering MapOutputTracker
>> 16/03/01 09:11:54 INFO SparkEnv: Registering BlockManagerMaster
>> 16/03/01 09:11:54 INFO DiskBlockManager: Created local directory at
>> /mnt/tmp/blockmgr-afaf0e7f-086e-49f1-946d-798e605a3fdc
>> 16/03/01 09:11:54 INFO MemoryStore: MemoryStore started with capacity
>> 518.1 MB
>> 16/03/01 09:11:55 INFO SparkEnv: Registering OutputCommitCoordinator
>> 16/03/01 09:11:55 INFO Utils: Successfully started service 'SparkUI' on
>> port 4040.
>> 16/03/01 09:11:55 INFO SparkUI: Started SparkUI at
>> http://172.31.39.37:4040
>> 16/03/01 09:11:55 INFO RMProxy: Connecting to ResourceManager at /
>> 172.31.39.37:8032
>> 16/03/01 09:11:55 INFO Client: Requesting a new application from cluster
>> with 2 NodeManagers
>> 16/03/01 09:11:55 INFO Client: Verifying our application has not
>> requested more than the maximum memory capability of the cluster (11520 MB
>> per container)
>> 16/03/01 09:11:55 INFO Client: Will allocate AM container, with 896 MB
>> memory including 384 MB overhead
>> 16/03/01 09:11:55 INFO Client: Setting up container launch context for
>> our AM
>> 16/03/01 09:11:55 INFO Client: Setting up the launch environment for our
>> AM container
>> 16/03/01 09:11:55 INFO Client: Preparing resources for our AM container
>> 16/03/01 09:11:56 INFO Client: Uploading resource
>> file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar ->
>> hdfs://
>> 172.31.39.37:8020/user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
>> 16/03/01 09:11:56 INFO MetricsSaver: MetricsConfigRecord
>> disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec:
>> 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500
>> lastModified: 1456818856695
>> 16/03/01 09:11:56 INFO MetricsSaver: Created MetricsSaver
>> 

Re: Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException

2016-03-01 Thread Yuval Itzchakov
As I said, it is the method which eventually serializes the object. It is
declared inside a companion object of a case class.

The problem is that Spark will still try to serialize the method, as it
needs to execute on the worker. How will that change the fact that
`EncodeJson[T]` is not serializable?

On Tue, Mar 1, 2016, 21:12 Shixiong(Ryan) Zhu 
wrote:

> Don't know where "argonaut.EncodeJson$$anon$2" comes from. However, you
> can always put your codes into an method of an "object". Then just call it
> like a Java static method.
>
> On Tue, Mar 1, 2016 at 10:30 AM, Yuval.Itzchakov 
> wrote:
>
>> I have a small snippet of code which relays on  argonaut
>>    for JSON serialization which is ran from a
>> `PairRDDFunctions.mapWithState` once a session is completed.
>>
>> This is the code snippet (not that important):
>>
>>   override def sendMessage(pageView: PageView): Unit = {
>> Future {
>>   LogHolder.logger.info(s"Sending pageview: ${pageView.id} to
>> automation")
>>   try {
>> Http(url)
>>   .postData(pageView.asJson.toString)
>>   .option(HttpOptions.connTimeout(timeOutMilliseconds))
>>   .asString
>>   .throwError
>>   }
>>   catch {
>> case NonFatal(e) => LogHolder.logger.error("Failed to send
>> pageview", e)
>>   }
>> }
>>   }
>>
>> argonaut relys on a user implementation of a trait called `EncodeJson[T]`,
>> which tells argonaut how to serialize and deserialize the object.
>>
>> The problem is, that the trait `EncodeJson[T]` is not serializable, thus
>> throwing a NotSerializableException:
>>
>> Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2
>> Serialization stack:
>> - object not serializable (class: argonaut.EncodeJson$$anon$2,
>> value: argonaut.EncodeJson$$anon$2@6415f61e)
>>
>> This is obvious and understandable.
>>
>> The question I have is - What possible ways are there to work around this?
>> I'm currently depended on a third-party library which I can't control of
>> change to implement Serializable in anyway. I've seen this  this
>> StackOverflow answer
>> <
>> http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
>> >
>> but couldn't implement any reasonable workaround.
>>
>> Anyone have any ideas?
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Sample sql query using pyspark

2016-03-01 Thread Maurin Lenglart
Hi,
Thanks for the hint. I have tried to remove the limit from the query but the 
result is still the same. If I understand correctly, the func "sample()” is 
taking a sample of the result of the query and not sampling the original table 
that I am querying.

I have a business use case to sample a lot of different queries in prod, so I 
can’t just insert 100 rows in an other table.

I am thinking about doing something like (pseudo code) :
my_df = SELECT * FROM my_table WHERE  `event_date` >= '2015-11-14' AND 
`event_date` <= '2016-02-19’
my_sample = my_df.sample(0.1)
Result = sample.groupBy("Category").agg(sum("bookings"), sum("dealviews”))


Thanks for your answer.



From: James Barney >
Date: Tuesday, March 1, 2016 at 7:01 AM
To: maurin lenglart >
Cc: "user@spark.apache.org" 
>
Subject: Re: Sample sql query using pyspark

Maurin,

I don't know the technical reason why but: try removing the 'limit 100' part of 
your query. I was trying to do something similar the other week and what I 
found is that each executor doesn't necessarily get the same 100 rows. Joins 
would fail or result with a bunch of nulls when keys weren't found between the 
slices of 100 rows.

Once I removed the 'limit ' part of my query, all the results were the same 
across the board and taking samples worked again.

If the amount of data is too large, or you're trying to just test on a smaller 
size, just define another table and insert only 100 rows into that table.

I hope that helps!

On Tue, Mar 1, 2016 at 3:10 AM, Maurin Lenglart 
> wrote:
Hi,
I am trying to get a sample of a sql query in to make the query run faster.
My query look like this :
SELECT `Category` as `Category`,sum(`bookings`) as `bookings`,sum(`dealviews`) 
as `dealviews` FROM groupon_dropbox WHERE  `event_date` >= '2015-11-14' AND 
`event_date` <= '2016-02-19' GROUP BY `Category` LIMIT 100

The table is partitioned by event_date. And the code I am using is:
 df = self.df_from_sql(sql, srcs)

results = df.sample(False, 0.5).collect()

 The results are a little bit different, but the execution time is almost the 
same. Am I missing something?


thanks



Re: SparkR Count vs Take performance

2016-03-01 Thread Dirceu Semighini Filho
Great, I didn't noticed this isEmpty method.
Well serialization is been a problem in this project, we have noticed a lot
of time been spent in serializing and deserializing things to send and get
from the cluster.

2016-03-01 15:47 GMT-03:00 Sean Owen :

> There is an "isEmpty" method that basically does exactly what your
> second version does.
>
> I have seen it be unusually slow at times because it must copy 1
> element to the driver, and it's possible that's slow. It still
> shouldn't be slow in general, and I'd be surprised if it's slower than
> a count in all but pathological cases.
>
>
>
> On Tue, Mar 1, 2016 at 6:03 PM, Dirceu Semighini Filho
>  wrote:
> > Hello all.
> > I have a script that create a dataframe from this operation:
> >
> > mytable <- sql(sqlContext,("SELECT ID_PRODUCT, ... FROM mytable"))
> >
> > rSparkDf <- createPartitionedDataFrame(sqlContext,myRdataframe)
> > dFrame <- join(mytable,rSparkDf,mytable$ID_PRODUCT==rSparkDf$ID_PRODUCT)
> >
> > After filtering this dFrame with this:
> >
> >
> > I tried to execute the following
> > filteredDF <- filterRDD(toRDD(dFrame),function (row) {row['COLUMN'] %in%
> > c("VALUES", ...)})
> > Now I need to know if the resulting dataframe is empty, and to do that I
> > tried this two codes:
> > if(count(filteredDF) > 0)
> > and
> > if(length(take(filteredDF,1)) > 0)
> > I thought that the second one, using take, shoule run faster than count,
> but
> > that didn't happen.
> > The take operation creates one job per partition of my rdd (which was
> 200)
> > and this make it to run slower than the count.
> > Is this the expected behaviour?
> >
> > Regards,
> > Dirceu
>


Re: Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException

2016-03-01 Thread Shixiong(Ryan) Zhu
Don't know where "argonaut.EncodeJson$$anon$2" comes from. However, you can
always put your codes into an method of an "object". Then just call it like
a Java static method.

On Tue, Mar 1, 2016 at 10:30 AM, Yuval.Itzchakov  wrote:

> I have a small snippet of code which relays on  argonaut
>    for JSON serialization which is ran from a
> `PairRDDFunctions.mapWithState` once a session is completed.
>
> This is the code snippet (not that important):
>
>   override def sendMessage(pageView: PageView): Unit = {
> Future {
>   LogHolder.logger.info(s"Sending pageview: ${pageView.id} to
> automation")
>   try {
> Http(url)
>   .postData(pageView.asJson.toString)
>   .option(HttpOptions.connTimeout(timeOutMilliseconds))
>   .asString
>   .throwError
>   }
>   catch {
> case NonFatal(e) => LogHolder.logger.error("Failed to send
> pageview", e)
>   }
> }
>   }
>
> argonaut relys on a user implementation of a trait called `EncodeJson[T]`,
> which tells argonaut how to serialize and deserialize the object.
>
> The problem is, that the trait `EncodeJson[T]` is not serializable, thus
> throwing a NotSerializableException:
>
> Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2
> Serialization stack:
> - object not serializable (class: argonaut.EncodeJson$$anon$2,
> value: argonaut.EncodeJson$$anon$2@6415f61e)
>
> This is obvious and understandable.
>
> The question I have is - What possible ways are there to work around this?
> I'm currently depended on a third-party library which I can't control of
> change to implement Serializable in anyway. I've seen this  this
> StackOverflow answer
> <
> http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
> >
> but couldn't implement any reasonable workaround.
>
> Anyone have any ideas?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkR Count vs Take performance

2016-03-01 Thread Sean Owen
There is an "isEmpty" method that basically does exactly what your
second version does.

I have seen it be unusually slow at times because it must copy 1
element to the driver, and it's possible that's slow. It still
shouldn't be slow in general, and I'd be surprised if it's slower than
a count in all but pathological cases.



On Tue, Mar 1, 2016 at 6:03 PM, Dirceu Semighini Filho
 wrote:
> Hello all.
> I have a script that create a dataframe from this operation:
>
> mytable <- sql(sqlContext,("SELECT ID_PRODUCT, ... FROM mytable"))
>
> rSparkDf <- createPartitionedDataFrame(sqlContext,myRdataframe)
> dFrame <- join(mytable,rSparkDf,mytable$ID_PRODUCT==rSparkDf$ID_PRODUCT)
>
> After filtering this dFrame with this:
>
>
> I tried to execute the following
> filteredDF <- filterRDD(toRDD(dFrame),function (row) {row['COLUMN'] %in%
> c("VALUES", ...)})
> Now I need to know if the resulting dataframe is empty, and to do that I
> tried this two codes:
> if(count(filteredDF) > 0)
> and
> if(length(take(filteredDF,1)) > 0)
> I thought that the second one, using take, shoule run faster than count, but
> that didn't happen.
> The take operation creates one job per partition of my rdd (which was 200)
> and this make it to run slower than the count.
> Is this the expected behaviour?
>
> Regards,
> Dirceu

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-01 Thread Daniel Siegmann
How many core nodes does your cluster have?

On Tue, Mar 1, 2016 at 4:15 AM, Oleg Ruchovets  wrote:

> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell but
> it looks it does't work and throws exceptions.
> Please advice:
>
> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
> support was removed in 8.0
> 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop
> 16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop
> 16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
> with modify permissions: Set(hadoop)
> 16/03/01 09:11:49 INFO HttpServer: Starting HTTP Server
> 16/03/01 09:11:49 INFO Utils: Successfully started service 'HTTP class
> server' on port 47223.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.6.0
>   /_/
>
> Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_71)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 16/03/01 09:11:53 INFO SparkContext: Running Spark version 1.6.0
> 16/03/01 09:11:53 INFO SecurityManager: Changing view acls to: hadoop
> 16/03/01 09:11:53 INFO SecurityManager: Changing modify acls to: hadoop
> 16/03/01 09:11:53 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
> with modify permissions: Set(hadoop)
> 16/03/01 09:11:54 INFO Utils: Successfully started service 'sparkDriver'
> on port 52143.
> 16/03/01 09:11:54 INFO Slf4jLogger: Slf4jLogger started
> 16/03/01 09:11:54 INFO Remoting: Starting remoting
> 16/03/01 09:11:54 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@172.31.39.37:42989]
> 16/03/01 09:11:54 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 42989.
> 16/03/01 09:11:54 INFO SparkEnv: Registering MapOutputTracker
> 16/03/01 09:11:54 INFO SparkEnv: Registering BlockManagerMaster
> 16/03/01 09:11:54 INFO DiskBlockManager: Created local directory at
> /mnt/tmp/blockmgr-afaf0e7f-086e-49f1-946d-798e605a3fdc
> 16/03/01 09:11:54 INFO MemoryStore: MemoryStore started with capacity
> 518.1 MB
> 16/03/01 09:11:55 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/03/01 09:11:55 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 16/03/01 09:11:55 INFO SparkUI: Started SparkUI at
> http://172.31.39.37:4040
> 16/03/01 09:11:55 INFO RMProxy: Connecting to ResourceManager at /
> 172.31.39.37:8032
> 16/03/01 09:11:55 INFO Client: Requesting a new application from cluster
> with 2 NodeManagers
> 16/03/01 09:11:55 INFO Client: Verifying our application has not requested
> more than the maximum memory capability of the cluster (11520 MB per
> container)
> 16/03/01 09:11:55 INFO Client: Will allocate AM container, with 896 MB
> memory including 384 MB overhead
> 16/03/01 09:11:55 INFO Client: Setting up container launch context for our
> AM
> 16/03/01 09:11:55 INFO Client: Setting up the launch environment for our
> AM container
> 16/03/01 09:11:55 INFO Client: Preparing resources for our AM container
> 16/03/01 09:11:56 INFO Client: Uploading resource
> file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar ->
> hdfs://
> 172.31.39.37:8020/user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
> 16/03/01 09:11:56 INFO MetricsSaver: MetricsConfigRecord
> disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec:
> 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500
> lastModified: 1456818856695
> 16/03/01 09:11:56 INFO MetricsSaver: Created MetricsSaver
> j-2FT6QNFSPTHNX:i-5f6bcadb:SparkSubmit:04807 period:60
> /mnt/var/em/raw/i-5f6bcadb_20160301_SparkSubmit_04807_raw.bin
> 16/03/01 09:11:56 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
> could only be replicated to 0 nodes instead of minReplication (=1).  There
> are 0 datanode(s) running and no node(s) are excluded in this operation.
> at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
> at
> 

Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException

2016-03-01 Thread Yuval.Itzchakov
I have a small snippet of code which relays on  argonaut
   for JSON serialization which is ran from a
`PairRDDFunctions.mapWithState` once a session is completed.

This is the code snippet (not that important):

  override def sendMessage(pageView: PageView): Unit = {
Future {
  LogHolder.logger.info(s"Sending pageview: ${pageView.id} to
automation")
  try {
Http(url)
  .postData(pageView.asJson.toString)
  .option(HttpOptions.connTimeout(timeOutMilliseconds))
  .asString
  .throwError
  }
  catch {
case NonFatal(e) => LogHolder.logger.error("Failed to send
pageview", e)
  }
}
  }

argonaut relys on a user implementation of a trait called `EncodeJson[T]`,
which tells argonaut how to serialize and deserialize the object.

The problem is, that the trait `EncodeJson[T]` is not serializable, thus
throwing a NotSerializableException:

Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2
Serialization stack:
- object not serializable (class: argonaut.EncodeJson$$anon$2,
value: argonaut.EncodeJson$$anon$2@6415f61e)

This is obvious and understandable.

The question I have is - What possible ways are there to work around this?
I'm currently depended on a third-party library which I can't control of
change to implement Serializable in anyway. I've seen this  this
StackOverflow answer

  
but couldn't implement any reasonable workaround.

Anyone have any ideas?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Get rid of FileAlreadyExistsError

2016-03-01 Thread Peter Halliday
I haven’t trie spark.hadoop.validateOutputSpecs.  However, it seems that has to 
do with the existence of the output directory itself and not the files.  Maybe 
I’m wrong?

Peter



> On Mar 1, 2016, at 11:53 AM, Sabarish Sasidharan 
>  wrote:
> 
> Have you tried spark.hadoop.validateOutputSpecs?
> 
> On 01-Mar-2016 9:43 pm, "Peter Halliday"  > wrote:
> http://pastebin.com/vbbFzyzb 
> 
> The problem seems to be to be two fold.  First, the ParquetFileWriter in 
> Hadoop allows for an overwrite flag that Spark doesn’t allow to be set.  The 
> second is that the DirectParquetOutputCommitter has an abortTask that’s 
> empty.  I see SPARK-8413 open on this too, but no plans on changing this.  
> I’m surprised not to see this fixed yet.
> 
> Peter Halliday 
> 
> 
> 
>> On Mar 1, 2016, at 10:01 AM, Ted Yu > > wrote:
>> 
>> Do you mind pastebin'ning the stack trace with the error so that we know 
>> which part of the code is under discussion ?
>> 
>> Thanks
>> 
>> On Tue, Mar 1, 2016 at 7:48 AM, Peter Halliday > > wrote:
>> I have a Spark application that has a Task seem to fail, but it actually did 
>> write out some of the files that were assigned it.  And Spark assigns 
>> another executor that task, and it gets a FileAlreadyExistsException.  The 
>> Hadoop code seems to allow for files to be overwritten, but I see the 1.5.1 
>> version of this code doesn’t allow for this to be passed in.  Is that 
>> correct?
>> 
>> Peter Halliday
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
>> 
> 



Re: SPARK SQL HiveContext Error

2016-03-01 Thread Gourav Sengupta
Hi,

FIRST ATTEMPT:
Use build.sbt in IntelliJ and it was giving me nightmares with several
incompatibility and library issues though the sbt version was compliant
with the scala version

SECOND ATTEMPT:
Created a new project with no entries in build.sbt file and imported all
the files in $SPARK_HOME/lib/*jar into the project. This started causing
issues I reported earlier

FINAL ATTEMPT:
removed all the files from the import (removing them from dependencies)
which had the word derby in it and this resolved the issue.

Please note that the following additional jars were included in the library
folder than the ones which are usually supplied with the SPARK distribution:
1. ojdbc7.jar
2. spark-csv***jar file


Regards,
Gourav Sengupta

On Tue, Mar 1, 2016 at 5:19 PM, Gourav Sengupta 
wrote:

> Hi,
>
> I am getting the error  "*java.lang.SecurityException: sealing violation:
> can't seal package org.apache.derby.impl.services.locks: already loaded"*
>   after running the following code in SCALA.
>
> I do not have any other instances of sparkContext running from my system.
>
> I will be grateful for if anyone could kindly help me out.
>
>
> Environment:
> SCALA: 1.6
> OS: MAC OS X
>
> 
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.SQLContext
>
> // Import SuccinctRDD
> import edu.berkeley.cs.succinct._
>
> object test1 {
>   def main(args: Array[String]) {
> //the below line returns nothing
> println(SparkContext.jarOfClass(this.getClass).toString())
> val logFile = "/tmp/README.md" // Should be some file on your system
>
> val conf = new 
> SparkConf().setAppName("IdeaProjects").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val logData = sc.textFile(logFile, 2).cache()
> val numAs = logData.filter(line => line.contains("a")).count()
> val numBs = logData.filter(line => line.contains("b")).count()
> println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
>
>
> // Create a Spark RDD as a collection of articles; ctx is the SparkContext
> val articlesRDD = sc.textFile("/tmp/README.md").map(_.getBytes)
>
> // Compress the Spark RDD into a Succinct Spark RDD, and persist it in 
> memory
> // Note that this is a time consuming step (usually at 8GB/hour/core) 
> since data needs to be compressed.
> // We are actively working on making this step faster.
> val succinctRDD = articlesRDD.succinct.persist()
>
>
> // SuccinctRDD supports a set of powerful primitives directly on 
> compressed RDD
> // Let us start by counting the number of occurrences of "Berkeley" 
> across all Wikipedia articles
> val count = succinctRDD.count("the")
>
> // Now suppose we want to find all offsets in the collection at which 
> ìBerkeleyî occurs; and
> // create an RDD containing all resulting offsets
> val offsetsRDD = succinctRDD.search("and")
>
> // Let us look at the first ten results in the above RDD
> val offsets = offsetsRDD.take(10)
>
> // Finally, let us extract 20 bytes before and after one of the 
> occurrences of ìBerkeleyî
> val offset = offsets(0)
> val data = succinctRDD.extract(offset - 20, 40)
>
> println(data)
> println(">>>")
>
>
> // Create a schema
> val citySchema = StructType(Seq(
>   StructField("Name", StringType, false),
>   StructField("Length", IntegerType, true),
>   StructField("Area", DoubleType, false),
>   StructField("Airport", BooleanType, true)))
>
> // Create an RDD of Rows with some data
> val cityRDD = sc.parallelize(Seq(
>   Row("San Francisco", 12, 44.52, true),
>   Row("Palo Alto", 12, 22.33, false),
>   Row("Munich", 8, 3.14, true)))
>
>
> val hiveContext = new HiveContext(sc)
>
> //val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
>   }
> }
>
>
> -
>
>
>
> Regards,
> Gourav Sengupta
>


Re: Spark streaming from Kafka best fit

2016-03-01 Thread Cody Koeninger
You don't need an equal number of executor cores to partitions.  An
executor can and will work on multiple partitions within a batch, one after
the other.  The real issue is whether you are able to keep your processing
time under your batch time, so that delay doesn't increase.

On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar 
wrote:

> Thanks Cody!
>
> I understand what you said and if I am correct it will be using 224
> executor cores just for fetching + stage-1 processing of 224 partitions. I
> will obviously need more cores for processing further stages and fetching
> next batch.
>
> I will start with higher number of executor cores and see how it goes.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger  wrote:
>
>> > "How do I keep a balance of executors which receive data from Kafka
>> and which process data"
>>
>> I think you're misunderstanding how the direct stream works.  The
>> executor which receives data is also the executor which processes data,
>> there aren't separate receivers.  If it's a single stage worth of work
>> (e.g. straight map / filter), the processing of a given partition is going
>> to be done by the executor that read it from kafka.  If you do something
>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>> processing.  The question of which executor works on which tasks is up to
>> the scheduler (and getPreferredLocations, which only matters if you're
>> running spark on the same nodes as kafka)
>>
>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> Hello all,
>>>
>>> I see that there are as of today 3 ways one can read from Kafka in spark
>>> streaming:
>>> 1. KafkaUtils.createStream() (here
>>> )
>>> 2. KafkaUtils.createDirectStream() (here
>>> )
>>> 3. Kafka-spark-consumer (here
>>> )
>>>
>>> My spark streaming application has to read from 1 kafka topic with
>>> around 224 partitions, consuming data at around 150MB/s (~90,000
>>> messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
>>> filtering. After filtering I need to maintain top 1 URL counts. I don't
>>> really care about exactly once semantics as I am interested in rough
>>> estimate.
>>>
>>> Code:
>>>
>>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>>> sparkConf.setAppName("KafkaReader")
>>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>>> createStreamingContext)
>>>
>>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>> val kafkaParams = Map[String, String](
>>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>>   "group.id" -> consumer_group
>>> )
>>>
>>> val lineStreams = (1 to N).map{ _ =>
>>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>>> }
>>>
>>> ssc.union(
>>>   lineStreams.map(stream => {
>>>   stream.map(ParseStringToLogRecord)
>>> .filter(record => isGoodRecord(record))
>>> .map(record => record.url)
>>>   })
>>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>>> moving window, 28 will probably help in parallelism
>>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>>   .mapPartitions(iter => {
>>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>>> 1000).iterator
>>>   }, true)
>>>   .foreachRDD((latestRDD, rddTime) => {
>>>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>>> record._1)).sortByKey(false).take(1000))
>>>   })
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> Questions:
>>>
>>> a) I used #2 but I found that I couldn't control how many executors will
>>> be actually fetching from Kafka. How do I keep a balance of executors which
>>> receive data from Kafka and which process data? Do they keep changing for
>>> every batch?
>>>
>>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>>> and then doing a union. I don't understand why would the number of events
>>> processed per 120 seconds batch will change drastically. PFA the events/sec
>>> graph while running with 1 receiver. How to debug this?
>>>
>>> c) What will be the most suitable method to integrate with Kafka from
>>> above 3? Any recommendations for getting maximum performance, running the
>>> streaming application reliably in production environment?
>>>
>>> --
>>> Thanks
>>> Jatin Kumar
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org

SparkR Count vs Take performance

2016-03-01 Thread Dirceu Semighini Filho
Hello all.
I have a script that create a dataframe from this operation:

mytable <- sql(sqlContext,("SELECT ID_PRODUCT, ... FROM mytable"))

rSparkDf <- createPartitionedDataFrame(sqlContext,myRdataframe)
dFrame <- join(mytable,rSparkDf,mytable$ID_PRODUCT==rSparkDf$ID_PRODUCT)

After filtering this dFrame with this:


I tried to execute the following
filteredDF <- filterRDD(toRDD(dFrame),function (row) {row['COLUMN'] %in%
c("VALUES", ...)})
Now I need to know if the resulting dataframe is empty, and to do that I
tried this two codes:
if(count(filteredDF) > 0)
and
if(length(take(filteredDF,1)) > 0)
I thought that the second one, using take, shoule run faster than count,
but that didn't happen.
The take operation creates one job per partition of my rdd (which was 200)
and this make it to run slower than the count.
Is this the expected behaviour?

Regards,
Dirceu


Re: Spark streaming from Kafka best fit

2016-03-01 Thread Jatin Kumar
Thanks Cody!

I understand what you said and if I am correct it will be using 224
executor cores just for fetching + stage-1 processing of 224 partitions. I
will obviously need more cores for processing further stages and fetching
next batch.

I will start with higher number of executor cores and see how it goes.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger  wrote:

> > "How do I keep a balance of executors which receive data from Kafka and
> which process data"
>
> I think you're misunderstanding how the direct stream works.  The executor
> which receives data is also the executor which processes data, there aren't
> separate receivers.  If it's a single stage worth of work (e.g. straight
> map / filter), the processing of a given partition is going to be done by
> the executor that read it from kafka.  If you do something involving a
> shuffle (e.g. reduceByKey), other executors will do additional processing.
> The question of which executor works on which tasks is up to the scheduler
> (and getPreferredLocations, which only matters if you're running spark on
> the same nodes as kafka)
>
> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> Hello all,
>>
>> I see that there are as of today 3 ways one can read from Kafka in spark
>> streaming:
>> 1. KafkaUtils.createStream() (here
>> )
>> 2. KafkaUtils.createDirectStream() (here
>> )
>> 3. Kafka-spark-consumer (here
>> )
>>
>> My spark streaming application has to read from 1 kafka topic with around
>> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
>> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
>> filtering I need to maintain top 1 URL counts. I don't really care
>> about exactly once semantics as I am interested in rough estimate.
>>
>> Code:
>>
>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>> sparkConf.setAppName("KafkaReader")
>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>> createStreamingContext)
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>> val kafkaParams = Map[String, String](
>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>   "group.id" -> consumer_group
>> )
>>
>> val lineStreams = (1 to N).map{ _ =>
>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>> }
>>
>> ssc.union(
>>   lineStreams.map(stream => {
>>   stream.map(ParseStringToLogRecord)
>> .filter(record => isGoodRecord(record))
>> .map(record => record.url)
>>   })
>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>> moving window, 28 will probably help in parallelism
>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>   .mapPartitions(iter => {
>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>> 1000).iterator
>>   }, true)
>>   .foreachRDD((latestRDD, rddTime) => {
>>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>> record._1)).sortByKey(false).take(1000))
>>   })
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> Questions:
>>
>> a) I used #2 but I found that I couldn't control how many executors will
>> be actually fetching from Kafka. How do I keep a balance of executors which
>> receive data from Kafka and which process data? Do they keep changing for
>> every batch?
>>
>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>> and then doing a union. I don't understand why would the number of events
>> processed per 120 seconds batch will change drastically. PFA the events/sec
>> graph while running with 1 receiver. How to debug this?
>>
>> c) What will be the most suitable method to integrate with Kafka from
>> above 3? Any recommendations for getting maximum performance, running the
>> streaming application reliably in production environment?
>>
>> --
>> Thanks
>> Jatin Kumar
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


I need some help making datasets with known columns from a JavaBean

2016-03-01 Thread Steve Lewis
I asked a similar question a day or so ago but this is a much more concrete
example showing the difficulty I am running into

I am trying to use DataSets. I have an object which I want to encode with
its fields as columns. The object is a well behaved Java Bean.
However one field is an object (or  a collection of objects) which are not
beans.
My simple code case is like this.
What I want is a DataSet of MyBeans with columns count,name and unBean

/**
 * This class is a good Java bean but one field holds an object
 * which is not a bean
 */
public class MyBean  implements Serializable {
private int m_count;
private String m_Name;
private MyUnBean m_UnBean;

public MyBean(int count, String name, MyUnBean unBean) {
m_count = count;
m_Name = name;
m_UnBean = unBean;
}

public int getCount() {return m_count; }
public void setCount(int count) {m_count = count;}
public String getName() {return m_Name;}
public void setName(String name) {m_Name = name;}
public MyUnBean getUnBean() {return m_UnBean;}
public void setUnBean(MyUnBean unBean) {m_UnBean = unBean;}
}
/**
 * This is a Java object which is not a bean
 * no getters or setters but is serializable
 */
public class MyUnBean implements Serializable {
public final int count;
public final String name;

public MyUnBean(int count, String name) {
this.count = count;
this.name = name;
}
}

**
 * This code creates a list of objects containing MyBean -
 * a Java Bean containing one field which is not bean
 * It then attempts and fails to use a bean encoder
 * to make a DataSet
 */
public class DatasetTest {
public static final Random RND = new Random();
public static final int LIST_SIZE = 100;

public static String makeName() {
return Integer.toString(RND.nextInt());
}

public static MyUnBean makeUnBean() {
return new MyUnBean(RND.nextInt(), makeName());
}

public static MyBean makeBean() {
return new MyBean(RND.nextInt(), makeName(), makeUnBean());
}

/**
 * Make a list of MyBeans
 * @return
 */
public static List makeBeanList() {
List holder = new ArrayList();
for (int i = 0; i < LIST_SIZE; i++) {
holder.add(makeBean());
}
return holder;
}

public static SQLContext getSqlContext() {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("BeanTest") ;
Option option = sparkConf.getOption("spark.master");
if (!option.isDefined())// use local over nothing
sparkConf.setMaster("local[*]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf) ;
return new SQLContext(ctx);
}


public static void main(String[] args) {
SQLContext sqlContext = getSqlContext();

Encoder evidence = Encoders.bean(MyBean.class);
Encoder evidence2 =
Encoders.javaSerialization(MyUnBean.class);

List holder = makeBeanList();
Dataset beanSet  = sqlContext.createDataset( holder,
evidence);

long count = beanSet.count();
if(count != LIST_SIZE)
throw new IllegalStateException("bad count");

}



}



This is the last seacion of the log showing the errors I get

16/03/01 09:21:31 INFO SparkUI: Started SparkUI at http://169.254.87.23:4040
16/03/01 09:21:31 INFO Executor: Starting executor ID driver on host
localhost
16/03/01 09:21:31 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 61922.
16/03/01 09:21:31 INFO NettyBlockTransferService: Server created on 61922
16/03/01 09:21:31 INFO BlockManagerMaster: Trying to register BlockManager
16/03/01 09:21:31 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:61922 with 5.1 GB RAM, BlockManagerId(driver, localhost,
61922)
16/03/01 09:21:31 INFO BlockManagerMaster: Registered BlockManager
Exception in thread "main" java.lang.UnsupportedOperationException: no
encoder found for com.lordjoe.testing.MyUnBean
at
org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor(JavaTypeInference.scala:403)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor$1.apply(JavaTypeInference.scala:400)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor$1.apply(JavaTypeInference.scala:393)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at 

Re: Save DataFrame to Hive Table

2016-03-01 Thread Mich Talebzadeh
Have these files the same schema. Probably yes

Can they be read as an RDD each -> converted to DF and then registered as
temporary tables and a UNION ALL on those temporary tables?

Alternatively if these files have different names, they can be put on the
same HDFS staging sub-directory and read in one go. This is an example for
csv files all in "/data/stg/table2" directory

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")


HTH,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 March 2016 at 17:00,  wrote:

> Good day colleagues. Quick question on Parquet and Dataframes. Right now I
> have the 4 parquet files stored in HDFS under the same path:
>
> /path/to/parquets/parquet1, /path/to/parquets/parquet2,
> /path/to/parquets/parquet3, /path/to/parquets/parquet4…
>
> I want to perform a union on all this parquet files. Is there any other
> way of doing this different to DataFrame’s unionAll?
>
>
>
> Thank you very much in advance.
>
>
>
> Andres Fernandez
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Tuesday, March 01, 2016 1:50 PM
> *To:* Jeff Zhang
> *Cc:* Yogesh Vyas; user@spark.apache.org
> *Subject:* Re: Save DataFrame to Hive Table
>
>
>
> Hi
>
>
>
> It seems that your code is not specifying which database is your table
> created
>
>
>
> Try this
>
>
>
> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> scala> // Choose a database
>
> scala> HiveContext.sql("show databases").show
>
>
>
> scala> HiveContext.sql("use test")  // I chose test database
>
> scala> HiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT,
> value STRING)")
>
> scala> HiveContext.sql("desc TableName").show
> ++-+---+
> |col_name|data_type|comment|
> ++-+---+
> | key|  int|   null|
> |   value|   string|   null|
> ++-+---+
>
>
>
> // create a simple DF
>
>
>
> Seq((1, "Mich"), (2, "James"))
>
> val b = a.toDF
>
>
>
> //Let me keep it simple. Create a temporary table and do a simple
> insert/select. No need to convolute it
>
>
>
> b.registerTempTable("tmp")
>
>
>
> // Rember this temporaryTable is created in sql context NOT HiveContext/
> So HiveContext will NOT see that table
>
> //
>
> HiveContext.sql("INSERT INTO TableName SELECT * FROM tmp")
> org.apache.spark.sql.AnalysisException: no such table tmp; line 1 pos 36
>
>
>
> // This will work
>
>
>
> sql("INSERT INTO TableName SELECT * FROM tmp")
>
>
>
> sql("select count(1) from TableName").show
> +---+
> |_c0|
> +---+
> |  2|
> +---+
>
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
>
> On 1 March 2016 at 06:33, Jeff Zhang  wrote:
>
> The following line does not execute the sql so the table is not created.
> Add .show() at the end to execute the sql.
>
> hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT, value
> STRING)")
>
>
>
> On Tue, Mar 1, 2016 at 2:22 PM, Yogesh Vyas  wrote:
>
> Hi,
>
> I have created a DataFrame in Spark, now I want to save it directly
> into the hive table. How to do it.?
>
> I have created the hive table using following hiveContext:
>
> HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(sc.sc
> ());
> hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key
> INT, value STRING)");
>
> I am using the following to save it into hive:
> DataFrame.write().mode(SaveMode.Append).insertInto("TableName");
>
> But it gives the error:
> Exception in thread "main" java.lang.RuntimeException: Table Not
> Found: TableName
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:139)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:266)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
> at
> 

Re: Save DataFrame to Hive Table

2016-03-01 Thread Silvio Fiorito
Just do:

val df = sqlContext.read.load(“/path/to/parquets/*”)

If you do df.explain it’ll show the multiple input paths.

From: "andres.fernan...@wellsfargo.com" 
>
Date: Tuesday, March 1, 2016 at 12:00 PM
To: "user@spark.apache.org" 
>
Subject: RE: Save DataFrame to Hive Table

Good day colleagues. Quick question on Parquet and Dataframes. Right now I have 
the 4 parquet files stored in HDFS under the same path:
/path/to/parquets/parquet1, /path/to/parquets/parquet2, 
/path/to/parquets/parquet3, /path/to/parquets/parquet4…
I want to perform a union on all this parquet files. Is there any other way of 
doing this different to DataFrame’s unionAll?

Thank you very much in advance.

Andres Fernandez

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Tuesday, March 01, 2016 1:50 PM
To: Jeff Zhang
Cc: Yogesh Vyas; user@spark.apache.org
Subject: Re: Save DataFrame to Hive Table

Hi

It seems that your code is not specifying which database is your table created

Try this

scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> // Choose a database
scala> HiveContext.sql("show databases").show

scala> HiveContext.sql("use test")  // I chose test database
scala> HiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT, value 
STRING)")
scala> HiveContext.sql("desc TableName").show
++-+---+
|col_name|data_type|comment|
++-+---+
| key|  int|   null|
|   value|   string|   null|
++-+---+

// create a simple DF

Seq((1, "Mich"), (2, "James"))
val b = a.toDF

//Let me keep it simple. Create a temporary table and do a simple 
insert/select. No need to convolute it

b.registerTempTable("tmp")

// Rember this temporaryTable is created in sql context NOT HiveContext/ So 
HiveContext will NOT see that table
//
HiveContext.sql("INSERT INTO TableName SELECT * FROM tmp")
org.apache.spark.sql.AnalysisException: no such table tmp; line 1 pos 36

// This will work

sql("INSERT INTO TableName SELECT * FROM tmp")

sql("select count(1) from TableName").show
+---+
|_c0|
+---+
|  2|
+---+

HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 1 March 2016 at 06:33, Jeff Zhang 
> wrote:

The following line does not execute the sql so the table is not created.  Add 
.show() at the end to execute the sql.

hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT, value STRING)")

On Tue, Mar 1, 2016 at 2:22 PM, Yogesh Vyas 
> wrote:
Hi,

I have created a DataFrame in Spark, now I want to save it directly
into the hive table. How to do it.?

I have created the hive table using following hiveContext:

HiveContext hiveContext = new 
org.apache.spark.sql.hive.HiveContext(sc.sc());
hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key
INT, value STRING)");

I am using the following to save it into hive:
DataFrame.write().mode(SaveMode.Append).insertInto("TableName");

But it gives the error:
Exception in thread "main" java.lang.RuntimeException: Table Not
Found: TableName
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:139)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:266)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at 

What version of twitter4j should I use with Spark Streaming?UPDATING thread

2016-03-01 Thread Alonso Isidoro Roman
hi, i read this post recommending to use twitter4j-3.0.3* and it is not
working for me. I want to load this jars within spark-shell without any
lucky. This is the output


   1. *MacBook-Pro-Retina-de-Alonso:spark-1.6 aironman$ ls *jar*
   2. mysql-connector-java-5.1.30.jar twitter4j-async-3.0.3.jar
   twitter4j-examples-3.0.3.jartwitter4j-stream-3.0.3.jar
   3. spark-streaming-twitter_2.10-1.0.0.jar  twitter4j-core-3.0.3.jar
 twitter4j-media-support-3.0.3.jar
   4. *MacBook-Pro-Retina-de-Alonso:spark-1.6 aironman$* bin/spark-shell
   --jars spark-streaming-twitter_2.10-1.0.0.jar twitter4j-async-3.0.3.jar
   twitter4j-core-3.0.3.jar twitter4j-examples-3.0.3.jar
   twitter4j-media-support-3.0.3.jar twitter4j-stream-3.0.3.jar
   5. log4j:WARN No appenders could be found for logger
   (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
   6. log4j:WARN Please initialize the log4j system properly.
   7. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
   for more info.
   8. 2016-03-01 18:00:33.542 java[35711:11354206] Unable to load realm
   info from SCDynamicStore
   9. Using Spark's repl log4j profile:
   org/apache/spark/log4j-defaults-repl.properties
   10. To adjust logging level use sc.setLogLevel("INFO")
   11. error: error while loading , error in opening zip file
   12.
   13. Failed to initialize compiler: object scala.runtime in compiler
   mirror not found.
   14. ** Note that as of 2.8 scala does not assume use of the java
   classpath.
   15. ** For the old behavior pass -usejavacp to scala, or if using a
   Settings
   16. ** object programatically, settings.usejavacp.value = true.
   17. 16/03/01 18:00:34 WARN SparkILoop$SparkILoopInterpreter: Warning:
   compiler accessed before init set up.  Assuming no postInit code.
   18.
   19. Failed to initialize compiler: object scala.runtime in compiler
   mirror not found.
   20. ** Note that as of 2.8 scala does not assume use of the java
   classpath.
   21. ** For the old behavior pass -usejavacp to scala, or if using a
   Settings
   22. ** object programatically, settings.usejavacp.value = true.
   23. Exception in thread "main" java.lang.AssertionError: assertion
   failed: null
   24. at scala.Predef$.assert(Predef.scala:179)
   25. at
   org.apache.spark.repl.SparkIMain.initializeSynchronous(SparkIMain.scala:247)
   26. at
   
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:990)
   27. at
   
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
   28. at
   
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
   29. at
   
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   30. at org.apache.spark.repl.SparkILoop.org
   $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
   31. at
   org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
   32. at org.apache.spark.repl.Main$.main(Main.scala:31)
   33. at org.apache.spark.repl.Main.main(Main.scala)
   34. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   35. at
   sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   36. at
   
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   37. at java.lang.reflect.Method.invoke(Method.java:606)
   38. at
   
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
   39. at
   org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
   40. at
   org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
   41. at
   org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
   42. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   43. MacBook-Pro-Retina-de-Alonso:spark-1.6 aironman$




Do i have to update the jars version? because i see that actual version of
twitter4j is 4.0.4...

thanks

Alonso Isidoro Roman.

Mis citas preferidas (de hoy) :
"Si depurar es el proceso de quitar los errores de software, entonces
programar debe ser el proceso de introducirlos..."
 -  Edsger Dijkstra

My favorite quotes (today):
"If debugging is the process of removing software bugs, then programming
must be the process of putting ..."
  - Edsger Dijkstra

"If you pay peanuts you get monkeys"


RE: Union Parquet, DataFrame

2016-03-01 Thread Andres.Fernandez
Worked perfectly. Thanks very much Silvio.

From: Silvio Fiorito [mailto:silvio.fior...@granturing.com]
Sent: Tuesday, March 01, 2016 2:14 PM
To: Fernandez, Andres; user@spark.apache.org
Subject: Re: Union Parquet, DataFrame

Just replied to your other email, but here’s the same thing:

Just do:

val df = sqlContext.read.load(“/path/to/parquets/*”)

If you do df.explain it’ll show the multiple input paths.

From: "andres.fernan...@wellsfargo.com" 
>
Date: Tuesday, March 1, 2016 at 12:01 PM
To: "user@spark.apache.org" 
>
Subject: Union Parquet, DataFrame

Good day colleagues. Quick question on Parquet and Dataframes. Right now I have 
the 4 parquet files stored in HDFS under the same path:
/path/to/parquets/parquet1, /path/to/parquets/parquet2, 
/path/to/parquets/parquet3, /path/to/parquets/parquet4…
I want to perform a union on all this parquet files. Is there any other way of 
doing this different to DataFrame’s unionAll?

Thank you very much in advance.

Andres Fernandez



Fwd: Starting SPARK application in cluster mode from an IDE

2016-03-01 Thread Gourav Sengupta
Hi,

I will be grateful if someone could kindly respond back to this query.


Thanks and Regards,
Gourav Sengupta

-- Forwarded message --
From: Gourav Sengupta 
Date: Sat, Feb 27, 2016 at 12:39 AM
Subject: Starting SPARK application in cluster mode from an IDE
To: user 


Hi,

The problem description is mentioned below - why should I be able to create
the SPARK application using Python and not SCALA (using an IDE like
IntelliJ or Eclipse)

SPARK Environment:
-
SPARK Version: 1.6.0
OS: MAC OS X 10.11.3
IDE:  IntelliJ


Created a SBT project in IntelliJ using the details in this page:
-
http://spark.apache.org/docs/latest/quick-start.html


The following code in SCALA fails to create an application in locally
running SPARK cluster (set by running ./sbin/start-master.sh and
./sbin/start-slaves.sh):

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf


object test {
  def main(args: Array[String]) {
//the below line returns nothing
println(SparkContext.jarOfClass(this.getClass).toString())
val logFile = "/tmp/README.md" // Should be some file on your system
val conf = new
SparkConf().setAppName("IdeaProjects").setMaster("spark://systemhostname:7077")
//val conf = new
SparkConf().setAppName("IdeaProjects").setMaster("local[*]")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}


The following code runs fine
--

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf


object test {
  def main(args: Array[String]) {
//the below line returns nothing
println(SparkContext.jarOfClass(this.getClass).toString())
val logFile = "/tmp/README.md" // Should be some file on your system
//val conf = new
SparkConf().setAppName("IdeaProjects").setMaster("spark://systemhostname:7077")
val conf = new SparkConf().setAppName("IdeaProjects").setMaster("local[*]")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}


But creating an application using Python works quite fine, as the following
code runs fine:
---
from pyspark import SparkConf, SparkContext
conf =
SparkConf().setMaster("spark://systemhostname:7077").setAppName("test").set("spark.executor.memory",
"1g").set("spark.executor.cores", "2")
conf.getAll()
sc = SparkContext(conf = conf)


Further description and links to this issue is mentioned here:
http://stackoverflow.com/questions/33222045/classnotfoundexception-anonfun-when-deploy-scala-code-to-spark


Thanks and Regards,
Gourav Sengupta


Re: Union Parquet, DataFrame

2016-03-01 Thread Silvio Fiorito
Just replied to your other email, but here’s the same thing:

Just do:

val df = sqlContext.read.load(“/path/to/parquets/*”)

If you do df.explain it’ll show the multiple input paths.

From: "andres.fernan...@wellsfargo.com" 
>
Date: Tuesday, March 1, 2016 at 12:01 PM
To: "user@spark.apache.org" 
>
Subject: Union Parquet, DataFrame

Good day colleagues. Quick question on Parquet and Dataframes. Right now I have 
the 4 parquet files stored in HDFS under the same path:
/path/to/parquets/parquet1, /path/to/parquets/parquet2, 
/path/to/parquets/parquet3, /path/to/parquets/parquet4…
I want to perform a union on all this parquet files. Is there any other way of 
doing this different to DataFrame’s unionAll?

Thank you very much in advance.

Andres Fernandez



SPARK SQL HiveContext Error

2016-03-01 Thread Gourav Sengupta
Hi,

I am getting the error  "*java.lang.SecurityException: sealing violation:
can't seal package org.apache.derby.impl.services.locks: already loaded"*
after running the following code in SCALA.

I do not have any other instances of sparkContext running from my system.

I will be grateful for if anyone could kindly help me out.


Environment:
SCALA: 1.6
OS: MAC OS X



import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext

// Import SuccinctRDD
import edu.berkeley.cs.succinct._

object test1 {
  def main(args: Array[String]) {
//the below line returns nothing
println(SparkContext.jarOfClass(this.getClass).toString())
val logFile = "/tmp/README.md" // Should be some file on your system

val conf = new SparkConf().setAppName("IdeaProjects").setMaster("local[*]")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))


// Create a Spark RDD as a collection of articles; ctx is the SparkContext
val articlesRDD = sc.textFile("/tmp/README.md").map(_.getBytes)

// Compress the Spark RDD into a Succinct Spark RDD, and persist
it in memory
// Note that this is a time consuming step (usually at
8GB/hour/core) since data needs to be compressed.
// We are actively working on making this step faster.
val succinctRDD = articlesRDD.succinct.persist()


// SuccinctRDD supports a set of powerful primitives directly on
compressed RDD
// Let us start by counting the number of occurrences of
"Berkeley" across all Wikipedia articles
val count = succinctRDD.count("the")

// Now suppose we want to find all offsets in the collection at
which ìBerkeleyî occurs; and
// create an RDD containing all resulting offsets
val offsetsRDD = succinctRDD.search("and")

// Let us look at the first ten results in the above RDD
val offsets = offsetsRDD.take(10)

// Finally, let us extract 20 bytes before and after one of the
occurrences of ìBerkeleyî
val offset = offsets(0)
val data = succinctRDD.extract(offset - 20, 40)

println(data)
println(">>>")


// Create a schema
val citySchema = StructType(Seq(
  StructField("Name", StringType, false),
  StructField("Length", IntegerType, true),
  StructField("Area", DoubleType, false),
  StructField("Airport", BooleanType, true)))

// Create an RDD of Rows with some data
val cityRDD = sc.parallelize(Seq(
  Row("San Francisco", 12, 44.52, true),
  Row("Palo Alto", 12, 22.33, false),
  Row("Munich", 8, 3.14, true)))


val hiveContext = new HiveContext(sc)

//val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  }
}


-



Regards,
Gourav Sengupta


Union Parquet, DataFrame

2016-03-01 Thread Andres.Fernandez
Good day colleagues. Quick question on Parquet and Dataframes. Right now I have 
the 4 parquet files stored in HDFS under the same path:
/path/to/parquets/parquet1, /path/to/parquets/parquet2, 
/path/to/parquets/parquet3, /path/to/parquets/parquet4…
I want to perform a union on all this parquet files. Is there any other way of 
doing this different to DataFrame’s unionAll?

Thank you very much in advance.

Andres Fernandez



Re: Does pyspark still lag far behind the Scala API in terms of features

2016-03-01 Thread Jules Damji
Hello Joshua,

comments are inline...

> On Mar 1, 2016, at 5:03 AM, Joshua Sorrell  wrote:
> 
> I haven't used Spark in the last year and a half. I am about to start a 
> project with a new team, and we need to decide whether to use pyspark or 
> Scala.

Indeed, good questions, and they do come up lot in trainings that I have 
attended, where this inevitable question is raised.
I believe, it depends on your level of comfort zone or adventure into newer 
things.

True, for the most part that Apache Spark committers have been committed to 
keep the APIs at parity across all the language offerings, even though in some 
cases, in particular Python, they have lagged by a minor release. To the the 
extent that they’re committed to level-parity is a good sign. It might to be 
the case with some experimental APIs, where they lag behind,  but for the most 
part, they have been admirably consistent. 

With Python there’s a minor performance hit, since there’s an extra level of 
indirection in the architecture and an additional Python PID that the executors 
launch to execute your pickled Python lambdas. Other than that it boils down to 
your comfort zone. I recommend looking at Sameer’s slides on (Advanced Spark 
for DevOps Training) where he walks through the pySpark and Python 
architecture. 
> 
> We are NOT a java shop. So some of the build tools/procedures will require 
> some learning overhead if we go the Scala route. What I want to know is: is 
> the Scala version of Spark still far enough ahead of pyspark to be well worth 
> any initial training overhead?  

If you are a very advanced Python shop and if you’ve in-house libraries that 
you have written in Python that don’t exist in Scala or some ML libs that don’t 
exist in the Scala version and will require fair amount of porting and gap is 
too large, then perhaps it makes sense to stay put with Python.

However, I believe, investing (or having some members of your group) learn and 
invest in Scala is worthwhile for few reasons. One, you will get the 
performance gain, especially now with Tungsten (not sure how it relates to 
Python, but some other knowledgeable people on the list, please chime in). Two, 
since Spark is written in Scala, it gives you an enormous advantage to read 
sources (which are well documented and highly readable) should you have to 
consult or learn nuances of certain API method or action not covered 
comprehensively in the docs. And finally, there’s a long term benefit in 
learning Scala for reasons other than Spark. For example, writing other 
scalable and distributed applications.
> 
> Particularly, we will be using Spark Streaming. I know a couple of years ago 
> that practically forced the decision to use Scala.  Is this still the case?

You’ll notice that certain APIs call are not available, at least for now, in 
Python. http://spark.apache.org/docs/latest/streaming-programming-guide.html 



Cheers
Jules

--
The Best Ideas Are Simple
Jules S. Damji
e-mail:dmat...@comcast.net
e-mail:jules.da...@gmail.com



RE: Save DataFrame to Hive Table

2016-03-01 Thread Andres.Fernandez
Good day colleagues. Quick question on Parquet and Dataframes. Right now I have 
the 4 parquet files stored in HDFS under the same path:
/path/to/parquets/parquet1, /path/to/parquets/parquet2, 
/path/to/parquets/parquet3, /path/to/parquets/parquet4…
I want to perform a union on all this parquet files. Is there any other way of 
doing this different to DataFrame’s unionAll?

Thank you very much in advance.

Andres Fernandez

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Tuesday, March 01, 2016 1:50 PM
To: Jeff Zhang
Cc: Yogesh Vyas; user@spark.apache.org
Subject: Re: Save DataFrame to Hive Table

Hi

It seems that your code is not specifying which database is your table created

Try this

scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> // Choose a database
scala> HiveContext.sql("show databases").show

scala> HiveContext.sql("use test")  // I chose test database
scala> HiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT, value 
STRING)")
scala> HiveContext.sql("desc TableName").show
++-+---+
|col_name|data_type|comment|
++-+---+
| key|  int|   null|
|   value|   string|   null|
++-+---+

// create a simple DF

Seq((1, "Mich"), (2, "James"))
val b = a.toDF

//Let me keep it simple. Create a temporary table and do a simple 
insert/select. No need to convolute it

b.registerTempTable("tmp")

// Rember this temporaryTable is created in sql context NOT HiveContext/ So 
HiveContext will NOT see that table
//
HiveContext.sql("INSERT INTO TableName SELECT * FROM tmp")
org.apache.spark.sql.AnalysisException: no such table tmp; line 1 pos 36

// This will work

sql("INSERT INTO TableName SELECT * FROM tmp")

sql("select count(1) from TableName").show
+---+
|_c0|
+---+
|  2|
+---+

HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 1 March 2016 at 06:33, Jeff Zhang 
> wrote:

The following line does not execute the sql so the table is not created.  Add 
.show() at the end to execute the sql.

hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT, value STRING)")

On Tue, Mar 1, 2016 at 2:22 PM, Yogesh Vyas 
> wrote:
Hi,

I have created a DataFrame in Spark, now I want to save it directly
into the hive table. How to do it.?

I have created the hive table using following hiveContext:

HiveContext hiveContext = new 
org.apache.spark.sql.hive.HiveContext(sc.sc());
hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key
INT, value STRING)");

I am using the following to save it into hive:
DataFrame.write().mode(SaveMode.Append).insertInto("TableName");

But it gives the error:
Exception in thread "main" java.lang.RuntimeException: Table Not
Found: TableName
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:139)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:266)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at 

Real time anomaly system

2016-03-01 Thread Priya Ch
Hi,

  I am trying to build real time anomaly detection system using Spark,
kafka, Cassandra and Akka. I have network intrusion dataset (KDD 1999 cup).
how can i build the system using this ? I understood that certain part of
the data, I am considering as historical data for my model training and
other data, I would simulate as stream of data coming through kafka.

Should I use spark streaming for re-training the model on incoming stream ?

How can I use Akka in this for alerting purpose ?


Re: Save DataFrame to Hive Table

2016-03-01 Thread Mich Talebzadeh
Hi

It seems that your code is not specifying which database is your table
created

Try this

scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> // Choose a database
scala> HiveContext.sql("show databases").show

scala> HiveContext.sql("use test")  // I chose test database
scala> HiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT,
value STRING)")
scala> HiveContext.sql("desc TableName").show
++-+---+
|col_name|data_type|comment|
++-+---+
| key|  int|   null|
|   value|   string|   null|
++-+---+

// create a simple DF

Seq((1, "Mich"), (2, "James"))
val b = a.toDF

//Let me keep it simple. Create a temporary table and do a simple
insert/select. No need to convolute it

b.registerTempTable("tmp")

// Rember this temporaryTable is created in sql context NOT HiveContext/ So
HiveContext will NOT see that table
//
HiveContext.sql("INSERT INTO TableName SELECT * FROM tmp")
org.apache.spark.sql.AnalysisException: no such table tmp; line 1 pos 36

// This will work

sql("INSERT INTO TableName SELECT * FROM tmp")

sql("select count(1) from TableName").show
+---+
|_c0|
+---+
|  2|
+---+

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 March 2016 at 06:33, Jeff Zhang  wrote:

> The following line does not execute the sql so the table is not created.
> Add .show() at the end to execute the sql.
>
> hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT, value
> STRING)")
>
> On Tue, Mar 1, 2016 at 2:22 PM, Yogesh Vyas  wrote:
>
>> Hi,
>>
>> I have created a DataFrame in Spark, now I want to save it directly
>> into the hive table. How to do it.?
>>
>> I have created the hive table using following hiveContext:
>>
>> HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(sc.sc
>> ());
>> hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key
>> INT, value STRING)");
>>
>> I am using the following to save it into hive:
>> DataFrame.write().mode(SaveMode.Append).insertInto("TableName");
>>
>> But it gives the error:
>> Exception in thread "main" java.lang.RuntimeException: Table Not
>> Found: TableName
>> at scala.sys.package$.error(package.scala:27)
>> at
>> org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:139)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:266)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
>> at
>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
>> at
>> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
>> at scala.collection.immutable.List.foldLeft(List.scala:84)
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:918)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:917)
>> at
>> 

Spark Submit using Convert to Marthon REST API

2016-03-01 Thread Ashish Soni
Hi All ,

Can some one please help me how do i translate below spark submit to
marathon JSON request

docker run -it --rm -e SPARK_MASTER="mesos://10.0.2.15:5050"  -e
SPARK_IMAGE="spark_driver:latest" spark_driver:latest
/opt/spark/bin/spark-submit  --name "PI Example" --class
org.apache.spark.examples.SparkPi --driver-memory 1g --executor-memory 1g
--executor-cores 1 /opt/spark/lib/spark-examples-1.6.0-hadoop2.6.0.jar

Thanks,


performance of personalized page rank

2016-03-01 Thread Cesar Flores
I would like to know if someone can help me with the next question. I have
a network of around ~5 billion edges and ~500 million nodes. I need to
generate a personalized page rank for each node of my network.

How many executors do you think I may need to execute this task in a
reasonable amount of time (i.e. less than 12 hours).


Best
-- 
Cesar Flores


Re: Recommendation for a good book on Spark, beginner to moderate knowledge

2016-03-01 Thread Robin East
Mohammed and I both obviously have a certain bias here but I have to agree with 
him - the documentation is pretty good but other sources are necessary to 
supplement. (Good) books are a curated source of information that can short-cut 
a lot of the learning. 
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 1 Mar 2016, at 16:13, Mohammed Guller  wrote:
> 
> I agree that the Spark official documentation is pretty good. However, a book 
> also serves a useful purpose. It provides a structured roadmap for learning a 
> new technology. Everything is nicely organized for the reader. For somebody 
> who has just started learning Spark, the amount of material on the Internet 
> can be overwhelming. There are ton of blogs and presentations on the 
> Internet. A beginner could easily spend months reading them and still be 
> lost. If you are experienced, it is easy to figure out what to read and what 
> to skip.
>  
> I also agree that a book becomes outdated at some point, but not right away. 
> For example, a book covering DataFrames and Spark ML is not outdated yet.
>  
> Mohammed
> Author: Big Data Analytics with Spark 
> 
>  
> From: charles li [mailto:charles.up...@gmail.com 
> ] 
> Sent: Monday, February 29, 2016 1:39 AM
> To: Ashok Kumar
> Cc: User
> Subject: Re: Recommendation for a good book on Spark, beginner to moderate 
> knowledge
>  
> since spark is under actively developing, so take a book to learn it is 
> somehow outdated to some degree.
>  
> I would like to suggest learn it from several ways as bellow:
>  
> spark official document, trust me, you will go through this for several time 
> if you want to learn in well : http://spark.apache.org/ 
> 
> spark summit, lots of videos and slide, high quality : 
> https://spark-summit.org/ 
> databricks' blog : https://databricks.com/blog 
> attend spark meetup : http://www.meetup.com/ 
> try spark 3-party package if needed and convenient : 
> http://spark-packages.org/ 
> and I just start to blog my spark learning memo on my blog: 
> http://litaotao.github.io  
>  
> in a word, I think the best way to learn it is official document + databricks 
> blog + others' blog ===>>> your blog [ tutorial by you or just memo for your 
> learning ]
>  
> On Mon, Feb 29, 2016 at 4:50 PM, Ashok Kumar  > wrote:
> Thank you all for valuable advice. Much appreciated
>  
> Best
>  
> 
> On Sunday, 28 February 2016, 21:48, Ashok Kumar  > wrote:
>  
> 
>   Hi Gurus,
>  
> Appreciate if you recommend me a good book on Spark or documentation for 
> beginner to moderate knowledge
>  
> I very much like to skill myself on transformation and action methods.
>  
> FYI, I have already looked at examples on net. However, some of them not 
> clear at least to me.
>  
> Warmest regards
>  
> 
> 
> 
>  
> -- 
> --
> a spark lover, a quant, a developer and a good man.
>  
> http://github.com/litaotao 


RE: Recommendation for a good book on Spark, beginner to moderate knowledge

2016-03-01 Thread Mohammed Guller
I agree that the Spark official documentation is pretty good. However, a book 
also serves a useful purpose. It provides a structured roadmap for learning a 
new technology. Everything is nicely organized for the reader. For somebody who 
has just started learning Spark, the amount of material on the Internet can be 
overwhelming. There are ton of blogs and presentations on the Internet. A 
beginner could easily spend months reading them and still be lost. If you are 
experienced, it is easy to figure out what to read and what to skip.

I also agree that a book becomes outdated at some point, but not right away. 
For example, a book covering DataFrames and Spark ML is not outdated yet.

Mohammed
Author: Big Data Analytics with 
Spark

From: charles li [mailto:charles.up...@gmail.com]
Sent: Monday, February 29, 2016 1:39 AM
To: Ashok Kumar
Cc: User
Subject: Re: Recommendation for a good book on Spark, beginner to moderate 
knowledge

since spark is under actively developing, so take a book to learn it is somehow 
outdated to some degree.

I would like to suggest learn it from several ways as bellow:


  *   spark official document, trust me, you will go through this for several 
time if you want to learn in well : http://spark.apache.org/
  *   spark summit, lots of videos and slide, high quality : 
https://spark-summit.org/
  *   databricks' blog : https://databricks.com/blog
  *   attend spark meetup : http://www.meetup.com/
  *   try spark 3-party package if needed and convenient : 
http://spark-packages.org/
  *   and I just start to blog my spark learning memo on my blog: 
http://litaotao.github.io

in a word, I think the best way to learn it is official document + databricks 
blog + others' blog ===>>> your blog [ tutorial by you or just memo for your 
learning ]

On Mon, Feb 29, 2016 at 4:50 PM, Ashok Kumar 
> wrote:
Thank you all for valuable advice. Much appreciated

Best

On Sunday, 28 February 2016, 21:48, Ashok Kumar 
> wrote:

  Hi Gurus,

Appreciate if you recommend me a good book on Spark or documentation for 
beginner to moderate knowledge

I very much like to skill myself on transformation and action methods.

FYI, I have already looked at examples on net. However, some of them not clear 
at least to me.

Warmest regards




--
--
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: [Proposal] Enabling time series analysis on spark metrics

2016-03-01 Thread Karan Kumar
+dev mailing list

Time series analysis on metrics becomes quite useful when running spark
jobs using a workflow manager like oozie.

Would love to take this up if the community thinks its worthwhile.

On Tue, Feb 23, 2016 at 2:59 PM, Karan Kumar 
wrote:

> HI
>
> Spark at the moment uses application ID to report metrics. I was thinking
> that if we can create an option to export metrics on a user-controlled key.
> This will allow us to do time series analysis on counters by dumping these
> counters in a DB such as graphite.
>
> One of the approaches I had in mind was allowing a user to set a property
> via the spark client. If that property is set, use the property value to
> report metrics else use the current implementation
> of
> reporting metrics on appid.
>
> Thoughts?
>
> --
> Thanks
> Karan
>



-- 
Thanks
Karan


Re: Get rid of FileAlreadyExistsError

2016-03-01 Thread Peter Halliday
http://pastebin.com/vbbFzyzb

The problem seems to be to be two fold.  First, the ParquetFileWriter in Hadoop 
allows for an overwrite flag that Spark doesn’t allow to be set.  The second is 
that the DirectParquetOutputCommitter has an abortTask that’s empty.  I see 
SPARK-8413 open on this too, but no plans on changing this.  I’m surprised not 
to see this fixed yet.

Peter Halliday 



> On Mar 1, 2016, at 10:01 AM, Ted Yu  wrote:
> 
> Do you mind pastebin'ning the stack trace with the error so that we know 
> which part of the code is under discussion ?
> 
> Thanks
> 
> On Tue, Mar 1, 2016 at 7:48 AM, Peter Halliday  > wrote:
> I have a Spark application that has a Task seem to fail, but it actually did 
> write out some of the files that were assigned it.  And Spark assigns another 
> executor that task, and it gets a FileAlreadyExistsException.  The Hadoop 
> code seems to allow for files to be overwritten, but I see the 1.5.1 version 
> of this code doesn’t allow for this to be passed in.  Is that correct?
> 
> Peter Halliday
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Spark 1.5 on Mesos

2016-03-01 Thread Ashish Soni
Not sure what is the issue but i am getting below error  when i try to run
spark PI example

Blacklisting Mesos slave value: "5345asdasdasdkas234234asdasdasdasd"
   due to too many failures; is Spark installed on it?
WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered
and have sufficient resources


On Mon, Feb 29, 2016 at 1:39 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> May be the Mesos executor couldn't find spark image or the constraints are
> not satisfied. Check your Mesos UI if you see Spark application in the
> Frameworks tab
>
> On Mon, Feb 29, 2016 at 12:23 PM Ashish Soni 
> wrote:
>
>> What is the Best practice , I have everything running as docker container
>> in single host ( mesos and marathon also as docker container )  and
>> everything comes up fine but when i try to launch the spark shell i get
>> below error
>>
>>
>> SQL context available as sqlContext.
>>
>> scala> val data = sc.parallelize(1 to 100)
>> data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>> parallelize at :27
>>
>> scala> data.count
>> [Stage 0:>  (0 +
>> 0) / 2]16/02/29 18:21:12 WARN TaskSchedulerImpl: Initial job has not
>> accepted any resources; check your cluster UI to ensure that workers are
>> registered and have sufficient resources
>> 16/02/29 18:21:27 WARN TaskSchedulerImpl: Initial job has not accepted
>> any resources; check your cluster UI to ensure that workers are registered
>> and have sufficient resources
>>
>>
>>
>> On Mon, Feb 29, 2016 at 12:04 PM, Tim Chen  wrote:
>>
>>> No you don't have to run Mesos in docker containers to run Spark in
>>> docker containers.
>>>
>>> Once you have Mesos cluster running you can then specfiy the Spark
>>> configurations in your Spark job (i.e: 
>>> spark.mesos.executor.docker.image=mesosphere/spark:1.6)
>>> and Mesos will automatically launch docker containers for you.
>>>
>>> Tim
>>>
>>> On Mon, Feb 29, 2016 at 7:36 AM, Ashish Soni 
>>> wrote:
>>>
 Yes i read that and not much details here.

 Is it true that we need to have spark installed on each mesos docker
 container ( master and slave ) ...

 Ashish

 On Fri, Feb 26, 2016 at 2:14 PM, Tim Chen  wrote:

> https://spark.apache.org/docs/latest/running-on-mesos.html should be
> the best source, what problems were you running into?
>
> Tim
>
> On Fri, Feb 26, 2016 at 11:06 AM, Yin Yang  wrote:
>
>> Have you read this ?
>> https://spark.apache.org/docs/latest/running-on-mesos.html
>>
>> On Fri, Feb 26, 2016 at 11:03 AM, Ashish Soni 
>> wrote:
>>
>>> Hi All ,
>>>
>>> Is there any proper documentation as how to run spark on mesos , I
>>> am trying from the last few days and not able to make it work.
>>>
>>> Please help
>>>
>>> Ashish
>>>
>>
>>
>

>>>
>>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-03-01 Thread Sachin Aggarwal
hi,

I used this code for graceful shutdown of my streaming app, this may not be
the best way. correct me

sys.ShutdownHookThread {
  println("Gracefully stopping Spark Streaming Application")
  ssc.stop(true, true)
  println("Application stopped")
}

class StopContextThread(ssc: StreamingContext) extends Runnable {
  def run {
ssc.stop(true, true)
  }
}

and i use this whenever i want to start graceful shutdown

val thread: Thread = new Thread(new StopContextThread(ssc))

thread.start


On Tue, Mar 1, 2016 at 2:32 PM, Lars Albertsson  wrote:

> If you wait for an inactivity period before ending a test case, you
> get the choice of using a long timeout, resulting in slow tests, or a
> short timeout, resulting in brittle tests. Both options will make
> developers waste time, and harm developer productivity.
>
> I suggest that you terminate the test case based on the test predicate
> getting fulfilled, with a long timeout in case of test failure. I
> presume that your application produces output to a channel, e.g.
> database or Kafka topic, which the test oracle can inspect for test
> completeness.
>
> The flow becomes:
>
> 1. Start the stream source component and output component locally,
> e.g. Kafka+Cassandra
> 2. Start the Spark streaming application, typically with a local master.
> 3. Feed the input into the stream source, e.g. a Kafka test topic.
> 4. Let the test oracle loop, polling for a condition to be met on the
> output, e.g. existence of a database entry or a message on the Kafka
> output topic. Sleep for a short period (10 ms) between polls, and fail
> the test if the condition is not met after a long time (30 s).
> 5. Terminate the streaming application.
> 6. Terminate the stream source and output components.
>
> I have used this strategy for testing both financial transaction
> systems and spark streaming applications, and it has resulted in both
> fast and reliable tests, without strong coupling between production
> code and test code.
>
> The base strategy fails if you need to test for the absence of an
> output event, e.g. when my streaming event sees message X, it should
> filter it and not produce output. You then need to send another input
> event (poison pill pattern), and terminate on the output effects of
> the poison pill event.
>
> If you test with multiple streaming executors, remember that there are
> no order guarantees between executors; you will need to either make
> sure that all executors receive poison pills, and test for the pill
> effects of all of them.
>
> Starting the source+output components and the Spark context can be
> slow. I recommend that you provide the option to reuse the test
> fixture between test cases for speed. For example, if you start and
> stop the fixture once for each test class, rather than once per test
> method, you save a lot of time.
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
>
>
> On Fri, Feb 26, 2016 at 8:24 AM, Mao, Wei  wrote:
> > I would argue against making it configurable unless there is real
> production
> > use case. If it’s just for test, there are bunch of ways to achieve it.
> For
> > example, you can mark if test streaming is finished globally, and stop
> ssc
> > on another thread when status of that mark changed.
> >
> >
> >
> > Back to  original exception, blindly calling “Option.get” is always not a
> > good practice. It would be better to pre-validate or use
> > getOption/getOrElse.
> >
> >
> >
> > Thanks,
> >
> > William
> >
> >
> >
> > From: Cheng, Hao [mailto:hao.ch...@intel.com]
> > Sent: Thursday, February 25, 2016 1:03 AM
> > To: Daniel Siegmann; Ashutosh Kumar
> > Cc: Hemant Bhanawat; Ted Yu; Femi Anthony; user
> > Subject: RE: Spark Streaming - graceful shutdown when stream has no more
> > data
> >
> >
> >
> > This is very interesting, how to shutdown the streaming job gracefully
> once
> > no input data for some time.
> >
> >
> >
> > A doable solution probably you can count the input data by using the
> > Accumulator, and anther thread (in master node) will always to get the
> > latest accumulator value, if there is no value change from the
> accumulator
> > for sometime, then shutdown the streaming job.
> >
> >
> >
> > From: Daniel Siegmann [mailto:daniel.siegm...@teamaol.com]
> > Sent: Wednesday, February 24, 2016 12:30 AM
> > To: Ashutosh Kumar 
> > Cc: Hemant Bhanawat ; Ted Yu  >;
> > Femi Anthony ; user 
> > Subject: Re: Spark Streaming - graceful shutdown when stream has no more
> > data
> >
> >
> >
> > During testing you will typically be using some finite data. You want the
> > stream to shut down automatically when that data has been consumed so
> your
> > test shuts down gracefully.
> >
> > Of course once the code is running in production you'll want it to keep
> > waiting for new records. So 

Re: Get rid of FileAlreadyExistsError

2016-03-01 Thread Ted Yu
Do you mind pastebin'ning the stack trace with the error so that we know
which part of the code is under discussion ?

Thanks

On Tue, Mar 1, 2016 at 7:48 AM, Peter Halliday  wrote:

> I have a Spark application that has a Task seem to fail, but it actually
> did write out some of the files that were assigned it.  And Spark assigns
> another executor that task, and it gets a FileAlreadyExistsException.  The
> Hadoop code seems to allow for files to be overwritten, but I see the 1.5.1
> version of this code doesn’t allow for this to be passed in.  Is that
> correct?
>
> Peter Halliday
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Get rid of FileAlreadyExistsError

2016-03-01 Thread Peter Halliday
I have a Spark application that has a Task seem to fail, but it actually did 
write out some of the files that were assigned it.  And Spark assigns another 
executor that task, and it gets a FileAlreadyExistsException.  The Hadoop code 
seems to allow for files to be overwritten, but I see the 1.5.1 version of this 
code doesn’t allow for this to be passed in.  Is that correct?

Peter Halliday
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: local class incompatible: stream classdesc

2016-03-01 Thread Ted Yu
RDD serialized by one release of Spark is not guaranteed to be readable by
another release of Spark.

Please check whether there are mixed Spark versions.

FYI:
http://stackoverflow.com/questions/10378855/java-io-invalidclassexception-local-class-incompatible


On Tue, Mar 1, 2016 at 7:35 AM, Nesrine BEN MUSTAPHA <
nesrine.benmusta...@gmail.com> wrote:

> Hi,
>
> I installed a standalone spark cluster with two workers. I developed a
> Java Application that use the maven dependency of spark (same version as
> the spark cluster).
>
> In my class Spark jobs I have only two methods considered as two different
> jobs:
>
> the first one is the example of spark word count as follows:
>
> *public static List>
> sparkWordCount(final String inputFilePath, final String outputFilePath, final 
> String masterUrl)
> {*
>
> *final SparkConf conf = new SparkConf();*
>
> *conf.set("spark.ui.port", "7077");*
>
> *conf.setAppName("My First Agent Spark Application");*
>
> *final String[] jars = {*
>
> *"target/spark-1.0-SNAPSHOT-jar-with-dependencies.jar"*
>
> *};*
>
> *conf.setJars(jars);*
>
>
> *conf.set("spark.eventLog.dir", "/Users/nbs/spark-1.6.0/tmp/spark-events");*
>
> *conf.set("spark.eventLog.enabled", "true");*
>
> *conf.setSparkHome("/Users/nbs/spark-1.6.0/");*
>
> *conf.setMaster("spark://Nesrines-Mac-mini.local:7077");*
>
> *final JavaSparkContext sc = new JavaSparkContext(conf);*
>
> *try {*
>
> *final JavaRDD rdd = sc.textFile(inputPath);*
>
> *final JavaPairRDD Integer> counts = rdd.flatMap(new FlatMapFunction() {*
>
> *@Override*
>
> *public Iterable call(final String x) {*
>
> *return Arrays.asList(x.split(" "));*
>
> *}*
>
> *}).mapToPair(new PairFunction() {*
>
> *@Override*
>
> *public Tuple2 call(final String x) {*
>
> *return new Tuple2<>(x, 1);*
>
> *}*
>
> *}).reduceByKey(new Function2() {*
>
> *@Override*
>
> *public Integer call(final Integer x, final Integer y) {*
>
> *return x + y;*
>
> *}*
>
> *});*
>
> *// counts.saveAsTextFile(outputPath);*
>
> *final List> results = counts.collect();*
>
> *return results;*
>
> *} finally {*
>
> *sc.close();*
>
> *}*
>
> *}*
>
>
> When I try to execute this job and try to get the answer. an exception is
> invoked when the collect() instruction is called.
>
> PS: others jobs are executed without problems.
>
> Thanks,
>
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 0.0 (TID 5, 172.22.142.19): java.io.InvalidClassException:
> com.intrinsec.ict.common.spark.SparkJobs$1; local class incompatible:
> stream classdesc serialVersionUID = -1239009763274695582, local class
> serialVersionUID = -88119554882139439
>
> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
>
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
>
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at 

Re: Spark Streaming: java.lang.StackOverflowError

2016-03-01 Thread Cody Koeninger
What code is triggering the stack overflow?

On Mon, Feb 29, 2016 at 11:13 PM, Vinti Maheshwari 
wrote:

> Hi All,
>
> I am getting below error in spark-streaming application, i am using kafka
> for input stream. When i was doing with socket, it was working fine. But
> when i changed to kafka it's giving error. Anyone has idea why it's
> throwing error, do i need to change my batch time and check pointing time?
>
>
>
> *ERROR StreamingContext: Error starting the context, marking it as
> stoppedjava.lang.StackOverflowError*
>
> My program:
>
> def main(args: Array[String]): Unit = {
>
> // Function to create and setup a new StreamingContext
> def functionToCreateContext(): StreamingContext = {
>   val conf = new SparkConf().setAppName("HBaseStream")
>   val sc = new SparkContext(conf)
>   // create a StreamingContext, the main entry point for all streaming 
> functionality
>   val ssc = new StreamingContext(sc, Seconds(5))
>   val brokers = args(0)
>   val topics= args(1)
>   val topicsSet = topics.split(",").toSet
>   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>   val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
> ssc, kafkaParams, topicsSet)
>
>   val inputStream = messages.map(_._2)
> //val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
>   ssc.checkpoint(checkpointDirectory)
>   inputStream.print(1)
>   val parsedStream = inputStream
> .map(line => {
>   val splitLines = line.split(",")
>   (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
> })
>   import breeze.linalg.{DenseVector => BDV}
>   import scala.util.Try
>
>   val state: DStream[(String, Array[Long])] = 
> parsedStream.updateStateByKey(
> (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>   prev.map(_ +: current).orElse(Some(current))
> .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
> })
>
>   state.checkpoint(Duration(1))
>   state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>   ssc
> }
> // Get StreamingContext from checkpoint data or create a new one
> val context = StreamingContext.getOrCreate(checkpointDirectory, 
> functionToCreateContext _)
>   }
> }
>
> Regards,
> ~Vinti
>


local class incompatible: stream classdesc

2016-03-01 Thread Nesrine BEN MUSTAPHA
Hi,

I installed a standalone spark cluster with two workers. I developed a Java
Application that use the maven dependency of spark (same version as the
spark cluster).

In my class Spark jobs I have only two methods considered as two different
jobs:

the first one is the example of spark word count as follows:

*public static List>
sparkWordCount(final String inputFilePath, final String
outputFilePath, final String masterUrl)
{*

*final SparkConf conf = new SparkConf();*

*conf.set("spark.ui.port", "7077");*

*conf.setAppName("My First Agent Spark Application");*

*final String[] jars = {*

*"target/spark-1.0-SNAPSHOT-jar-with-dependencies.jar"*

*};*

*conf.setJars(jars);*

*conf.set("spark.eventLog.dir", "/Users/nbs/spark-1.6.0/tmp/spark-events");*

*conf.set("spark.eventLog.enabled", "true");*

*conf.setSparkHome("/Users/nbs/spark-1.6.0/");*

*conf.setMaster("spark://Nesrines-Mac-mini.local:7077");*

*final JavaSparkContext sc = new JavaSparkContext(conf);*

*try {*

*final JavaRDD rdd = sc.textFile(inputPath);*

*final JavaPairRDD counts = rdd.flatMap(new FlatMapFunction() {*

*@Override*

*public Iterable call(final String x) {*

*return Arrays.asList(x.split(" "));*

*}*

*}).mapToPair(new PairFunction() {*

*@Override*

*public Tuple2 call(final String x) {*

*return new Tuple2<>(x, 1);*

*}*

*}).reduceByKey(new Function2() {*

*@Override*

*public Integer call(final Integer x, final Integer y) {*

*return x + y;*

*}*

*});*

*// counts.saveAsTextFile(outputPath);*

*final List> results = counts.collect();*

*return results;*

*} finally {*

*sc.close();*

*}*

*}*


When I try to execute this job and try to get the answer. an exception is
invoked when the collect() instruction is called.

PS: others jobs are executed without problems.

Thanks,




org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
0.0 (TID 5, 172.22.142.19): java.io.InvalidClassException:
com.intrinsec.ict.common.spark.SparkJobs$1; local class incompatible:
stream classdesc serialVersionUID = -1239009763274695582, local class
serialVersionUID = -88119554882139439

at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)

at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)

at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)


Re: Spark UI standalone "crashes" after an application finishes

2016-03-01 Thread Sumona Routh
Thanks Shixiong!
To clarify for others, yes, I was speaking of the UI at port 4040, and I do
have event logging enabled, so I can review jobs after the fact. We hope to
upgrade our version of Spark soon, so I'll write back if that resolves it.

Sumona

On Mon, Feb 29, 2016 at 8:27 PM Sea <261810...@qq.com> wrote:

> Hi, Sumona:
>   It's a bug in Spark old version, In spark 1.6.0, it is fixed.
>   After the application complete, spark master will load event log to
> memory, and it is sync because of actor. If the event log is big, spark
> master will hang a long time, and you can not submit any applications, if
> your master memory is to small, you master will die!
>   The solution in spark 1.6 is not very good, the operation is async
> ,
> and so you still need to set a big java heap for master.
>
>
>
> -- 原始邮件 --
> *发件人:* "Shixiong(Ryan) Zhu";;
> *发送时间:* 2016年3月1日(星期二) 上午8:02
> *收件人:* "Sumona Routh";
> *抄送:* "user@spark.apache.org";
> *主题:* Re: Spark UI standalone "crashes" after an application finishes
>
> Do you mean you cannot access Master UI after your application completes?
> Could you check the master log?
>
> On Mon, Feb 29, 2016 at 3:48 PM, Sumona Routh  wrote:
>
>> Hi there,
>> I've been doing some performance tuning of our Spark application, which
>> is using Spark 1.2.1 standalone. I have been using the spark metrics to
>> graph out details as I run the jobs, as well as the UI to review the tasks
>> and stages.
>>
>> I notice that after my application completes, or is near completion, the
>> UI "crashes." I get a Connection Refused response. Sometimes, the page
>> eventually recovers and will load again, but sometimes I end up having to
>> restart the Spark master to get it back. When I look at my graphs on the
>> app, the memory consumption (of driver, executors, and what I believe to be
>> the daemon (spark.jvm.total.used)) appears to be healthy. Monitoring the
>> master machine itself, memory and CPU appear healthy as well.
>>
>> Has anyone else seen this issue? Are there logs for the UI itself, and
>> where might I find those?
>>
>> Thanks!
>> Sumona
>>
>
>


Re: Spark streaming from Kafka best fit

2016-03-01 Thread Cody Koeninger
> "How do I keep a balance of executors which receive data from Kafka and
which process data"

I think you're misunderstanding how the direct stream works.  The executor
which receives data is also the executor which processes data, there aren't
separate receivers.  If it's a single stage worth of work (e.g. straight
map / filter), the processing of a given partition is going to be done by
the executor that read it from kafka.  If you do something involving a
shuffle (e.g. reduceByKey), other executors will do additional processing.
The question of which executor works on which tasks is up to the scheduler
(and getPreferredLocations, which only matters if you're running spark on
the same nodes as kafka)

On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello all,
>
> I see that there are as of today 3 ways one can read from Kafka in spark
> streaming:
> 1. KafkaUtils.createStream() (here
> )
> 2. KafkaUtils.createDirectStream() (here
> )
> 3. Kafka-spark-consumer (here
> )
>
> My spark streaming application has to read from 1 kafka topic with around
> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
> filtering I need to maintain top 1 URL counts. I don't really care
> about exactly once semantics as I am interested in rough estimate.
>
> Code:
>
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
> sparkConf.setAppName("KafkaReader")
> val ssc = StreamingContext.getOrCreate(kCheckPointDir, createStreamingContext)
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> val kafkaParams = Map[String, String](
>   "metadata.broker.list" -> "kafka.server.ip:9092",
>   "group.id" -> consumer_group
> )
>
> val lineStreams = (1 to N).map{ _ =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
> }
>
> ssc.union(
>   lineStreams.map(stream => {
>   stream.map(ParseStringToLogRecord)
> .filter(record => isGoodRecord(record))
> .map(record => record.url)
>   })
> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute moving 
> window, 28 will probably help in parallelism
>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>   .mapPartitions(iter => {
> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
> 1000).iterator
>   }, true)
>   .foreachRDD((latestRDD, rddTime) => {
>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
> record._1)).sortByKey(false).take(1000))
>   })
>
> ssc.start()
> ssc.awaitTermination()
>
> Questions:
>
> a) I used #2 but I found that I couldn't control how many executors will
> be actually fetching from Kafka. How do I keep a balance of executors which
> receive data from Kafka and which process data? Do they keep changing for
> every batch?
>
> b) Now I am trying to use #1 creating multiple DStreams, filtering them
> and then doing a union. I don't understand why would the number of events
> processed per 120 seconds batch will change drastically. PFA the events/sec
> graph while running with 1 receiver. How to debug this?
>
> c) What will be the most suitable method to integrate with Kafka from
> above 3? Any recommendations for getting maximum performance, running the
> streaming application reliably in production environment?
>
> --
> Thanks
> Jatin Kumar
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Sample sql query using pyspark

2016-03-01 Thread James Barney
Maurin,

I don't know the technical reason why but: try removing the 'limit 100'
part of your query. I was trying to do something similar the other week and
what I found is that each executor doesn't necessarily get the same 100
rows. Joins would fail or result with a bunch of nulls when keys weren't
found between the slices of 100 rows.

Once I removed the 'limit ' part of my query, all the results were the
same across the board and taking samples worked again.

If the amount of data is too large, or you're trying to just test on a
smaller size, just define another table and insert only 100 rows into that
table.

I hope that helps!

On Tue, Mar 1, 2016 at 3:10 AM, Maurin Lenglart 
wrote:

> Hi,
> I am trying to get a sample of a sql query in to make the query run
> faster.
> My query look like this :
> SELECT `Category` as `Category`,sum(`bookings`) as
> `bookings`,sum(`dealviews`) as `dealviews` FROM groupon_dropbox WHERE
>  `event_date` >= '2015-11-14' AND `event_date` <= '2016-02-19' GROUP BY
> `Category` LIMIT 100
>
> The table is partitioned by event_date. And the code I am using is:
>  df = self.df_from_sql(sql, srcs)
>
> results = df.sample(False, 0.5).collect()
>
>  The results are a little bit different, but the execution time is almost the 
> same. Am I missing something?
>
>
> thanks
>
>


Spark executor killed without apparent reason

2016-03-01 Thread Nirav Patel
Hi,

We are using spark 1.5.2 or yarn. We have a spark application utilizing
about 15GB executor memory and 1500 overhead. However, at certain stage we
notice higher GC time (almost same as task time) spent. These executors are
bound to get killed at some point. However, nodemanager or resource manager
logs doesn't indicate failure due to 'beyond physical/virtual memory
limits' nor I see any 'heap space' or 'gc overhead exceeded' errors in
executor logs. Some of these high GC executor gets killed eventually but I
can't seem to find reason. Based on application logs it seems like executor
didn't respond to driver for long period of time and connection was reset.

Following are logs from 'yarn logs -applicationId appId_1232_xxx'


16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0
(TID 15318). 2099 bytes result sent to driver
16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 15333
16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0
(TID 15333)
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125
non-empty blocks out of 3007 blocks
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
remote fetches in 10 ms
16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
maprnode5 has been quiet for 12 ms while there are outstanding
requests. Assuming connection is dead; please adjust spark.network.timeout
if this is wrong.
16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from maprnode5 is closed
16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
starting block fetches
java.io.IOException: Connection from maprnode5 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:744)
16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3)
for 6 outstanding blocks after 5000 ms
16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive
connection to maprnode5, creating a new one.
16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in
connection from maprnode5
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at

Re: [ERROR]: Spark 1.5.2 + Hbase 1.1 + Hive 1.2 + HbaseIntegration

2016-03-01 Thread Ted Yu
16/03/01 01:36:31 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal):
java.lang.RuntimeException: hbase-default.xml file seems to be for an older
version of HBase (null), this version is 1.1.2.2.3.4.0-3485

The above was likely caused by some component being built with different
release of hbase.

Try setting "hbase.defaults.for.version.skip" to true.

Cheers

On Mon, Feb 29, 2016 at 9:12 PM, Ted Yu  wrote:

> 16/02/29 23:09:34 INFO ZooKeeper: Initiating client connection,
> connectString=localhost:2181 sessionTimeout=9
> watcher=hconnection-0x26fa89a20x0, quorum=localhost:2181, baseZNode=/hbase
>
> Since baseZNode didn't match what you set in hbase-site.xml, the cause was
> likely that hbase-site.xml being inaccessible to your Spark job.
>
> Please add it in your classpath.
>
> On Mon, Feb 29, 2016 at 8:42 PM, Ted Yu  wrote:
>
>> 16/02/29 23:09:34 INFO ClientCnxn: Opening socket connection to server
>> localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using
>> SASL (unknown error)
>>
>> Is your cluster secure cluster ?
>>
>> bq. Trace :
>>
>> Was there any output after 'Trace :' ?
>>
>> Was hbase-site.xml accessible to your Spark job ?
>>
>> Thanks
>>
>> On Mon, Feb 29, 2016 at 8:27 PM, Divya Gehlot 
>> wrote:
>>
>>> Hi,
>>> I am getting error when I am trying to connect hive table (which is
>>> being created through HbaseIntegration) in spark
>>>
>>> Steps I followed :
>>> *Hive Table creation code  *:
>>> CREATE EXTERNAL TABLE IF NOT EXISTS TEST(NAME STRING,AGE INT)
>>> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
>>> WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,0:AGE")
>>> TBLPROPERTIES ("hbase.table.name" = "TEST",
>>> "hbase.mapred.output.outputtable" = "TEST");
>>>
>>>
>>> *DESCRIBE TEST ;*
>>> col_namedata_typecomment
>>> namestring from deserializer
>>> age   int from deserializer
>>>
>>>
>>> *Spark Code :*
>>> import org.apache.spark._
>>> import org.apache.spark.sql._
>>>
>>> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> hiveContext.sql("from TEST SELECT  NAME").collect.foreach(println)
>>>
>>>
>>> *Starting Spark shell*
>>> spark-shell --jars
>>> /usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
>>> --driver-class-path
>>> /usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
>>> --packages com.databricks:spark-csv_2.10:1.3.0  --master yarn-client -i
>>> /TestDivya/Spark/InstrumentCopyToHDFSHive.scala
>>>
>>> *Stack Trace* :
>>>
>>> Stack SQL context available as sqlContext.
 Loading /TestDivya/Spark/InstrumentCopyToHDFSHive.scala...
 import org.apache.spark._
 import org.apache.spark.sql._
 16/02/29 23:09:29 INFO HiveContext: Initializing execution hive,
 version 1.2.1
 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
 2.7.1.2.3.4.0-3485
 16/02/29 23:09:29 INFO ClientWrapper: Loaded
 org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
 2.7.1.2.3.4.0-3485
 16/02/29 23:09:29 INFO HiveContext: default warehouse location is
 /user/hive/warehouse
 16/02/29 23:09:29 INFO HiveContext: Initializing
 HiveMetastoreConnection version 1.2.1 using Spark classes.
 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
 2.7.1.2.3.4.0-3485
 16/02/29 23:09:29 INFO ClientWrapper: Loaded
 org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
 2.7.1.2.3.4.0-3485
 16/02/29 23:09:30 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 16/02/29 23:09:30 INFO metastore: Trying to connect to metastore with
 URI thrift://ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal:9083
 16/02/29 23:09:30 INFO metastore: Connected to metastore.
 16/02/29 23:09:30 WARN DomainSocketFactory: The short-circuit local
 reads feature cannot be 

Re: Suggested Method to Write to Kafka

2016-03-01 Thread Sathish Dhinakaran
http://stackoverflow.com/questions/31590592/how-to-write-to-kafka-from-spark-streaming

On Tue, Mar 1, 2016 at 8:54 AM, Bryan Jeffrey 
wrote:

> Hello.
>
> Is there a suggested method and/or some example code to write results from
> a Spark streaming job back to Kafka?
>
> I'm using Scala and Spark 1.4.1.
>
> Regards,
>
> Bryan Jeffrey
>


Suggested Method to Write to Kafka

2016-03-01 Thread Bryan Jeffrey
Hello.

Is there a suggested method and/or some example code to write results from
a Spark streaming job back to Kafka?

I'm using Scala and Spark 1.4.1.

Regards,

Bryan Jeffrey


Re: Spark for client

2016-03-01 Thread Todd Nist
You could also look at Apache Toree, http://toree.apache.org/
, github : https://github.com/apache/incubator-toree.  This use to be the
Spark Kernel from IBM but has been contributed to Apache.

Good overview here on its features,
http://www.spark.tc/how-to-enable-interactive-applications-against-apache-spark/.
Specifically this section on usage:

Usage

Using the kernel as the backbone of communication, we have enabled several
higher-level applications to interact with Apache Spark:

*->* Livesheets
, a
line of business tool for data exploration

*->* A RESTful query engine
 running
on top of Spark SQL

*->* A demonstration of a PHP application utilizing Apache Spark
 at
ZendCon 2014

*->* IPython notebook
 running
the Spark Kernel underneath
HTH.
Todd

On Tue, Mar 1, 2016 at 4:10 AM, Mich Talebzadeh 
wrote:

> Thanks Mohannad.
>
> Installed Anaconda 3 that contains Jupyter. Now I want to access Spark on
> Scala from Jupyter. What is the easiest way of doing it without using
> Python!
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 March 2016 at 08:18, Mohannad Ali  wrote:
>
>> Jupyter (http://jupyter.org/) also supports Spark and generally it's a
>> beast allows you to do so much more.
>> On Mar 1, 2016 00:25, "Mich Talebzadeh" 
>> wrote:
>>
>>> Thank you very much both
>>>
>>> Zeppelin looks promising. Basically as I understand runs an agent on a
>>> given port (I chose 21999) on the host that Spark is installed. I created a
>>> notebook and running scripts through there. One thing for sure notebook
>>> just returns the results rather all other stuff that one does not need/.
>>>
>>> Cheers,
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 29 February 2016 at 19:22, Minudika Malshan 
>>> wrote:
>>>
 +Adding resources
 https://zeppelin.incubator.apache.org/docs/latest/interpreter/spark.html
 https://zeppelin.incubator.apache.org

 Minudika Malshan
 Undergraduate
 Department of Computer Science and Engineering
 University of Moratuwa.
 *Mobile : +94715659887 <%2B94715659887>*
 *LinkedIn* : https://lk.linkedin.com/in/minudika



 On Tue, Mar 1, 2016 at 12:51 AM, Minudika Malshan <
 minudika...@gmail.com> wrote:

> Hi,
>
> I think zeppelin spark interpreter will give a solution to your
> problem.
>
> Regards.
> Minudika
>
> Minudika Malshan
> Undergraduate
> Department of Computer Science and Engineering
> University of Moratuwa.
> *Mobile : +94715659887 <%2B94715659887>*
> *LinkedIn* : https://lk.linkedin.com/in/minudika
>
>
>
> On Tue, Mar 1, 2016 at 12:35 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Zeppelin?
>>
>> Regards
>> Sab
>> On 01-Mar-2016 12:27 am, "Mich Talebzadeh" 
>> wrote:
>>
>>> Hi,
>>>
>>> Is there such thing as Spark for client much like RDBMS client that
>>> have cut down version of their big brother useful for client 
>>> connectivity
>>> but cannot be used as server.
>>>
>>> Thanks
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>

>>>
>


Does pyspark still lag far behind the Scala API in terms of features

2016-03-01 Thread Joshua Sorrell
I haven't used Spark in the last year and a half. I am about to start a
project with a new team, and we need to decide whether to use pyspark or
Scala.

We are NOT a java shop. So some of the build tools/procedures will require
some learning overhead if we go the Scala route. What I want to know is: is
the Scala version of Spark still far enough ahead of pyspark to be well
worth any initial training overhead?

Particularly, we will be using Spark Streaming. I know a couple of years
ago that practically forced the decision to use Scala.  Is this still the
case?

Thanks in advance!


Dropping parquet file partitions

2016-03-01 Thread sparkuser2345
Is there a way to drop parquet file partitions through Spark? I'm
partitioning a parquet file by a date field and I would like to drop old
partitions in a file system agnostic manner. I guess I could read the whole
parquet file into a DataFrame, filter out the dates to be dropped, and
overwrite the parquet file, but that is quite a heavy task if there are lots
of partitions and I only need to drop a couple. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dropping-parquet-file-partitions-tp26368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Mllib Logistic Regression performance relative to Mahout

2016-03-01 Thread Sonal Goyal
You can also check if you are caching your input so that features are not
being read/computed every iteration.

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Mon, Feb 29, 2016 at 1:02 PM, Yashwanth Kumar 
wrote:

> Hi,
> If your features are numeric, try feature scaling and feed it to Spark
> Logistic Regression, It might increase rate%
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-Logistic-Regression-performance-relative-to-Mahout-tp26346p26358.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is spark.driver.maxResultSize used correctly ?

2016-03-01 Thread Jeff Zhang
Check the code again. Looks like currently the task result will be loaded
into memory no matter it is DirectTaskResult or InDirectTaskResult.
Previous I thought InDirectTaskResult can be loaded into memory later which
can save memory, RDD#collectAsIterator is what I thought that may save
memory.

On Tue, Mar 1, 2016 at 5:00 PM, Reynold Xin  wrote:

> How big of a deal is this though? If I am reading your email correctly,
> either way this job will fail. You simply want it to fail earlier in the
> executor side, rather than collecting it and fail on the driver side?
>
>
> On Sunday, February 28, 2016, Jeff Zhang  wrote:
>
>> data skew might be possible, but not the common case. I think we should
>> design for the common case, for the skew case, we may can set some
>> parameter of fraction to allow user to tune it.
>>
>> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin  wrote:
>>
>>> But sometimes you might have skew and almost all the result data are in
>>> one or a few tasks though.
>>>
>>>
>>> On Friday, February 26, 2016, Jeff Zhang  wrote:
>>>

 My job get this exception very easily even when I set large value of
 spark.driver.maxResultSize. After checking the spark code, I found
 spark.driver.maxResultSize is also used in Executor side to decide whether
 DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
 Using  spark.driver.maxResultSize / taskNum might be more proper. Because
 if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
 output. Then even the output of each task is less than
  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
 the total result size is 2g which will cause exception in driver side.


 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
 LogisticRegression.scala:283, took 33.796379 s

 Exception in thread "main" org.apache.spark.SparkException: Job aborted
 due to stage failure: Total size of serialized results of 1 tasks (1085.0
 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)


 --
 Best Regards

 Jeff Zhang

>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang


EMR 4.3.0 spark 1.6 shell problem

2016-03-01 Thread Oleg Ruchovets
Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell but
it looks it does't work and throws exceptions.
Please advice:

[hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
[hadoop@ip-172-31-39-37 bin]$ ./spark-shell
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support
was removed in 8.0
16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop
16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop
16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(hadoop); users
with modify permissions: Set(hadoop)
16/03/01 09:11:49 INFO HttpServer: Starting HTTP Server
16/03/01 09:11:49 INFO Utils: Successfully started service 'HTTP class
server' on port 47223.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
  /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_71)
Type in expressions to have them evaluated.
Type :help for more information.
16/03/01 09:11:53 INFO SparkContext: Running Spark version 1.6.0
16/03/01 09:11:53 INFO SecurityManager: Changing view acls to: hadoop
16/03/01 09:11:53 INFO SecurityManager: Changing modify acls to: hadoop
16/03/01 09:11:53 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(hadoop); users
with modify permissions: Set(hadoop)
16/03/01 09:11:54 INFO Utils: Successfully started service 'sparkDriver' on
port 52143.
16/03/01 09:11:54 INFO Slf4jLogger: Slf4jLogger started
16/03/01 09:11:54 INFO Remoting: Starting remoting
16/03/01 09:11:54 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@172.31.39.37:42989]
16/03/01 09:11:54 INFO Utils: Successfully started service
'sparkDriverActorSystem' on port 42989.
16/03/01 09:11:54 INFO SparkEnv: Registering MapOutputTracker
16/03/01 09:11:54 INFO SparkEnv: Registering BlockManagerMaster
16/03/01 09:11:54 INFO DiskBlockManager: Created local directory at
/mnt/tmp/blockmgr-afaf0e7f-086e-49f1-946d-798e605a3fdc
16/03/01 09:11:54 INFO MemoryStore: MemoryStore started with capacity 518.1
MB
16/03/01 09:11:55 INFO SparkEnv: Registering OutputCommitCoordinator
16/03/01 09:11:55 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
16/03/01 09:11:55 INFO SparkUI: Started SparkUI at http://172.31.39.37:4040
16/03/01 09:11:55 INFO RMProxy: Connecting to ResourceManager at /
172.31.39.37:8032
16/03/01 09:11:55 INFO Client: Requesting a new application from cluster
with 2 NodeManagers
16/03/01 09:11:55 INFO Client: Verifying our application has not requested
more than the maximum memory capability of the cluster (11520 MB per
container)
16/03/01 09:11:55 INFO Client: Will allocate AM container, with 896 MB
memory including 384 MB overhead
16/03/01 09:11:55 INFO Client: Setting up container launch context for our
AM
16/03/01 09:11:55 INFO Client: Setting up the launch environment for our AM
container
16/03/01 09:11:55 INFO Client: Preparing resources for our AM container
16/03/01 09:11:56 INFO Client: Uploading resource
file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar ->
hdfs://
172.31.39.37:8020/user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
16/03/01 09:11:56 INFO MetricsSaver: MetricsConfigRecord disabledInCluster:
false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60
disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500
lastModified: 1456818856695
16/03/01 09:11:56 INFO MetricsSaver: Created MetricsSaver
j-2FT6QNFSPTHNX:i-5f6bcadb:SparkSubmit:04807 period:60
/mnt/var/em/raw/i-5f6bcadb_20160301_SparkSubmit_04807_raw.bin
16/03/01 09:11:56 WARN DFSClient: DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
could only be replicated to 0 nodes instead of minReplication (=1).  There
are 0 datanode(s) running and no node(s) are excluded in this operation.
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:632)
at 

Re: Spark for client

2016-03-01 Thread Mich Talebzadeh
Thanks Mohannad.

Installed Anaconda 3 that contains Jupyter. Now I want to access Spark on
Scala from Jupyter. What is the easiest way of doing it without using
Python!

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 March 2016 at 08:18, Mohannad Ali  wrote:

> Jupyter (http://jupyter.org/) also supports Spark and generally it's a
> beast allows you to do so much more.
> On Mar 1, 2016 00:25, "Mich Talebzadeh"  wrote:
>
>> Thank you very much both
>>
>> Zeppelin looks promising. Basically as I understand runs an agent on a
>> given port (I chose 21999) on the host that Spark is installed. I created a
>> notebook and running scripts through there. One thing for sure notebook
>> just returns the results rather all other stuff that one does not need/.
>>
>> Cheers,
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 29 February 2016 at 19:22, Minudika Malshan 
>> wrote:
>>
>>> +Adding resources
>>> https://zeppelin.incubator.apache.org/docs/latest/interpreter/spark.html
>>> https://zeppelin.incubator.apache.org
>>>
>>> Minudika Malshan
>>> Undergraduate
>>> Department of Computer Science and Engineering
>>> University of Moratuwa.
>>> *Mobile : +94715659887 <%2B94715659887>*
>>> *LinkedIn* : https://lk.linkedin.com/in/minudika
>>>
>>>
>>>
>>> On Tue, Mar 1, 2016 at 12:51 AM, Minudika Malshan >> > wrote:
>>>
 Hi,

 I think zeppelin spark interpreter will give a solution to your
 problem.

 Regards.
 Minudika

 Minudika Malshan
 Undergraduate
 Department of Computer Science and Engineering
 University of Moratuwa.
 *Mobile : +94715659887 <%2B94715659887>*
 *LinkedIn* : https://lk.linkedin.com/in/minudika



 On Tue, Mar 1, 2016 at 12:35 AM, Sabarish Sasidharan <
 sabarish.sasidha...@manthan.com> wrote:

> Zeppelin?
>
> Regards
> Sab
> On 01-Mar-2016 12:27 am, "Mich Talebzadeh" 
> wrote:
>
>> Hi,
>>
>> Is there such thing as Spark for client much like RDBMS client that
>> have cut down version of their big brother useful for client connectivity
>> but cannot be used as server.
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>

>>>
>>


Re: Is spark.driver.maxResultSize used correctly ?

2016-03-01 Thread Reynold Xin
How big of a deal is this though? If I am reading your email correctly,
either way this job will fail. You simply want it to fail earlier in the
executor side, rather than collecting it and fail on the driver side?

On Sunday, February 28, 2016, Jeff Zhang  wrote:

> data skew might be possible, but not the common case. I think we should
> design for the common case, for the skew case, we may can set some
> parameter of fraction to allow user to tune it.
>
> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin  > wrote:
>
>> But sometimes you might have skew and almost all the result data are in
>> one or a few tasks though.
>>
>>
>> On Friday, February 26, 2016, Jeff Zhang > > wrote:
>>
>>>
>>> My job get this exception very easily even when I set large value of
>>> spark.driver.maxResultSize. After checking the spark code, I found
>>> spark.driver.maxResultSize is also used in Executor side to decide whether
>>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>>> output. Then even the output of each task is less than
>>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>>> the total result size is 2g which will cause exception in driver side.
>>>
>>>
>>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>>> LogisticRegression.scala:283, took 33.796379 s
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Converting array to DF

2016-03-01 Thread Jeff Zhang
Change Array to Seq and import sqlContext.implicits._



On Tue, Mar 1, 2016 at 4:38 PM, Ashok Kumar 
wrote:

> Hi,
>
> I have this
>
> val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9),
> ("f", 4), ("g", 6))
> weights.toDF("weights","value")
>
> I want to convert the Array to DF but I get thisor
>
> weights: Array[(String, Int)] = Array((a,3), (b,2), (c,5), (d,1), (e,9),
> (f,4), (g,6))
> :33: error: value toDF is not a member of Array[(String, Int)]
>   weights.toDF("weights","value")
>
> I want to label columns and print out the contents in value order please I
> don't know why I am getting this error
>
> Thanks
>
>


-- 
Best Regards

Jeff Zhang


Converting array to DF

2016-03-01 Thread Ashok Kumar
 Hi,
I have this
val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), 
("g", 6))
weights.toDF("weights","value")
I want to convert the Array to DF but I get thisor
weights: Array[(String, Int)] = Array((a,3), (b,2), (c,5), (d,1), (e,9), (f,4), 
(g,6))
:33: error: value toDF is not a member of Array[(String, Int)]
  weights.toDF("weights","value")
I want to label columns and print out the contents in value order please I 
don't know why I am getting this error
Thanks


Re: Support virtualenv in PySpark

2016-03-01 Thread Jeff Zhang
I may not express it clearly. This method is trying to create virtualenv
before python worker start, and this virtualenv is application scope, after
the spark application job finish, the virtualenv will be cleanup. And the
virtualenvs don't need to be the same path for each node (In my POC, it is
the yarn container working directory). So that means user don't need to
manually install packages on each node (sometimes you even can't install
packages on cluster due to security reason). This is the biggest benefit
and purpose that user can create virtualenv on demand without touching each
node even when you are not administrator.  The cons is the extra cost for
installing the required packages before starting python worker. But if it
is an application which will run for several hours then the extra cost can
be ignored.

On Tue, Mar 1, 2016 at 4:15 PM, Mohannad Ali  wrote:

> Hello Jeff,
>
> Well this would also mean that you have to manage the same virtualenv
> (same path) on all nodes and install your packages to it the same way you
> would if you would install the packages to the default python path.
>
> In any case at the moment you can already do what you proposed by creating
> identical virtualenvs on all nodes on the same path and change the spark
> python path to point to the virtualenv.
>
> Best Regards,
> Mohannad
> On Mar 1, 2016 06:07, "Jeff Zhang"  wrote:
>
>> I have created jira for this feature , comments and feedback are welcome
>> about how to improve it and whether it's valuable for users.
>>
>> https://issues.apache.org/jira/browse/SPARK-13587
>>
>>
>> Here's some background info and status of this work.
>>
>>
>> Currently, it's not easy for user to add third party python packages in
>> pyspark.
>>
>>- One way is to using --py-files (suitable for simple dependency, but
>>not suitable for complicated dependency, especially with transitive
>>dependency)
>>- Another way is install packages manually on each node (time
>>wasting, and not easy to switch to different environment)
>>
>> Python now has 2 different virtualenv implementation. One is native
>> virtualenv another is through conda.
>>
>> I have implemented POC for this features. Here's one simple command for
>> how to use virtualenv in pyspark
>>
>> bin/spark-submit --master yarn --deploy-mode client --conf 
>> "spark.pyspark.virtualenv.enabled=true" --conf 
>> "spark.pyspark.virtualenv.type=conda" --conf 
>> "spark.pyspark.virtualenv.requirements=/Users/jzhang/work/virtualenv/conda.txt"
>>  --conf "spark.pyspark.virtualenv.path=/Users/jzhang/anaconda/bin/conda"  
>> ~/work/virtualenv/spark.py
>>
>> There're 4 properties needs to be set
>>
>>- spark.pyspark.virtualenv.enabled (enable virtualenv)
>>- spark.pyspark.virtualenv.type (native/conda are supported, default
>>is native)
>>- spark.pyspark.virtualenv.requirements (requirement file for the
>>dependencies)
>>- spark.pyspark.virtualenv.path (path to the executable for for
>>virtualenv/conda)
>>
>>
>>
>>
>>
>>
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang


Re: Spark for client

2016-03-01 Thread Mohannad Ali
Jupyter (http://jupyter.org/) also supports Spark and generally it's a
beast allows you to do so much more.
On Mar 1, 2016 00:25, "Mich Talebzadeh"  wrote:

> Thank you very much both
>
> Zeppelin looks promising. Basically as I understand runs an agent on a
> given port (I chose 21999) on the host that Spark is installed. I created a
> notebook and running scripts through there. One thing for sure notebook
> just returns the results rather all other stuff that one does not need/.
>
> Cheers,
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 29 February 2016 at 19:22, Minudika Malshan 
> wrote:
>
>> +Adding resources
>> https://zeppelin.incubator.apache.org/docs/latest/interpreter/spark.html
>> https://zeppelin.incubator.apache.org
>>
>> Minudika Malshan
>> Undergraduate
>> Department of Computer Science and Engineering
>> University of Moratuwa.
>> *Mobile : +94715659887 <%2B94715659887>*
>> *LinkedIn* : https://lk.linkedin.com/in/minudika
>>
>>
>>
>> On Tue, Mar 1, 2016 at 12:51 AM, Minudika Malshan 
>> wrote:
>>
>>> Hi,
>>>
>>> I think zeppelin spark interpreter will give a solution to your problem.
>>>
>>> Regards.
>>> Minudika
>>>
>>> Minudika Malshan
>>> Undergraduate
>>> Department of Computer Science and Engineering
>>> University of Moratuwa.
>>> *Mobile : +94715659887 <%2B94715659887>*
>>> *LinkedIn* : https://lk.linkedin.com/in/minudika
>>>
>>>
>>>
>>> On Tue, Mar 1, 2016 at 12:35 AM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 Zeppelin?

 Regards
 Sab
 On 01-Mar-2016 12:27 am, "Mich Talebzadeh" 
 wrote:

> Hi,
>
> Is there such thing as Spark for client much like RDBMS client that
> have cut down version of their big brother useful for client connectivity
> but cannot be used as server.
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>

>>>
>>
>


Re: Support virtualenv in PySpark

2016-03-01 Thread Mohannad Ali
Hello Jeff,

Well this would also mean that you have to manage the same virtualenv (same
path) on all nodes and install your packages to it the same way you would
if you would install the packages to the default python path.

In any case at the moment you can already do what you proposed by creating
identical virtualenvs on all nodes on the same path and change the spark
python path to point to the virtualenv.

Best Regards,
Mohannad
On Mar 1, 2016 06:07, "Jeff Zhang"  wrote:

> I have created jira for this feature , comments and feedback are welcome
> about how to improve it and whether it's valuable for users.
>
> https://issues.apache.org/jira/browse/SPARK-13587
>
>
> Here's some background info and status of this work.
>
>
> Currently, it's not easy for user to add third party python packages in
> pyspark.
>
>- One way is to using --py-files (suitable for simple dependency, but
>not suitable for complicated dependency, especially with transitive
>dependency)
>- Another way is install packages manually on each node (time wasting,
>and not easy to switch to different environment)
>
> Python now has 2 different virtualenv implementation. One is native
> virtualenv another is through conda.
>
> I have implemented POC for this features. Here's one simple command for
> how to use virtualenv in pyspark
>
> bin/spark-submit --master yarn --deploy-mode client --conf 
> "spark.pyspark.virtualenv.enabled=true" --conf 
> "spark.pyspark.virtualenv.type=conda" --conf 
> "spark.pyspark.virtualenv.requirements=/Users/jzhang/work/virtualenv/conda.txt"
>  --conf "spark.pyspark.virtualenv.path=/Users/jzhang/anaconda/bin/conda"  
> ~/work/virtualenv/spark.py
>
> There're 4 properties needs to be set
>
>- spark.pyspark.virtualenv.enabled (enable virtualenv)
>- spark.pyspark.virtualenv.type (native/conda are supported, default
>is native)
>- spark.pyspark.virtualenv.requirements (requirement file for the
>dependencies)
>- spark.pyspark.virtualenv.path (path to the executable for for
>virtualenv/conda)
>
>
>
>
>
>
> Best Regards
>
> Jeff Zhang
>


Sample sql query using pyspark

2016-03-01 Thread Maurin Lenglart
Hi,
I am trying to get a sample of a sql query in to make the query run faster.
My query look like this :
SELECT `Category` as `Category`,sum(`bookings`) as `bookings`,sum(`dealviews`) 
as `dealviews` FROM groupon_dropbox WHERE  `event_date` >= '2015-11-14' AND 
`event_date` <= '2016-02-19' GROUP BY `Category` LIMIT 100

The table is partitioned by event_date. And the code I am using is:
 df = self.df_from_sql(sql, srcs)

results = df.sample(False, 0.5).collect()

 The results are a little bit different, but the execution time is almost the 
same. Am I missing something?


thanks


Re: [Help]: DataframeNAfunction fill method throwing exception

2016-03-01 Thread ai he
Hi Divya,

I guess the error is thrown from spark-csv. Spark-csv tries to parse string
"null" to double.

The workaround is to add nullValue option, like .option("nullValue",
"null"). But this nullValue feature is not included in current spark-csv
1.3. Just checkout the master of spark-csv and use the local ivy to make it
work.

Best,
Ai

On Thu, Feb 25, 2016 at 11:34 PM Divya Gehlot 
wrote:

> Hi Jan ,
> Thanks for help.
> Alas..
> you suggestion also didnt work
>
> scala> import org.apache.spark.sql.types.{StructType, StructField,
>> StringType,IntegerType,LongType,DoubleType, FloatType};
>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>> IntegerType, LongType, DoubleType, FloatType}
>> scala> val nulltestSchema = StructType(Seq(StructField("name",
>> StringType, false),StructField("age", DoubleType, true)))
>> nulltestSchema: org.apache.spark.sql.types.StructType =
>> StructType(StructField(name,StringType,false),
>> StructField(age,DoubleType,true))
>>
> scala> val dfnulltest =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").schema(nulltestSchema).load("hdfs://xx.xx.xx.xxx:8020/TestDivya/Spark/nulltest.csv")
>
>
>> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
>>
> scala> dfnulltest.selectExpr("name", "coalesce(age, 0) as age")
>> res0: org.apache.spark.sql.DataFrame = [name: string, age: double]
>> scala> val dfresult = dfnulltest.selectExpr("name", "coalesce(age, 0) as
>> age")
>> dfresult: org.apache.spark.sql.DataFrame = [name: string, age: double]
>> scala> dfresult.show
>
>
>  java.text.ParseException: Unparseable number: "null"
> at java.text.NumberFormat.parse(NumberFormat.java:350)
>
>
> On 26 February 2016 at 15:15, Jan Štěrba  wrote:
>
>> just use coalesce function
>>
>> df.selectExpr("name", "coalesce(age, 0) as age")
>>
>> --
>> Jan Sterba
>> https://twitter.com/honzasterba | http://flickr.com/honzasterba |
>> http://500px.com/honzasterba
>>
>> On Fri, Feb 26, 2016 at 5:27 AM, Divya Gehlot 
>> wrote:
>>
>>> Hi,
>>> I have dataset which looks like below
>>> name age
>>> alice 35
>>> bob null
>>> peter 24
>>> I need to replace null values of columns with 0
>>> so  I referred Spark API DataframeNAfunctions.scala
>>> 
>>>
>>>  I tried the below code its throwing exception
>>> scala> import org.apache.spark.sql.types.{StructType, StructField,
>>> StringType,IntegerType,LongType,DoubleType, FloatType};
>>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>>> IntegerType, LongType, DoubleType, FloatType}
>>>
>>> scala> val nulltestSchema = StructType(Seq(StructField("name",
>>> StringType, false),StructField("age", DoubleType, true)))
>>> nulltestSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField(name,StringType,false),
>>> StructField(age,DoubleType,true))
>>>
>>> scala> val dfnulltest =
>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>> "true").schema(nulltestSchema).load("hdfs://
>>> 172.31.29.201:8020/TestDivya/Spark/nulltest.csv")
>>> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
>>>
>>> scala> val dfchangenull =
>>> dfnulltest.na.fill(0,Seq("age")).select("name","age")
>>> dfchangenull: org.apache.spark.sql.DataFrame = [name: string, age:
>>> double]
>>>
>>> scala> dfchangenull.show
>>> 16/02/25 23:15:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID
>>> 2, ip-172-31-22-135.ap-southeast-1.compute.internal):
>>> java.text.ParseException: Unparseable number: "null"
>>> at java.text.NumberFormat.parse(NumberFormat.java:350)
>>>
>>>
>>
>>
>


  1   2   >