Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-26 Thread Abhishek Anand
Any insights on this ?

On Fri, Feb 26, 2016 at 1:21 PM, Abhishek Anand 
wrote:

> On changing the default compression codec which is snappy to lzf the
> errors are gone !!
>
> How can I fix this using snappy as the codec ?
>
> Is there any downside of using lzf as snappy is the default codec that
> ships with spark.
>
>
> Thanks !!!
> Abhi
>
> On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand 
> wrote:
>
>> Hi ,
>>
>> I am getting the following exception on running my spark streaming job.
>>
>> The same job has been running fine since long and when I added two new
>> machines to my cluster I see the job failing with the following exception.
>>
>>
>>
>> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
>> (TID 22594)
>> java.io.IOException: java.lang.reflect.InvocationTargetException
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>> at
>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>> at
>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>> at org.apache.spark.broadcast.TorrentBroadcast.org
>> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>> ... 11 more
>> Caused by: java.lang.IllegalArgumentException
>> at
>> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
>> ... 20 more
>>
>>
>>
>> Thanks !!!
>> Abhi
>>
>
>


.cache() changes contents of RDD

2016-02-26 Thread Yan Yang
Hi

I am pretty new to Spark, and after experimentation on our pipelines. I ran
into this weird issue.

The Scala code is as below:

val input = sc.newAPIHadoopRDD(...)
val rdd = input.map(...)
rdd.cache()
rdd.saveAsTextFile(...)

I found rdd to consist of 80+K identical rows. To be more precise, the
number of rows is right, but all are identical.

The truly weird part is if I remove rdd.cache(), everything works just
fine. I have encountered this issue on a few occasions.

Thanks
Yan


RE: Standalone vs. Mesos for production installation on a smallish cluster

2016-02-26 Thread Mohammed Guller
I think you may be referring to Spark Survey 2015. According to that survey, 
48% use standalone, 40% use YARN and only 11% use Mesos (the numbers don’t add 
up to 100 – probably because of rounding error).

Mohammed
Author: Big Data Analytics with 
Spark

From: Igor Berman [mailto:igor.ber...@gmail.com]
Sent: Friday, February 26, 2016 3:52 AM
To: Petr Novak
Cc: user
Subject: Re: Standalone vs. Mesos for production installation on a smallish 
cluster

Imho most of production clusters are standalone
there was some presentation from spark summit with some stats inside(can't find 
right now), so standalone was at 1st place
it was from Matei
https://databricks.com/resources/slides

On 26 February 2016 at 13:40, Petr Novak 
> wrote:
Hi all,
I believe that it used to be in documentation that Standalone mode is not for 
production. I'm either wrong or it was already removed.

Having a small cluster between 5-10 nodes is Standalone recommended for 
production? I would like to go with Mesos but the question is if there is real 
add-on value for production, mainly from stability perspective.

Can I expect that adding Mesos will improve stability compared to Standalone to 
the extent to justify itself according to somewhat increased complexity?

I know it is hard to answer because Mesos layer itself is going to add some 
bugs as well.

Are there unique features enabled by Mesos specific to Spark? E.g. adaptive 
resources for jobs or whatever?

In the future once cluster will grow and more services running on Mesos, we 
plan to use Mesos. The question is if it does worth to go with it immediately 
even maybe its utility is not directly needed at this point.

Many thanks,
Petr



RE: Get all vertexes with outDegree equals to 0 with GraphX

2016-02-26 Thread Mohammed Guller
Here is another solution (minGraph is the graph from your code. I assume that 
is your original graph):

val graphWithNoOutEdges = minGraph.filter(
  graph => graph.outerJoinVertices(graph.outDegrees) {(vId, vData, 
outDegreesOpt) => outDegreesOpt.getOrElse(0)},
  vpred = (vId: VertexId, vOutDegrees: Int) => vOutDegrees == 0
)

val verticesWithNoOutEdges = graphWithNoOutEdges.vertices

Mohammed
Author: Big Data Analytics with 
Spark

From: Guillermo Ortiz [mailto:konstt2...@gmail.com]
Sent: Friday, February 26, 2016 5:46 AM
To: Robin East
Cc: user
Subject: Re: Get all vertexes with outDegree equals to 0 with GraphX

Yes, I am not really happy with that "collect".
I was taking a look to use subgraph method and others options and didn't figure 
out anything easy or direct..

I'm going to try your idea.

2016-02-26 14:16 GMT+01:00 Robin East 
>:
Whilst I can think of other ways to do it I don’t think they would be 
conceptually or syntactically any simpler. GraphX doesn’t have the concept of 
built-in vertex properties which would make this simpler - a vertex in GraphX 
is a Vertex ID (Long) and a bunch of custom attributes that you assign. This 
means you have to find a way of ‘pushing’ the vertex degree into the graph so 
you can do comparisons (cf a join in relational databases) or as you have done 
create a list and filter against that (cf filtering against a sub-query in 
relational database).

One thing I would point out is that you probably want to avoid 
finalVerexes.collect() for a large-scale system - this will pull all the 
vertices into the driver and then push them out to the executors again as part 
of the filter operation. A better strategy for large graphs would be:

1. build a graph based on the existing graph where the vertex attribute is the 
vertex degree - the GraphX documentation shows how to do this
2. filter this “degrees” graph to just give you 0 degree vertices
3 use graph.mask passing in the 0-degree graph to get the original graph with 
just 0 degree vertices

Just one variation on several possibilities, the key point is that everything 
is just a graph transformation until you call an action on the resulting graph
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action




On 26 Feb 2016, at 11:59, Guillermo Ortiz 
> wrote:

I'm new with graphX. I need to get the vertex without out edges..
I guess that it's pretty easy but I did it pretty complicated.. and inefficienct


val vertices: RDD[(VertexId, (List[String], List[String]))] =
  sc.parallelize(Array((1L, (List("a"), List[String]())),
(2L, (List("b"), List[String]())),
(3L, (List("c"), List[String]())),
(4L, (List("d"), List[String]())),
(5L, (List("e"), List[String]())),
(6L, (List("f"), List[String]()

// Create an RDD for edges
val relationships: RDD[Edge[Boolean]] =
  sc.parallelize(Array(Edge(1L, 2L, true), Edge(2L, 3L, true), Edge(3L, 4L, 
true), Edge(5L, 2L, true)))

val out = minGraph.outDegrees.map(vertex => vertex._1)

val finalVertexes = minGraph.vertices.keys.subtract(out)

//It must be something better than this way..
val nodes = finalVertexes.collect()
val result = minGraph.vertices.filter(v => nodes.contains(v._1))



What's the good way to do this operation? It seems that it should be pretty 
easy.




Configure Spark Resource on AWS CLI Not Working

2016-02-26 Thread Weiwei Zhang
Hi there,

I am trying to configure memory for spark using AWS CLI. However, I got the
following message:

*A client error (ValidationException) occurred when calling the RunJobFlow
operation: Cannot specify args for application 'Spark' when release label
is used.*

In the aws 'create-cluster' command, I have '--release-label emr-4.0.0
--applications Name=Hadoop
Name=Spark,Args=[-d,num-executors=4,spark.executor.memory=3000M,spark.driver.memory=4000M]'
and it seems like I cannot specify args when there is '--release-label'.
How do I get around this?

I also tried using a JSON configuration file saved in a S3 bucket and add
"--configurations http://path/bucket/config.json; to the command but it
gave me an 403 error (access denied). But when I did "aws s3 ls
(s3://bucket)" I could see that bucket and the config.json in the bucket.

Please advise. Thank you very much.

Best Regards,
Vivian


Re: PySpark : couldn't pickle object of type class T

2016-02-26 Thread Anoop Shiralige
Hi Jeff,

Thank you for looking into the post.

I had explored spark-avro option earlier. Since, we have union of multiple
complex data types in our avro schema we couldn't use it.
Couple of things I tried.

   -
   
https://stackoverflow.com/questions/31261376/how-to-read-pyspark-avro-file-and-extract-the-values
:
   "Spark Exception : Unions may only consist of concrete type and null"
   - Use of dataFrame/DataSet : serialization problem.

For now, I got it working by modifing AvroConversionUtils, to address the
union of multiple data-types.

Thanks,
AnoopShiralige


On Thu, Feb 25, 2016 at 7:25 AM, Jeff Zhang  wrote:

> Avro Record is not supported by pickler, you need to create a custom
> pickler for it.  But I don't think it worth to do that. Actually you can
> use package spark-avro to load avro data and then convert it to RDD if
> necessary.
>
> https://github.com/databricks/spark-avro
>
>
> On Thu, Feb 11, 2016 at 10:38 PM, Anoop Shiralige <
> anoop.shiral...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am working with Spark 1.6.0 and pySpark shell specifically. I have an
>> JavaRDD[org.apache.avro.GenericRecord] which I have converted to pythonRDD
>> in the following way.
>>
>> javaRDD = sc._jvm.java.package.loadJson("path to data", sc._jsc)
>> javaPython = sc._jvm.SerDe.javaToPython(javaRDD)
>> from pyspark.rdd import RDD
>> pythonRDD=RDD(javaPython,sc)
>>
>> pythonRDD.first()
>>
>> However everytime I am trying to call collect() or first() method on
>> pythonRDD I am getting the following error :
>>
>> 16/02/11 06:19:19 ERROR python.PythonRunner: Python worker exited
>> unexpectedly (crashed)
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File
>>
>> "/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/worker.py",
>> line 98, in main
>> command = pickleSer._read_with_length(infile)
>>   File
>>
>> "/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 156, in _read_with_length
>> length = read_int(stream)
>>   File
>>
>> "/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 545, in read_int
>> raise EOFError
>> EOFError
>>
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>> at
>>
>> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
>> at
>> org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>> at
>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: net.razorvine.pickle.PickleException: couldn't pickle object of
>> type class org.apache.avro.generic.GenericData$Record
>> at net.razorvine.pickle.Pickler.save(Pickler.java:142)
>> at
>> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:493)
>> at net.razorvine.pickle.Pickler.dispatch(Pickler.java:205)
>> at net.razorvine.pickle.Pickler.save(Pickler.java:137)
>> at net.razorvine.pickle.Pickler.dump(Pickler.java:107)
>> at net.razorvine.pickle.Pickler.dumps(Pickler.java:92)
>> at
>>
>> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
>> at
>>
>> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:110)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at
>>
>> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:110)
>> at
>>
>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
>> at
>>
>> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
>> at
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
>> at
>>
>> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
>>
>> Thanks for your time,
>> AnoopShiralige
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-couldn-t-pickle-object-of-type-class-T-tp26204.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For 

RE: Clarification on RDD

2016-02-26 Thread Mohammed Guller
HDFS, as the name implies, is a distributed file system. A file stored on HDFS 
is already distributed. So if you create an RDD from a HDFS file, the created 
RDD just points to the file partitions on different nodes.

You can read more about HDFS here.

http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html

Mohammed
Author: Big Data Analytics with 
Spark

From: Ashok Kumar [mailto:ashok34...@yahoo.com.INVALID]
Sent: Friday, February 26, 2016 9:41 AM
To: User
Subject: Clarification on RDD

Hi,

Spark doco says

Spark’s primary abstraction is a distributed collection of items called a 
Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop 
InputFormats (such as HDFS files) or by transforming other RDDs

example:

val textFile = sc.textFile("README.md")


my question is when RDD is created like above from a file stored on HDFS, does 
that mean that data is distributed among all the nodes in the cluster or data 
from the md file is copied to each node of the cluster so each node has 
complete copy of data? Has the data is actually moved around or data is not 
copied over until an action like COUNT() is performed on RDD?

Thanks



Re: s3 access through proxy

2016-02-26 Thread Gourav Sengupta
Hi,

why are you trying to access data in S3 via another network? Does that not
cause huge network overhead, and data transmissions losses (as data is
getting transferred over internet) and other inconsistencies?

Have you tried using AWS CLI? Using "aws s3 sync" command you can copy all
the files in a s3://bucket/ or s3://bucket/key/ to your local system. And
then you can point your spark cluster to the local data store and run the
queries.Of course that depends on the data volume as well.

Regards,
Gourav Sengupta

On Fri, Feb 26, 2016 at 7:29 PM, Joshua Buss  wrote:

> Hello,
>
> I'm trying to use spark with google cloud storage, but from a network where
> I cannot talk to the outside internet directly.  This means we have a proxy
> set up for all requests heading towards GCS.
>
> So far, I've had good luck with projects that talk to S3 through libraries
> like boto (for python) and the AWS SDK (for node.js), because these both
> appear compatible with what Google calls " interoperability mode
>   ".
>
> Spark, (or whatever it uses for s3 connectivity under the hood, maybe
> "JetS3t"?), on the other hand, doesn't appear to be compatible.
> Furthermore, I can't use hadoop Google Cloud Storage connector because it
> doesn't have any properties exposed for setting up a proxy host.
>
>  When I set the following core-xml values for the s3a connector, I get an
> AmazonS3Exception:
>
> Caused by: com.cloudera.com.amazonaws.services.s3.model.AmazonS3Exception:
> The provided security credentials are not valid. (Service: Amazon S3;
> Status
> Code: 403; Error Code: InvalidSecurity; Request ID: null), S3 Extended
> Request ID: null
>
> I know this isn't real xml, I just condensed it a bit for readability:
>   fs.s3a.access.key
>   
>   fs.s3a.secret.key
>   
>   fs.s3a.endpoint
>   https://storage.googleapis.com
>   fs.s3a.connection.ssl.enabled
>   True
>   fs.s3a.proxy.host
>   proxyhost
>   fs.s3a.proxy.port
>   12345
>
> I'd inspect the traffic manually to learn more, but it's all encrypted with
> SSL of course.  Any suggestions?
>
> I feel like I should also mention that since this is critical to get
> working
> asap, I'm also going down the route of using another local proxy like  this
> one    written in python
> because boto handles the interoperability mode correctly and is designed to
> "look like actual s3" to clients of it.
>
> Using this approach I can get a basic read to work with spark but this has
> numerous drawbacks, including being very fragile, not supporting writes,
> and
> not to mention I'm sure it'll be a huge performance bottleneck once we
> start
> trying to run larger workloads ...so... I'd like to get the native s3a/s3n
> connectors working if at all possible, but need some help.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/s3-access-through-proxy-tp26347.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


SparkML Using Pipeline API locally on driver

2016-02-26 Thread Eugene Morozov
Hi everyone.

I have a requirement to run prediction for random forest model locally on a
web-service without touching spark at all in some specific cases. I've
achieved that with previous mllib API (java 8 syntax):

public List> predictLocally(RandomForestModel
model, List data) {
return data.stream()
.map(point -> new Tuple2<>(model.predict(point.features()),
point.label()))
.collect(Collectors.toList());
}

So I have instance of trained model and can use it any way I want.
The question is whether it's possible to run this on the driver itself with
the following:
DataFrame predictions = model.transform(test);
because AFAIU test has to be a DataFrame, which means it's going to be run
on the cluster.

The use case to run it on driver is very small amount of data for
prediction - much faster to handle it this way, than using spark cluster.
Thank you.
--
Be well!
Jean Morozov


Re: Spark SQL support for sub-queries

2016-02-26 Thread Mich Talebzadeh
Ok let us try this

val d = HiveContext.table("test.dummy")
d.registerTempTable("tmp")
//Obtain boundary values
var minValue : Int = HiveContext.sql("SELECT minRow.id AS minValue FROM
(SELECT min(struct(id)) as minRow FROM tmp) AS
a").collect.apply(0).getInt(0)
var maxValue : Int = HiveContext.sql("SELECT maxRow.id AS maxValue FROM
(SELECT max(struct(id)) as maxRow FROM tmp) AS
b").collect.apply(0).getInt(0)
d.filter( col("id") === lit(minValue) || col("id") ===
lit(maxValue)).orderBy(col("id")).show


This works OK as well



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 26 February 2016 at 23:21, Yin Yang  wrote:

> I tried the following:
>
> scala> Seq((2, "a", "test"), (2, "b", "foo")).toDF("id", "a",
> "b").registerTempTable("test")
>
> scala> val df = sql("SELECT maxRow.* FROM (SELECT max(struct(id, b, a)) as
> maxRow FROM test) a")
> df: org.apache.spark.sql.DataFrame = [id: int, b: string ... 1 more field]
>
> scala> df.show
> +---++---+
> | id|   b|  a|
> +---++---+
> |  2|test|  a|
> +---++---+
>
> Looks like the sort order is governed by the order give in struct().
>
> Nice feature.
>
> On Fri, Feb 26, 2016 at 12:30 PM, Michael Armbrust  > wrote:
>
>> There will probably be some subquery support in 2.0.  That particular
>> query would be more efficient to express as an argmax however.  Here is
>> an example in Spark 1.6
>> 
>> .
>>
>> On Thu, Feb 25, 2016 at 2:58 PM, Mohammad Tariq 
>> wrote:
>>
>>> AFAIK, this isn't supported yet. A ticket
>>>  is in progress
>>> though.
>>>
>>>
>>>
>>> [image: http://]
>>>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>> 
>>>
>>>
>>> On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>


 Hi,



 I guess the following confirms that Spark does bot support sub-queries



 val d = HiveContext.table("test.dummy")

 d.registerTempTable("tmp")

 HiveContext.sql("select * from tmp where id IN (select max(id) from
 tmp)")

 It crashes

 The SQL works OK in Hive itself on the underlying table!

 select * from dummy where id IN (select max(id) from dummy);



 Thanks


 --

 Dr Mich Talebzadeh

 LinkedIn  
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com

 NOTE: The information in this email is proprietary and confidential. This 
 message is for the designated recipient only, if you are not the intended 
 recipient, you should destroy it immediately. Any information in this 
 message shall not be understood as given or endorsed by Cloud Technology 
 Partners Ltd, its subsidiaries or their employees, unless expressly so 
 stated. It is the responsibility of the recipient to ensure that this 
 email is virus free, therefore neither Cloud Technology partners Ltd, its 
 subsidiaries nor their employees accept any responsibility.



>>>
>>
>


Re: Spark SQL support for sub-queries

2016-02-26 Thread Yin Yang
I tried the following:

scala> Seq((2, "a", "test"), (2, "b", "foo")).toDF("id", "a",
"b").registerTempTable("test")

scala> val df = sql("SELECT maxRow.* FROM (SELECT max(struct(id, b, a)) as
maxRow FROM test) a")
df: org.apache.spark.sql.DataFrame = [id: int, b: string ... 1 more field]

scala> df.show
+---++---+
| id|   b|  a|
+---++---+
|  2|test|  a|
+---++---+

Looks like the sort order is governed by the order give in struct().

Nice feature.

On Fri, Feb 26, 2016 at 12:30 PM, Michael Armbrust 
wrote:

> There will probably be some subquery support in 2.0.  That particular
> query would be more efficient to express as an argmax however.  Here is
> an example in Spark 1.6
> 
> .
>
> On Thu, Feb 25, 2016 at 2:58 PM, Mohammad Tariq 
> wrote:
>
>> AFAIK, this isn't supported yet. A ticket
>>  is in progress though.
>>
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>> On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I guess the following confirms that Spark does bot support sub-queries
>>>
>>>
>>>
>>> val d = HiveContext.table("test.dummy")
>>>
>>> d.registerTempTable("tmp")
>>>
>>> HiveContext.sql("select * from tmp where id IN (select max(id) from
>>> tmp)")
>>>
>>> It crashes
>>>
>>> The SQL works OK in Hive itself on the underlying table!
>>>
>>> select * from dummy where id IN (select max(id) from dummy);
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>> --
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> http://talebzadehmich.wordpress.com
>>>
>>> NOTE: The information in this email is proprietary and confidential. This 
>>> message is for the designated recipient only, if you are not the intended 
>>> recipient, you should destroy it immediately. Any information in this 
>>> message shall not be understood as given or endorsed by Cloud Technology 
>>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>>> stated. It is the responsibility of the recipient to ensure that this email 
>>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>>> subsidiaries nor their employees accept any responsibility.
>>>
>>>
>>>
>>
>


Re: Spark SQL support for sub-queries

2016-02-26 Thread Michael Armbrust
There will probably be some subquery support in 2.0.  That particular query
would be more efficient to express as an argmax however.  Here is an
example in Spark 1.6

.

On Thu, Feb 25, 2016 at 2:58 PM, Mohammad Tariq  wrote:

> AFAIK, this isn't supported yet. A ticket
>  is in progress though.
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>>
>>
>> I guess the following confirms that Spark does bot support sub-queries
>>
>>
>>
>> val d = HiveContext.table("test.dummy")
>>
>> d.registerTempTable("tmp")
>>
>> HiveContext.sql("select * from tmp where id IN (select max(id) from tmp)")
>>
>> It crashes
>>
>> The SQL works OK in Hive itself on the underlying table!
>>
>> select * from dummy where id IN (select max(id) from dummy);
>>
>>
>>
>> Thanks
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>


Attempting to aggregate multiple values

2016-02-26 Thread Daniel Imberman
Hi all,

So over the past few days I've been attempting to create a function that
takes an RDD[U], and creates three MMaps. I've been attempting to aggregate
these values but I'm running into a major issue.

when I initially tried to use separate aggregators for each map, I noticed
a significant slowdown due to the fact that I was running three aggregates.
To battle this issue I created one aggregator that takes in all three
values as a tuple and acts someone

val zeroValue: (A, B, C) = ??? // (accum1.zero, accum2.zero, accum3.zero)
def seqOp(r: (A, B, C), t: T): (A, B, C) = r match {
  // (accum1.addAccumulator(a, t), ..., accum3..addAccumulator(c, t))
  case (a, b, c) =>  ??? }
def combOp(r1: (A, B, C), r2: (A, B, C)): (A, B, C) = (r1, r2) match {
  // (acc1.addInPlace(a1, a2), ..., acc3.addInPlace(c1, c2))
  case ((a1, b1, c1), (a2, b2, c2)) => ???}
val rdd: RDD[T] = ???
val accums: (A, B, C) = rdd.aggregate(zeroValue)(seqOp, combOp)


However, upon building this joint aggregator I've noticed an obscene amount
of garbage collection which is grinding my progress to a halt. My current
theory is that because I'm using a tuple of maps rather than individual
mutable maps that the system is creating way too many objects. Has anyone
run into a problem like this before? Does anyone have any suggestions for
aggregating multiple values without creating a new object eve


Re: DirectFileOutputCommiter

2016-02-26 Thread Alexander Pivovarov
DirectOutputCommitter doc says:
The FileOutputCommitter is required for HDFS + speculation, which allows
only one writer at
 a time for a file (so two people racing to write the same file would not
work). However, S3
 supports multiple writers outputting to the same file, where visibility is
guaranteed to be
 atomic. This is a monotonic operation: all writers should be writing the
same data, so which
 one wins is immaterial.

aws impl is better because it uses DirectFileOutputCommitter only for
s3n:// files
https://gist.github.com/apivovarov/bb215f08318318570567

But for some reason it does not work for me.

On Fri, Feb 26, 2016 at 11:50 AM, Reynold Xin  wrote:

> It could lose data in speculation mode, or if any job fails.
>
> On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman 
> wrote:
>
>> Takeshi, do you know the reason why they wanted to remove this commiter
>> in SPARK-10063?
>> the jira has no info inside
>> as far as I understand the direct committer can't be used when either of
>> two is true
>> 1. speculation mode
>> 2. append mode(ie. not creating new version of data but appending to
>> existing data)
>>
>> On 26 February 2016 at 08:24, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> Great work!
>>> What is the concrete performance gain of the committer on s3?
>>> I'd like to know.
>>>
>>> I think there is no direct committer for files because these kinds of
>>> committer has risks
>>> to loss data (See: SPARK-10063).
>>> Until this resolved, ISTM files cannot support direct commits.
>>>
>>> thanks,
>>>
>>>
>>>
>>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>>>
 yes, should be this one
 https://gist.github.com/aarondav/c513916e72101bbe14ec

 then need to set it in spark-defaults.conf :
 https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13

 Am Freitag, 26. Februar 2016 schrieb Yin Yang :
 > The header of DirectOutputCommitter.scala says Databricks.
 > Did you get it from Databricks ?
 > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
 >>
 >> interesting in this topic as well, why the DirectFileOutputCommitter
 not included?
 >> we added it in our fork,
 under 
 core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
 >> moreover, this DirectFileOutputCommitter is not working for the
 insert operations in HiveContext, since the Committer is called by hive
 (means uses dependencies in hive package)
 >> we made some hack to fix this, you can take a look:
 >>
 https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
 >>
 >> may bring some ideas to other spark contributors to find a better
 way to use s3.
 >>
 >> 2016-02-22 23:18 GMT+01:00 igor.berman :
 >>>
 >>> Hi,
 >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
 alikes
 >>> especially when working with s3?
 >>> I know that there is one impl in spark distro for parquet format,
 but not
 >>> for files -  why?
 >>>
 >>> Imho, it can bring huge performance boost.
 >>> Using default FileOutputCommiter with s3 has big overhead at commit
 stage
 >>> when all parts are copied one-by-one to destination dir from
 _temporary,
 >>> which is bottleneck when number of partitions is high.
 >>>
 >>> Also, wanted to know if there are some problems when using
 >>> DirectFileOutputCommitter?
 >>> If writing one partition directly will fail in the middle is spark
 will
 >>> notice this and will fail job(say after all retries)?
 >>>
 >>> thanks in advance
 >>>
 >>>
 >>>
 >>>
 >>> --
 >>> View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
 >>> Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 >>>
 >>>
 -
 >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 >>> For additional commands, e-mail: user-h...@spark.apache.org
 >>>
 >>
 >
 >

>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>


Re: d.filter("id in max(id)")

2016-02-26 Thread Michael Armbrust
You can do max on a struct to get the max value for the first column, along
with the values for other columns in the row (an argmax)

Here is an example

in
Spark 1.6.

On Thu, Feb 25, 2016 at 8:24 AM, Ashok Kumar 
wrote:

> Hi,
>
> How can I make that work?
>
> val d = HiveContext.table("table")
>
> select * from table where ID = MAX(ID) from table
>
> Thanks
>


Re: Spark SQL support for sub-queries

2016-02-26 Thread Mich Talebzadeh
Good stuff

I decided to do some boundary value analysis by getting records where the
ID (unique value) is IN (min() and max()

Unfortanely Hive SQL does not yet support more than one level of sub-query.
For example this operation is perfectly valid in Oracle

select * from dummy where id IN (select MIN(ID) from dummy) OR id IN
(select MAX(ID) from dummy);

It comes back with two rows

In Hive SQL you get the following error

select * from dummy where id IN (select MIN(ID) from dummy) OR id IN
(select MAX(ID) from dummy);
FAILED: SemanticException [Error 10249]: Line 1:66 Unsupported SubQuery
Expression 'id': Only 1 SubQuery expression is supported.

So the solution I found was to use UNION in Hive

SELECT * FROM dummy WHERE id IN (SELECT MIN(ID) FROM dummy)
UNION
SELECT * FROM dummy WHERE id IN (SELECT MAX(ID) FROM dummy)
ORDER BY id

It took 2 min, 6 sec to return two rows

In FP I decided to do this

val d = HiveContext.table("test.dummy")
//Obtain boundary values
val minValue: Int = d.agg(min(col("id"))).collect.apply(0).getInt(0)
val maxValue: Int = d.agg(max(col("id"))).collect.apply(0).getInt(0)
d.filter( col("id") === lit(minValue) || col("id") ===
lit(maxValue)).orderBy(col("id")).show

It returns the rows back in 1 min. Now I am not sure whether the FP code is
most optimised but still wins

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 26 February 2016 at 14:51, Yin Yang  wrote:

> Since collect is involved, the approach would be slower compared to the
> SQL Mich gave in his first email.
>
> On Fri, Feb 26, 2016 at 1:42 AM, Michał Zieliński <
> zielinski.mich...@gmail.com> wrote:
>
>> You need to collect the value.
>>
>> val m: Int = d.agg(max($"id")).collect.apply(0).getInt(0)
>> d.filter(col("id") === lit(m))
>>
>> On 26 February 2016 at 09:41, Mich Talebzadeh 
>> wrote:
>>
>>> Can this be done using DFs?
>>>
>>>
>>>
>>> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>> scala> val d = HiveContext.table("test.dummy")
>>> d: org.apache.spark.sql.DataFrame = [id: int, clustered: int, scattered:
>>> int, randomised: int, random_string: string, small_vc: string, padding:
>>> string]
>>>
>>> scala>  var m = d.agg(max($"id"))
>>> m: org.apache.spark.sql.DataFrame = [max(id): int]
>>>
>>> How can I join these two? In other words I want to get all rows with id
>>> = m here?
>>>
>>> d.filter($"id" = m)  ?
>>>
>>> Thanks
>>>
>>> On 25/02/2016 22:58, Mohammad Tariq wrote:
>>>
>>> AFAIK, this isn't supported yet. A ticket
>>>  is in progress
>>> though.
>>>
>>>
>>>
>>> [image: http://] 
>>>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>>
>>>
>>>
>>> On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>


 Hi,



 I guess the following confirms that Spark does bot support sub-queries



 val d = HiveContext.table("test.dummy")

 d.registerTempTable("tmp")

 HiveContext.sql("select * from tmp where id IN (select max(id) from
 tmp)")

 It crashes

 The SQL works OK in Hive itself on the underlying table!

 select * from dummy where id IN (select max(id) from dummy);



 Thanks

>>>
>>
>


Re: DirectFileOutputCommiter

2016-02-26 Thread Reynold Xin
It could lose data in speculation mode, or if any job fails.

On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman  wrote:

> Takeshi, do you know the reason why they wanted to remove this commiter in
> SPARK-10063?
> the jira has no info inside
> as far as I understand the direct committer can't be used when either of
> two is true
> 1. speculation mode
> 2. append mode(ie. not creating new version of data but appending to
> existing data)
>
> On 26 February 2016 at 08:24, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> Great work!
>> What is the concrete performance gain of the committer on s3?
>> I'd like to know.
>>
>> I think there is no direct committer for files because these kinds of
>> committer has risks
>> to loss data (See: SPARK-10063).
>> Until this resolved, ISTM files cannot support direct commits.
>>
>> thanks,
>>
>>
>>
>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>>
>>> yes, should be this one
>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>
>>> then need to set it in spark-defaults.conf :
>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>
>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>> > The header of DirectOutputCommitter.scala says Databricks.
>>> > Did you get it from Databricks ?
>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>>> >>
>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>> not included?
>>> >> we added it in our fork,
>>> under 
>>> core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>> insert operations in HiveContext, since the Committer is called by hive
>>> (means uses dependencies in hive package)
>>> >> we made some hack to fix this, you can take a look:
>>> >>
>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>> >>
>>> >> may bring some ideas to other spark contributors to find a better way
>>> to use s3.
>>> >>
>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>>> >>>
>>> >>> Hi,
>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>> alikes
>>> >>> especially when working with s3?
>>> >>> I know that there is one impl in spark distro for parquet format,
>>> but not
>>> >>> for files -  why?
>>> >>>
>>> >>> Imho, it can bring huge performance boost.
>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>> stage
>>> >>> when all parts are copied one-by-one to destination dir from
>>> _temporary,
>>> >>> which is bottleneck when number of partitions is high.
>>> >>>
>>> >>> Also, wanted to know if there are some problems when using
>>> >>> DirectFileOutputCommitter?
>>> >>> If writing one partition directly will fail in the middle is spark
>>> will
>>> >>> notice this and will fail job(say after all retries)?
>>> >>>
>>> >>> thanks in advance
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>> >>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >>>
>>> >>> -
>>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>>> >>>
>>> >>
>>> >
>>> >
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


s3 access through proxy

2016-02-26 Thread Joshua Buss
Hello,

I'm trying to use spark with google cloud storage, but from a network where
I cannot talk to the outside internet directly.  This means we have a proxy
set up for all requests heading towards GCS.

So far, I've had good luck with projects that talk to S3 through libraries
like boto (for python) and the AWS SDK (for node.js), because these both
appear compatible with what Google calls " interoperability mode
  ".

Spark, (or whatever it uses for s3 connectivity under the hood, maybe
"JetS3t"?), on the other hand, doesn't appear to be compatible. 
Furthermore, I can't use hadoop Google Cloud Storage connector because it
doesn't have any properties exposed for setting up a proxy host.

 When I set the following core-xml values for the s3a connector, I get an
AmazonS3Exception:

Caused by: com.cloudera.com.amazonaws.services.s3.model.AmazonS3Exception:
The provided security credentials are not valid. (Service: Amazon S3; Status
Code: 403; Error Code: InvalidSecurity; Request ID: null), S3 Extended
Request ID: null

I know this isn't real xml, I just condensed it a bit for readability:
  fs.s3a.access.key
  
  fs.s3a.secret.key
  
  fs.s3a.endpoint
  https://storage.googleapis.com
  fs.s3a.connection.ssl.enabled
  True
  fs.s3a.proxy.host
  proxyhost
  fs.s3a.proxy.port
  12345

I'd inspect the traffic manually to learn more, but it's all encrypted with
SSL of course.  Any suggestions?

I feel like I should also mention that since this is critical to get working
asap, I'm also going down the route of using another local proxy like  this
one    written in python
because boto handles the interoperability mode correctly and is designed to
"look like actual s3" to clients of it.

Using this approach I can get a basic read to work with spark but this has
numerous drawbacks, including being very fragile, not supporting writes, and
not to mention I'm sure it'll be a huge performance bottleneck once we start
trying to run larger workloads ...so... I'd like to get the native s3a/s3n
connectors working if at all possible, but need some help.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/s3-access-through-proxy-tp26347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5 on Mesos

2016-02-26 Thread Tim Chen
https://spark.apache.org/docs/latest/running-on-mesos.html should be the
best source, what problems were you running into?

Tim

On Fri, Feb 26, 2016 at 11:06 AM, Yin Yang  wrote:

> Have you read this ?
> https://spark.apache.org/docs/latest/running-on-mesos.html
>
> On Fri, Feb 26, 2016 at 11:03 AM, Ashish Soni 
> wrote:
>
>> Hi All ,
>>
>> Is there any proper documentation as how to run spark on mesos , I am
>> trying from the last few days and not able to make it work.
>>
>> Please help
>>
>> Ashish
>>
>
>


Spark 1.5 on Mesos

2016-02-26 Thread Ashish Soni
Hi All ,

Is there any proper documentation as how to run spark on mesos , I am
trying from the last few days and not able to make it work.

Please help

Ashish


Re: Spark 1.5 on Mesos

2016-02-26 Thread Yin Yang
Have you read this ?
https://spark.apache.org/docs/latest/running-on-mesos.html

On Fri, Feb 26, 2016 at 11:03 AM, Ashish Soni  wrote:

> Hi All ,
>
> Is there any proper documentation as how to run spark on mesos , I am
> trying from the last few days and not able to make it work.
>
> Please help
>
> Ashish
>


Re: Standalone vs. Mesos for production installation on a smallish cluster

2016-02-26 Thread Tim Chen
Mesos does provide some benefits and features, such as the ability to
launch all the Spark pieces in Docker and also Mesos resource scheduling
features (weights, roles), and if you plan to also use HDFS/Cassandra there
are existing frameworks that are actively maintained by us.

That said when there is just 5 nodes and you just want to use Spark without
any other frameworks and not to add complexity I would also suggest use
Standalone.

Tim

On Fri, Feb 26, 2016 at 3:51 AM, Igor Berman  wrote:

> Imho most of production clusters are standalone
> there was some presentation from spark summit with some stats inside(can't
> find right now), so standalone was at 1st place
> it was from Matei
> https://databricks.com/resources/slides
>
> On 26 February 2016 at 13:40, Petr Novak  wrote:
>
>> Hi all,
>> I believe that it used to be in documentation that Standalone mode is not
>> for production. I'm either wrong or it was already removed.
>>
>> Having a small cluster between 5-10 nodes is Standalone recommended for
>> production? I would like to go with Mesos but the question is if there is
>> real add-on value for production, mainly from stability perspective.
>>
>> Can I expect that adding Mesos will improve stability compared to
>> Standalone to the extent to justify itself according to somewhat increased
>> complexity?
>>
>> I know it is hard to answer because Mesos layer itself is going to add
>> some bugs as well.
>>
>> Are there unique features enabled by Mesos specific to Spark? E.g.
>> adaptive resources for jobs or whatever?
>>
>> In the future once cluster will grow and more services running on Mesos,
>> we plan to use Mesos. The question is if it does worth to go with it
>> immediately even maybe its utility is not directly needed at this point.
>>
>> Many thanks,
>> Petr
>>
>
>


Re: Saving and Loading Dataframes

2016-02-26 Thread Raj Kumar
Thanks for the response Yanbo. Here is the source (it uses the 
sample_libsvm_data.txt file used in the
mlliv examples).

-Raj
— IOTest.scala -

import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame

object IOTest {
  val InputFile = "/tmp/sample_libsvm_data.txt"
  val OutputDir ="/tmp/out"

  val sconf = new SparkConf().setAppName("test").setMaster("local[*]")
  val sqlc  = new SQLContext( new SparkContext( sconf ))
  val df = sqlc.read.format("libsvm").load( InputFile  )
  df.show; df.printSchema

  df.write.format("json").mode("overwrite").save( OutputDir )
  val data = sqlc.read.format("json").load( OutputDir )
  data.show; data.printSchema

  def main( args: Array[String]):Unit = {}
}


---

On Feb 26, 2016, at 12:47 AM, Yanbo Liang 
> wrote:

Hi Raj,

Could you share your code which can help others to diagnose this issue? Which 
version did you use?
I can not reproduce this problem in my environment.

Thanks
Yanbo

2016-02-26 10:49 GMT+08:00 raj.kumar 
>:
Hi,

I am using mllib. I use the ml vectorization tools to create the vectorized
input dataframe for
the ml/mllib machine-learning models with schema:

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

To avoid repeated vectorization, I am trying to save and load this dataframe
using
   df.write.format("json").mode("overwrite").save( url )
val data = Spark.sqlc.read.format("json").load( url )

However when I load the dataframe, the newly loaded dataframe has the
following schema:
root
 |-- features: struct (nullable = true)
 ||-- indices: array (nullable = true)
 |||-- element: long (containsNull = true)
 ||-- size: long (nullable = true)
 ||-- type: long (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 |-- label: double (nullable = true)

which the machine-learning models do not recognize.

Is there a way I can save and load this dataframe without the schema
changing.
I assume it has to do with the fact that Vector is not a basic type.

thanks
-Raj





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-and-Loading-Dataframes-tp26339.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org





Clarification on RDD

2016-02-26 Thread Ashok Kumar
 Hi,
Spark doco says
Spark’s primary abstraction is a distributed collection of items called a 
Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop 
InputFormats (such as HDFS files) or by transforming other RDDs
example:
val textFile = sc.textFile("README.md")

my question is when RDD is created like above from a file stored on HDFS, does 
that mean that data is distributed among all the nodes in the cluster or data 
from the md file is copied to each node of the cluster so each node has 
complete copy of data? Has the data is actually moved around or data is not 
copied over until an action like COUNT() is performed on RDD? 
Thanks


Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread yuhang.chenn
Thanks a lot.

发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月27日 上午1:02写道:Yes.On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn  wrote:Thanks a lot.
And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to consume all the messages in Kafka?

发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月25日 上午11:58写道:The per partition offsets are part of the rdd as defined on the driver. Have you readhttps://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.mdand/or watchedhttps://www.youtube.com/watch?v=fXnNEq1v3VAOn Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen  wrote:Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below:When a partition (worker node) fails in Spark Streaming,1. Is its computation passed to another partition, or just waits for the failed partition to restart? 2. How does the restarted partition know the offset range it should consume from Kafka? It should consume the some data as the before-failed one, right?




Re: Hbase in spark

2016-02-26 Thread Ted Yu
I know little about your use case.

Did you mean that your data is relatively evenly distributed in Spark
domain but showed skew in the bulk load phase ?

On Fri, Feb 26, 2016 at 9:02 AM, Renu Yadav  wrote:

> Hi Ted,
>
> Thanks for the reply. I am using spark hbase module only but the problem
> is when I do the bulk load it shows data skew and takes time to create the
> hfile.
> On 26 Feb 2016 10:25 p.m., "Ted Yu"  wrote:
>
>> In hbase, there is hbase-spark module which supports bulk load.
>> This module is to be backported in the upcoming 1.3.0 release.
>>
>> There is some pending work, such as HBASE-15271 .
>>
>> FYI
>>
>> On Fri, Feb 26, 2016 at 8:50 AM, Renu Yadav  wrote:
>>
>>> Has anybody implemented bulk load into hbase using spark?
>>>
>>> I need help to optimize its performance.
>>>
>>> Please help.
>>>
>>>
>>> Thanks & Regards,
>>> Renu Yadav
>>>
>>
>>


Re: kafka streaming topic partitions vs executors

2016-02-26 Thread Cody Koeninger
Spark in general isn't a good fit if you're trying to make sure that
certain tasks only run on certain executors.

You can look at overriding getPreferredLocations and increasing the value
of spark.locality.wait, but even then, what do you do when an executor
fails?

On Fri, Feb 26, 2016 at 8:08 AM, patcharee 
wrote:

> Hi,
>
> I am working a streaming application integrated with Kafka by the API
> createDirectStream. The application streams a topic which contains 10
> partitions (on Kafka). It executes with 10 workers (--num-executors 10)
> When it reads data from Kafka/ZooKeeper, Spark creates 10 tasks (as same as
> the topic's partitions). However some executors are given more than 1 tasks
> and work on these tasks sequentially.
>
> Why Spark does not distribute these 10 tasks to 10 executors? How to do
> that?
>
> Thanks,
> Patcharee
>
>
>


Mllib Logistic Regression performance relative to Mahout

2016-02-26 Thread raj.kumar
Hi,

We are trying to port over some code that uses Mahout Logistic Regression to
Mllib Logistic Regression and our preliminary performance tests indicate a
performance bottleneck. It is not clear to me if this is due to one of three
factors:

o Comparing apples to oranges
o Inadequate tuning
o Insufficient parallelism

The test results and the code that produced the results are below. I am
hoping that someone can shed some light on the performance problem we are
having. 

thanks much
-Raj

P.S. Apologies if this is a duplicate posting. I got a response to a
previous posting that suggested that the posting may not have correctly
registered. 

- Mahout LR vs. Mllib LR -
Data   Cluster   MLLIbMahout
sizetype   Train Test  Rate  Train   Test  Rate
--  -    -   
100local[*]   .03.154  1.111  100
100Cluster[6]   .036   .09  59  1   9100
500,000 local[*]32  983  326   1086   82
500,000 Cluster[6] 8   483  310   877 81

All rates are in records/milliseconds
The 100 dataset is the sample_libsvm_data.txt
My cluster was a set of 6 worker-machines on aws
Rate indicate the % of the test set that were labeled correctly
The latest versions of mllib (1.6) and Mahout (0.9) were used in the tests
 
MllMahout.scala

  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-Logistic-Regression-performance-relative-to-Mahout-tp26346.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread Cody Koeninger
Yes.

On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn 
wrote:

> Thanks a lot.
> And I got another question: What would happen if I didn't set
> "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to
> consume all the messages in Kafka?
>
> 发自WPS邮箱客戶端
> 在 Cody Koeninger ,2016年2月25日 上午11:58写道:
>
> The per partition offsets are part of the rdd as defined on the driver.
> Have you read
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
>
> and/or watched
>
> https://www.youtube.com/watch?v=fXnNEq1v3VA
>
> On Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen 
> wrote:
>
>> Hi, as far as I know, there is a 1:1 mapping between Spark partition and
>> Kafka partition, and in Spark's fault-tolerance mechanism, if a partition
>> failed, another partition will be used to recompute those data. And my
>> questions are below:
>>
>> When a partition (worker node) fails in Spark Streaming,
>> 1. Is its computation passed to another partition, or just waits for the
>> failed partition to restart?
>> 2. How does the restarted partition know the offset range it should
>> consume from Kafka? It should consume the some data as the before-failed
>> one, right?
>>
>
>


Re: Hbase in spark

2016-02-26 Thread Ted Yu
In hbase, there is hbase-spark module which supports bulk load.
This module is to be backported in the upcoming 1.3.0 release.

There is some pending work, such as HBASE-15271 .

FYI

On Fri, Feb 26, 2016 at 8:50 AM, Renu Yadav  wrote:

> Has anybody implemented bulk load into hbase using spark?
>
> I need help to optimize its performance.
>
> Please help.
>
>
> Thanks & Regards,
> Renu Yadav
>


Hbase in spark

2016-02-26 Thread Renu Yadav
Has anybody implemented bulk load into hbase using spark?

I need help to optimize its performance.

Please help.


Thanks & Regards,
Renu Yadav


Re: Spark SQL support for sub-queries

2016-02-26 Thread Yin Yang
Since collect is involved, the approach would be slower compared to the SQL
Mich gave in his first email.

On Fri, Feb 26, 2016 at 1:42 AM, Michał Zieliński <
zielinski.mich...@gmail.com> wrote:

> You need to collect the value.
>
> val m: Int = d.agg(max($"id")).collect.apply(0).getInt(0)
> d.filter(col("id") === lit(m))
>
> On 26 February 2016 at 09:41, Mich Talebzadeh 
> wrote:
>
>> Can this be done using DFs?
>>
>>
>>
>> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> scala> val d = HiveContext.table("test.dummy")
>> d: org.apache.spark.sql.DataFrame = [id: int, clustered: int, scattered:
>> int, randomised: int, random_string: string, small_vc: string, padding:
>> string]
>>
>> scala>  var m = d.agg(max($"id"))
>> m: org.apache.spark.sql.DataFrame = [max(id): int]
>>
>> How can I join these two? In other words I want to get all rows with id =
>> m here?
>>
>> d.filter($"id" = m)  ?
>>
>> Thanks
>>
>> On 25/02/2016 22:58, Mohammad Tariq wrote:
>>
>> AFAIK, this isn't supported yet. A ticket
>>  is in progress though.
>>
>>
>>
>> [image: http://] 
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>>
>>
>>
>> On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I guess the following confirms that Spark does bot support sub-queries
>>>
>>>
>>>
>>> val d = HiveContext.table("test.dummy")
>>>
>>> d.registerTempTable("tmp")
>>>
>>> HiveContext.sql("select * from tmp where id IN (select max(id) from
>>> tmp)")
>>>
>>> It crashes
>>>
>>> The SQL works OK in Hive itself on the underlying table!
>>>
>>> select * from dummy where id IN (select max(id) from dummy);
>>>
>>>
>>>
>>> Thanks
>>>
>>
>


Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

2016-02-26 Thread Romain Sagean
it seems like some library are missing. I'm not good at compiling and I
don't know how to use gradle. But for sbt I use sbt-assembly plugin (
https://github.com/sbt/sbt-assembly) to include all dependency and make a
fat jar. For gradle I have found this:
https://github.com/musketyr/gradle-fatjar-plugin.

my complete build.sbt for reference.


import AssemblyKeys._name := "ON-3_geolocation"version :=
"1.0"scalaVersion := "2.10.4"resolvers += "SnowPlow Repo" at
"http://maven.snplow.com/releases/"libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.0",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  %
"0.2.0")retrieveManaged := trueassemblySettingsmergeStrategy in
assembly := {  case m if m.toLowerCase.endsWith("manifest.mf")
 => MergeStrategy.discard  case m if
m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
MergeStrategy.discard  case "log4j.properties"
 => MergeStrategy.discard  case m if
m.toLowerCase.startsWith("meta-inf/services/") =>
MergeStrategy.filterDistinctLines  case "reference.conf"
 => MergeStrategy.concat  case _
=> MergeStrategy.first}


2016-02-26 7:47 GMT+01:00 Zhun Shen :

> Hi,
>
> thanks for you advice. I tried your method, I use Gradle to manage my
> scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was
> imported in Gradle.
>
> spark version: 1.6.0
> scala: 2.10.4
> scala-maxmind-iplookups: 0.2.0
>
> I run my test, got the error as below:
> java.lang.NoClassDefFoundError:
> scala/collection/JavaConversions$JMapWrapperLike
> at
> com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
>
>
>
>
> On Feb 24, 2016, at 1:10 AM, romain sagean  wrote:
>
> I realize I forgot the sbt part
>
> resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/;
> 
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.3.0",
>   "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
> )
>
> otherwise, to process streaming log I use logstash with kafka as input.
> You can set kafka as output if you need to do some extra calculation with
> spark.
>
> Le 23/02/2016 15:07, Romain Sagean a écrit :
>
> Hi,
> I use maxmind geoip with spark (no streaming). To make it work you should
> use mapPartition. I don't know if something similar exist for spark
> streaming.
>
> my code for reference:
>
>   def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
> val lookupResult = ipLookups.performLookups(ip)
> val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
> val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
> val latitude =
> (lookupResult._1).map(_.latitude).getOrElse(None).toString
> val longitude =
> (lookupResult._1).map(_.longitude).getOrElse(None).toString
> return List(countryName, city, latitude, longitude)
>   }
> sc.addFile("/home/your_user/GeoLiteCity.dat")
>
> //load your data in my_data rdd
>
> my_data.mapPartitions { rows =>
> val ipLookups = IpLookups(geoFile =
> Some(SparkFiles.get("GeoLiteCity.dat")))
> rows.map { row => row ::: parseIP(row(3),ipLookups) }
> }
>
> Le 23/02/2016 14:28, Zhun Shen a écrit :
>
> Hi all,
>
> Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
> parse the log and enrich the IP info with geoip libs from Maxmind.
>
> I found this one 
> https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark
> streaming throw error and told that the lib was not Serializable.
>
> Does anyone there way to process the IP info in Spark Streaming? Many
> thanks.
>
>
>
>
>


-- 
*Romain Sagean*

*romain.sag...@hupi.fr *


kafka streaming topic partitions vs executors

2016-02-26 Thread patcharee

Hi,

I am working a streaming application integrated with Kafka by the API 
createDirectStream. The application streams a topic which contains 10 
partitions (on Kafka). It executes with 10 workers (--num-executors 10) 
When it reads data from Kafka/ZooKeeper, Spark creates 10 tasks (as same 
as the topic's partitions). However some executors are given more than 1 
tasks and work on these tasks sequentially.


Why Spark does not distribute these 10 tasks to 10 executors? How to do 
that?


Thanks,
Patcharee




Re: Get all vertexes with outDegree equals to 0 with GraphX

2016-02-26 Thread Guillermo Ortiz
Yes, I am not really happy with that "collect".
I was taking a look to use subgraph method and others options and didn't
figure out anything easy or direct..

I'm going to try your idea.

2016-02-26 14:16 GMT+01:00 Robin East :

> Whilst I can think of other ways to do it I don’t think they would be
> conceptually or syntactically any simpler. GraphX doesn’t have the concept
> of built-in vertex properties which would make this simpler - a vertex in
> GraphX is a Vertex ID (Long) and a bunch of custom attributes that you
> assign. This means you have to find a way of ‘pushing’ the vertex degree
> into the graph so you can do comparisons (cf a join in relational
> databases) or as you have done create a list and filter against that (cf
> filtering against a sub-query in relational database).
>
> One thing I would point out is that you probably want to avoid
> finalVerexes.collect() for a large-scale system - this will pull all the
> vertices into the driver and then push them out to the executors again as
> part of the filter operation. A better strategy for large graphs would be:
>
> 1. build a graph based on the existing graph where the vertex attribute is
> the vertex degree - the GraphX documentation shows how to do this
> 2. filter this “degrees” graph to just give you 0 degree vertices
> 3 use graph.mask passing in the 0-degree graph to get the original graph
> with just 0 degree vertices
>
> Just one variation on several possibilities, the key point is that
> everything is just a graph transformation until you call an action on the
> resulting graph
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 26 Feb 2016, at 11:59, Guillermo Ortiz  wrote:
>
> I'm new with graphX. I need to get the vertex without out edges..
> I guess that it's pretty easy but I did it pretty complicated.. and
> inefficienct
>
> val vertices: RDD[(VertexId, (List[String], List[String]))] =
>   sc.parallelize(Array((1L, (List("a"), List[String]())),
> (2L, (List("b"), List[String]())),
> (3L, (List("c"), List[String]())),
> (4L, (List("d"), List[String]())),
> (5L, (List("e"), List[String]())),
> (6L, (List("f"), List[String]()
>
> // Create an RDD for edges
> val relationships: RDD[Edge[Boolean]] =
>   sc.parallelize(Array(Edge(1L, 2L, true), Edge(2L, 3L, true), Edge(3L, 4L, 
> true), Edge(5L, 2L, true)))
>
> val out = minGraph.outDegrees.map(vertex => vertex._1)
>
> val finalVertexes = minGraph.vertices.keys.subtract(out)
>
> //It must be something better than this way..
> val nodes = finalVertexes.collect()
> val result = minGraph.vertices.filter(v => nodes.contains(v._1))
>
>
> What's the good way to do this operation? It seems that it should be pretty 
> easy.
>
>
>


Java/Spark Library for interacting with Spark API

2016-02-26 Thread Hans van den Bogert
Hi, 

Does anyone know of a Java/Scala library (not simply a HTTP library) for 
interacting with Spark through its REST/HTTP API? My “problem” is that 
interacting through REST induces a lot of work mapping the JSON to sensible 
Spark/Scala objects. 

So a simple example, I hope there is a library which allows me to do something 
like this (not a prerequisite, only as example):

sparkHost(“10.0.01”).getApplications().first().getJobs().first().status

In broader scope, is using the REST API the only way to retrieve information 
from Spark by a different (JVM) process? 

Regards,

Hans
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dynamic allocation Spark

2016-02-26 Thread Alvaro Brandon
That was exactly it. I had the worker and master processes of Spark
standalone running together with YARN and somehow the resource manager
didn't see the nodes. It's working now.

Thanks for the tip :-)


2016-02-26 12:33 GMT+01:00 Jeff Zhang :

> Check the RM UI to ensure you have available resources. I suspect it might
> be that you didn't configure yarn correctly, so NM didn't start properly
> and you have no resource.
>
> On Fri, Feb 26, 2016 at 7:14 PM, alvarobrandon 
> wrote:
>
>> Hello everyone:
>>
>> I'm trying the dynamic allocation in Spark with YARN. I have followed the
>> following configuration steps:
>> 1. Copy the spark-*-yarn-shuffle.jar to the nodemanager classpath. "cp
>> /opt/spark/lib/spark-*-yarn-shuffle.jar /opt/hadoop/share/hadoop/yarn"
>> 2. Added the shuffle service of spark in yarn-site.xml
>> 
>> yarn.nodemanager.aux-services
>> mapreduce_shuffle,spark_shuffle
>> shuffle implementation
>>   
>> 3. Enabled the class for the shuffle service in yarn-site.xml
>>   
>> yarn.nodemanager.aux-services.spark_shuffle.class
>> org.apache.spark.network.yarn.YarnShuffleService
>> enable the class for dynamic allocation
>>   
>> 4. Activated the dynamic allocation in the spark defaults
>> spark.dynamicAllocation.enabled true
>> spark.shuffle.service.enabled   true
>>
>> When I launch my application it just stays in the queue accepted but it
>> never actually runs.
>> 16/02/26 11:11:46 INFO yarn.Client: Application report for
>> application_1456482268159_0001 (state: ACCEPTED)
>>
>> Am I missing something?
>>
>> Thanks in advance as always
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Dynamic-allocation-Spark-tp26344.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Get all vertexes with outDegree equals to 0 with GraphX

2016-02-26 Thread Robin East
Whilst I can think of other ways to do it I don’t think they would be 
conceptually or syntactically any simpler. GraphX doesn’t have the concept of 
built-in vertex properties which would make this simpler - a vertex in GraphX 
is a Vertex ID (Long) and a bunch of custom attributes that you assign. This 
means you have to find a way of ‘pushing’ the vertex degree into the graph so 
you can do comparisons (cf a join in relational databases) or as you have done 
create a list and filter against that (cf filtering against a sub-query in 
relational database). 

One thing I would point out is that you probably want to avoid 
finalVerexes.collect() for a large-scale system - this will pull all the 
vertices into the driver and then push them out to the executors again as part 
of the filter operation. A better strategy for large graphs would be:

1. build a graph based on the existing graph where the vertex attribute is the 
vertex degree - the GraphX documentation shows how to do this
2. filter this “degrees” graph to just give you 0 degree vertices
3 use graph.mask passing in the 0-degree graph to get the original graph with 
just 0 degree vertices

Just one variation on several possibilities, the key point is that everything 
is just a graph transformation until you call an action on the resulting graph
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 26 Feb 2016, at 11:59, Guillermo Ortiz  wrote:
> 
> I'm new with graphX. I need to get the vertex without out edges..
> I guess that it's pretty easy but I did it pretty complicated.. and 
> inefficienct 
> 
> val vertices: RDD[(VertexId, (List[String], List[String]))] =
>   sc.parallelize(Array((1L, (List("a"), List[String]())),
> (2L, (List("b"), List[String]())),
> (3L, (List("c"), List[String]())),
> (4L, (List("d"), List[String]())),
> (5L, (List("e"), List[String]())),
> (6L, (List("f"), List[String]()
> 
> // Create an RDD for edges
> val relationships: RDD[Edge[Boolean]] =
>   sc.parallelize(Array(Edge(1L, 2L, true), Edge(2L, 3L, true), Edge(3L, 4L, 
> true), Edge(5L, 2L, true)))
> val out = minGraph.outDegrees.map(vertex => vertex._1)
> val finalVertexes = minGraph.vertices.keys.subtract(out)
> //It must be something better than this way..
> val nodes = finalVertexes.collect()
> val result = minGraph.vertices.filter(v => nodes.contains(v._1))
> 
> What's the good way to do this operation? It seems that it should be pretty 
> easy.



Get all vertexes with outDegree equals to 0 with GraphX

2016-02-26 Thread Guillermo Ortiz
I'm new with graphX. I need to get the vertex without out edges..
I guess that it's pretty easy but I did it pretty complicated.. and
inefficienct

val vertices: RDD[(VertexId, (List[String], List[String]))] =
  sc.parallelize(Array((1L, (List("a"), List[String]())),
(2L, (List("b"), List[String]())),
(3L, (List("c"), List[String]())),
(4L, (List("d"), List[String]())),
(5L, (List("e"), List[String]())),
(6L, (List("f"), List[String]()

// Create an RDD for edges
val relationships: RDD[Edge[Boolean]] =
  sc.parallelize(Array(Edge(1L, 2L, true), Edge(2L, 3L, true),
Edge(3L, 4L, true), Edge(5L, 2L, true)))

val out = minGraph.outDegrees.map(vertex => vertex._1)

val finalVertexes = minGraph.vertices.keys.subtract(out)

//It must be something better than this way..
val nodes = finalVertexes.collect()
val result = minGraph.vertices.filter(v => nodes.contains(v._1))


What's the good way to do this operation? It seems that it should be
pretty easy.


Re: Bug in DiskBlockManager subDirs logic?

2016-02-26 Thread Igor Berman
I've experienced such kind of outputs when executor was killed(e.g. by OOM
killer) or was lost for some reason
i.e. try to look at machine if executor wasn't restarted...

On 26 February 2016 at 08:37, Takeshi Yamamuro 
wrote:

> Hi,
>
> Could you make simple codes to reproduce the issue?
> I'm not exactly sure why shuffle data on temp dir. are wrongly deleted.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 6:00 AM, Zee Chen  wrote:
>
>> Hi,
>>
>> I am debugging a situation where SortShuffleWriter sometimes fail to
>> create a file, with the following stack trace:
>>
>> 16/02/23 11:48:46 ERROR Executor: Exception in task 13.0 in stage
>> 47827.0 (TID 1367089)
>> java.io.FileNotFoundException:
>>
>> /tmp/spark-9dd8dca9-6803-4c6c-bb6a-0e9c0111837c/executor-129dfdb8-9422-4668-989e-e789703526ad/blockmgr-dda6e340-7859-468f-b493-04e4162d341a/00/temp_shuffle_69fe1673-9ff2-462b-92b8-683d04669aad
>> (No such file or directory)
>> at java.io.FileOutputStream.open0(Native Method)
>> at java.io.FileOutputStream.open(FileOutputStream.java:270)
>> at java.io.FileOutputStream.(FileOutputStream.java:213)
>> at
>> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:110)
>> at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> I checked the linux file system (ext4) and saw the /00/ subdir is
>> missing. I went through the heap dump of the
>> CoarseGrainedExecutorBackend jvm proc and found that
>> DiskBlockManager's subDirs list had more non-null 2-hex subdirs than
>> present on the file system! As a test I created all 64 2-hex subdirs
>> by hand and then the problem went away.
>>
>> So had anybody else seen this problem? Looking at the relevant logic
>> in DiskBlockManager and it hasn't changed much since the fix to
>> https://issues.apache.org/jira/browse/SPARK-6468
>>
>> My configuration:
>> spark-1.5.1, hadoop-2.6.0, standalone, oracle jdk8u60
>>
>> Thanks,
>> Zee
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Standalone vs. Mesos for production installation on a smallish cluster

2016-02-26 Thread Igor Berman
Imho most of production clusters are standalone
there was some presentation from spark summit with some stats inside(can't
find right now), so standalone was at 1st place
it was from Matei
https://databricks.com/resources/slides

On 26 February 2016 at 13:40, Petr Novak  wrote:

> Hi all,
> I believe that it used to be in documentation that Standalone mode is not
> for production. I'm either wrong or it was already removed.
>
> Having a small cluster between 5-10 nodes is Standalone recommended for
> production? I would like to go with Mesos but the question is if there is
> real add-on value for production, mainly from stability perspective.
>
> Can I expect that adding Mesos will improve stability compared to
> Standalone to the extent to justify itself according to somewhat increased
> complexity?
>
> I know it is hard to answer because Mesos layer itself is going to add
> some bugs as well.
>
> Are there unique features enabled by Mesos specific to Spark? E.g.
> adaptive resources for jobs or whatever?
>
> In the future once cluster will grow and more services running on Mesos,
> we plan to use Mesos. The question is if it does worth to go with it
> immediately even maybe its utility is not directly needed at this point.
>
> Many thanks,
> Petr
>


Re: DirectFileOutputCommiter

2016-02-26 Thread Igor Berman
Takeshi, do you know the reason why they wanted to remove this commiter in
SPARK-10063?
the jira has no info inside
as far as I understand the direct committer can't be used when either of
two is true
1. speculation mode
2. append mode(ie. not creating new version of data but appending to
existing data)

On 26 February 2016 at 08:24, Takeshi Yamamuro 
wrote:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread yuhang.chenn
Thanks a lot.
And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to consume all the messages in Kafka?

发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月25日 上午11:58写道:The per partition offsets are part of the rdd as defined on the driver. Have you readhttps://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.mdand/or watchedhttps://www.youtube.com/watch?v=fXnNEq1v3VAOn Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen  wrote:Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below:When a partition (worker node) fails in Spark Streaming,1. Is its computation passed to another partition, or just waits for the failed partition to restart? 2. How does the restarted partition know the offset range it should consume from Kafka? It should consume the some data as the before-failed one, right?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DirectFileOutputCommiter

2016-02-26 Thread Igor Berman
Alexander,
implementation you've attaches supports both modes configured by property "
mapred.output.direct." + fs.getClass().getSimpleName()
as soon as you see _temporary dir probably the mode is off i.e. the default
impl is working and you experiencing some other problem.

On 26 February 2016 at 10:57, Alexander Pivovarov 
wrote:

> Amazon uses the following impl
> https://gist.github.com/apivovarov/bb215f08318318570567
> But for some reason Spark show error at the end of the job
>
> 16/02/26 08:16:54 INFO scheduler.DAGScheduler: ResultStage 0
> (saveAsTextFile at :28) finished in 14.305 s
> 16/02/26 08:16:54 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose
> tasks have all completed, from pool
> 16/02/26 08:16:54 INFO scheduler.DAGScheduler: Job 0 finished:
> saveAsTextFile at :28, took 14.467271 s
> java.io.FileNotFoundException: File
> s3n://my-backup/test/test1/_temporary/0 does not exist.
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:564)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
> at
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
> at
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:112)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1214)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>
>
> Another implementation works fine
> https://gist.github.com/aarondav/c513916e72101bbe14ec
>
> On Thu, Feb 25, 2016 at 10:24 PM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> Great work!
>> What is the concrete performance gain of the committer on s3?
>> I'd like to know.
>>
>> I think there is no direct committer for files because these kinds of
>> committer has risks
>> to loss data (See: SPARK-10063).
>> Until this resolved, ISTM files cannot support direct commits.
>>
>> thanks,
>>
>>
>>
>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>>
>>> yes, should be this one
>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>
>>> then need to set it in spark-defaults.conf :
>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>
>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>> > The header of DirectOutputCommitter.scala says Databricks.
>>> > Did you get it from Databricks ?
>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>>> >>
>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>> not included?
>>> >> we added it in our fork,
>>> under 
>>> core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>> insert operations in HiveContext, since the Committer is called by hive
>>> (means uses dependencies in hive package)
>>> >> we made some hack to fix this, you can take a look:
>>> >>
>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>> >>
>>> >> may bring some ideas to other spark contributors to find a better way
>>> to use s3.
>>> >>
>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>>> >>>
>>> >>> Hi,
>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>> alikes
>>> >>> especially when working with s3?
>>> >>> I know that there is one impl in spark distro for parquet format,
>>> but not
>>> >>> for files -  why?
>>> >>>
>>> >>> Imho, it can bring huge performance boost.
>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>> stage
>>> >>> when all parts are copied one-by-one to destination dir from
>>> _temporary,
>>> >>> which is bottleneck when number of partitions is high.
>>> >>>
>>> >>> Also, wanted to know if there are some problems when using
>>> >>> DirectFileOutputCommitter?
>>> >>> If writing one partition directly will fail in the middle is spark
>>> will
>>> >>> notice this and will fail job(say after all retries)?
>>> >>>
>>> >>> thanks in advance
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>> 

Standalone vs. Mesos for production installation on a smallish cluster

2016-02-26 Thread Petr Novak
Hi all,
I believe that it used to be in documentation that Standalone mode is not
for production. I'm either wrong or it was already removed.

Having a small cluster between 5-10 nodes is Standalone recommended for
production? I would like to go with Mesos but the question is if there is
real add-on value for production, mainly from stability perspective.

Can I expect that adding Mesos will improve stability compared to
Standalone to the extent to justify itself according to somewhat increased
complexity?

I know it is hard to answer because Mesos layer itself is going to add some
bugs as well.

Are there unique features enabled by Mesos specific to Spark? E.g. adaptive
resources for jobs or whatever?

In the future once cluster will grow and more services running on Mesos, we
plan to use Mesos. The question is if it does worth to go with it
immediately even maybe its utility is not directly needed at this point.

Many thanks,
Petr


Re: DirectFileOutputCommiter

2016-02-26 Thread Igor Berman
the performance gain is for commit stage when data is moved from _temporary
directory to distination directory
since s3 is key-value really the move operation is like copy operation


On 26 February 2016 at 08:24, Takeshi Yamamuro 
wrote:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Dynamic allocation Spark

2016-02-26 Thread Jeff Zhang
Check the RM UI to ensure you have available resources. I suspect it might
be that you didn't configure yarn correctly, so NM didn't start properly
and you have no resource.

On Fri, Feb 26, 2016 at 7:14 PM, alvarobrandon 
wrote:

> Hello everyone:
>
> I'm trying the dynamic allocation in Spark with YARN. I have followed the
> following configuration steps:
> 1. Copy the spark-*-yarn-shuffle.jar to the nodemanager classpath. "cp
> /opt/spark/lib/spark-*-yarn-shuffle.jar /opt/hadoop/share/hadoop/yarn"
> 2. Added the shuffle service of spark in yarn-site.xml
> 
> yarn.nodemanager.aux-services
> mapreduce_shuffle,spark_shuffle
> shuffle implementation
>   
> 3. Enabled the class for the shuffle service in yarn-site.xml
>   
> yarn.nodemanager.aux-services.spark_shuffle.class
> org.apache.spark.network.yarn.YarnShuffleService
> enable the class for dynamic allocation
>   
> 4. Activated the dynamic allocation in the spark defaults
> spark.dynamicAllocation.enabled true
> spark.shuffle.service.enabled   true
>
> When I launch my application it just stays in the queue accepted but it
> never actually runs.
> 16/02/26 11:11:46 INFO yarn.Client: Application report for
> application_1456482268159_0001 (state: ACCEPTED)
>
> Am I missing something?
>
> Thanks in advance as always
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Dynamic-allocation-Spark-tp26344.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Dynamic allocation Spark

2016-02-26 Thread alvarobrandon
Hello everyone:

I'm trying the dynamic allocation in Spark with YARN. I have followed the
following configuration steps:
1. Copy the spark-*-yarn-shuffle.jar to the nodemanager classpath. "cp
/opt/spark/lib/spark-*-yarn-shuffle.jar /opt/hadoop/share/hadoop/yarn"
2. Added the shuffle service of spark in yarn-site.xml

yarn.nodemanager.aux-services
mapreduce_shuffle,spark_shuffle
shuffle implementation
  
3. Enabled the class for the shuffle service in yarn-site.xml
  
yarn.nodemanager.aux-services.spark_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService
enable the class for dynamic allocation
  
4. Activated the dynamic allocation in the spark defaults
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled   true

When I launch my application it just stays in the queue accepted but it
never actually runs.
16/02/26 11:11:46 INFO yarn.Client: Application report for
application_1456482268159_0001 (state: ACCEPTED)

Am I missing something?

Thanks in advance as always 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dynamic-allocation-Spark-tp26344.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No event log in /tmp/spark-events

2016-02-26 Thread Jeff Zhang
If event log is enabled, there should be log like following. But I don't
see it in your log.

16/02/26 19:10:01 INFO EventLoggingListener: Logging events to
file:///Users/jzhang/Temp/spark-events/local-1456485001491

Could you add "--verbose" in spark-submit command to check whether your
configuration is picked up correct ?

On Fri, Feb 26, 2016 at 7:01 PM, alvarobrandon 
wrote:

> Just write /tmp/sparkserverlog without the file part.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-event-log-in-tmp-spark-events-tp26318p26343.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: No event log in /tmp/spark-events

2016-02-26 Thread alvarobrandon
Just write /tmp/sparkserverlog without the file part. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-event-log-in-tmp-spark-events-tp26318p26343.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DirectFileOutputCommiter

2016-02-26 Thread Teng Qiu
Hi, thanks :) performance gain is huge, we have a INSERT INTO query, ca.
30GB in JSON format will be written to s3 at the end, without
DirectOutputCommitter and our hack in hive and InsertIntoHiveTable.scala,
it took more than 40min, with our changes, only 15min then.

DirectOutputCommitter works for SparkContext and SqlContext, but for
HiveContext, it only solved the problem with "staging folder" in target
table, problem for HiveContext is here:
https://github.com/apache/spark/blob/v1.6.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L132

besides staging folder created by Committer, Hive will use a temp location
as well... so we made some hack on this:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando#diff-d579db9a8f27e0bbef37720ab14ec3f6R134

mainly idea is, we added an internal var runID, and use HiveConf
spark.hive.insert.skip.temp to disable Hive to use temp location, but with
this hack, we need to change Hive's implementation... we put our Hive.java
file under
sql/hive/src/main/java/org/apache/hadoop/hive/ql/metadata/Hive.java

you can find the full change using this link:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando#diff-d579db9a8f27e0bbef37720ab14ec3f6R134


i would like to forward this discuss to spark-dev, hope spark team can
think about it, and hope there will be a better solution for this, like
some more official hack :D


2016-02-26 7:24 GMT+01:00 Takeshi Yamamuro :

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Is spark.driver.maxResultSize used correctly ?

2016-02-26 Thread Jeff Zhang
My job get this exception very easily even when I set large value of
spark.driver.maxResultSize. After checking the spark code, I found
spark.driver.maxResultSize is also used in Executor side to decide whether
DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
Using  spark.driver.maxResultSize / taskNum might be more proper. Because
if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
output. Then even the output of each task is less than
 spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
the total result size is 2g which will cause exception in driver side.


16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
LogisticRegression.scala:283, took 33.796379 s

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Total size of serialized results of 1 tasks (1085.0 MB)
is bigger than spark.driver.maxResultSize (1024.0 MB)


-- 
Best Regards

Jeff Zhang


Re: Survival Curves using AFT implementation in Spark

2016-02-26 Thread Yanbo Liang
Hi Stuti,

AFTSurvivalRegression does not support computing the predicted survival
functions/curves currently.
I don't know whether the quantile predictions can help you, you can refer
the example

.
Maybe we can add this feature later.

Thanks
Yanbo

2016-02-26 14:35 GMT+08:00 Stuti Awasthi :

> Hi All,
>
> I wanted to apply Survival Analysis using Spark AFT algorithm
> implementation. Now I perform the same in R using coxph model and passing
> the model in Survfit() function to generate survival curves
>
> Then I can visualize the survival curve on validation data to understand
> how good my model fits.
>
>
>
> R: Code
>
> fit <- coxph(Surv(futime, fustat) ~ age, data = ovarian)
>
> plot(survfit(fit,newdata=data.frame(age=60)))
>
>
>
> I wanted to achieve something similar with Spark. Hence I created the AFT
> model using Spark and passed my Test dataframe for prediction. The result
> of prediction is single prediction value for single input data which is as
> expected. But now how can I use this model to generate the Survival curves
> for visualization.
>
>
>
> Eg: Spark Code model.transform(test_final).show()
>
>
>
> standardized_features|   prediction|
>
> +-+-+
>
> | [0.0,0.0,0.743853...|48.33071792204102|
>
> +-+-+
>
>
>
> Can any suggest how to use the developed model for plotting Survival
> Curves for “test_final” data which is a dataframe feature[vector].
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
>
>
> ::DISCLAIMER::
>
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior
> written consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error
> please delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
>
> 
>


Re: Restricting number of cores not resulting in reduction in parallelism

2016-02-26 Thread ankursaxena86
The issue is resolved now. I figured out that I wasn't aware of a hard coding
of the spark master parameter as local[4] in the program code which was
causing the parallel executions despite me trying to limit cores and
executors from command line options. Its a revelation for me that program
arguments set from the source code take precedence over the command line
arguments passed with spark-submit.

Anyways, thanks to everyone who responded.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Restricting-number-of-cores-not-resulting-in-reduction-in-parallelism-tp26319p26342.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL support for sub-queries

2016-02-26 Thread Mich Talebzadeh
thanks much appreciated

On 26 February 2016 at 09:54, Michał Zieliński 
wrote:

> Spark has a great documentation
> 
>  and
> guides :
>
> lit and col are here
> 
> getInt is here
> 
> apply(0) is just a method on Array which is returned by collect (here
> 
> )
>
> On 26 February 2016 at 10:47, Mich Talebzadeh 
> wrote:
>
>> Thanks Michael. Great
>>
>>  d.filter(col("id") === lit(m)).show
>>
>> BTW where all these methods like lit etc are documented. Also I guess any
>> action call like apply(0) or getInt(0) refers to the "current" parameter?
>>
>> Regards
>>
>> On 26 February 2016 at 09:42, Michał Zieliński <
>> zielinski.mich...@gmail.com> wrote:
>>
>>> You need to collect the value.
>>>
>>> val m: Int = d.agg(max($"id")).collect.apply(0).getInt(0)
>>> d.filter(col("id") === lit(m))
>>>
>>> On 26 February 2016 at 09:41, Mich Talebzadeh >> > wrote:
>>>
 Can this be done using DFs?



 scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

 scala> val d = HiveContext.table("test.dummy")
 d: org.apache.spark.sql.DataFrame = [id: int, clustered: int,
 scattered: int, randomised: int, random_string: string, small_vc: string,
 padding: string]

 scala>  var m = d.agg(max($"id"))
 m: org.apache.spark.sql.DataFrame = [max(id): int]

 How can I join these two? In other words I want to get all rows with id
 = m here?

 d.filter($"id" = m)  ?

 Thanks

 On 25/02/2016 22:58, Mohammad Tariq wrote:

 AFAIK, this isn't supported yet. A ticket
  is in progress
 though.



 [image: http://] 

 Tariq, Mohammad
 about.me/mti
 [image: http://]



 On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
 mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
>
>
> I guess the following confirms that Spark does bot support sub-queries
>
>
>
> val d = HiveContext.table("test.dummy")
>
> d.registerTempTable("tmp")
>
> HiveContext.sql("select * from tmp where id IN (select max(id) from
> tmp)")
>
> It crashes
>
> The SQL works OK in Hive itself on the underlying table!
>
> select * from dummy where id IN (select max(id) from dummy);
>
>
>
> Thanks
>

>>>
>>
>


Re: Spark SQL support for sub-queries

2016-02-26 Thread Michał Zieliński
Spark has a great documentation

and
guides :

lit and col are here

getInt is here

apply(0) is just a method on Array which is returned by collect (here

)

On 26 February 2016 at 10:47, Mich Talebzadeh 
wrote:

> Thanks Michael. Great
>
>  d.filter(col("id") === lit(m)).show
>
> BTW where all these methods like lit etc are documented. Also I guess any
> action call like apply(0) or getInt(0) refers to the "current" parameter?
>
> Regards
>
> On 26 February 2016 at 09:42, Michał Zieliński <
> zielinski.mich...@gmail.com> wrote:
>
>> You need to collect the value.
>>
>> val m: Int = d.agg(max($"id")).collect.apply(0).getInt(0)
>> d.filter(col("id") === lit(m))
>>
>> On 26 February 2016 at 09:41, Mich Talebzadeh 
>> wrote:
>>
>>> Can this be done using DFs?
>>>
>>>
>>>
>>> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>> scala> val d = HiveContext.table("test.dummy")
>>> d: org.apache.spark.sql.DataFrame = [id: int, clustered: int, scattered:
>>> int, randomised: int, random_string: string, small_vc: string, padding:
>>> string]
>>>
>>> scala>  var m = d.agg(max($"id"))
>>> m: org.apache.spark.sql.DataFrame = [max(id): int]
>>>
>>> How can I join these two? In other words I want to get all rows with id
>>> = m here?
>>>
>>> d.filter($"id" = m)  ?
>>>
>>> Thanks
>>>
>>> On 25/02/2016 22:58, Mohammad Tariq wrote:
>>>
>>> AFAIK, this isn't supported yet. A ticket
>>>  is in progress
>>> though.
>>>
>>>
>>>
>>> [image: http://] 
>>>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>>
>>>
>>>
>>> On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>


 Hi,



 I guess the following confirms that Spark does bot support sub-queries



 val d = HiveContext.table("test.dummy")

 d.registerTempTable("tmp")

 HiveContext.sql("select * from tmp where id IN (select max(id) from
 tmp)")

 It crashes

 The SQL works OK in Hive itself on the underlying table!

 select * from dummy where id IN (select max(id) from dummy);



 Thanks

>>>
>>
>


Re: Spark SQL support for sub-queries

2016-02-26 Thread Mich Talebzadeh
Thanks Michael. Great

 d.filter(col("id") === lit(m)).show

BTW where all these methods like lit etc are documented. Also I guess any
action call like apply(0) or getInt(0) refers to the "current" parameter?

Regards

On 26 February 2016 at 09:42, Michał Zieliński 
wrote:

> You need to collect the value.
>
> val m: Int = d.agg(max($"id")).collect.apply(0).getInt(0)
> d.filter(col("id") === lit(m))
>
> On 26 February 2016 at 09:41, Mich Talebzadeh 
> wrote:
>
>> Can this be done using DFs?
>>
>>
>>
>> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> scala> val d = HiveContext.table("test.dummy")
>> d: org.apache.spark.sql.DataFrame = [id: int, clustered: int, scattered:
>> int, randomised: int, random_string: string, small_vc: string, padding:
>> string]
>>
>> scala>  var m = d.agg(max($"id"))
>> m: org.apache.spark.sql.DataFrame = [max(id): int]
>>
>> How can I join these two? In other words I want to get all rows with id =
>> m here?
>>
>> d.filter($"id" = m)  ?
>>
>> Thanks
>>
>> On 25/02/2016 22:58, Mohammad Tariq wrote:
>>
>> AFAIK, this isn't supported yet. A ticket
>>  is in progress though.
>>
>>
>>
>> [image: http://] 
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>>
>>
>>
>> On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I guess the following confirms that Spark does bot support sub-queries
>>>
>>>
>>>
>>> val d = HiveContext.table("test.dummy")
>>>
>>> d.registerTempTable("tmp")
>>>
>>> HiveContext.sql("select * from tmp where id IN (select max(id) from
>>> tmp)")
>>>
>>> It crashes
>>>
>>> The SQL works OK in Hive itself on the underlying table!
>>>
>>> select * from dummy where id IN (select max(id) from dummy);
>>>
>>>
>>>
>>> Thanks
>>>
>>
>


Re: Spark SQL support for sub-queries

2016-02-26 Thread Michał Zieliński
You need to collect the value.

val m: Int = d.agg(max($"id")).collect.apply(0).getInt(0)
d.filter(col("id") === lit(m))

On 26 February 2016 at 09:41, Mich Talebzadeh 
wrote:

> Can this be done using DFs?
>
>
>
> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> scala> val d = HiveContext.table("test.dummy")
> d: org.apache.spark.sql.DataFrame = [id: int, clustered: int, scattered:
> int, randomised: int, random_string: string, small_vc: string, padding:
> string]
>
> scala>  var m = d.agg(max($"id"))
> m: org.apache.spark.sql.DataFrame = [max(id): int]
>
> How can I join these two? In other words I want to get all rows with id =
> m here?
>
> d.filter($"id" = m)  ?
>
> Thanks
>
> On 25/02/2016 22:58, Mohammad Tariq wrote:
>
> AFAIK, this isn't supported yet. A ticket
>  is in progress though.
>
>
>
> [image: http://] 
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
>
>
>
> On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>>
>>
>> I guess the following confirms that Spark does bot support sub-queries
>>
>>
>>
>> val d = HiveContext.table("test.dummy")
>>
>> d.registerTempTable("tmp")
>>
>> HiveContext.sql("select * from tmp where id IN (select max(id) from tmp)")
>>
>> It crashes
>>
>> The SQL works OK in Hive itself on the underlying table!
>>
>> select * from dummy where id IN (select max(id) from dummy);
>>
>>
>>
>> Thanks
>>
>


Re: select * from mytable where column1 in (select max(column1) from mytable)

2016-02-26 Thread ayan guha
But can't I just use HiveContext and use hive's functionality, which does
support subqueries?

On Fri, Feb 26, 2016 at 4:28 PM, Mohammad Tariq  wrote:

> Spark doesn't support subqueries in WHERE clause, IIRC. It supports
> subqueries only in the FROM clause as of now. See this ticket
>  for more on this.
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Feb 26, 2016 at 7:01 AM, ayan guha  wrote:
>
>> Why is this not working for you? Are you trying on dataframe? What error
>> are you getting?
>>
>> On Thu, Feb 25, 2016 at 10:23 PM, Ashok Kumar <
>> ashok34...@yahoo.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> What is the equivalent of this in Spark please
>>>
>>> select * from mytable where column1 in (select max(column1) from mytable)
>>>
>>> Thanks
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Fwd: Spark SQL support for sub-queries

2016-02-26 Thread Mich Talebzadeh
Can this be done using DFs?



scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

scala> val d = HiveContext.table("test.dummy")
d: org.apache.spark.sql.DataFrame = [id: int, clustered: int, scattered:
int, randomised: int, random_string: string, small_vc: string, padding:
string]

scala>  var m = d.agg(max($"id"))
m: org.apache.spark.sql.DataFrame = [max(id): int]

How can I join these two? In other words I want to get all rows with id = m
here?

d.filter($"id" = m)  ?

Thanks

On 25/02/2016 22:58, Mohammad Tariq wrote:

AFAIK, this isn't supported yet. A ticket
 is in progress though.



[image: http://] 

Tariq, Mohammad
about.me/mti
[image: http://]



On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
>
>
> I guess the following confirms that Spark does bot support sub-queries
>
>
>
> val d = HiveContext.table("test.dummy")
>
> d.registerTempTable("tmp")
>
> HiveContext.sql("select * from tmp where id IN (select max(id) from tmp)")
>
> It crashes
>
> The SQL works OK in Hive itself on the underlying table!
>
> select * from dummy where id IN (select max(id) from dummy);
>
>
>
> Thanks
>


Re: DirectFileOutputCommiter

2016-02-26 Thread Alexander Pivovarov
Amazon uses the following impl
https://gist.github.com/apivovarov/bb215f08318318570567
But for some reason Spark show error at the end of the job

16/02/26 08:16:54 INFO scheduler.DAGScheduler: ResultStage 0
(saveAsTextFile at :28) finished in 14.305 s
16/02/26 08:16:54 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose
tasks have all completed, from pool
16/02/26 08:16:54 INFO scheduler.DAGScheduler: Job 0 finished:
saveAsTextFile at :28, took 14.467271 s
java.io.FileNotFoundException: File s3n://my-backup/test/test1/_temporary/0
does not exist.
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:564)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
at
org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:112)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1214)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)


Another implementation works fine
https://gist.github.com/aarondav/c513916e72101bbe14ec

On Thu, Feb 25, 2016 at 10:24 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Spark SQL support for sub-queries

2016-02-26 Thread Mich Talebzadeh
Can this be done using DFs?



scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

scala> val d = HiveContext.table("test.dummy")
d: org.apache.spark.sql.DataFrame = [id: int, clustered: int, scattered:
int, randomised: int, random_string: string, small_vc: string, padding:
string]

scala>  var m = d.agg(max($"id"))
m: org.apache.spark.sql.DataFrame = [max(id): int]

How can I join these two? In other words I want to get all rows with id = m
here?

d.filter($"id" = m)  ?

Thanks

On 25/02/2016 22:58, Mohammad Tariq wrote:

AFAIK, this isn't supported yet. A ticket
 is in progress though.



[image: http://] 

Tariq, Mohammad
about.me/mti
[image: http://]



On Fri, Feb 26, 2016 at 4:16 AM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
>
>
> I guess the following confirms that Spark does bot support sub-queries
>
>
>
> val d = HiveContext.table("test.dummy")
>
> d.registerTempTable("tmp")
>
> HiveContext.sql("select * from tmp where id IN (select max(id) from tmp)")
>
> It crashes
>
> The SQL works OK in Hive itself on the underlying table!
>
> select * from dummy where id IN (select max(id) from dummy);
>
>
>
> Thanks
>


Re: When I merge some datas,can't go on...

2016-02-26 Thread Jeff Zhang
rdd.map(e=>e.split("\\s")).map(e=>(e(0),e(1))).groupByKey()

On Fri, Feb 26, 2016 at 3:20 PM, Bonsen  wrote:

> I have a file,like 1.txt:
> 1 2
> 1 3
> 1 4
> 1 5
> 1 6
> 1 7
> 2 4
> 2 5
> 2 7
> 2 9
>
> I want to merge them,results like this
> map(1->List(2,3,4,5,6,7),2->List(4,5,7,9))
> what should I do?。。
> val file1=sc.textFile("1.txt")
> val q1=file1.flatMap(_.split(' '))???,maybe I should change RDD[int] to
> RDD[int,int]?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/When-I-merge-some-datas-can-t-go-on-tp26341.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang