转发: Error:scalac: Error: assertion failed: List(object package$DebugNode, object package$DebugNode)

2015-12-30 Thread zml张明磊
I’m sorry. The error is not when I build spark occurs. It’s happen when running 
the example with LogisticRegreesionWithElasticNetExample.scala.

发件人: zml张明磊 [mailto:mingleizh...@ctrip.com]
发送时间: 2015年12月31日 15:01
收件人: user@spark.apache.org
主题: Error:scalac: Error: assertion failed: List(object package$DebugNode, 
object package$DebugNode)

Hello,

Recently, I build spark from apache/master and getting the following error. 
From stackoverflow 
http://stackoverflow.com/questions/24165184/scalac-assertion-failed-while-run-scalatest-in-idea,
 I can not find Preferences > Scala he said in Intellij IDEA. And SBT is not 
worked for me in our company. Use maven instead. How can I fix and work around 
it ? Last : happy new year to everyone.

Error:scalac: Error: assertion failed: List(object package$DebugNode, object 
package$DebugNode)
  java.lang.AssertionError: assertion failed: List(object 
package$DebugNode, object package$DebugNode)
   at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678)
   at 
scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988)
   at 
scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991)
   at 
scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371)
   at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120)
   at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583)
   at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557)
   at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553)
   at scala.tools.nsc.Global$Run.compile(Global.scala:1662)
   at xsbt.CachedCompiler0.run(CompilerInterface.scala:126)


thanks,
Minglei.


Re: Spark MLLib KMeans Performance on Amazon EC2 M3.2xlarge

2015-12-30 Thread Yanbo Liang
Hi Jia,

You can try to use inputRDD.persist(MEMORY_AND_DISK) and verify whether it
can produce stable performance. The storage level of MEMORY_AND_DISK will
store the partitions that don't fit on disk and read them from there when
they are needed.
Actually, it's not necessary to set so large driver memory in your case,
because KMeans use low memory for driver if your k is not very large.

Cheers
Yanbo

2015-12-30 22:20 GMT+08:00 Jia Zou :

> I am running Spark MLLib KMeans in one EC2 M3.2xlarge instance with 8 CPU
> cores and 30GB memory. Executor memory is set to 15GB, and driver memory is
> set to 15GB.
>
> The observation is that, when input data size is smaller than 15GB, the
> performance is quite stable. However, when input data becomes larger than
> that, the performance will be extremely unpredictable. For example, for
> 15GB input, with inputRDD.persist(MEMORY_ONLY) , I've got three
> dramatically different testing results: 27mins, 61mins and 114 mins. (All
> settings are the same for the 3 tests, and I will create input data
> immediately before running each of the tests to keep OS buffer cache hot.)
>
> Anyone can help to explain this? Thanks very much!
>
>


K means clustering in spark

2015-12-30 Thread anjali . gautam09
Hi,

I am trying to use kmeans for clustering in spark using python. I implemented 
it on the data set which spark has within. It's a 3*4 matrix. 

Can anybody please help me with how and what is orientation of data for kmeans. 
Also how to find out what all clusters and its members are.

Thanks 
Anjali


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 回复: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-30 Thread SparkUser
Sounds like you guys are on the right track, this is purely FYI because 
I haven't seen it posted, just responding to the line in the original 
post that your data structure should fit in memory.


OK two more disclaimers "FWIW" and "maybe this is not relevant or 
already covered" OK here goes...


 from 
http://spark.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks


Sometimes, you will get an OutOfMemoryError not because your RDDs don’t 
fit in memory, but because the working set of one of your tasks, such as 
one of the reduce tasks in |groupByKey|, was too large. Spark’s shuffle 
operations (|sortByKey|, |groupByKey|, |reduceByKey|, |join|, etc) build 
a hash table within each task to perform the grouping, which can often 
be large. The simplest fix here is to /increase the level of 
parallelism/, so that each task’s input set is smaller. Spark can 
efficiently support tasks as short as 200 ms, because it reuses one 
executor JVM across many tasks and it has a low task launching cost, so 
you can safely increase the level of parallelism to more than the number 
of cores in your clusters.


I would be curious if that helps at all. Sounds like an interesting 
problem you are working on.


Jim

On 12/29/2015 05:51 PM, Davies Liu wrote:

Hi Andy,

Could you change logging level to INFO and post some here? There will be some 
logging about the memory usage of a task when OOM.

In 1.6, the memory for a task is : (HeapSize  - 300M) * 0.75 / number of tasks. 
Is it possible that the heap is too small?

Davies

--
Davies Liu
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)

已使用 Sparrow (http://www.sparrowmailapp.com/?sig)

在 2015年12月29日 星期二,下午4:28,Andy Davidson 写道:


Hi Michael
  
https://github.com/apache/spark/archive/v1.6.0.tar.gz
  
Both 1.6.0 and 1.5.2 my unit test work when I call reparation(1) before saving output. Coalesce still fails.
  
Coalesce(1) spark-1.5.2

Caused by:
java.io.IOException: Unable to acquire 33554432 bytes of memory
  
  
Coalesce(1) spark-1.6.0
  
Caused by:

java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
  
Hope this helps
  
Andy
  
From: Michael Armbrust 

Date: Monday, December 28, 2015 at 2:41 PM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: trouble understanding data frame memory usage 
³java.io.IOException: Unable to acquire memory²
  

Unfortunately in 1.5 we didn't force operators to spill when ran out of memory 
so there is not a lot you can do. It would be awesome if you could test with 
1.6 and see if things are any better?
  
On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson  wrote:

I am using spark 1.5.1. I am running into some memory problems with a java unit test. Yes 
I could fix it by setting –Xmx (its set to 1024M) how ever I want to better understand 
what is going on so I can write better code in the future. The test runs on a Mac, 
master="Local[2]"
  
I have a java unit test that starts by reading a 672K ascii file. I my output data file is 152K. Its seems strange that such a small amount of data would cause an out of memory exception. I am running a pretty standard machine learning process
  
Load data

create a ML pipeline
transform the data
Train a model
Make predictions
Join the predictions back to my original data set
Coalesce(1), I only have a small amount of data and want to save it in a single 
file
Save final results back to disk
  
  
Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to acquire memory”
  
To try and figure out what is going I put log messages in to count the number of partitions
  
Turns out I have 20 input files, each one winds up in a separate partition. Okay so after loading I call coalesce(1) and check to make sure I only have a single partition.
  
The total number of observations is 1998.
  
After calling step 7 I count the number of partitions and discovered I have 224 partitions!. Surprising given I called Coalesce(1) before I did anything with the data. My data set should easily fit in memory. When I save them to disk I get 202 files created with 162 of them being empty!
  
In general I am not explicitly using cache.
  
Some of the data frames get registered as tables. I find it easier to use sql.
  
Some of the data frames get converted back to RDDs. I find it easier to create RDD this way
  
I put calls to unpersist(true). In several places
  
  
private void memoryCheck(String name) {
  
  
Runtime rt = Runtime.getRuntime();
  
  
logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} df.size: {}",
  
  
name,
  
  
String.format("%,d", rt.totalMemory()),
  
  
String.format("%,d", rt.freeMemory()));
  
  
}
  
  
  
Any idea how I can get a better understanding of what is going on? My goal is to learn 

SparkSQL integration issue with AWS S3a

2015-12-30 Thread KOSTIANTYN Kudriavtsev
Dear Spark community,

I faced the following issue with trying accessing data on S3a, my code is
the following:

val sparkConf = new SparkConf()

val sc = new SparkContext(sparkConf)
sc.hadoopConfiguration.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")

val sqlContext = SQLContext.getOrCreate(sc)

val df = sqlContext.read.parquet(...)

df.count


It results in the following exception and log messages:

15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load
credentials from BasicAWSCredentialsProvider: *Access key or secret
key is null*
15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance
metadata service at URL:
http://x.x.x.x/latest/meta-data/iam/security-credentials/
15/12/30 
17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials
from InstanceProfileCredentialsProvider: The requested metadata is not
found at http://x.x.x.x/latest/meta-data/iam/security-credentials/
15/12/30 
17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
com.amazonaws.AmazonClientException: Unable to load AWS credentials
from any provider in the chain
at 
com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at 
com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at 
com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)


I run standalone spark 1.5.2 and using hadoop 2.7.1

any ideas/workarounds?

AWS credentials are correct for this bucket

Thank you,
Konstantin Kudryavtsev


How to register a Tuple3 with KryoSerializer?

2015-12-30 Thread Russ
I need to register with KryoSerializer a Tuple3 that is generated by a call to 
the sortBy() method that eventually calls collect() from 
Partitioner.RangePartitioner.sketch().
The IntelliJ Idea debugger indicates that the for the Tuple3 are 
java.lang.Integer, java.lang.Integer and long[].  So, the question is, how 
should I specify the long[] type?
I have tried the following from my Scala code:
sparkConf.registerKryoClasses(Array(classOf[scala.Tuple3[java.lang.Integer,  
java.lang.Integer, Array[java.lang.Long]]]))
However, that approach throws the following exception which indicates that I 
have failed to register the Tuple3 correctly:
java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]
Can anyone suggest the correct way to register this Tuple3?  I suppose that I 
could create register the tuple from a Java method but it would be nice to 
avoid having to introduce any Java into my code.
Thanks.


Re: Run ad-hoc queries at runtime against cached RDDs

2015-12-30 Thread Chris Fregly
There are a few diff ways to apply approximation algorithms and
probabilistic data structures to your Spark data - including Spark's
countApproxDistinct() methods as you pointed out.

There's also Twitter Algebird, and Redis HyperLogLog (PFCOUNT, PFADD).

Here's some examples from my *pipeline Github project*
 that demonstrates how to
use these in a streaming context - if that's interesting to you, at all:

*1) Algebird CountMin Sketch*

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/AlgebirdCountMinSketchTopK.scala

*2) Algebird HyperLogLog*

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/AlgebirdHyperLogLog.scala

*3) Redis HyperLogLog*

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/RedisHyperLogLog.scala

In addition, my *Global Advanced Apache Spark Meetup* (based in SF) has an
entire evening dedicated to this exact topic next month:

http://www.meetup.com/Advanced-Apache-Spark-Meetup/events/22616/.

Video, slides, and live streaming urls will be available the day of the
meetup.


On Mon, Dec 14, 2015 at 12:20 PM, Krishna Rao 
wrote:

> Thanks for the response Jörn. So to elaborate, I have a large dataset with
> userIds, each tagged with a property, e.g.:
>
> user_1prop1=X
> user_2prop1=Yprop2=A
> user_3prop2=B
>
>
> I would like to be able to get the number of distinct users that have a
> particular property (or combination of properties). The cardinality of each
> property is in the 1000s and will only grow, as will the number of
> properties. I'm happy with approximate values to trade accuracy for
> performance.
>
> Spark's performance when doing this via spark-shell is more that excellent
> using the "countApproxDistinct" method on a "javaRDD". However, I've no
> idea what's the best way to be able to run a query programatically like I
> can do manually via spark-shell.
>
> Hope this clarifies things.
>
>
> On 14 December 2015 at 17:04, Jörn Franke  wrote:
>
>> Can you elaborate a little bit more on the use case? It looks a little
>> bit like an abuse of Spark in general . Interactive queries that are not
>> suitable for in-memory batch processing might be better supported by ignite
>> that has in-memory indexes, concept of hot, warm, cold data etc. or hive on
>> tez+llap .
>>
>> > On 14 Dec 2015, at 17:19, Krishna Rao  wrote:
>> >
>> > Hi all,
>> >
>> > What's the best way to run ad-hoc queries against a cached RDDs?
>> >
>> > For example, say I have an RDD that has been processed, and persisted
>> to memory-only. I want to be able to run a count (actually
>> "countApproxDistinct") after filtering by an, at compile time, unknown
>> (specified by query) value.
>> >
>> > I've experimented with using (abusing) Spark Streaming, by streaming
>> queries and running these against the cached RDD. However, as I say I don't
>> think that this is an intended use-case of Streaming.
>> >
>> > Cheers,
>> >
>> > Krishna
>>
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: 回复: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-30 Thread Chris Fregly
@Jim-

I'm wondering if those docs are outdated as its my understanding (please 
correct if I'm wrong), that we should never be seeing OOMs as 1.5/Tungsten not 
only improved (reduced) the memory footprint of our data, but also introduced 
better task level - and even key level - external spilling before an OOM occurs.

Michael's comment seems to indicate there was a missed case that is fixed in 
1.6.

I personally see far too many OOMs for what I expected out of 1.5, so I'm 
anxious to try 1.6 and hopefully squash more of these edge cases.

while increasing parallelism is definitely a best practice and applies either 
way, the docs could use some updating, I feel, by the contributors of this code.

> On Dec 30, 2015, at 12:18 PM, SparkUser  wrote:
> 
> Sounds like you guys are on the right track, this is purely FYI because I 
> haven't seen it posted, just responding to the line in the original post that 
> your data structure should fit in memory.
> 
> OK two more disclaimers "FWIW" and "maybe this is not relevant or already 
> covered" OK here goes...
> 
>  from 
> http://spark.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks
> 
> Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit 
> in memory, but because the working set of one of your tasks, such as one of 
> the reduce tasks in groupByKey, was too large. Spark’s shuffle operations 
> (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within 
> each task to perform the grouping, which can often be large. The simplest fix 
> here is to increase the level of parallelism, so that each task’s input set 
> is smaller. Spark can efficiently support tasks as short as 200 ms, because 
> it reuses one executor JVM across many tasks and it has a low task launching 
> cost, so you can safely increase the level of parallelism to more than the 
> number of cores in your clusters.
> 
> I would be curious if that helps at all. Sounds like an interesting problem 
> you are working on.
> 
> Jim
> 
>> On 12/29/2015 05:51 PM, Davies Liu wrote:
>> Hi Andy,  
>> 
>> Could you change logging level to INFO and post some here? There will be 
>> some logging about the memory usage of a task when OOM.  
>> 
>> In 1.6, the memory for a task is : (HeapSize  - 300M) * 0.75 / number of 
>> tasks. Is it possible that the heap is too small?
>> 
>> Davies  
>> 
>> --  
>> Davies Liu
>> Sent with Sparrow (http://www.sparrowmailapp.com/?sig)
>> 
>> 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)  
>> 
>> 在 2015年12月29日 星期二,下午4:28,Andy Davidson 写道:
>> 
>>> Hi Michael
>>>  
>>> https://github.com/apache/spark/archive/v1.6.0.tar.gz
>>>  
>>> Both 1.6.0 and 1.5.2 my unit test work when I call reparation(1) before 
>>> saving output. Coalesce still fails.  
>>>  
>>> Coalesce(1) spark-1.5.2
>>> Caused by:
>>> java.io.IOException: Unable to acquire 33554432 bytes of memory
>>>  
>>>  
>>> Coalesce(1) spark-1.6.0
>>>  
>>> Caused by:  
>>> java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
>>>  
>>> Hope this helps
>>>  
>>> Andy
>>>  
>>> From: Michael Armbrust >> (mailto:mich...@databricks.com)>
>>> Date: Monday, December 28, 2015 at 2:41 PM
>>> To: Andrew Davidson >> (mailto:a...@santacruzintegration.com)>
>>> Cc: "user @spark" 
>>> Subject: Re: trouble understanding data frame memory usage 
>>> ³java.io.IOException: Unable to acquire memory²
>>>  
 Unfortunately in 1.5 we didn't force operators to spill when ran out of 
 memory so there is not a lot you can do. It would be awesome if you could 
 test with 1.6 and see if things are any better?
  
 On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson 
  
 wrote:
> I am using spark 1.5.1. I am running into some memory problems with a 
> java unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how 
> ever I want to better understand what is going on so I can write better 
> code in the future. The test runs on a Mac, master="Local[2]"
>  
> I have a java unit test that starts by reading a 672K ascii file. I my 
> output data file is 152K. Its seems strange that such a small amount of 
> data would cause an out of memory exception. I am running a pretty 
> standard machine learning process
>  
> Load data
> create a ML pipeline
> transform the data
> Train a model
> Make predictions
> Join the predictions back to my original data set
> Coalesce(1), I only have a small amount of data and want to save it in a 
> single file
> Save final results back to disk
>  
>  
> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to 
> acquire memory”
>  
> To try and figure out what is going I put log messages in to count 

Re: SparkSQL Hive orc snappy table

2015-12-30 Thread Dawid Wysakowicz
I do understand that Snappy is not splittable as such, but ORCFile is. In
ORC blocks are compressed with snappy so there should be no problem with it.

Anyway ZLIB(used both in ORC and Parquet by default) is also not splittable
but it works perfectly fine.

2015-12-30 16:26 GMT+01:00 Chris Fregly :

> Reminder that Snappy is not a splittable format.
>
> I've had success with Hive + LZF (splittable) and bzip2 (also splittable).
>
> Gzip is also not splittable, so you won't be utilizing your cluster to
> process this data in parallel as only 1 task can read and process
> unsplittable data - versus many tasks spread across the cluster.
>
> On Wed, Dec 30, 2015 at 6:45 AM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
>
>> Didn't anyone used spark with orc and snappy compression?
>>
>> 2015-12-29 18:25 GMT+01:00 Dawid Wysakowicz :
>>
>>> Hi,
>>>
>>> I have a table in hive stored as orc with compression = snappy. I try to
>>> execute a query on that table that fails (previously I run it on table in
>>> orc-zlib format and parquet so it is not the matter of query).
>>>
>>> I managed to execute this query with hive on tez on that tables.
>>>
>>> The exception i get is as follows:
>>>
>>> 15/12/29 17:16:46 WARN scheduler.DAGScheduler: Creating new stage failed
>>> due to exception - job: 3
>>> java.lang.RuntimeException: serious problem
>>> at
>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
>>> at
>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
>>> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>>> at
>>> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.getPartitions(MapPartitionsWithPreparationRDD.scala:40)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>>> at org.apache.spark.ShuffleDependency.(Dependency.scala:82)
>>> at
>>> org.apache.spark.sql.execution.ShuffledRowRDD.getDependencies(ShuffledRowRDD.scala:59)
>>> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:226)
>>> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:224)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.dependencies(RDD.scala:224)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:388)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:405)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:370)
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:253)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:354)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:351)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:351)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:363)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:266)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:300)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:734)
>>> at
>>> 

DStream keyBy

2015-12-30 Thread Brian London
RDD has a method keyBy[K](f: T=>K) that acts as an alias for map(x =>
(f(x), x)) and is useful for generating pair RDDs.  Is there a reason this
method doesn't exist on DStream?  It's a fairly heavily used method and
allows clearer code than the more verbose map.


Using Experminal Spark Features

2015-12-30 Thread David Newberger
Hi All,

I've been looking at the Direct Approach for streaming Kafka integration 
(http://spark.apache.org/docs/latest/streaming-kafka-integration.html) because 
it looks like a good fit for our use cases. My concern is the feature is 
experimental according to the documentation. Has anyone used this approach yet 
and if so what has you experience been with using it? If it helps we'd be 
looking to implement it using Scala. Secondly, in general what has people 
experience been with using experimental features in Spark?

Cheers,

David Newberger



Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Chris Fregly
are the credentials visible from each Worker node to all the Executor JVMs on 
each Worker?

> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Dear Spark community,
> 
> I faced the following issue with trying accessing data on S3a, my code is the 
> following:
> 
> val sparkConf = new SparkConf()
> 
> val sc = new SparkContext(sparkConf)
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
> val sqlContext = SQLContext.getOrCreate(sc)
> val df = sqlContext.read.parquet(...)
> df.count
> 
> It results in the following exception and log messages:
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from BasicAWSCredentialsProvider: Access key or secret key is null
> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
> metadata service at URL: 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from InstanceProfileCredentialsProvider: The requested metadata 
> is not found at http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
> com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
> provider in the chain
>   at 
> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
> 
> I run standalone spark 1.5.2 and using hadoop 2.7.1
> 
> any ideas/workarounds?
> 
> AWS credentials are correct for this bucket
> 
> Thank you,
> Konstantin Kudryavtsev


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread KOSTIANTYN Kudriavtsev
Chris,

 good question, as you can see from the code I set up them on driver, so I
expect they will be propagated to all nodes, won't them?

Thank you,
Konstantin Kudryavtsev

On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly  wrote:

> are the credentials visible from each Worker node to all the Executor JVMs
> on each Worker?
>
> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev <
> kudryavtsev.konstan...@gmail.com> wrote:
>
> Dear Spark community,
>
> I faced the following issue with trying accessing data on S3a, my code is
> the following:
>
> val sparkConf = new SparkConf()
>
> val sc = new SparkContext(sparkConf)
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>
> val sqlContext = SQLContext.getOrCreate(sc)
>
> val df = sqlContext.read.parquet(...)
>
> df.count
>
>
> It results in the following exception and log messages:
>
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from BasicAWSCredentialsProvider: *Access key or secret key is 
> null*
> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
> metadata service at URL: 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30  
> 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials from 
> InstanceProfileCredentialsProvider: The requested metadata is not found at 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30  
> 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
> com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
> provider in the chain
>   at 
> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
>
>
> I run standalone spark 1.5.2 and using hadoop 2.7.1
>
> any ideas/workarounds?
>
> AWS credentials are correct for this bucket
>
> Thank you,
> Konstantin Kudryavtsev
>
>


Re: Problem with WINDOW functions?

2015-12-30 Thread Vadim Tkachenko
Davies,

Thank you, I will wait on 1.6 release.

http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tt25833.html
?

On Wed, Dec 30, 2015 at 12:06 AM, Davies Liu  wrote:

> Window functions are improved in 1.6 release, could you try 1.6-RC4
> (or wait until next week for the final release)?
>
> Even In 1.6, the buffer of rows for window function does not support
> spilling (also does not use memory efficiently), there is a JIRA for
> it: https://issues.apache.org/jira/browse/SPARK-12295
>
> On Tue, Dec 29, 2015 at 5:28 PM, vadimtk  wrote:
> > Hi,
> >
> > I can't successfully execute a query with WINDOW function.
> >
> > The statements are following:
> >
> > val orcFile =
> >
> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'")
> > orcFile.registerTempTable("d1")
> >  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
> ORDER
> > BY pageviews DESC) as rank FROM d1").filter("rank <=
> > 20").sort($"day",$"rank").collect().foreach(println)
> >
> > with default
> > spark.driver.memory
> >
> > I am getting java.lang.OutOfMemoryError: Java heap space.
> > The same if I set spark.driver.memory=10g.
> >
> > When I set spark.driver.memory=45g (the box has 256GB of RAM) the
> execution
> > fails with a different error:
> >
> > 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no
> recent
> > heartbeats: 129324 ms exceeds timeout 12 ms
> >
> > And I see that GC takes a lot of time.
> >
> > What is a proper way to execute statements above?
> >
> > I see the similar problems reported
> >
> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception
> >
> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


SparkSQL integration issue with AWS S3a

2015-12-30 Thread KOSTIANTYN Kudriavtsev
Dear Spark community,

I faced the following issue with trying accessing data on S3a, my code is
the following:

val sparkConf = new SparkConf()

val sc = new SparkContext(sparkConf)
sc.hadoopConfiguration.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")

val sqlContext = SQLContext.getOrCreate(sc)

val df = sqlContext.read.parquet(...)

df.count


It results in the following exception and log messages:

15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load
credentials from BasicAWSCredentialsProvider: *Access key or secret
key is null*
15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance
metadata service at URL:
http://x.x.x.x/latest/meta-data/iam/security-credentials/
15/12/30 
17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials
from InstanceProfileCredentialsProvider: The requested metadata is not
found at http://x.x.x.x/latest/meta-data/iam/security-credentials/
15/12/30 
17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
com.amazonaws.AmazonClientException: Unable to load AWS credentials
from any provider in the chain
at 
com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at 
com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at 
com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)


I run standalone spark 1.5.2 and using hadoop 2.7.1

any ideas/workarounds?

AWS credentials are correct for this bucket

Thank you,
Konstantin Kudryavtsev


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Blaž Šnuderl
Try setting s3 credentials using keys specified here
https://github.com/Aloisius/hadoop-s3a/blob/master/README.md

Blaz
On Dec 30, 2015 6:48 PM, "KOSTIANTYN Kudriavtsev" <
kudryavtsev.konstan...@gmail.com> wrote:

> Dear Spark community,
>
> I faced the following issue with trying accessing data on S3a, my code is
> the following:
>
> val sparkConf = new SparkConf()
>
> val sc = new SparkContext(sparkConf)
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>
> val sqlContext = SQLContext.getOrCreate(sc)
>
> val df = sqlContext.read.parquet(...)
>
> df.count
>
>
> It results in the following exception and log messages:
>
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from BasicAWSCredentialsProvider: *Access key or secret key is 
> null*
> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
> metadata service at URL: 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30  
> 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials from 
> InstanceProfileCredentialsProvider: The requested metadata is not found at 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30  
> 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
> com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
> provider in the chain
>   at 
> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
>
>
> I run standalone spark 1.5.2 and using hadoop 2.7.1
>
> any ideas/workarounds?
>
> AWS credentials are correct for this bucket
>
> Thank you,
> Konstantin Kudryavtsev
>


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread KOSTIANTYN Kudriavtsev
Hi Blaz,

I did, the same result

Thank you,
Konstantin Kudryavtsev

On Wed, Dec 30, 2015 at 12:54 PM, Blaž Šnuderl  wrote:

> Try setting s3 credentials using keys specified here
> https://github.com/Aloisius/hadoop-s3a/blob/master/README.md
>
> Blaz
> On Dec 30, 2015 6:48 PM, "KOSTIANTYN Kudriavtsev" <
> kudryavtsev.konstan...@gmail.com> wrote:
>
>> Dear Spark community,
>>
>> I faced the following issue with trying accessing data on S3a, my code is
>> the following:
>>
>> val sparkConf = new SparkConf()
>>
>> val sc = new SparkContext(sparkConf)
>> sc.hadoopConfiguration.set("fs.s3a.impl", 
>> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
>> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>>
>> val sqlContext = SQLContext.getOrCreate(sc)
>>
>> val df = sqlContext.read.parquet(...)
>>
>> df.count
>>
>>
>> It results in the following exception and log messages:
>>
>> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
>> credentials from BasicAWSCredentialsProvider: *Access key or secret key is 
>> null*
>> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
>> metadata service at URL: 
>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>> 15/12/30  
>> 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials from 
>> InstanceProfileCredentialsProvider: The requested metadata is not found at 
>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>> 15/12/30  
>> 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
>> com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
>> provider in the chain
>>  at 
>> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>>  at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
>>
>>
>> I run standalone spark 1.5.2 and using hadoop 2.7.1
>>
>> any ideas/workarounds?
>>
>> AWS credentials are correct for this bucket
>>
>> Thank you,
>> Konstantin Kudryavtsev
>>
>


Re: Can't submit job to stand alone cluster

2015-12-30 Thread SparkUser

Sorry need to clarify:

When you say:

   /When the docs say //"If your application is launched through Spark
   submit, then the application jar is automatically distributed to all
   worker nodes,"//it is actually saying that your executors get their
   jars from the driver. This is true whether you're running in client
   mode or cluster mode./


Don't you mean the master, not the driver? I thought the whole point of 
confusion is that people expect the driver to distribute jars but they 
have to be visible to the master on the file system local to the master?


I see a lot of people tripped up by this and a nice mail from Greg Hill 
to the list cleared this up for me but now I am confused again. I am a 
couple days away from having a way to test this myself, so I am just "in 
theory" right now.


   On 12/29/2015 05:18 AM, Greg Hill wrote:

Yes, you have misunderstood, but so did I.  So the problem is that
--deploy-mode cluster runs the Driver on the cluster as well, and you
don't know which node it's going to run on, so every node needs
access to
the JAR.  spark-submit does not pass the JAR along to the Driver,
but the
Driver will pass it to the executors.  I ended up putting the JAR
in HDFS
and passing an hdfs:// path to spark-submit.  This is a subtle
difference
from Spark on YARN which does pass the JAR along to the Driver
automatically, and IMO should probably be fixed in spark-submit. 
It's

really confusing for newcomers.



Thanks,

Jim


On 12/29/2015 04:36 PM, Daniel Valdivia wrote:

That makes things more clear! Thanks

Issue resolved

Sent from my iPhone

On Dec 29, 2015, at 2:43 PM, Annabel Melongo 
> wrote:



Thanks Andrew for this awesome explanation *:) happy


On Tuesday, December 29, 2015 5:30 PM, Andrew Or 
> wrote:



Let me clarify a few things for everyone:

There are three *cluster managers*: standalone, YARN, and Mesos. Each 
cluster manager can run in two *deploy modes*, client or cluster. In 
client mode, the driver runs on the machine that submitted the 
application (the client). In cluster mode, the driver runs on one of 
the worker machines in the cluster.


When I say "standalone cluster mode" I am referring to the standalone 
cluster manager running in cluster deploy mode.


Here's how the resources are distributed in each mode (omitting Mesos):

*Standalone / YARN client mode. *The driver runs on the client
machine (i.e. machine that ran Spark submit) so it should already
have access to the jars. The executors then pull the jars from an
HTTP server started in the driver.

*Standalone cluster mode. *Spark submit does /not/ upload your
jars to the cluster, so all the resources you need must already
be on all of the worker machines. The executors, however,
actually just pull the jars from the driver as in client mode
instead of finding it in their own local file systems.

*YARN cluster mode. *Spark submit /does/ upload your jars to the
cluster. In particular, it puts the jars in HDFS so your driver
can just read from there. As in other deployments, the executors
pull the jars from the driver.


When the docs say "If your application is launched through Spark 
submit, then the application jar is automatically distributed to all 
worker nodes," it is actually saying that your executors get their 
jars from the driver. This is true whether you're running in client 
mode or cluster mode.


If the docs are unclear (and they seem to be), then we should update 
them. I have filed SPARK-12565 
 to track this.


Please let me know if there's anything else I can help clarify.

Cheers,
-Andrew




2015-12-29 13:07 GMT-08:00 Annabel Melongo >:


Andrew,

Now I see where the confusion lays. Standalone cluster mode, your
link, is nothing but a combination of client-mode and standalone
mode, my link, without YARN.

But I'm confused by this paragraph in your link:

If your application is launched through Spark submit, then the
application jar is automatically distributed to all worker nodes.
For any additional jars that your
application depends on, you should specify them through
the |--jars| flag using comma as a delimiter (e.g. |--jars
jar1,jar2|).

That can't be true; this is only the case when Spark runs on top
of YARN. Please correct me, if I'm wrong.

Thanks


On Tuesday, December 29, 2015 2:54 PM, Andrew Or
> wrote:



http://spark.apache.org/docs/latest/spark-standalone.html#launching-spark-applications

2015-12-29 11:48 GMT-08:00 Annabel Melongo
>:

  

Re: Can't submit job to stand alone cluster

2015-12-30 Thread Andrew Or
Hi Jim,

Just to clarify further:

   - *Driver *is the process with SparkContext. A driver represents an
   application (e.g. spark-shell, SparkPi) so there is exactly one driver in
   each application.


   - *Executor *is the process that runs the tasks scheduled by the driver.
   There should be at least one executor in each application.


   - *Master *is the process that handles scheduling of *applications*. It
   decides where drivers and executors are launched and how many cores and how
   much memory to give to each application. This only exists in standalone
   mode.


   - *Worker *is the process that actually launches the executor and driver
   JVMs (the latter only in cluster mode). It talks to the Master to decide
   what to launch with how much memory to give to the process. This only
   exists in standalone mode.

It is actually the *driver*, not the Master, that distributes jars to
executors. The Master is largely unconcerned with individual requirements
from an application apart from cores / memory constraints. This is because
we still need to distribute jars to executors in YARN and Mesos modes, so
the common process, the driver, has to do it.

I thought the whole point of confusion is that people expect the driver to
> distribute jars but they have to be visible to the master on the file
> system local to the master?


Actually the requirement is that the jars have to be visible to the machine
running the *driver*, not the Master. In client mode, your jars have to be
visible to the machine running spark-submit. In cluster mode, your jars
have to be visible to all machines running a Worker, since the driver can
be launched on any of them.

The nice email from Greg is spot-on.

Does that make sense?

-Andrew


2015-12-30 11:23 GMT-08:00 SparkUser :

> Sorry need to clarify:
>
> When you say:
>
> *When the docs say **"If your application is launched through Spark
> submit, then the application jar is automatically distributed to all worker
> nodes,"**it is actually saying that your executors get their jars from
> the driver. This is true whether you're running in client mode or cluster
> mode.*
>
>
> Don't you mean the master, not the driver? I thought the whole point of
> confusion is that people expect the driver to distribute jars but they have
> to be visible to the master on the file system local to the master?
>
> I see a lot of people tripped up by this and a nice mail from Greg Hill to
> the list cleared this up for me but now I am confused again. I am a couple
> days away from having a way to test this myself, so I am just "in theory"
> right now.
>
> On 12/29/2015 05:18 AM, Greg Hill wrote:
>
> Yes, you have misunderstood, but so did I.  So the problem is that
> --deploy-mode cluster runs the Driver on the cluster as well, and you
> don't know which node it's going to run on, so every node needs access to
> the JAR.  spark-submit does not pass the JAR along to the Driver, but the
> Driver will pass it to the executors.  I ended up putting the JAR in HDFS
> and passing an hdfs:// path to spark-submit.  This is a subtle difference
> from Spark on YARN which does pass the JAR along to the Driver
> automatically, and IMO should probably be fixed in spark-submit.  It's
> really confusing for newcomers.
>
>
> Thanks,
>
> Jim
>
>
> On 12/29/2015 04:36 PM, Daniel Valdivia wrote:
>
> That makes things more clear! Thanks
>
> Issue resolved
>
> Sent from my iPhone
>
> On Dec 29, 2015, at 2:43 PM, Annabel Melongo < 
> melongo_anna...@yahoo.com> wrote:
>
> Thanks Andrew for this awesome explanation [image: *:) happy]
>
>
> On Tuesday, December 29, 2015 5:30 PM, Andrew Or < 
> and...@databricks.com> wrote:
>
>
> Let me clarify a few things for everyone:
>
> There are three *cluster managers*: standalone, YARN, and Mesos. Each
> cluster manager can run in two *deploy modes*, client or cluster. In
> client mode, the driver runs on the machine that submitted the application
> (the client). In cluster mode, the driver runs on one of the worker
> machines in the cluster.
>
> When I say "standalone cluster mode" I am referring to the standalone
> cluster manager running in cluster deploy mode.
>
> Here's how the resources are distributed in each mode (omitting Mesos):
>
> *Standalone / YARN client mode. *The driver runs on the client machine
> (i.e. machine that ran Spark submit) so it should already have access to
> the jars. The executors then pull the jars from an HTTP server started in
> the driver.
>
> *Standalone cluster mode. *Spark submit does *not* upload your jars to
> the cluster, so all the resources you need must already be on all of the
> worker machines. The executors, however, actually just pull the jars from
> the driver as in client mode instead of finding it in their own local file
> systems.
>
> *YARN cluster mode. *Spark submit *does* upload your jars to the cluster.
> In particular, it puts the 

Re: Problem with WINDOW functions?

2015-12-30 Thread Vadim Tkachenko
Gokula,

Thanks,
I will try this.
I am just SQL kind of guy :), but I will try your suggestion

Thanks,
Vadim


On Wed, Dec 30, 2015 at 1:07 PM, Gokula Krishnan D 
wrote:

> Hello Vadim -
>
> Alternatively, you can achieve by using the *window functions* which is
> available from 1.4.0
>
> *code_value.txt (Input)*
> =
> 1000,200,Descr-200,01
> 1000,200,Descr-200-new,02
> 1000,201,Descr-201,01
> 1000,202,Descr-202-new,03
> 1000,202,Descr-202,01
> 1000,202,Descr-202-old,02
>
> *Expected Output(DataFrame):*
> ==
> 1000,200,Descr-200-new,02
> 1000,201,Descr-201,01
> 1000,202,Descr-202-new,03
>
> ==
> *Code (Spark-Shell)*
> import org.apache.spark.{SparkContext,SparkConf}
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions
>
> val sqlSC = new org.apache.spark.sql.hive.HiveContext(sc)
> import sqlSC.implicits._
>
> case class data(batch_id:Int,code:String,descr:String,seq:Int)
>
> val input_RDD = sc.textFile("Data/Projects/Spark/Input/code_Value.txt")
>
> val data_RDD =
> input_RDD.map(line=>line.split(",")).map(x=>data(x(0).toInt,x(1),x(2),x(3).toInt))
> val data_DF  = data_RDD.toDF()
> val winSpec =
> Window.partitionBy(data_DF("code")).orderBy(data_DF("seq").desc)
> {data_DF.select($"batch_id",$"code",$"descr",$"seq",
> rowNumber.over(winSpec).alias("rn"))
>.filter($"rn"<=1)
>   .select($"batch_id",$"code",$"descr",$"seq")
>   .show}
>
> ==
>
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>
> On Wed, Dec 30, 2015 at 11:35 AM, Vadim Tkachenko 
> wrote:
>
>> Davies,
>>
>> Thank you, I will wait on 1.6 release.
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tt25833.html
>> ?
>>
>> On Wed, Dec 30, 2015 at 12:06 AM, Davies Liu 
>> wrote:
>>
>>> Window functions are improved in 1.6 release, could you try 1.6-RC4
>>> (or wait until next week for the final release)?
>>>
>>> Even In 1.6, the buffer of rows for window function does not support
>>> spilling (also does not use memory efficiently), there is a JIRA for
>>> it: https://issues.apache.org/jira/browse/SPARK-12295
>>>
>>> On Tue, Dec 29, 2015 at 5:28 PM, vadimtk  wrote:
>>> > Hi,
>>> >
>>> > I can't successfully execute a query with WINDOW function.
>>> >
>>> > The statements are following:
>>> >
>>> > val orcFile =
>>> >
>>> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'")
>>> > orcFile.registerTempTable("d1")
>>> >  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
>>> ORDER
>>> > BY pageviews DESC) as rank FROM d1").filter("rank <=
>>> > 20").sort($"day",$"rank").collect().foreach(println)
>>> >
>>> > with default
>>> > spark.driver.memory
>>> >
>>> > I am getting java.lang.OutOfMemoryError: Java heap space.
>>> > The same if I set spark.driver.memory=10g.
>>> >
>>> > When I set spark.driver.memory=45g (the box has 256GB of RAM) the
>>> execution
>>> > fails with a different error:
>>> >
>>> > 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no
>>> recent
>>> > heartbeats: 129324 ms exceeds timeout 12 ms
>>> >
>>> > And I see that GC takes a lot of time.
>>> >
>>> > What is a proper way to execute statements above?
>>> >
>>> > I see the similar problems reported
>>> >
>>> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception
>>> >
>>> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>
>>
>


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Chris Fregly
couple things:

1) switch to IAM roles if at all possible - explicitly passing AWS
credentials is a long and lonely road in the end

2) one really bad workaround/hack is to run a job that hits every worker
and writes the credentials to the proper location (~/.awscredentials or
whatever)

^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle
autoscaling, but i'm mentioning it anyway as it is a temporary fix.

if you switch to IAM roles, things become a lot easier as you can authorize
all of the EC2 instances in the cluster - and handles autoscaling very well
- and at some point, you will want to autoscale.

On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> Chris,
>
>  good question, as you can see from the code I set up them on driver, so I
> expect they will be propagated to all nodes, won't them?
>
> Thank you,
> Konstantin Kudryavtsev
>
> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly  wrote:
>
>> are the credentials visible from each Worker node to all the Executor
>> JVMs on each Worker?
>>
>> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev <
>> kudryavtsev.konstan...@gmail.com> wrote:
>>
>> Dear Spark community,
>>
>> I faced the following issue with trying accessing data on S3a, my code is
>> the following:
>>
>> val sparkConf = new SparkConf()
>>
>> val sc = new SparkContext(sparkConf)
>> sc.hadoopConfiguration.set("fs.s3a.impl", 
>> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
>> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>>
>> val sqlContext = SQLContext.getOrCreate(sc)
>>
>> val df = sqlContext.read.parquet(...)
>>
>> df.count
>>
>>
>> It results in the following exception and log messages:
>>
>> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
>> credentials from BasicAWSCredentialsProvider: *Access key or secret key is 
>> null*
>> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
>> metadata service at URL: 
>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>> 15/12/30  
>> 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials from 
>> InstanceProfileCredentialsProvider: The requested metadata is not found at 
>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>> 15/12/30  
>> 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
>> com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
>> provider in the chain
>>  at 
>> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>>  at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
>>
>>
>> I run standalone spark 1.5.2 and using hadoop 2.7.1
>>
>> any ideas/workarounds?
>>
>> AWS credentials are correct for this bucket
>>
>> Thank you,
>> Konstantin Kudryavtsev
>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread KOSTIANTYN Kudriavtsev
Chris,

thanks for the hist with AIM roles, but in my case  I need to run different
jobs with different S3 permissions on the same cluster, so this approach
doesn't work for me as far as I understood it

Thank you,
Konstantin Kudryavtsev

On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly  wrote:

> couple things:
>
> 1) switch to IAM roles if at all possible - explicitly passing AWS
> credentials is a long and lonely road in the end
>
> 2) one really bad workaround/hack is to run a job that hits every worker
> and writes the credentials to the proper location (~/.awscredentials or
> whatever)
>
> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle
> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>
> if you switch to IAM roles, things become a lot easier as you can
> authorize all of the EC2 instances in the cluster - and handles autoscaling
> very well - and at some point, you will want to autoscale.
>
> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev <
> kudryavtsev.konstan...@gmail.com> wrote:
>
>> Chris,
>>
>>  good question, as you can see from the code I set up them on driver, so
>> I expect they will be propagated to all nodes, won't them?
>>
>> Thank you,
>> Konstantin Kudryavtsev
>>
>> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly  wrote:
>>
>>> are the credentials visible from each Worker node to all the Executor
>>> JVMs on each Worker?
>>>
>>> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev <
>>> kudryavtsev.konstan...@gmail.com> wrote:
>>>
>>> Dear Spark community,
>>>
>>> I faced the following issue with trying accessing data on S3a, my code
>>> is the following:
>>>
>>> val sparkConf = new SparkConf()
>>>
>>> val sc = new SparkContext(sparkConf)
>>> sc.hadoopConfiguration.set("fs.s3a.impl", 
>>> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>>> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
>>> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>>>
>>> val sqlContext = SQLContext.getOrCreate(sc)
>>>
>>> val df = sqlContext.read.parquet(...)
>>>
>>> df.count
>>>
>>>
>>> It results in the following exception and log messages:
>>>
>>> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
>>> credentials from BasicAWSCredentialsProvider: *Access key or secret key is 
>>> null*
>>> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
>>> metadata service at URL: 
>>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>>> 15/12/30 
>>>  
>>> 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials from 
>>> InstanceProfileCredentialsProvider: The requested metadata is not found at 
>>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>>> 15/12/30 
>>>  
>>> 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
>>> com.amazonaws.AmazonClientException: Unable to load AWS credentials from 
>>> any provider in the chain
>>> at 
>>> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>>> at 
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>>> at 
>>> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>>> at 
>>> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>>> at 
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
>>>
>>>
>>> I run standalone spark 1.5.2 and using hadoop 2.7.1
>>>
>>> any ideas/workarounds?
>>>
>>> AWS credentials are correct for this bucket
>>>
>>> Thank you,
>>> Konstantin Kudryavtsev
>>>
>>>
>>
>
>
> --
>
> *Chris Fregly*
> Principal Data Solutions Engineer
> IBM Spark Technology Center, San Francisco, CA
> http://spark.tc | http://advancedspark.com
>


2 of 20,675 Spark Streaming : Out put frequency different from read frequency in StatefulNetworkWordCount

2015-12-30 Thread Soumitra Johri
Hi, in the example :
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

the streaming frequency is 1 seconds however I do not want to print the
contents of the word-counts every minute and resent the word counts again
back to 0 every minute. How can I do that ?

I have to print per minute work counts with streaming frequency of 1
second. I though of using scala schedulers but then there can be
concurrency issues.

My algorithm is as follows :

   1. Read the words every 1 second
   2. Do cumulative work count for 60 seconds
   3. After the end of every 60 second (1 minute ) print the workcounts and
   resent the counters to zero.

Any help would be appreciated!

Thanks

Warm Regards


Re: How to ignore case in dataframe groupby?

2015-12-30 Thread Eran Witkon
Drop the original column and rename the new column
See df.drop & df.withcolimnrenamed
Eran
On Wed, 30 Dec 2015 at 19:08 raja kbv  wrote:

> Solutions from Eran & Yanbo are working well. Thank you.
>
> @Eran,
>
> Your solution worked with a small change.
> DF.withColumn("upper-code",upper(df("countrycode"))).
>
> This creates a new column "upper-code". Is there a way to update the
> column or create a new df with update column?
>
> Thanks,
> Raja
>
> On Thursday, 24 December 2015 6:17 PM, Eran Witkon 
> wrote:
>
>
> Use DF.withColumn("upper-code",df("countrycode).toUpper))
> or just run a map function that does the same
>
> On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja 
> wrote:
>
> Hi,
> Values in a dataframe column named countrycode are in different cases. Eg:
> (US, us).  groupBy & count gives two rows but the requirement is to ignore
> case for this operation.
> 1) Is there a way to ignore case in groupBy? Or
> 2) Is there a way to update the dataframe column countrycode to uppercase?
>
> Thanks in advance.
>
> Regards,
> Raja
>
>
>
>


Re: Problem with WINDOW functions?

2015-12-30 Thread Gokula Krishnan D
Hello Vadim -

Alternatively, you can achieve by using the *window functions* which is
available from 1.4.0

*code_value.txt (Input)*
=
1000,200,Descr-200,01
1000,200,Descr-200-new,02
1000,201,Descr-201,01
1000,202,Descr-202-new,03
1000,202,Descr-202,01
1000,202,Descr-202-old,02

*Expected Output(DataFrame):*
==
1000,200,Descr-200-new,02
1000,201,Descr-201,01
1000,202,Descr-202-new,03
==
*Code (Spark-Shell)*
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions

val sqlSC = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlSC.implicits._

case class data(batch_id:Int,code:String,descr:String,seq:Int)

val input_RDD = sc.textFile("Data/Projects/Spark/Input/code_Value.txt")

val data_RDD =
input_RDD.map(line=>line.split(",")).map(x=>data(x(0).toInt,x(1),x(2),x(3).toInt))
val data_DF  = data_RDD.toDF()
val winSpec =
Window.partitionBy(data_DF("code")).orderBy(data_DF("seq").desc)
{data_DF.select($"batch_id",$"code",$"descr",$"seq",
rowNumber.over(winSpec).alias("rn"))
   .filter($"rn"<=1)
  .select($"batch_id",$"code",$"descr",$"seq")
  .show}
==



Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Wed, Dec 30, 2015 at 11:35 AM, Vadim Tkachenko 
wrote:

> Davies,
>
> Thank you, I will wait on 1.6 release.
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tt25833.html
> ?
>
> On Wed, Dec 30, 2015 at 12:06 AM, Davies Liu 
> wrote:
>
>> Window functions are improved in 1.6 release, could you try 1.6-RC4
>> (or wait until next week for the final release)?
>>
>> Even In 1.6, the buffer of rows for window function does not support
>> spilling (also does not use memory efficiently), there is a JIRA for
>> it: https://issues.apache.org/jira/browse/SPARK-12295
>>
>> On Tue, Dec 29, 2015 at 5:28 PM, vadimtk  wrote:
>> > Hi,
>> >
>> > I can't successfully execute a query with WINDOW function.
>> >
>> > The statements are following:
>> >
>> > val orcFile =
>> >
>> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'")
>> > orcFile.registerTempTable("d1")
>> >  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
>> ORDER
>> > BY pageviews DESC) as rank FROM d1").filter("rank <=
>> > 20").sort($"day",$"rank").collect().foreach(println)
>> >
>> > with default
>> > spark.driver.memory
>> >
>> > I am getting java.lang.OutOfMemoryError: Java heap space.
>> > The same if I set spark.driver.memory=10g.
>> >
>> > When I set spark.driver.memory=45g (the box has 256GB of RAM) the
>> execution
>> > fails with a different error:
>> >
>> > 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no
>> recent
>> > heartbeats: 129324 ms exceeds timeout 12 ms
>> >
>> > And I see that GC takes a lot of time.
>> >
>> > What is a proper way to execute statements above?
>> >
>> > I see the similar problems reported
>> >
>> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception
>> >
>> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


Re: How to register a Tuple3 with KryoSerializer?

2015-12-30 Thread Shixiong(Ryan) Zhu
You can use "_", e.g.,
sparkConf.registerKryoClasses(Array(classOf[scala.Tuple3[_, _, _]]))

Best Regards,
Shixiong(Ryan) Zhu

Software Engineer

Databricks Inc.

shixi...@databricks.com

databricks.com



On Wed, Dec 30, 2015 at 10:16 AM, Russ  wrote:

> I need to register with KryoSerializer a Tuple3 that is generated by a
> call to the sortBy() method that eventually calls collect() from
> Partitioner.RangePartitioner.sketch().
>
> The IntelliJ Idea debugger indicates that the for the Tuple3 are
> java.lang.Integer, java.lang.Integer and long[].  So, the question is, how
> should I specify the long[] type?
>
> I have tried the following from my Scala code:
>
> sparkConf.registerKryoClasses(Array(classOf[scala.Tuple3[java.lang.Integer,
> java.lang.Integer, Array[java.lang.Long]]]))
>
> However, that approach throws the following exception which indicates that
> I have failed to register the Tuple3 correctly:
>
> java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]
>
> Can anyone suggest the correct way to register this Tuple3?  I suppose
> that I could create register the tuple from a Java method but it would be
> nice to avoid having to introduce any Java into my code.
>
> Thanks.
>


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Jerry Lam
Hi Kostiantyn,

Can you define those properties in hdfs-site.xml and make sure it is visible in 
the class path when you spark-submit? It looks like a conf sourcing issue to 
me. 

Cheers,

Sent from my iPhone

> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Chris,
> 
> thanks for the hist with AIM roles, but in my case  I need to run different 
> jobs with different S3 permissions on the same cluster, so this approach 
> doesn't work for me as far as I understood it
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly  wrote:
>> couple things:
>> 
>> 1) switch to IAM roles if at all possible - explicitly passing AWS 
>> credentials is a long and lonely road in the end
>> 
>> 2) one really bad workaround/hack is to run a job that hits every worker and 
>> writes the credentials to the proper location (~/.awscredentials or whatever)
>> 
>> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle 
>> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>> 
>> if you switch to IAM roles, things become a lot easier as you can authorize 
>> all of the EC2 instances in the cluster - and handles autoscaling very well 
>> - and at some point, you will want to autoscale.
>> 
>>> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev 
>>>  wrote:
>>> Chris,
>>> 
>>>  good question, as you can see from the code I set up them on driver, so I 
>>> expect they will be propagated to all nodes, won't them?
>>> 
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> 
 On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly  wrote:
 are the credentials visible from each Worker node to all the Executor JVMs 
 on each Worker?
 
> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Dear Spark community,
> 
> I faced the following issue with trying accessing data on S3a, my code is 
> the following:
> 
> val sparkConf = new SparkConf()
> 
> val sc = new SparkContext(sparkConf)
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
> val sqlContext = SQLContext.getOrCreate(sc)
> val df = sqlContext.read.parquet(...)
> df.count
> 
> It results in the following exception and log messages:
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from BasicAWSCredentialsProvider: Access key or secret key is 
> null
> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
> metadata service at URL: 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from InstanceProfileCredentialsProvider: The requested 
> metadata is not found at 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 
> 3)
> com.amazonaws.AmazonClientException: Unable to load AWS credentials from 
> any provider in the chain
>   at 
> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
> 
> I run standalone spark 1.5.2 and using hadoop 2.7.1
> 
> any ideas/workarounds?
> 
> AWS credentials are correct for this bucket
> 
> Thank you,
> Konstantin Kudryavtsev
>> 
>> 
>> 
>> -- 
>> 
>> Chris Fregly
>> Principal Data Solutions Engineer
>> IBM Spark Technology Center, San Francisco, CA
>> http://spark.tc | http://advancedspark.com
> 


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread KOSTIANTYN Kudriavtsev
Hi Jerry,

I want to run different jobs on different S3 buckets - different AWS creds
- on the same instances. Could you shed some light if it's possible to
achieve with hdfs-site?

Thank you,
Konstantin Kudryavtsev

On Wed, Dec 30, 2015 at 2:10 PM, Jerry Lam  wrote:

> Hi Kostiantyn,
>
> Can you define those properties in hdfs-site.xml and make sure it is
> visible in the class path when you spark-submit? It looks like a conf
> sourcing issue to me.
>
> Cheers,
>
> Sent from my iPhone
>
> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev <
> kudryavtsev.konstan...@gmail.com> wrote:
>
> Chris,
>
> thanks for the hist with AIM roles, but in my case  I need to run
> different jobs with different S3 permissions on the same cluster, so this
> approach doesn't work for me as far as I understood it
>
> Thank you,
> Konstantin Kudryavtsev
>
> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly  wrote:
>
>> couple things:
>>
>> 1) switch to IAM roles if at all possible - explicitly passing AWS
>> credentials is a long and lonely road in the end
>>
>> 2) one really bad workaround/hack is to run a job that hits every worker
>> and writes the credentials to the proper location (~/.awscredentials or
>> whatever)
>>
>> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle
>> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>>
>> if you switch to IAM roles, things become a lot easier as you can
>> authorize all of the EC2 instances in the cluster - and handles autoscaling
>> very well - and at some point, you will want to autoscale.
>>
>> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev <
>> kudryavtsev.konstan...@gmail.com> wrote:
>>
>>> Chris,
>>>
>>>  good question, as you can see from the code I set up them on driver, so
>>> I expect they will be propagated to all nodes, won't them?
>>>
>>> Thank you,
>>> Konstantin Kudryavtsev
>>>
>>> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly  wrote:
>>>
 are the credentials visible from each Worker node to all the Executor
 JVMs on each Worker?

 On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev <
 kudryavtsev.konstan...@gmail.com> wrote:

 Dear Spark community,

 I faced the following issue with trying accessing data on S3a, my code
 is the following:

 val sparkConf = new SparkConf()

 val sc = new SparkContext(sparkConf)
 sc.hadoopConfiguration.set("fs.s3a.impl", 
 "org.apache.hadoop.fs.s3a.S3AFileSystem")
 sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
 sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")

 val sqlContext = SQLContext.getOrCreate(sc)

 val df = sqlContext.read.parquet(...)

 df.count


 It results in the following exception and log messages:

 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
 credentials from BasicAWSCredentialsProvider: *Access key or secret key is 
 null*
 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
 metadata service at URL: 
 http://x.x.x.x/latest/meta-data/iam/security-credentials/
 15/12/30 
  
 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials 
 from InstanceProfileCredentialsProvider: The requested metadata is not 
 found at http://x.x.x.x/latest/meta-data/iam/security-credentials/
 15/12/30 
  
 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
 com.amazonaws.AmazonClientException: Unable to load AWS credentials from 
 any provider in the chain
at 
 com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at 
 com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at 
 com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at 
 com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at 
 org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)


 I run standalone spark 1.5.2 and using hadoop 2.7.1

 any ideas/workarounds?

 AWS credentials are correct for this bucket

 Thank you,
 Konstantin Kudryavtsev


>>>
>>
>>
>> --
>>
>> *Chris Fregly*
>> Principal Data Solutions Engineer
>> IBM Spark Technology Center, San Francisco, CA
>> http://spark.tc | http://advancedspark.com
>>
>
>


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Jerry Lam
Hi Kostiantyn,

I want to confirm that it works first by using hdfs-site.xml. If yes, you could 
define different spark-{user-x}.conf and source them during spark-submit. let 
us know if hdfs-site.xml works first. It should.

Best Regards,

Jerry

Sent from my iPhone

> On 30 Dec, 2015, at 2:31 pm, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Hi Jerry,
> 
> I want to run different jobs on different S3 buckets - different AWS creds - 
> on the same instances. Could you shed some light if it's possible to achieve 
> with hdfs-site?
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 2:10 PM, Jerry Lam  wrote:
>> Hi Kostiantyn,
>> 
>> Can you define those properties in hdfs-site.xml and make sure it is visible 
>> in the class path when you spark-submit? It looks like a conf sourcing issue 
>> to me. 
>> 
>> Cheers,
>> 
>> Sent from my iPhone
>> 
>>> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>>>  wrote:
>>> 
>>> Chris,
>>> 
>>> thanks for the hist with AIM roles, but in my case  I need to run different 
>>> jobs with different S3 permissions on the same cluster, so this approach 
>>> doesn't work for me as far as I understood it
>>> 
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> 
 On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly  wrote:
 couple things:
 
 1) switch to IAM roles if at all possible - explicitly passing AWS 
 credentials is a long and lonely road in the end
 
 2) one really bad workaround/hack is to run a job that hits every worker 
 and writes the credentials to the proper location (~/.awscredentials or 
 whatever)
 
 ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle 
 autoscaling, but i'm mentioning it anyway as it is a temporary fix.
 
 if you switch to IAM roles, things become a lot easier as you can 
 authorize all of the EC2 instances in the cluster - and handles 
 autoscaling very well - and at some point, you will want to autoscale.
 
> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev 
>  wrote:
> Chris,
> 
>  good question, as you can see from the code I set up them on driver, so 
> I expect they will be propagated to all nodes, won't them?
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly  wrote:
>> are the credentials visible from each Worker node to all the Executor 
>> JVMs on each Worker?
>> 
>>> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev 
>>>  wrote:
>>> 
>>> Dear Spark community,
>>> 
>>> I faced the following issue with trying accessing data on S3a, my code 
>>> is the following:
>>> 
>>> val sparkConf = new SparkConf()
>>> 
>>> val sc = new SparkContext(sparkConf)
>>> sc.hadoopConfiguration.set("fs.s3a.impl", 
>>> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>>> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
>>> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>>> val sqlContext = SQLContext.getOrCreate(sc)
>>> val df = sqlContext.read.parquet(...)
>>> df.count
>>> 
>>> It results in the following exception and log messages:
>>> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
>>> credentials from BasicAWSCredentialsProvider: Access key or secret key 
>>> is null
>>> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
>>> metadata service at URL: 
>>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>>> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
>>> credentials from InstanceProfileCredentialsProvider: The requested 
>>> metadata is not found at 
>>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>>> 15/12/30 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 
>>> (TID 3)
>>> com.amazonaws.AmazonClientException: Unable to load AWS credentials 
>>> from any provider in the chain
>>> at 
>>> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>>> at 
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>>> at 
>>> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>>> at 
>>> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>>> at 
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
>>> 
>>> I run standalone spark 1.5.2 and using hadoop 2.7.1
>>> 
>>> any ideas/workarounds?
>>> 
>>> AWS credentials are correct for this bucket
>>> 
>>> Thank you,
>>> Konstantin 

Re: 2 of 20,675 Spark Streaming : Out put frequency different from read frequency in StatefulNetworkWordCount

2015-12-30 Thread Shixiong(Ryan) Zhu
You can use "reduceByKeyAndWindow", e.g.,

val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((x: Int,
y: Int) => x + y, Seconds(60), Seconds(60))
wordCounts.print()


On Wed, Dec 30, 2015 at 12:00 PM, Soumitra Johri <
soumitra.siddha...@gmail.com> wrote:

> Hi, in the example :
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
>
> the streaming frequency is 1 seconds however I do not want to print the
> contents of the word-counts every minute and resent the word counts again
> back to 0 every minute. How can I do that ?
>
> I have to print per minute work counts with streaming frequency of 1
> second. I though of using scala schedulers but then there can be
> concurrency issues.
>
> My algorithm is as follows :
>
>1. Read the words every 1 second
>2. Do cumulative work count for 60 seconds
>3. After the end of every 60 second (1 minute ) print the workcounts
>and resent the counters to zero.
>
> Any help would be appreciated!
>
> Thanks
>
> Warm Regards
>


Re: Using Experminal Spark Features

2015-12-30 Thread Chris Fregly
A lot of folks are using the new Kafka Direct Stream API in production.

And a lot of folks who used the old Kafka Receiver-based API are migrating
over.

The usual downside to "Experimental" features in Spark is that the API
might change, so you'll need to rewrite some code.

Stability-wise, the Spark codebase has a TON of tests around every new
feature - including experimental features.

>From experience, the Kafka Direct Stream API is very stable and a lot of
momentum is behind this implementation.

Check out my *pipeline* Github project to see this impl fully configured
and working within a Docker instance:

https://github.com/fluxcapacitor/pipeline/wiki

Here's a link to the kafka, kafka-rest-api, and kafka schema registry
configuration:  https://github.com/fluxcapacitor/pipeline/tree/master/config

And here's a link to a sample app that uses the Kafka Direct API and stores
data in Cassandra:

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/store/Cassandra.scala

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/start-streaming-ratings-cassandra.sh

Here's a link to the Docker image which contains the installation scripts
for Confluent's Kafka Distribution:

https://github.com/fluxcapacitor/pipeline/blob/master/Dockerfile

All code is in Scala.

On Wed, Dec 30, 2015 at 11:26 AM, David Newberger <
david.newber...@wandcorp.com> wrote:

> Hi All,
>
>
>
> I’ve been looking at the Direct Approach for streaming Kafka integration (
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html)
> because it looks like a good fit for our use cases. My concern is the
> feature is experimental according to the documentation. Has anyone used
> this approach yet and if so what has you experience been with using it? If
> it helps we’d be looking to implement it using Scala. Secondly, in general
> what has people experience been with using experimental features in Spark?
>
>
>
> Cheers,
>
>
>
> David Newberger
>
>
>



-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: How to ignore case in dataframe groupby?

2015-12-30 Thread raja kbv
Solutions from Eran & Yanbo are working well. Thank you. 
@Eran,
Your solution worked with a small change. 
DF.withColumn("upper-code",upper(df("countrycode"))). 

This creates a new column "upper-code". Is there a way to update the column or 
create a new df with update column? 

Thanks,Raja

On Thursday, 24 December 2015 6:17 PM, Eran Witkon  
wrote:
 

 Use DF.withColumn("upper-code",df("countrycode).toUpper))or just run a map 
function that does the same
On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja  wrote:

Hi,
Values in a dataframe column named countrycode are in different cases. Eg: (US, 
us).  groupBy & count gives two rows but the requirement is to ignore case for 
this operation.
1) Is there a way to ignore case in groupBy? Or
2) Is there a way to update the dataframe column countrycode to uppercase?

Thanks in advance.

Regards,
Raja


  

Working offline with spark-core and sbt

2015-12-30 Thread Ashic Mahtab
Hello,I'm trying to work offline with spark-core. I've got an empty project 
with the following:
name := "sbtSand"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
"joda-time" % "joda-time" % "2.9.1",
"org.apache.spark" %% "spark-core" % "1.5.2"
)
I can "sbt compile" this. But if I go offline, and "sbt clean", then "sbt 
compile", then it fails. If I remove the spark-core dependency (but leave 
joda-time in), then "sbt compile" succeeds, and the package from the ivy2 cache 
is used. I also added scalatest, and that works offline (assuming the package 
is in ~/.ivy2/cache). However, I can't find a way to work offline with 
spark-core. Is there a simple way to get this working?

Crossposted here from 
http://stackoverflow.com/questions/34537886/sbt-ivy-offline-work-and-weirdness 
as I'm wondering if somebody working with Spark has found a solution.
Thanks,Ashic. 

Re: [SparkSQL][Parquet] Read from nested parquet data

2015-12-30 Thread lin
Hi yanbo, thanks for the quick response.

Looks like we'll need to do some work-around.
But before that, we'd like to dig into some related discussions first. We've
looked through the following urls, but none seems helpful.

Mailing list threads:
http://search-hadoop.com/m/q3RTtLkgZl1K4oyx/v=threaded
JIRA:
Parquet pushdown for unionAll,
https://issues.apache.org/jira/browse/SPARK-3462
Spark SQL reads unneccesary nested fields from Parquet,
https://issues.apache.org/jira/browse/SPARK-4502
Parquet Predicate Pushdown Does Not Work with Nested Structures,
https://issues.apache.org/jira/browse/SPARK-5151

Any recommended threads, JIRA, or PR, please? Thanks. :)


On Wed, Dec 30, 2015 at 6:21 PM, Yanbo Liang  wrote:

> This problem has been discussed long before, but I think there is no
> straight forward way to read only col_g.
>
> 2015-12-30 17:48 GMT+08:00 lin :
>
>> Hi all,
>>
>> We are trying to read from nested parquet data. SQL is "select
>> col_b.col_d.col_g from some_table" and the data schema for some_table is:
>>
>> root
>>  |-- col_a: long (nullable = false)
>>  |-- col_b: struct (nullable = true)
>>  ||-- col_c: string (nullable = true)
>>  ||-- col_d: array (nullable = true)
>>  |||-- element: struct (containsNull = true)
>>  ||||-- col_e: integer (nullable = true)
>>  ||||-- col_f: string (nullable = true)
>>  ||||-- col_g: long (nullable = true)
>>
>> We expect to see only col_g are read and parsed from the parquet files;
>> however, we acually observed the whole col_b being read and parsed.
>>
>> As we dig in a little bit, seems that col_g is a GetArrayStructFields,
>> col_d is a GetStructField, and only col_b is an AttributeReference, so
>> PhysicalOperation.collectProjectsAndFilters() returns col_b instead of
>> col_g as projections.
>>
>> So we wonder, is there any way to read and parse only col_g instead of
>> the whole col_b? We use Spark 1.5.1 and Parquet 1.7.0.
>>
>> Thanks! :)
>>
>
>


RE: Working offline with spark-core and sbt

2015-12-30 Thread Ashic Mahtab
To answer my own question, it appears certain tihngs (like parents, etc.) 
caused the issue. I was using sbt 0.13.8. Using 0.13.9 works fine.

From: as...@live.com
To: user@spark.apache.org
Subject: Working offline with spark-core and sbt
Date: Thu, 31 Dec 2015 02:07:26 +




Hello,I'm trying to work offline with spark-core. I've got an empty project 
with the following:
name := "sbtSand"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
"joda-time" % "joda-time" % "2.9.1",
"org.apache.spark" %% "spark-core" % "1.5.2"
)
I can "sbt compile" this. But if I go offline, and "sbt clean", then "sbt 
compile", then it fails. If I remove the spark-core dependency (but leave 
joda-time in), then "sbt compile" succeeds, and the package from the ivy2 cache 
is used. I also added scalatest, and that works offline (assuming the package 
is in ~/.ivy2/cache). However, I can't find a way to work offline with 
spark-core. Is there a simple way to get this working?

Crossposted here from 
http://stackoverflow.com/questions/34537886/sbt-ivy-offline-work-and-weirdness 
as I'm wondering if somebody working with Spark has found a solution.
Thanks,Ashic.   
  

Error while starting Zeppelin Service in HDP2.3.2

2015-12-30 Thread Divya Gehlot
Hi,
I am getting following error while starting the Zeppelin service from
ambari server .

/var/lib/ambari-agent/data/errors-2408.txt

Traceback (most recent call last):
  File 
"/var/lib/ambari-agent/cache/stacks/HDP/2.3/services/ZEPPELIN/package/scripts/master.py",
line 295, in 
Master().execute()
  File 
"/usr/lib/python2.6/site-packages/resource_management/libraries/script/script.py",
line 216, in execute
method(env)
  File 
"/var/lib/ambari-agent/cache/stacks/HDP/2.3/services/ZEPPELIN/package/scripts/master.py",
line 230, in start
Execute (params.zeppelin_dir+'/bin/zeppelin-daemon.sh start >> ' +
params.zeppelin_log_file, user=params.zeppelin_user)
  File "/usr/lib/python2.6/site-packages/resource_management/core/base.py",
line 154, in __init__
self.env.run()
  File 
"/usr/lib/python2.6/site-packages/resource_management/core/environment.py",
line 152, in run
self.run_action(resource, action)
  File 
"/usr/lib/python2.6/site-packages/resource_management/core/environment.py",
line 118, in run_action
provider_action()
  File 
"/usr/lib/python2.6/site-packages/resource_management/core/providers/system.py",
line 260, in action_run
tries=self.resource.tries, try_sleep=self.resource.try_sleep)
  File "/usr/lib/python2.6/site-packages/resource_management/core/shell.py",
line 70, in inner
result = function(command, **kwargs)
  File "/usr/lib/python2.6/site-packages/resource_management/core/shell.py",
line 92, in checked_call
tries=tries, try_sleep=try_sleep)
  File "/usr/lib/python2.6/site-packages/resource_management/core/shell.py",
line 140, in _call_wrapper
result = _call(command, **kwargs_copy)
  File "/usr/lib/python2.6/site-packages/resource_management/core/shell.py",
line 290, in _call
err_msg = Logger.filter_text(("Execution of '%s' returned %d. %s")
% (command_alias, code, all_output))
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position
31: ordinal not in range(128)

stdout:   /var/lib/ambari-agent/data/output-2408.txt

2015-12-31 02:01:20,438 - Group['hadoop'] {}
2015-12-31 02:01:20,439 - Group['users'] {}
2015-12-31 02:01:20,439 - Group['zeppelin'] {}
2015-12-31 02:01:20,439 - Group['knox'] {}
2015-12-31 02:01:20,439 - Group['spark'] {}
2015-12-31 02:01:20,440 - User['hive'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,440 - User['oozie'] {'gid': 'hadoop', 'groups': [u'users']}
2015-12-31 02:01:20,441 - User['zeppelin'] {'gid': 'hadoop', 'groups':
[u'hadoop']}
2015-12-31 02:01:20,441 - User['ambari-qa'] {'gid': 'hadoop',
'groups': [u'users']}
2015-12-31 02:01:20,442 - User['flume'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,442 - User['hdfs'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,443 - User['knox'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,443 - User['spark'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,444 - User['mapred'] {'gid': 'hadoop', 'groups':
[u'hadoop']}
2015-12-31 02:01:20,444 - User['tez'] {'gid': 'hadoop', 'groups': [u'users']}
2015-12-31 02:01:20,445 - User['zookeeper'] {'gid': 'hadoop',
'groups': [u'hadoop']}
2015-12-31 02:01:20,445 - User['sqoop'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,446 - User['yarn'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,446 - User['hcat'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,447 - User['ams'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,447 -
File['/var/lib/ambari-agent/tmp/changeUid.sh'] {'content':
StaticFile('changeToSecureUid.sh'), 'mode': 0555}
2015-12-31 02:01:20,448 -
Execute['/var/lib/ambari-agent/tmp/changeUid.sh ambari-qa
/tmp/hadoop-ambari-qa,/tmp/hsperfdata_ambari-qa,/home/ambari-qa,/tmp/ambari-qa,/tmp/sqoop-ambari-qa']
{'not_if': '(test $(id -u ambari-qa) -gt 1000) || (false)'}
2015-12-31 02:01:20,452 - Skipping
Execute['/var/lib/ambari-agent/tmp/changeUid.sh ambari-qa
/tmp/hadoop-ambari-qa,/tmp/hsperfdata_ambari-qa,/home/ambari-qa,/tmp/ambari-qa,/tmp/sqoop-ambari-qa']
due to not_if
2015-12-31 02:01:20,453 - Group['hdfs'] {'ignore_failures': False}
2015-12-31 02:01:20,453 - User['hdfs'] {'ignore_failures': False,
'groups': [u'hadoop', u'hdfs']}
2015-12-31 02:01:20,453 - Directory['/etc/hadoop'] {'mode': 0755}
2015-12-31 02:01:20,465 -
File['/usr/hdp/current/hadoop-client/conf/hadoop-env.sh'] {'content':
InlineTemplate(...), 'owner': 'hdfs', 'group': 'hadoop'}
2015-12-31 02:01:20,466 -
Directory['/var/lib/ambari-agent/tmp/hadoop_java_io_tmpdir'] {'owner':
'hdfs', 'group': 'hadoop', 'mode': 0777}
2015-12-31 02:01:20,474 - Execute[('setenforce', '0')] {'not_if': '(!
which getenforce ) || (which getenforce && getenforce | grep -q
Disabled)', 'sudo': True, 'only_if': 'test -f /selinux/enforce'}
2015-12-31 02:01:20,482 - Skipping Execute[('setenforce', '0')] due to only_if
2015-12-31 02:01:20,482 - Directory['/var/log/hadoop'] {'owner':
'root', 'mode': 0775, 'group': 'hadoop', 'recursive': True,
'cd_access': 'a'}

Error:scalac: Error: assertion failed: List(object package$DebugNode, object package$DebugNode)

2015-12-30 Thread zml张明磊
Hello,

Recently, I build spark from apache/master and getting the following error. 
From stackoverflow 
http://stackoverflow.com/questions/24165184/scalac-assertion-failed-while-run-scalatest-in-idea,
 I can not find Preferences > Scala he said in Intellij IDEA. And SBT is not 
worked for me in our company. Use maven instead. How can I fix and work around 
it ? Last : happy new year to everyone.

Error:scalac: Error: assertion failed: List(object package$DebugNode, object 
package$DebugNode)
  java.lang.AssertionError: assertion failed: List(object 
package$DebugNode, object package$DebugNode)
   at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678)
   at 
scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988)
   at 
scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991)
   at 
scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371)
   at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120)
   at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583)
   at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557)
   at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553)
   at scala.tools.nsc.Global$Run.compile(Global.scala:1662)
   at xsbt.CachedCompiler0.run(CompilerInterface.scala:126)


thanks,
Minglei.


Re: [SparkSQL][Parquet] Read from nested parquet data

2015-12-30 Thread Yanbo Liang
This problem has been discussed long before, but I think there is no
straight forward way to read only col_g.

2015-12-30 17:48 GMT+08:00 lin :

> Hi all,
>
> We are trying to read from nested parquet data. SQL is "select
> col_b.col_d.col_g from some_table" and the data schema for some_table is:
>
> root
>  |-- col_a: long (nullable = false)
>  |-- col_b: struct (nullable = true)
>  ||-- col_c: string (nullable = true)
>  ||-- col_d: array (nullable = true)
>  |||-- element: struct (containsNull = true)
>  ||||-- col_e: integer (nullable = true)
>  ||||-- col_f: string (nullable = true)
>  ||||-- col_g: long (nullable = true)
>
> We expect to see only col_g are read and parsed from the parquet files;
> however, we acually observed the whole col_b being read and parsed.
>
> As we dig in a little bit, seems that col_g is a GetArrayStructFields,
> col_d is a GetStructField, and only col_b is an AttributeReference, so
> PhysicalOperation.collectProjectsAndFilters() returns col_b instead of
> col_g as projections.
>
> So we wonder, is there any way to read and parse only col_g instead of the
> whole col_b? We use Spark 1.5.1 and Parquet 1.7.0.
>
> Thanks! :)
>


Re: SparkSQL Hive orc snappy table

2015-12-30 Thread Dawid Wysakowicz
Didn't anyone used spark with orc and snappy compression?

2015-12-29 18:25 GMT+01:00 Dawid Wysakowicz :

> Hi,
>
> I have a table in hive stored as orc with compression = snappy. I try to
> execute a query on that table that fails (previously I run it on table in
> orc-zlib format and parquet so it is not the matter of query).
>
> I managed to execute this query with hive on tez on that tables.
>
> The exception i get is as follows:
>
> 15/12/29 17:16:46 WARN scheduler.DAGScheduler: Creating new stage failed
> due to exception - job: 3
> java.lang.RuntimeException: serious problem
> at
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
> at
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.getPartitions(MapPartitionsWithPreparationRDD.scala:40)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at org.apache.spark.ShuffleDependency.(Dependency.scala:82)
> at
> org.apache.spark.sql.execution.ShuffledRowRDD.getDependencies(ShuffledRowRDD.scala:59)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:226)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:224)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.dependencies(RDD.scala:224)
> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:388)
> at
> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:405)
> at
> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:370)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:253)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:354)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:351)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:351)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:363)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:266)
> at
> org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:300)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:734)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1466)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IndexOutOfBoundsException: Index: 0
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1016)
> ... 48 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 0
> at java.util.Collections$EmptyList.get(Collections.java:4454)
> at
> org.apache.hadoop.hive.ql.io.orc.OrcProto$Type.getSubtypes(OrcProto.java:12240)
> at
> 

Re: Executor deregistered after 2mins (mesos, 1.6.0-rc4)

2015-12-30 Thread Adrian Bridgett
I've worked around this by setting 
spark.shuffle.io.connectionTimeout=3600s, uploading the spark tarball to 
HDFS again and restarting the shuffle service (not 100% sure that last 
step is needed).


I attempted (with my newbie Scala skills) to allow 
ExternalShuffleClient() to accept an optional closeIdleConnections 
parameter (defaulting to "true") so that the MesosExternalShuffleClient 
can set this to "false".  I then passsed this into the TransportContext 
call.  However this didn't seem to work (maybe it's using the config 
from HDFS not the local spark (which I thought the Driver used).


Anyhow I'll do more testing and then raise a JIRA.

Adrian
--
*Adrian Bridgett* |  Sysadmin Engineer, OpenSignal 


_
Office: First Floor, Scriptor Court, 155-157 Farringdon Road, 
Clerkenwell, London, EC1R 3AD

Phone #: +44 777-377-8251
Skype: abridgett  |@adrianbridgett | 
LinkedIn link 

_


Re: Executor deregistered after 2mins (mesos, 1.6.0-rc4)

2015-12-30 Thread Adrian Bridgett

Hi Ted,

sorry I should have been a bit more consistent in my cut and paste 
(there are nine nodes +driver) - I was concentrating on S9/6 (these logs 
are from that box - 10.1.201.165). S1/4 lines are:


15/12/29 18:49:45 INFO CoarseMesosSchedulerBackend: Registered executor 
NettyRpcEndpointRef(null) (ip-10-1-202-114.ec2.internal:19891) with ID 
f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4
15/12/29 18:49:45 INFO ExecutorAllocationManager: New executor 
f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4 has registered (new total is 6)
15/12/29 18:49:45 INFO BlockManagerMasterEndpoint: Registering block 
manager ip-10-1-202-114.ec2.internal:14257 with 13.8 GB RAM, 
BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4, 
ip-10-1-202-114.ec2.internal, 14257)
15/12/29 18:58:07 WARN TaskSetManager: Lost task 21.0 in stage 1.0 (TID 
2149, ip-10-1-200-232.ec2.internal): 
FetchFailed(BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4, 
ip-10-1-202-114.ec2.internal, 7337), shuffleId=1, mapId=5, reduceId=21, 
message=
org.apache.spark.shuffle.FetchFailedException: 
java.lang.RuntimeException: Executor is not registered 
(appId=a9344e17-f767-4b1e-a32e-e98922d6ca43-0014, 
execId=f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4)
at 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:183)


I've tried to see how I can increase the idle timeout of the 
mesosExternalShuffleClient.registerDriverWithShuffleService as thats 
seems to be the core issue.



On 29/12/2015 21:17, Ted Yu wrote:

Have you searched log for 'f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4' ?

In the snippet you posted, I don't see registration of this Executor.

Cheers

On Tue, Dec 29, 2015 at 12:43 PM, Adrian Bridgett 
> wrote:


We're seeing an "Executor is not registered" error on a Spark
(1.6.0rc4, mesos-0.26) cluster.  It seems as if the logic in
MesosExternalShuffleService.scala isn't working for some reason
(new in 1.6 I believe).

spark application sees this:
...
15/12/29 18:49:41 INFO MesosExternalShuffleClient: Successfully
registered app a9344e17-f767-4b1e-a32e-e98922d6ca43-0014 with
external shuffle service.
15/12/29 18:49:41 INFO MesosExternalShuffleClient: Successfully
registered app a9344e17-f767-4b1e-a32e-e98922d6ca43-0014 with
external shuffle service.
15/12/29 18:49:43 INFO CoarseMesosSchedulerBackend: Registered
executor NettyRpcEndpointRef(null)
(ip-10-1-201-165.ec2.internal:37660) with ID
f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6
15/12/29 18:49:43 INFO ExecutorAllocationManager: New executor
f02cb67a-3519-4655-b23a-edc0dd082bf1-S3/1 has registered (new
total is 1)
15/12/29 18:49:43 INFO BlockManagerMasterEndpoint: Registering
block manager ip-10-1-201-165.ec2.internal:53854 with 13.8 GB RAM,
BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6,
ip-10-1-201-165.ec2.internal, 53854)
15/12/29 18:49:43 INFO BlockManagerMasterEndpoint: Registering
block manager ip-10-1-201-132.ec2.internal:12793 with 13.8 GB RAM,
BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S3/1,
ip-10-1-201-132.ec2.internal, 12793)
15/12/29 18:49:43 INFO ExecutorAllocationManager: New executor
f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6 has registered (new
total is 2)
...
15/12/29 18:58:06 INFO BlockManagerInfo: Added broadcast_6_piece0
in memory on ip-10-1-201-165.ec2.internal:53854 (size: 5.2KB,
free: 13.8GB)
15/12/29 18:58:06 INFO MapOutputTrackerMasterEndpoint: Asked to
send map output locations for shuffle 1 to
ip-10-1-202-121.ec2.internal:59734
15/12/29 18:58:06 INFO MapOutputTrackerMaster: Size of output
statuses for shuffle 1 is 1671814 bytes
15/12/29 18:58:06 INFO MapOutputTrackerMasterEndpoint: Asked to
send map output locations for shuffle 1 to
ip-10-1-201-165.ec2.internal:37660
...
15/12/29 18:58:07 INFO TaskSetManager: Starting task 63.0 in stage
1.0 (TID 2191, ip-10-1-200-232.ec2.internal, partition
63,PROCESS_LOCAL, 2171 bytes)
15/12/29 18:58:07 WARN TaskSetManager: Lost task 21.0 in stage 1.0
(TID 2149, ip-10-1-200-232.ec2.internal):
FetchFailed(BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4,
ip-10-1-202-114.ec2.internal, 7337), shuffleId=1, mapId=5,
reduceId=21, message=
org.apache.spark.shuffle.FetchFailedException:
java.lang.RuntimeException: Executor is not registered
(appId=a9344e17-f767-4b1e-a32e-e98922d6ca43-0014,
execId=f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4)
at

org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:183)
...
15/12/29 18:58:07 INFO DAGScheduler: Resubmitting ShuffleMapStage
0 (reduceByKey at
/home/ubuntu/ajay/name-mapper/kpis/namemap_kpi_processor.py:48)
and ShuffleMapStage 1 (reduceByKey at

[SparkSQL][Parquet] Read from nested parquet data

2015-12-30 Thread lin
Hi all,

We are trying to read from nested parquet data. SQL is "select
col_b.col_d.col_g from some_table" and the data schema for some_table is:

root
 |-- col_a: long (nullable = false)
 |-- col_b: struct (nullable = true)
 ||-- col_c: string (nullable = true)
 ||-- col_d: array (nullable = true)
 |||-- element: struct (containsNull = true)
 ||||-- col_e: integer (nullable = true)
 ||||-- col_f: string (nullable = true)
 ||||-- col_g: long (nullable = true)

We expect to see only col_g are read and parsed from the parquet files;
however, we acually observed the whole col_b being read and parsed.

As we dig in a little bit, seems that col_g is a GetArrayStructFields,
col_d is a GetStructField, and only col_b is an AttributeReference, so
PhysicalOperation.collectProjectsAndFilters() returns col_b instead of
col_g as projections.

So we wonder, is there any way to read and parse only col_g instead of the
whole col_b? We use Spark 1.5.1 and Parquet 1.7.0.

Thanks! :)


Re: Problem with WINDOW functions?

2015-12-30 Thread Davies Liu
Window functions are improved in 1.6 release, could you try 1.6-RC4
(or wait until next week for the final release)?

Even In 1.6, the buffer of rows for window function does not support
spilling (also does not use memory efficiently), there is a JIRA for
it: https://issues.apache.org/jira/browse/SPARK-12295

On Tue, Dec 29, 2015 at 5:28 PM, vadimtk  wrote:
> Hi,
>
> I can't successfully execute a query with WINDOW function.
>
> The statements are following:
>
> val orcFile =
> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'")
> orcFile.registerTempTable("d1")
>  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day ORDER
> BY pageviews DESC) as rank FROM d1").filter("rank <=
> 20").sort($"day",$"rank").collect().foreach(println)
>
> with default
> spark.driver.memory
>
> I am getting java.lang.OutOfMemoryError: Java heap space.
> The same if I set spark.driver.memory=10g.
>
> When I set spark.driver.memory=45g (the box has 256GB of RAM) the execution
> fails with a different error:
>
> 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no recent
> heartbeats: 129324 ms exceeds timeout 12 ms
>
> And I see that GC takes a lot of time.
>
> What is a proper way to execute statements above?
>
> I see the similar problems reported
> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception
> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table
>
>
>
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark MLLib KMeans Performance on Amazon EC2 M3.2xlarge

2015-12-30 Thread Jia Zou
I am running Spark MLLib KMeans in one EC2 M3.2xlarge instance with 8 CPU
cores and 30GB memory. Executor memory is set to 15GB, and driver memory is
set to 15GB.

The observation is that, when input data size is smaller than 15GB, the
performance is quite stable. However, when input data becomes larger than
that, the performance will be extremely unpredictable. For example, for
15GB input, with inputRDD.persist(MEMORY_ONLY) , I've got three
dramatically different testing results: 27mins, 61mins and 114 mins. (All
settings are the same for the 3 tests, and I will create input data
immediately before running each of the tests to keep OS buffer cache hot.)

Anyone can help to explain this? Thanks very much!


Re: Executor deregistered after 2mins (mesos, 1.6.0-rc4)

2015-12-30 Thread Adrian Bridgett
To wrap this up, it's the shuffle manager sending the FIN so setting 
spark.shuffle.io.connectionTimeout to 3600s is the only workaround right 
now.  SPARK-12583 raised.


Adrian

--
*Adrian Bridgett*


Monitoring Spark HDFS Reads and Writes

2015-12-30 Thread alvarobrandon
Hello:

Is there anyway of monitoring the number of Bytes or blocks read and written
by an Spark application?. I'm running Spark with YARN and I want to measure
how I/O intensive a set of applications are. Closest thing I have seen is
the HDFS DataNode Logs in YARN but they don't seem to have Spark
applications specific reads and writes.

2015-12-21 18:29:15,347 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src:
/127.0.0.1:53805, dest: /127.0.0.1:50010, bytes: 72159, op: HDFS_WRITE,
cliID: DFSClient_NONMAPREDUCE_-1850086307_1, offset: 0, srvID:
a9edc8ad-fb09-4621-b469-76de587560c0, blockid:
BP-189543387-138.100.13.81-1450715936956:blk_1073741837_1013, duration:
2619119
hadoop-alvarobrandon-datanode-usuariop81.fi.upm.es.log:2015-12-21
18:29:15,429 INFO org.apache.hadoop.hdfs.server.d

Is there any trace about this kind of operations to be found in any log?

Thanks in advance
Best Regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Monitoring-Spark-HDFS-Reads-and-Writes-tp25838.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark job submission REST API

2015-12-30 Thread Fernando O.
One of the advantages of using spark-jobserver is that it lets you reuse
your contexts (create one context and run multiple jobs on it)

Since you can multiple jobs in one context, you can also share RDDs
(NamedRDD) between jobs ie: create a MLLib model and share it without the
need to persist it.
It is also useful if you want to run multiple SQL queries and you don't
need to create an SQLContext for every job.



On Thu, Dec 10, 2015 at 11:56 PM, manasdebashiskar 
wrote:

> We use ooyala job server. It is great. It has a great set of api's to
> cancel
> job. Create adhoc or persistent context etc.
> It has great support in remote deploy and tests too which helps faster
> coding.
>
> The current version is missing job progress bar but I could not find the
> same in the hidden spark api's either.
>
> In any case I think job server is better than the hidden api's because it
> is
> not hidden.
>
> ..Manas
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-submission-REST-API-tp25670p25674.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkSQL Hive orc snappy table

2015-12-30 Thread Chris Fregly
Reminder that Snappy is not a splittable format.

I've had success with Hive + LZF (splittable) and bzip2 (also splittable).

Gzip is also not splittable, so you won't be utilizing your cluster to
process this data in parallel as only 1 task can read and process
unsplittable data - versus many tasks spread across the cluster.

On Wed, Dec 30, 2015 at 6:45 AM, Dawid Wysakowicz <
wysakowicz.da...@gmail.com> wrote:

> Didn't anyone used spark with orc and snappy compression?
>
> 2015-12-29 18:25 GMT+01:00 Dawid Wysakowicz :
>
>> Hi,
>>
>> I have a table in hive stored as orc with compression = snappy. I try to
>> execute a query on that table that fails (previously I run it on table in
>> orc-zlib format and parquet so it is not the matter of query).
>>
>> I managed to execute this query with hive on tez on that tables.
>>
>> The exception i get is as follows:
>>
>> 15/12/29 17:16:46 WARN scheduler.DAGScheduler: Creating new stage failed
>> due to exception - job: 3
>> java.lang.RuntimeException: serious problem
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
>> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> at
>> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.getPartitions(MapPartitionsWithPreparationRDD.scala:40)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> at org.apache.spark.ShuffleDependency.(Dependency.scala:82)
>> at
>> org.apache.spark.sql.execution.ShuffledRowRDD.getDependencies(ShuffledRowRDD.scala:59)
>> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:226)
>> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:224)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.dependencies(RDD.scala:224)
>> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:388)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:405)
>> at
>> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:370)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:253)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:354)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:351)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:351)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:363)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:266)
>> at
>> org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:300)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:734)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1466)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.IndexOutOfBoundsException: Index: 0
>> at