spark-csv package - output to filename.csv?

2015-09-03 Thread Ewan Leith
Using the spark-csv package or outputting to text files, you end up with files 
named:

test.csv/part-00

rather than a more user-friendly "test.csv", even if there's only 1 part file.

We can merge the files using the Hadoop merge command with something like this 
code from http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/


def merge(sc: SparkContext, srcPath: String, dstPath: String): Unit = {

val srcFileSystem = FileSystem.get(new URI(srcPath), sc.hadoopConfiguration)

val dstFileSystem = FileSystem.get(new URI(dstPath), sc.hadoopConfiguration)

dstFileSystem.delete(new Path(dstPath), true)

FileUtil.copyMerge(srcFileSystem, new Path(srcPath), dstFileSystem, new 
Path(dstPath), true, sc.hadoopConfiguration, null)

  }

but does anyone know a way without dropping down to Hadoop.fs code?

Thanks,
Ewan


Re: Input size increasing every iteration of gradient boosted trees [1.4]

2015-09-03 Thread Sean Owen
Since it sounds like this has been encountered 3 times, and I've
personally seen it and mostly verified it, I think it's legit enough
for a JIRA: SPARK-10433   I am sorry to say I don't know what is going
here though.

On Thu, Sep 3, 2015 at 1:56 PM, Peter Rudenko  wrote:
> Confirm, having the same issue (1.4.1 mllib package). For smaller dataset
> accuracy degradeted also. Haven’t tested yet in 1.5 with ml package
> implementation.
>
> val boostingStrategy = BoostingStrategy.defaultParams("Classification")
> boostingStrategy.setNumIterations(30)
> boostingStrategy.setLearningRate(1.0)
> boostingStrategy.treeStrategy.setMaxDepth(3)
> boostingStrategy.treeStrategy.setMaxBins(128)
> boostingStrategy.treeStrategy.setSubsamplingRate(1.0)
> boostingStrategy.treeStrategy.setMinInstancesPerNode(1)
> boostingStrategy.treeStrategy.setUseNodeIdCache(true)
> boostingStrategy.treeStrategy.setCategoricalFeaturesInfo(
>
> mapAsJavaMap(categoricalFeatures).asInstanceOf[java.util.Map[java.lang.Integer,
> java.lang.Integer]])
>
> val model = GradientBoostedTrees.train(instances, boostingStrategy)
>
> Thanks,
> Peter Rudenko
>
> On 2015-08-14 00:33, Sean Owen wrote:
>
> Not that I have any answer at this point, but I was discussing this
> exact same problem with Johannes today. An input size of ~20K records
> was growing each iteration by ~15M records. I could not see why on a
> first look.
>
> @jkbradley I know it's not much info but does that ring any bells? I
> think Johannes even has an instance of this up and running for
> examination.
>
> On Thu, Aug 13, 2015 at 10:04 PM, Matt Forbes
>  wrote:
>
> I am training a boosted trees model on a couple million input samples (with
> around 300 features) and am noticing that the input size of each stage is
> increasing each iteration. For each new tree, the first step seems to be
> building the decision tree metadata, which does a .count() on the input
> data, so this is the step I've been using to track the input size changing.
> Here is what I'm seeing:
>
> count at DecisionTreeMetadata.scala:111
> 1. Input Size / Records: 726.1 MB / 1295620
> 2. Input Size / Records: 106.9 GB / 64780816
> 3. Input Size / Records: 160.3 GB / 97171224
> 4. Input Size / Records: 214.8 GB / 129680959
> 5. Input Size / Records: 268.5 GB / 162533424
> 
> Input Size / Records: 1912.6 GB / 1382017686
> 
>
> This step goes from taking less than 10s up to 5 minutes by the 15th or so
> iteration. I'm not quite sure what could be causing this. I am passing a
> memory-only cached RDD[LabeledPoint] to GradientBoostedTrees.train
>
> Does anybody have some insight? Is this a bug or could it be an error on my
> part?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Hbase Lookup

2015-09-03 Thread Tao Lu
Yes. Ayan, you approach will work.

Or alternatively, use Spark, and write a Scala/Java function which
implements similar logic in your Pig UDF.

Both approaches look similar.

Personally, I would go with Spark solution, it will be slightly faster, and
easier if you already have Spark cluster setup on top of your hadoop
cluster in your infrastructure.

Cheers,
Tao


On Thu, Sep 3, 2015 at 1:15 AM, ayan guha  wrote:

> Thanks for your info. I am planning to implement a pig udf to do record
> look ups. Kindly let me know if this is a good idea.
>
> Best
> Ayan
>
> On Thu, Sep 3, 2015 at 2:55 PM, Jörn Franke  wrote:
>
>>
>> You may check if it makes sense to write a coprocessor doing an upsert
>> for you, if it does not exist already. Maybe phoenix for Hbase supports
>> this already.
>>
>> Another alternative, if the records do not have an unique Id, is to put
>> them into a text index engine, such as Solr or Elasticsearch, which does in
>> this case a fast matching with relevancy scores.
>>
>>
>> You can use also Spark and Pig there. However, I am not sure if Spark is
>> suitable for these one row lookups. Same holds for Pig.
>>
>>
>> Le mer. 2 sept. 2015 à 23:53, ayan guha  a écrit :
>>
>> Hello group
>>
>> I am trying to use pig or spark in order to achieve following:
>>
>> 1. Write a batch process which will read from a file
>> 2. Lookup hbase to see if the record exists. If so then need to compare
>> incoming values with hbase and update fields which do not match. Else
>> create a new record.
>>
>> My questions:
>> 1. Is this a good use case for pig or spark?
>> 2. Is there any way to read hbase for each incoming record in pig without
>> writing map reduce code?
>> 3. In case of spark I think we have to connect to hbase for every record.
>> Is thr any other way?
>> 4. What is the best connector for hbase which gives this functionality?
>>
>> Best
>>
>> Ayan
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 

Thanks!
Tao


Re: Hbase Lookup

2015-09-03 Thread Tao Lu
But I don't see how it works here with phoenix or hbase coprocessor.
Remember we are joining 2 big data sets here, one is the big file in HDFS,
and records in HBASE. The driving force comes from Hadoop cluster.




On Thu, Sep 3, 2015 at 11:37 AM, Jörn Franke  wrote:

> If you use pig or spark you increase the complexity from an operations
> management perspective significantly. Spark should be seen from a platform
> perspective if it make sense. If you can do it directly with hbase/phoenix
> or only hbase coprocessor then this should be preferred. Otherwise you pay
> more money for maintenance and development.
>
> Le jeu. 3 sept. 2015 à 17:16, Tao Lu  a écrit :
>
>> Yes. Ayan, you approach will work.
>>
>> Or alternatively, use Spark, and write a Scala/Java function which
>> implements similar logic in your Pig UDF.
>>
>> Both approaches look similar.
>>
>> Personally, I would go with Spark solution, it will be slightly faster,
>> and easier if you already have Spark cluster setup on top of your hadoop
>> cluster in your infrastructure.
>>
>> Cheers,
>> Tao
>>
>>
>> On Thu, Sep 3, 2015 at 1:15 AM, ayan guha  wrote:
>>
>>> Thanks for your info. I am planning to implement a pig udf to do record
>>> look ups. Kindly let me know if this is a good idea.
>>>
>>> Best
>>> Ayan
>>>
>>> On Thu, Sep 3, 2015 at 2:55 PM, Jörn Franke 
>>> wrote:
>>>

 You may check if it makes sense to write a coprocessor doing an upsert
 for you, if it does not exist already. Maybe phoenix for Hbase supports
 this already.

 Another alternative, if the records do not have an unique Id, is to put
 them into a text index engine, such as Solr or Elasticsearch, which does in
 this case a fast matching with relevancy scores.


 You can use also Spark and Pig there. However, I am not sure if Spark
 is suitable for these one row lookups. Same holds for Pig.


 Le mer. 2 sept. 2015 à 23:53, ayan guha  a écrit :

 Hello group

 I am trying to use pig or spark in order to achieve following:

 1. Write a batch process which will read from a file
 2. Lookup hbase to see if the record exists. If so then need to compare
 incoming values with hbase and update fields which do not match. Else
 create a new record.

 My questions:
 1. Is this a good use case for pig or spark?
 2. Is there any way to read hbase for each incoming record in pig
 without writing map reduce code?
 3. In case of spark I think we have to connect to hbase for every
 record. Is thr any other way?
 4. What is the best connector for hbase which gives this functionality?

 Best

 Ayan



>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> 
>> Thanks!
>> Tao
>>
>


-- 

Thanks!
Tao


Re: Small File to HDFS

2015-09-03 Thread nibiau
My main question in case of HAR usage is , is it possible to use Pig on it and 
what about performances ?

- Mail original -
De: "Jörn Franke" 
À: nib...@free.fr, user@spark.apache.org
Envoyé: Jeudi 3 Septembre 2015 15:54:42
Objet: Re: Small File to HDFS




Store them as hadoop archive (har) 


Le mer. 2 sept. 2015 à 18:07, < nib...@free.fr > a écrit : 


Hello, 
I'am currently using Spark Streaming to collect small messages (events) , size 
being <50 KB , volume is high (several millions per day) and I have to store 
those messages in HDFS. 
I understood that storing small files can be problematic in HDFS , how can I 
manage it ? 

Tks 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



pySpark window functions are not working in the same way as Spark/Scala ones

2015-09-03 Thread Sergey Shcherbakov
Hello all,

I'm experimenting with Spark 1.4.1 window functions
and have come to a problem in pySpark that I've described in a Stackoverflow
question


In essence, the

wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank")).collect()
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next")).collect()

does not work in pySpark: exception for the first collect() and None output
from window function in the second collect().

While the same example in Spark/Scala works fine:

val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"),
lead(df("b"),1).over(wSpec).alias("next"))

Am I doing anything wrong or this is a pySpark issue indeed?


Best Regards,
Sergey

PS: Here is the full pySpark shell example:

from pyspark.sql.window import Window
import pyspark.sql.functions as func

l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])
wSpec = Window.orderBy(df.a).rowsBetween(-1,1)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
# ==> Failure org.apache.spark.sql.AnalysisException: Window function rank
does not take a frame specification.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next"))
# ===>  org.apache.spark.sql.AnalysisException: Window function lag does
not take a frame specification.;


wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
# ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more
arguments are expected.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next")).collect()
# [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202,
next=None), Row(a=3, prev=None, b=303, next=None)]


Re: Small File to HDFS

2015-09-03 Thread Jörn Franke
Store them as hadoop archive (har)

Le mer. 2 sept. 2015 à 18:07,   a écrit :

> Hello,
> I'am currently using Spark Streaming to collect small messages (events) ,
> size being <50 KB , volume is high (several millions per day) and I have to
> store those messages in HDFS.
> I understood that storing small files can be problematic in HDFS , how can
> I manage it ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Small File to HDFS

2015-09-03 Thread Tao Lu
Your requirements conflict with each other.
1. You want to dump all your messages somewhere
2. You want to be able to update/delete individual message
3. You don't want to introduce anther NOSQL database(like HBASE) since you
already have all messages stored in MongoDB

My suggestion is:
1. Don't introduce another layer of complexity
2. After Spark Streaming, don't store the raw data again
3. If your MongoDB is for OLTP, then simply clone your MongoDB in your OLAP
environment

Cheers,
Tao




On Thu, Sep 3, 2015 at 10:17 AM,  wrote:

> My main question in case of HAR usage is , is it possible to use Pig on it
> and what about performances ?
>
> - Mail original -
> De: "Jörn Franke" 
> À: nib...@free.fr, user@spark.apache.org
> Envoyé: Jeudi 3 Septembre 2015 15:54:42
> Objet: Re: Small File to HDFS
>
>
>
>
> Store them as hadoop archive (har)
>
>
> Le mer. 2 sept. 2015 à 18:07, < nib...@free.fr > a écrit :
>
>
> Hello,
> I'am currently using Spark Streaming to collect small messages (events) ,
> size being <50 KB , volume is high (several millions per day) and I have to
> store those messages in HDFS.
> I understood that storing small files can be problematic in HDFS , how can
> I manage it ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Thanks!
Tao


Re: Slow Mongo Read from Spark

2015-09-03 Thread Jörn Franke
You might think about another storage layer not being mongodb
(hdfs+orc+compression or hdfs+parquet+compression)  to improve performance

Le jeu. 3 sept. 2015 à 9:15, Akhil Das  a
écrit :

> On SSD you will get around 30-40MB/s on a single machine (on 4 cores).
>
> Thanks
> Best Regards
>
> On Mon, Aug 31, 2015 at 3:13 PM, Deepesh Maheshwari <
> deepesh.maheshwar...@gmail.com> wrote:
>
>> tried it,,gives the same above exception
>>
>> Exception in thread "main" java.io.IOException: No FileSystem for scheme:
>> mongodb
>>
>> In you case, do you have used above code.
>> What read throughput , you get?
>>
>> On Mon, Aug 31, 2015 at 2:04 PM, Akhil Das 
>> wrote:
>>
>>> FYI, newAPIHadoopFile and newAPIHadoopRDD uses the NewHadoopRDD class
>>> itself underneath and it doesnt mean it will only read from HDFS. Give it a
>>> shot if you haven't tried it already (it just the inputformat and the
>>> reader which are different from your approach).
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Aug 31, 2015 at 1:14 PM, Deepesh Maheshwari <
>>> deepesh.maheshwar...@gmail.com> wrote:
>>>
 Hi Akhil,

 This code snippet is from below link

 https://github.com/crcsmnky/mongodb-spark-demo/blob/master/src/main/java/com/mongodb/spark/demo/Recommender.java

 Here it reading data from HDFS file system but in our case i need to
 read from mongodb.

 I have tried it earlier and now again tried it but is giving below
 error which is self explanantory.

 Exception in thread "main" java.io.IOException: No FileSystem for
 scheme: mongodb

 On Mon, Aug 31, 2015 at 1:03 PM, Akhil Das 
 wrote:

> Here's a piece of code which works well for us (spark 1.4.1)
>
> Configuration bsonDataConfig = new Configuration();
> bsonDataConfig.set("mongo.job.input.format",
> "com.mongodb.hadoop.BSONFileInputFormat");
>
> Configuration predictionsConfig = new Configuration();
> predictionsConfig.set("mongo.output.uri", mongodbUri);
>
> JavaPairRDD bsonRatingsData =
> sc.newAPIHadoopFile(
> ratingsUri, BSONFileInputFormat.class, Object.class,
> BSONObject.class, bsonDataConfig);
>
>
> Thanks
> Best Regards
>
> On Mon, Aug 31, 2015 at 12:59 PM, Deepesh Maheshwari <
> deepesh.maheshwar...@gmail.com> wrote:
>
>> Hi, I am using 1.3.0
>>
>> I am not getting constructor for above values
>>
>> [image: Inline image 1]
>>
>> So, i tried to shuffle the values in constructor .
>> [image: Inline image 2]
>>
>> But, it is giving this error.Please suggest
>> [image: Inline image 3]
>>
>> Best Regards
>>
>> On Mon, Aug 31, 2015 at 12:43 PM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> Can you try with these key value classes and see the performance?
>>>
>>> inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
>>>
>>>
>>> keyClassName = "org.apache.hadoop.io.Text"
>>> valueClassName = "org.apache.hadoop.io.MapWritable"
>>>
>>>
>>> Taken from databricks blog
>>> 
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Aug 31, 2015 at 12:26 PM, Deepesh Maheshwari <
>>> deepesh.maheshwar...@gmail.com> wrote:
>>>
 Hi, I am trying to read mongodb in Spark newAPIHadoopRDD.

 / Code */

 config.set("mongo.job.input.format",
 "com.mongodb.hadoop.MongoInputFormat");
 config.set("mongo.input.uri",SparkProperties.MONGO_OUTPUT_URI);
 config.set("mongo.input.query","{host: 'abc.com'}");

 JavaSparkContext sc=new JavaSparkContext("local", "MongoOps");

 JavaPairRDD mongoRDD =
 sc.newAPIHadoopRDD(config,
 com.mongodb.hadoop.MongoInputFormat.class,
 Object.class,
 BSONObject.class);

 long count=mongoRDD.count();

 There are about 1.5million record.
 Though i am getting data but read operation took around 15min to
 read whole.

 Is this Api really too slow or am i missing something.
 Please suggest if there is an alternate approach to read data from
 Mongo faster.

 Thanks,
 Deepesh

>>>
>>>
>>
>

>>>
>>
>


Re: Tuning - tasks per core

2015-09-03 Thread Igor Berman
suppose you have 1 job that do some transformation, suppose you have X
cores in your cluster and you are willing to give all of them to your job
suppose you have no shuffles(to keep it simple)

set number of partitions of your input data to be 3X or 2X, thus you'll get
2/3 tasks per each core

On 3 September 2015 at 15:56, Hans van den Bogert 
wrote:

> The tuning documentations tells us to have 2-3 tasks per CPU core
>
> > In general, we recommend 2-3 tasks per CPU core in your cluster.
>
> I’m wondering how you’d actually accomplish this.
> Setting spark.task.cpus to a fraction like 0.5 or 0.3 does not work.
>
> Perhaps I’m misunderstanding, any advice is welcome,
>
> Regards,
>
> Hans
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Small File to HDFS

2015-09-03 Thread Martin Menzel
Hello Nicolas,

I solved a similar problem using FSDataOutputStream

http://blog.woopi.org/wordpress/files/hadoop-2.6.0-javadoc/org/apache/hadoop/fs/FSDataOutputStream.html

Each entry can be a ArrayWritable

http://blog.woopi.org/wordpress/files/hadoop-2.6.0-javadoc/org/apache/hadoop/io/ArrayWritable.html

so that you can put a arbitrary sequence of Writable subtypes into one
entry/element.

During my tests, best performance results could be achieved when using
binary ZLIB library. GZIP or bzip2 delivered slightly smaller files bute
the processing time was several TIMES slower in my cases.

Regarding update and delete:

As far as I know HDFS does not support update and delete. Tools like HBase
realize this by using several HDFS files and rewriting them from time to
time. Depending on the frequence you need to update / delete data, you can
think about housekeeping your HDFS file by yourself. You delete an entry by
writing a delete flag somewhere else refering to a key of your single data
entry. If you have to much deleted and you want really to cleanup the HDFS
file you can rewrite the HDFS file using MAPREDUCE for example.

In the update case you can use a similar technique. Just append the new /
updated version of your dataset and write a delete flag to the old version
somewhere else.

As a summary you should think about the complexity of your own hand my
solution in comparison with a HBase solution (like mentioned before). If
you don't like key value store databases you can also have a look to
phoenix on top of HBase which delivers a very SQL like access layer.

The only restriction I know for HBase is, that a single dataset should not
be bigger than the size of HDFS blocks.

I hope the comments help you. If you have questions don't hestitate to
cantact me.

Good luck

Martin



2015-09-03 16:17 GMT+02:00 :

> My main question in case of HAR usage is , is it possible to use Pig on it
> and what about performances ?
>
> - Mail original -
> De: "Jörn Franke" 
> À: nib...@free.fr, user@spark.apache.org
> Envoyé: Jeudi 3 Septembre 2015 15:54:42
> Objet: Re: Small File to HDFS
>
>
>
>
> Store them as hadoop archive (har)
>
>
> Le mer. 2 sept. 2015 à 18:07, < nib...@free.fr > a écrit :
>
>
> Hello,
> I'am currently using Spark Streaming to collect small messages (events) ,
> size being <50 KB , volume is high (several millions per day) and I have to
> store those messages in HDFS.
> I understood that storing small files can be problematic in HDFS , how can
> I manage it ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: How to Take the whole file as a partition

2015-09-03 Thread Ewan Leith
Have a look at the sparkContext.binaryFiles, it works like wholeTextFiles but 
returns a PortableDataStream per file. It might be a workable solution though 
you'll need to handle the binary to UTF-8 or equivalent conversion

Thanks,
Ewan

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: 03 September 2015 15:22
To: user@spark.apache.org
Subject: How to Take the whole file as a partition

Hi All,

I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark can 
read them as partition on the file level. Which means want the FileSplit turn 
off.

I know there are some solutions, but not very good in my case:
1, I can't use WholeTextFiles method, because my file is too big, I don't want 
to risk the performance.
2, I try to use newAPIHadoopFile and turnoff the file split:

lines = 
ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class, 
LongWritable.class, Text.class, hadoopConf).values()

.map(new Function() {

@Override

public String call(Text arg0) throws Exception {

return arg0.toString();

}

});

This works for some cases, but it truncate some lines (I am not sure why, but 
it looks like there is a limit on this file reading). I have a feeling that the 
spark truncate this file on 2GB bytes. Anyway it happens (because same data has 
no issue when I use mapreduce to do the input), the spark sometimes do a trunc 
on very big file if try to read all of them.

3, I can do another way is distribute the file name as the input of the Spark 
and in function open stream to read the file directly. This is what I am 
planning to do but I think it is ugly. I want to know anyone have better 
solution for it?

BTW: the file currently in text format, but it might be parquet format later, 
that is also reason I don't like my third option.

Regards,

Shuai


NOT IN in Spark SQL

2015-09-03 Thread Pietro Gentile
Hi all,

How can I do to use the "NOT IN" clause in Spark SQL 1.2 ??

He continues to give me syntax errors. But the question is correct in SQL.

Thanks in advance,
Best regards,

Pietro.


Re: Hbase Lookup

2015-09-03 Thread Jörn Franke
If you use pig or spark you increase the complexity from an operations
management perspective significantly. Spark should be seen from a platform
perspective if it make sense. If you can do it directly with hbase/phoenix
or only hbase coprocessor then this should be preferred. Otherwise you pay
more money for maintenance and development.

Le jeu. 3 sept. 2015 à 17:16, Tao Lu  a écrit :

> Yes. Ayan, you approach will work.
>
> Or alternatively, use Spark, and write a Scala/Java function which
> implements similar logic in your Pig UDF.
>
> Both approaches look similar.
>
> Personally, I would go with Spark solution, it will be slightly faster,
> and easier if you already have Spark cluster setup on top of your hadoop
> cluster in your infrastructure.
>
> Cheers,
> Tao
>
>
> On Thu, Sep 3, 2015 at 1:15 AM, ayan guha  wrote:
>
>> Thanks for your info. I am planning to implement a pig udf to do record
>> look ups. Kindly let me know if this is a good idea.
>>
>> Best
>> Ayan
>>
>> On Thu, Sep 3, 2015 at 2:55 PM, Jörn Franke  wrote:
>>
>>>
>>> You may check if it makes sense to write a coprocessor doing an upsert
>>> for you, if it does not exist already. Maybe phoenix for Hbase supports
>>> this already.
>>>
>>> Another alternative, if the records do not have an unique Id, is to put
>>> them into a text index engine, such as Solr or Elasticsearch, which does in
>>> this case a fast matching with relevancy scores.
>>>
>>>
>>> You can use also Spark and Pig there. However, I am not sure if Spark is
>>> suitable for these one row lookups. Same holds for Pig.
>>>
>>>
>>> Le mer. 2 sept. 2015 à 23:53, ayan guha  a écrit :
>>>
>>> Hello group
>>>
>>> I am trying to use pig or spark in order to achieve following:
>>>
>>> 1. Write a batch process which will read from a file
>>> 2. Lookup hbase to see if the record exists. If so then need to compare
>>> incoming values with hbase and update fields which do not match. Else
>>> create a new record.
>>>
>>> My questions:
>>> 1. Is this a good use case for pig or spark?
>>> 2. Is there any way to read hbase for each incoming record in pig
>>> without writing map reduce code?
>>> 3. In case of spark I think we have to connect to hbase for every
>>> record. Is thr any other way?
>>> 4. What is the best connector for hbase which gives this functionality?
>>>
>>> Best
>>>
>>> Ayan
>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> 
> Thanks!
> Tao
>


How to Take the whole file as a partition

2015-09-03 Thread Shuai Zheng
Hi All,

 

I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark
can read them as partition on the file level. Which means want the FileSplit
turn off.

 

I know there are some solutions, but not very good in my case:

1, I can't use WholeTextFiles method, because my file is too big, I don't
want to risk the performance.

2, I try to use newAPIHadoopFile and turnoff the file split:

 

lines =
ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class,
LongWritable.class, Text.class, hadoopConf).values()

 
.map(new Function() {

 
@Override

 
public String call(Text arg0) throws Exception {

 
return arg0.toString();

 
}

 
});

 

This works for some cases, but it truncate some lines (I am not sure why,
but it looks like there is a limit on this file reading). I have a feeling
that the spark truncate this file on 2GB bytes. Anyway it happens (because
same data has no issue when I use mapreduce to do the input), the spark
sometimes do a trunc on very big file if try to read all of them.

 

3, I can do another way is distribute the file name as the input of the
Spark and in function open stream to read the file directly. This is what I
am planning to do but I think it is ugly. I want to know anyone have better
solution for it?

 

BTW: the file currently in text format, but it might be parquet format
later, that is also reason I don't like my third option.

 

Regards,

 

Shuai



Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-03 Thread Dmitry Goldenberg
I'm seeing an oddity where I initially set the batchdurationmillis to 1
second and it works fine:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(batchDurationMillis));

Then I tried changing the value to 10 seconds. The change didn't seem to
take. I've bounced the Spark workers and the consumers and now I'm seeing
RDD's coming in once around 10 seconds (not always 10 seconds according to
the logs).

However, now I'm trying to change the value to 20 seconds and it's just not
taking. I've bounced Spark master, workers, and consumers and the value
seems "stuck" at 10 seconds.

Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.

Thanks.

- Dmitry


Re: How to Take the whole file as a partition

2015-09-03 Thread Tao Lu
You situation is special. It seems to me Spark may not fit well in your
case.

You want to process the individual files (500M~2G) as a whole, you want
good performance.

You may want to write our own Scala/Java programs and distribute it along
with those files across your cluster, and run them in parallel.

If you insist on using Spark, maybe option 3 is closer.

Cheers,
Tao


On Thu, Sep 3, 2015 at 10:22 AM, Shuai Zheng  wrote:

> Hi All,
>
>
>
> I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark
> can read them as partition on the file level. Which means want the
> FileSplit turn off.
>
>
>
> I know there are some solutions, but not very good in my case:
>
> 1, I can’t use WholeTextFiles method, because my file is too big, I don’t
> want to risk the performance.
>
> 2, I try to use newAPIHadoopFile and turnoff the file split:
>
>
>
> lines =
> ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class,
> LongWritable.class, Text.class, hadoopConf).values()
>
>
> .map(new Function() {
>
>
> @Override
>
>
> public String call(Text arg0) throws Exception {
>
>
> return arg0.toString();
>
>
> }
>
>
> });
>
>
>
> This works for some cases, but it truncate some lines (I am not sure why,
> but it looks like there is a limit on this file reading). I have a feeling
> that the spark truncate this file on 2GB bytes. Anyway it happens (because
> same data has no issue when I use mapreduce to do the input), the spark
> sometimes do a trunc on very big file if try to read all of them.
>
>
>
> 3, I can do another way is distribute the file name as the input of the
> Spark and in function open stream to read the file directly. This is what I
> am planning to do but I think it is ugly. I want to know anyone have better
> solution for it?
>
>
>
> BTW: the file currently in text format, but it might be parquet format
> later, that is also reason I don’t like my third option.
>
>
>
> Regards,
>
>
>
> Shuai
>



-- 

Thanks!
Tao


Re: Slow Mongo Read from Spark

2015-09-03 Thread Deepesh Maheshwari
Because of existing architecture , i am bound to use mongodb.

Please suggest for this

On Thu, Sep 3, 2015 at 9:10 PM, Jörn Franke  wrote:

> You might think about another storage layer not being mongodb
> (hdfs+orc+compression or hdfs+parquet+compression)  to improve performance
>
> Le jeu. 3 sept. 2015 à 9:15, Akhil Das  a
> écrit :
>
>> On SSD you will get around 30-40MB/s on a single machine (on 4 cores).
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Aug 31, 2015 at 3:13 PM, Deepesh Maheshwari <
>> deepesh.maheshwar...@gmail.com> wrote:
>>
>>> tried it,,gives the same above exception
>>>
>>> Exception in thread "main" java.io.IOException: No FileSystem for
>>> scheme: mongodb
>>>
>>> In you case, do you have used above code.
>>> What read throughput , you get?
>>>
>>> On Mon, Aug 31, 2015 at 2:04 PM, Akhil Das 
>>> wrote:
>>>
 FYI, newAPIHadoopFile and newAPIHadoopRDD uses the NewHadoopRDD class
 itself underneath and it doesnt mean it will only read from HDFS. Give it a
 shot if you haven't tried it already (it just the inputformat and the
 reader which are different from your approach).

 Thanks
 Best Regards

 On Mon, Aug 31, 2015 at 1:14 PM, Deepesh Maheshwari <
 deepesh.maheshwar...@gmail.com> wrote:

> Hi Akhil,
>
> This code snippet is from below link
>
> https://github.com/crcsmnky/mongodb-spark-demo/blob/master/src/main/java/com/mongodb/spark/demo/Recommender.java
>
> Here it reading data from HDFS file system but in our case i need to
> read from mongodb.
>
> I have tried it earlier and now again tried it but is giving below
> error which is self explanantory.
>
> Exception in thread "main" java.io.IOException: No FileSystem for
> scheme: mongodb
>
> On Mon, Aug 31, 2015 at 1:03 PM, Akhil Das  > wrote:
>
>> Here's a piece of code which works well for us (spark 1.4.1)
>>
>> Configuration bsonDataConfig = new Configuration();
>> bsonDataConfig.set("mongo.job.input.format",
>> "com.mongodb.hadoop.BSONFileInputFormat");
>>
>> Configuration predictionsConfig = new Configuration();
>> predictionsConfig.set("mongo.output.uri", mongodbUri);
>>
>> JavaPairRDD bsonRatingsData =
>> sc.newAPIHadoopFile(
>> ratingsUri, BSONFileInputFormat.class, Object.class,
>> BSONObject.class, bsonDataConfig);
>>
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Aug 31, 2015 at 12:59 PM, Deepesh Maheshwari <
>> deepesh.maheshwar...@gmail.com> wrote:
>>
>>> Hi, I am using 1.3.0
>>>
>>> I am not getting constructor for above values
>>>
>>> [image: Inline image 1]
>>>
>>> So, i tried to shuffle the values in constructor .
>>> [image: Inline image 2]
>>>
>>> But, it is giving this error.Please suggest
>>> [image: Inline image 3]
>>>
>>> Best Regards
>>>
>>> On Mon, Aug 31, 2015 at 12:43 PM, Akhil Das <
>>> ak...@sigmoidanalytics.com> wrote:
>>>
 Can you try with these key value classes and see the performance?

 inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"


 keyClassName = "org.apache.hadoop.io.Text"
 valueClassName = "org.apache.hadoop.io.MapWritable"


 Taken from databricks blog
 

 Thanks
 Best Regards

 On Mon, Aug 31, 2015 at 12:26 PM, Deepesh Maheshwari <
 deepesh.maheshwar...@gmail.com> wrote:

> Hi, I am trying to read mongodb in Spark newAPIHadoopRDD.
>
> / Code */
>
> config.set("mongo.job.input.format",
> "com.mongodb.hadoop.MongoInputFormat");
> config.set("mongo.input.uri",SparkProperties.MONGO_OUTPUT_URI);
> config.set("mongo.input.query","{host: 'abc.com'}");
>
> JavaSparkContext sc=new JavaSparkContext("local", "MongoOps");
>
> JavaPairRDD mongoRDD =
> sc.newAPIHadoopRDD(config,
> com.mongodb.hadoop.MongoInputFormat.class,
> Object.class,
> BSONObject.class);
>
> long count=mongoRDD.count();
>
> There are about 1.5million record.
> Though i am getting data but read operation took around 15min to
> read whole.
>
> Is this Api really too slow or am i missing something.
> Please suggest if there is an alternate approach to read data from
> Mongo faster.
>
> Thanks,
> Deepesh
>


>>>
>>
>

>>>
>>


Re: Is it required to remove checkpoint when submitting a code change?

2015-09-03 Thread Ricardo Luis Silva Paiva
Good tip. I will try that.

Thank you.

On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger  wrote:

> Yeah, in general if you're changing the jar you can't recover the
> checkpoint.
>
> If you're just changing parameters, why not externalize those in a
> configuration file so your jar doesn't change?  I tend to stick even my
> app-specific parameters in an external spark config so everything is in one
> place.
>
> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>> Hi,
>>
>> Is there a way to submit an app code change, keeping the checkpoint data
>> or do I need to erase the checkpoint folder every time I re-submit the
>> spark app with a new jar?
>>
>> I have an app that count pageviews streaming from Kafka, and deliver a
>> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with
>> the reduce and inverse functions set.
>>
>> I'm doing some code improvements and would like to keep the data from the
>> past hours, so when I re-submit a code change, I would keep delivering the
>> pageviews aggregation without need to wait for 24 hours of new data.
>> Sometimes I'm just changing the submission parameters, like number of
>> executors, memory and cores.
>>
>> Many thanks,
>>
>> Ricardo
>>
>> --
>> Ricardo Paiva
>> Big Data / Semântica
>> *globo.com* 
>>
>
>


-- 
Ricardo Paiva
Big Data / Semântica
*globo.com* 


spark.shuffle.spill=false ignored?

2015-09-03 Thread Eric Walker
Hi,

I am using Spark 1.3.1 on EMR with lots of memory.  I have attempted to run
a large pyspark job several times, specifying `spark.shuffle.spill=false`
in different ways.  It seems that the setting is ignored, at least
partially, and some of the tasks start spilling large amounts of data to
disk.  The job has been fast enough in the past, but once it starts
spilling to disk it lands on Miller's planet [1].

Is this expected behavior?  Is it a misconfiguration on my part, e.g.,
could there be an incompatible setting that is overriding
`spark.shuffle.spill=false`?  Is it something that goes back to Spark
1.3.1?  Is it something that goes back to EMR?  When I've allowed the job
to continue on for a while, I've started to see Kryo stack traces in the
tasks that are spilling to disk.  The stack traces mention there not being
enough disk space, although a `df` shows plenty of space (perhaps after the
fact, when temporary files have been cleaned up).

Has anyone run into something like this before?  I would be happy to see
OOM errors, because that would be consistent with one understanding of what
might be going on, but I haven't yet.

Eric


[1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active


Re: large number of import-related function calls in PySpark profile

2015-09-03 Thread Priedhorsky, Reid

On Sep 2, 2015, at 11:31 PM, Davies Liu 
> wrote:

Could you have a short script to reproduce this?

Good point. Here you go. This is Python 3.4.3 on Ubuntu 15.04.

import pandas as pd  # must be in default path for interpreter
import pyspark

LEN = 260
ITER_CT = 1

conf = pyspark.SparkConf()
conf.set('spark.python.profile', 'true')
sc = pyspark.SparkContext(conf=conf)

a = sc.broadcast(pd.Series(range(LEN)))

def map_(i):
   b = pd.Series(range(LEN))
   for i in range(ITER_CT):
  b.corr(a.value)
   return None

shards = sc.parallelize(range(sc.defaultParallelism), sc.defaultParallelism)
data = shards.map(map_)
data.collect()

sc.show_profiles()

Run as:

$ spark-submit --master local[4] demo.py

Here’s a profile excerpt:


Profile of 

Re: Too many open files issue

2015-09-03 Thread Sigurd Knippenberg
I don't think that is the issue. I have it setup to run in a thread pool
but I have set the pool size to 1 for this test until I get this resolved.
I am having some problems with using the Spark web portal since it is
picking a random port and with the way my environment is setup, by time I
have figured out which port it is using the job has finished. But what I
did do is add some logging and I added collecting the RDD record count to
make sure the last logging statements were in fact executed after the RDD
process ran. I added the logging statements in the job flow:

val directories = findModifiedFiles()
directories.foreach(directory => {
log 'Starting directory processor for $directory'
rdd = sparkContext.newAPIHadoopFile(directory)
.filter(...)
.map(...)
.reduceByKey(...)

rdd.foreachPartition(iterator => {
iterator.foreach(tuple => {
// send data to kafka
}
}

val count = rdd.count
log 'Processed $count records for $directory'
log 'Finished directory processor for $directory'
}

This results in these log lines until the "Too many open files in system"
errors started happening after which it only printed the first log line for
each iteration (as expected since it's throwing an exception).

Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/15
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/15
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/15
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/16
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/16
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/16
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/17
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/17
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/17
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/18
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/18
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/18
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/19
Processed 4 records for /user/oozie/topics/testtopic/hourly/2015/04/19
Finished directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/19
Starting directory processor for
/user/oozie/topics/testtopic/hourly/2015/04/20

Just to see what would happen if I made the job run slower and had GC run
more frequently (in case it had something to do with that), I added the
following to each loop iteration:
  System.gc()
  Thread.sleep(5000)

But besides making the job run a lot longer it did not change anything.

Sigurd


On Wed, Sep 2, 2015 at 9:40 AM, Saisai Shao  wrote:

> Here is the code in which NewHadoopRDD register close handler and be
> called when the task is completed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
> ).
>
> From my understanding, possibly the reason is that this `foreach` code in
> your implementation is not executed Spark job one by one in loop as
> expected, on the contrary all the jobs are submitted to the DAGScheduler
> simultaneously, since each job has no dependency to others, Spark's
> scheduler will unwrap the loop and submit jobs in parallelism, so maybe
> several map stages are running and pending, this makes your node out of
> file handler.
>
> You could check Spark web portal to see if there's several map stages
> running simultaneously, or some of them are running while others are
> pending.
>
> Thanks
> Jerry
>
>
> On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg  > wrote:
>
>> Yep. I know. It's was set to 32K when I ran this test. If I bump it to
>> 64K the issue goes away. It still doesn't make sense to me that the Spark
>> job doesn't release its file handles until the end of the job instead of
>> doing that while my loop iterates.
>>
>> Sigurd
>>
>> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
>> wrote:
>>
>>>
>>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
>>> wrote:
>>>
>>> I know I can adjust the max open files allowed by the OS but I'd rather
>>> fix the underlaying issue.
>>>
>>>
>>>
>>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>>
>>> https://wiki.apache.org/hadoop/TooManyOpenFiles
>>>
>>
>>
>


Re: Spark partitions from CassandraRDD

2015-09-03 Thread Ankur Srivastava
Hi Alaa,

Partition when using CassandraRDD depends on your partition key in
Cassandra table.

If you see only 1 partition in the RDD it means all the rows you have
selected have same partition_key in C*

Thanks
Ankur


On Thu, Sep 3, 2015 at 11:54 AM, Alaa Zubaidi (PDF) 
wrote:

> Hi,
>
> I testing Spark and Cassandra, Spark 1.4, Cassandra 2.1.7 cassandra spark
> connector 1.4, running in standalone mode.
>
> I am getting 4000 rows from Cassandra (4mb row), where the row keys are
> random.
> .. sc.cassandraTable[RES](keyspace,res_name).where(res_where).cache
>
> I am expecting that it will generate few partitions.
> However, I can ONLY see 1 partition.
> I cached the CassandraRDD and in the UI storage tab it shows ONLY 1
> partition.
>
> Any idea, why I am getting 1 partition?
>
> Thanks,
> Alaa
>
>
>
> *This message may contain confidential and privileged information. If it
> has been sent to you in error, please reply to advise the sender of the
> error and then immediately permanently delete it and all attachments to it
> from your systems. If you are not the intended recipient, do not read,
> copy, disclose or otherwise use this message or any attachments to it. The
> sender disclaims any liability for such unauthorized use. PLEASE NOTE that
> all incoming e-mails sent to PDF e-mail accounts will be archived and may
> be scanned by us and/or by external service providers to detect and prevent
> threats to our systems, investigate illegal or inappropriate behavior,
> and/or eliminate unsolicited promotional e-mails (“spam”). If you have any
> concerns about this process, please contact us at *
> *legal.departm...@pdf.com* *.*


Re: Ranger-like Security on Spark

2015-09-03 Thread Matei Zaharia
If you run on YARN, you can use Kerberos, be authenticated as the right user, 
etc in the same way as MapReduce jobs.

Matei

> On Sep 3, 2015, at 1:37 PM, Daniel Schulz  
> wrote:
> 
> Hi,
> 
> I really enjoy using Spark. An obstacle to sell it to our clients currently 
> is the missing Kerberos-like security on a Hadoop with simple authentication. 
> Are there plans, a proposal, or a project to deliver a Ranger plugin or 
> something similar to Spark. The target is to differentiate users and their 
> privileges when reading and writing data to HDFS? Is Kerberos my only option 
> then?
> 
> Kind regards, Daniel.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem while loading saved data

2015-09-03 Thread Ewan Leith
>From that, I'd guesd that HDFS isn't setup between the nodes, or for some 
>reason writes are defaulting to file:///path/ rather than hdfs:///path/




-- Original message--

From: Amila De Silva

Date: Thu, 3 Sep 2015 17:12

To: Ewan Leith;

Cc: user@spark.apache.org;

Subject:Re: Problem while loading saved data


Hi Ewan,

Yes, 'people.parquet' is from the first attempt and in that attempt it tried to 
save the same people.json.

It seems that the same folder is created on both the nodes and contents of the 
files are distributed between the two servers.

On the master node(this is the same node which runs IPython Notebook) this is 
what I have:

people.parquet
└── _SUCCESS

On the slave I get,
people.parquet
└── _temporary
└── 0
├── task_201509030057_4699_m_00
│   └── part-r-0-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
├── task_201509030057_4699_m_01
│   └── part-r-1-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
└── _temporary

I have zipped and attached both the folders.

On Thu, Sep 3, 2015 at 5:58 PM, Ewan Leith 
> wrote:
Your error log shows you attempting to read from 'people.parquet2' not 
‘people.parquet’ as you’ve put below, is that just from a different attempt?

Otherwise, it’s an odd one! There aren’t _SUCCESS, _common_metadata and 
_metadata files under people.parquet that you’ve listed below, which would 
normally be created when the write completes, can you show us your write output?


Thanks,
Ewan



From: Amila De Silva [mailto:jaa...@gmail.com]
Sent: 03 September 2015 05:44
To: Guru Medasani >
Cc: user@spark.apache.org
Subject: Re: Problem while loading saved data

Hi Guru,

Thanks for the reply.

Yes, I checked if the file exists. But instead of a single file what I found 
was a directory having the following structure.

people.parquet
└── _temporary
└── 0
├── task_201509030057_4699_m_00
│   └── part-r-0-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
├── task_201509030057_4699_m_01
│   └── part-r-1-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
└── _temporary


On Thu, Sep 3, 2015 at 7:13 AM, Guru Medasani 
> wrote:
Hi Amila,

Error says that the ‘people.parquet’ file does not exist. Can you manually 
check to see if that file exists?


Py4JJavaError: An error occurred while calling o53840.parquet.

: java.lang.AssertionError: assertion failed: No schema defined, and no Parquet 
data file or summary file found under file:/home/ubuntu/ipython/people.parquet2.



Guru Medasani
gdm...@gmail.com



On Sep 2, 2015, at 8:25 PM, Amila De Silva 
> wrote:

Hi All,

I have a two node spark cluster, to which I'm connecting using IPython notebook.
To see how data saving/loading works, I simply created a dataframe using 
people.json using the Code below;

df = sqlContext.read.json("examples/src/main/resources/people.json")

Then called the following to save the dataframe as a parquet.
df.write.save("people.parquet")

Tried loading the saved dataframe using;
df2 = sqlContext.read.parquet('people.parquet');

But this simply fails giving the following exception


---

Py4JJavaError Traceback (most recent call last)

 in ()

> 1 df2 = sqlContext.read.parquet('people.parquet2');



/srv/spark/python/pyspark/sql/readwriter.pyc in parquet(self, *path)

154 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 
'int')]

155 """

--> 156 return 
self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))

157

158 @since(1.4)



/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
__call__(self, *args)

536 answer = self.gateway_client.send_command(command)

537 return_value = get_return_value(answer, self.gateway_client,

--> 538 self.target_id, self.name)

539

540 for temp_arg in temp_args:



/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)

298 raise Py4JJavaError(

299 'An error occurred while calling {0}{1}{2}.\n'.

--> 300 format(target_id, '.', name), value)

301 else:

302 raise Py4JError(



Py4JJavaError: An error occurred while calling o53840.parquet.

: java.lang.AssertionError: assertion failed: No schema defined, and no Parquet 
data file or summary file found under file:/home/ubuntu/ipython/people.parquet2.

   at scala.Predef$.assert(Predef.scala:179)

   at 

SparkSQL without access to arrays?

2015-09-03 Thread Terry
Hi, 
i'm using Spark 1.4.1.
Here is de printSchema after load my json file:

root
 |-- result: struct (nullable = true)
 ||-- negative_votes: long (nullable = true)
 ||-- players: array (nullable = true)
 ||||-- account_id: long (nullable = true)
 ||||-- assists: long (nullable = true)
 ||||-- deaths: long (nullable = true)
 ||||-- denies: long (nullable = true)
 ||-- positive_votes: long (nullable = true)
 ||-- radiant_captain: long (nullable = true)
 ||-- radiant_win: boolean (nullable = true)

Why is not possible to do:


There is an alternative for this work?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-without-access-to-arrays-tp24572.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: large number of import-related function calls in PySpark profile

2015-09-03 Thread Davies Liu
I think this is not a problem of PySpark, you also saw this if you
profile this script:

```
list(map(map_, range(sc.defaultParallelism)))
```

81777/808740.0860.0000.3600.000 :2264(_handle_fromlist)



On Thu, Sep 3, 2015 at 11:16 AM, Priedhorsky, Reid  wrote:
>
> On Sep 2, 2015, at 11:31 PM, Davies Liu  wrote:
>
> Could you have a short script to reproduce this?
>
>
> Good point. Here you go. This is Python 3.4.3 on Ubuntu 15.04.
>
> import pandas as pd  # must be in default path for interpreter
> import pyspark
>
> LEN = 260
> ITER_CT = 1
>
> conf = pyspark.SparkConf()
> conf.set('spark.python.profile', 'true')
> sc = pyspark.SparkContext(conf=conf)
>
> a = sc.broadcast(pd.Series(range(LEN)))
>
> def map_(i):
>b = pd.Series(range(LEN))
>for i in range(ITER_CT):
>   b.corr(a.value)
>return None
>
> shards = sc.parallelize(range(sc.defaultParallelism), sc.defaultParallelism)
> data = shards.map(map_)
> data.collect()
>
> sc.show_profiles()
>
>
> Run as:
>
> $ spark-submit --master local[4] demo.py
>
>
> Here’s a profile excerpt:
>
> 
> Profile of 

Spark partitions from CassandraRDD

2015-09-03 Thread Alaa Zubaidi (PDF)
Hi,

I testing Spark and Cassandra, Spark 1.4, Cassandra 2.1.7 cassandra spark
connector 1.4, running in standalone mode.

I am getting 4000 rows from Cassandra (4mb row), where the row keys are
random.
.. sc.cassandraTable[RES](keyspace,res_name).where(res_where).cache

I am expecting that it will generate few partitions.
However, I can ONLY see 1 partition.
I cached the CassandraRDD and in the UI storage tab it shows ONLY 1
partition.

Any idea, why I am getting 1 partition?

Thanks,
Alaa

-- 
*This message may contain confidential and privileged information. If it 
has been sent to you in error, please reply to advise the sender of the 
error and then immediately permanently delete it and all attachments to it 
from your systems. If you are not the intended recipient, do not read, 
copy, disclose or otherwise use this message or any attachments to it. The 
sender disclaims any liability for such unauthorized use. PLEASE NOTE that 
all incoming e-mails sent to PDF e-mail accounts will be archived and may 
be scanned by us and/or by external service providers to detect and prevent 
threats to our systems, investigate illegal or inappropriate behavior, 
and/or eliminate unsolicited promotional e-mails (“spam”). If you have any 
concerns about this process, please contact us at *
*legal.departm...@pdf.com* *.*


Re: Small File to HDFS

2015-09-03 Thread nibiau
HAR archive seems a good idea , but just a last question to be sure to do the 
best choice :
- Is it possible to override (remove/replace) a file inside the HAR ?
Basically the name of my small files will be the keys of my records , and 
sometimes I will need to replace the content of a file by a new content 
(remove/replace)


Tks a lot
Nicolas

- Mail original -
De: "Jörn Franke" 
À: nib...@free.fr
Cc: user@spark.apache.org
Envoyé: Jeudi 3 Septembre 2015 19:29:42
Objet: Re: Small File to HDFS



Har is transparent and hardly any performance overhead. You may decide not to 
compress or use a fast compression algorithm, such as snappy (recommended) 



Le jeu. 3 sept. 2015 à 16:17, < nib...@free.fr > a écrit : 


My main question in case of HAR usage is , is it possible to use Pig on it and 
what about performances ? 

- Mail original - 
De: "Jörn Franke" < jornfra...@gmail.com > 
À: nib...@free.fr , user@spark.apache.org 
Envoyé: Jeudi 3 Septembre 2015 15:54:42 
Objet: Re: Small File to HDFS 




Store them as hadoop archive (har) 


Le mer. 2 sept. 2015 à 18:07, < nib...@free.fr > a écrit : 


Hello, 
I'am currently using Spark Streaming to collect small messages (events) , size 
being <50 KB , volume is high (several millions per day) and I have to store 
those messages in HDFS. 
I understood that storing small files can be problematic in HDFS , how can I 
manage it ? 

Tks 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark partitions from CassandraRDD

2015-09-03 Thread Alaa Zubaidi (PDF)
Thanks Ankur,

But I grabbed some keys from the Spark results and ran "nodetool -h
getendpoints " and it showed the data is coming from at least 2 nodes?
Regards,
Alaa

On Thu, Sep 3, 2015 at 12:06 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi Alaa,
>
> Partition when using CassandraRDD depends on your partition key in
> Cassandra table.
>
> If you see only 1 partition in the RDD it means all the rows you have
> selected have same partition_key in C*
>
> Thanks
> Ankur
>
>
> On Thu, Sep 3, 2015 at 11:54 AM, Alaa Zubaidi (PDF) 
> wrote:
>
>> Hi,
>>
>> I testing Spark and Cassandra, Spark 1.4, Cassandra 2.1.7 cassandra spark
>> connector 1.4, running in standalone mode.
>>
>> I am getting 4000 rows from Cassandra (4mb row), where the row keys are
>> random.
>> .. sc.cassandraTable[RES](keyspace,res_name).where(res_where).cache
>>
>> I am expecting that it will generate few partitions.
>> However, I can ONLY see 1 partition.
>> I cached the CassandraRDD and in the UI storage tab it shows ONLY 1
>> partition.
>>
>> Any idea, why I am getting 1 partition?
>>
>> Thanks,
>> Alaa
>>
>>
>>
>> *This message may contain confidential and privileged information. If it
>> has been sent to you in error, please reply to advise the sender of the
>> error and then immediately permanently delete it and all attachments to it
>> from your systems. If you are not the intended recipient, do not read,
>> copy, disclose or otherwise use this message or any attachments to it. The
>> sender disclaims any liability for such unauthorized use. PLEASE NOTE that
>> all incoming e-mails sent to PDF e-mail accounts will be archived and may
>> be scanned by us and/or by external service providers to detect and prevent
>> threats to our systems, investigate illegal or inappropriate behavior,
>> and/or eliminate unsolicited promotional e-mails (“spam”). If you have any
>> concerns about this process, please contact us at *
>> *legal.departm...@pdf.com* *.*
>
>
>


-- 

Alaa Zubaidi
PDF Solutions, Inc.
333 West San Carlos Street, Suite 1000
San Jose, CA 95110  USA
Tel: 408-283-5639
fax: 408-938-6479
email: alaa.zuba...@pdf.com

-- 
*This message may contain confidential and privileged information. If it 
has been sent to you in error, please reply to advise the sender of the 
error and then immediately permanently delete it and all attachments to it 
from your systems. If you are not the intended recipient, do not read, 
copy, disclose or otherwise use this message or any attachments to it. The 
sender disclaims any liability for such unauthorized use. PLEASE NOTE that 
all incoming e-mails sent to PDF e-mail accounts will be archived and may 
be scanned by us and/or by external service providers to detect and prevent 
threats to our systems, investigate illegal or inappropriate behavior, 
and/or eliminate unsolicited promotional e-mails (“spam”). If you have any 
concerns about this process, please contact us at *
*legal.departm...@pdf.com* *.*


Re: spark 1.4.1 - LZFException

2015-09-03 Thread Yadid Ayzenberg

Hi Akhil,

No, it seems I have plenty of more disk space available on that node.
I look at the logs and one minute before that exception I am seeing the 
following exception.


15/09/03 12:51:39 ERROR TransportChannelHandler: Connection to 
/x.y.z.w:44892 has been quiet for 12 ms while there are outstanding 
requests. Assuming connection is dead; please adjust 
spark.network.timeout if this is wrong.
15/09/03 12:51:39 ERROR TransportResponseHandler: Still have 8 requests 
outstanding when connection from /18.85.28.197:44892 is closed
15/09/03 12:51:39 ERROR OneForOneBlockFetcher: Failed while starting 
block fetches

java.io.IOException: Connection from /x.y.z.w:44892 closed
at 
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at 
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)

at java.lang.Thread.run(Thread.java:745)

You think that is related to the problem ?

Yadid

On 8/28/15 1:31 AM, Akhil Das wrote:
Is it filling up your disk space? Can you look a bit more in the 
executor logs to see whats going on


Thanks
Best Regards

On Sun, Aug 23, 2015 at 1:27 AM, Yadid Ayzenberg > wrote:




Hi All,

We have a spark standalone cluster running 1.4.1 and we are
setting spark.io.compression.codec to lzf.
I have a long running interactive application which behaves as
normal, but after a few days I get the following exception in
multiple jobs. Any ideas on what could be causing this ?

Yadid



Job aborted due to stage failure: Task 27 in stage 286.0 failed 4 times, 
most recent failure: Lost task 27.3 in stage 286.0 (TID 516817, xx.yy.zz.ww): 
com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: 
Corrupt input data, block did not start with 2 byte signature ('ZV') followed 
by type byte, 2-byte length)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
at com.esotericsoftware.kryo.io.Input.require(Input.java:155)
at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
at 
org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
at 
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:200)
at 
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:197)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
   

RE: How to Take the whole file as a partition

2015-09-03 Thread Shuai Zheng
Hi, 

 

Will there any way to change the default split size when load data for Spark? 
By default it is 64M, I know how to change this in Hadoop Mapreduce, but not 
sure how to do this in Spark.

 

Regards,

 

Shuai

 

From: Tao Lu [mailto:taolu2...@gmail.com] 
Sent: Thursday, September 03, 2015 11:07 AM
To: Shuai Zheng
Cc: user
Subject: Re: How to Take the whole file as a partition

 

You situation is special. It seems to me Spark may not fit well in your case. 

 

You want to process the individual files (500M~2G) as a whole, you want good 
performance. 

 

You may want to write our own Scala/Java programs and distribute it along with 
those files across your cluster, and run them in parallel. 

 

If you insist on using Spark, maybe option 3 is closer. 

 

Cheers,

Tao

 

 

On Thu, Sep 3, 2015 at 10:22 AM, Shuai Zheng  wrote:

Hi All,

 

I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark can 
read them as partition on the file level. Which means want the FileSplit turn 
off.

 

I know there are some solutions, but not very good in my case:

1, I can’t use WholeTextFiles method, because my file is too big, I don’t want 
to risk the performance.

2, I try to use newAPIHadoopFile and turnoff the file split:

 

lines = 
ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class, 
LongWritable.class, Text.class, hadoopConf).values()


.map(new Function() {


@Override


public String call(Text arg0) throws Exception {


return arg0.toString();


}


});

 

This works for some cases, but it truncate some lines (I am not sure why, but 
it looks like there is a limit on this file reading). I have a feeling that the 
spark truncate this file on 2GB bytes. Anyway it happens (because same data has 
no issue when I use mapreduce to do the input), the spark sometimes do a trunc 
on very big file if try to read all of them.

 

3, I can do another way is distribute the file name as the input of the Spark 
and in function open stream to read the file directly. This is what I am 
planning to do but I think it is ugly. I want to know anyone have better 
solution for it?

 

BTW: the file currently in text format, but it might be parquet format later, 
that is also reason I don’t like my third option.

 

Regards,

 

Shuai





 

-- 



Thanks!
Tao



Re: Ranger-like Security on Spark

2015-09-03 Thread Daniel Schulz
Hi Matei,

Thanks for your answer.

My question is regarding simple authenticated Spark-on-YARN only, without 
Kerberos. So when I run Spark on YARN and HDFS, Spark will pass through my HDFS 
user and only be able to access files I am entitled to read/write? Will it 
enforce HDFS ACLs and Ranger policies as well?

Best regards, Daniel.

> On 03 Sep 2015, at 21:16, Matei Zaharia  wrote:
> 
> If you run on YARN, you can use Kerberos, be authenticated as the right user, 
> etc in the same way as MapReduce jobs.
> 
> Matei
> 
>> On Sep 3, 2015, at 1:37 PM, Daniel Schulz  
>> wrote:
>> 
>> Hi,
>> 
>> I really enjoy using Spark. An obstacle to sell it to our clients currently 
>> is the missing Kerberos-like security on a Hadoop with simple 
>> authentication. Are there plans, a proposal, or a project to deliver a 
>> Ranger plugin or something similar to Spark. The target is to differentiate 
>> users and their privileges when reading and writing data to HDFS? Is 
>> Kerberos my only option then?
>> 
>> Kind regards, Daniel.
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet partitioning for unique identifier

2015-09-03 Thread Kohki Nishio
let's say I have a data like htis

   ID  |   Some1   |  Some2| Some3   | 
A1 | kdsfajfsa | dsafsdafa | fdsfafa  |
A2 | dfsfafasd | 23jfdsjkj | 980dfs   |
A3 | 99989df   | jksdljas  | 48dsaas  |
   ..
Z00..  | fdsafdsfa | fdsdafdas | 89sdaff  |

My understanding is that if I give the column 'ID' to use for partition,
it's going to generate a file per entry since it's unique, no ? Using Json,
I create 1000 files separated as specified in parallelize parameter. But
json is large and a bit slow I'd like to try Parquet to see what happens.

On Wed, Sep 2, 2015 at 11:15 PM, Adrien Mogenet <
adrien.moge...@contentsquare.com> wrote:

> Any code / Parquet schema to provide? I'm not sure to understand which
> step fails right there...
>
> On 3 September 2015 at 04:12, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> Did you specify partitioning column while saving data..
>> On Sep 3, 2015 5:41 AM, "Kohki Nishio"  wrote:
>>
>>> Hello experts,
>>>
>>> I have a huge json file (> 40G) and trying to use Parquet as a file
>>> format. Each entry has a unique identifier but other than that, it doesn't
>>> have 'well balanced value' column to partition it. Right now it just throws
>>> OOM and couldn't figure out what to do with it.
>>>
>>> It would be ideal if I could provide a partitioner based on the unique
>>> identifier value like computing its hash value or something.  One of the
>>> option would be to produce a hash value and add it as a separate column,
>>> but it doesn't sound right to me. Is there any other ways I can try ?
>>>
>>> Regards,
>>> --
>>> Kohki Nishio
>>>
>>
>
>
> --
>
> *Adrien Mogenet*
> Head of Backend/Infrastructure
> adrien.moge...@contentsquare.com
> (+33)6.59.16.64.22
> http://www.contentsquare.com
> 50, avenue Montaigne - 75008 Paris
>



-- 
Kohki Nishio


Re: spark-submit not using conf/spark-defaults.conf

2015-09-03 Thread Davies Liu
I think it's a missing feature.

On Wed, Sep 2, 2015 at 10:58 PM, Axel Dahl  wrote:
> So a bit more investigation, shows that:
>
> if I have configured spark-defaults.conf with:
>
> "spark.files  library.py"
>
> then if I call
>
> "spark-submit.py -v test.py"
>
> I see that my "spark.files" default option has been replaced with
> "spark.files  test.py",  basically spark-submit is overwriting
> spark.files with the name of the script.
>
> Is this a bug or is there another way to add default libraries without
> having to specify them on the command line?
>
> Thanks,
>
> -Axel
>
>
>
> On Wed, Sep 2, 2015 at 10:34 PM, Davies Liu  wrote:
>>
>> This should be a bug, could you create a JIRA for it?
>>
>> On Wed, Sep 2, 2015 at 4:38 PM, Axel Dahl  wrote:
>> > in my spark-defaults.conf I have:
>> > spark.files   file1.zip, file2.py
>> > spark.master   spark://master.domain.com:7077
>> >
>> > If I execute:
>> > bin/pyspark
>> >
>> > I can see it adding the files correctly.
>> >
>> > However if I execute
>> >
>> > bin/spark-submit test.py
>> >
>> > where test.py relies on the file1.zip, I get and error.
>> >
>> > If I i instead execute
>> >
>> > bin/spark-submit --py-files file1.zip test.py
>> >
>> > It works as expected.
>> >
>> > How do I get spark-submit to import the spark-defaults.conf file or what
>> > should I start checking to figure out why one works and the other
>> > doesn't?
>> >
>> > Thanks,
>> >
>> > -Axel
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Resource allocation issue - is it possible to submit a new job in existing application under a different user?

2015-09-03 Thread Steve Loughran
If its running the thrift server from hive, it's got a SQL API for you to 
connect to...

On 3 Sep 2015, at 17:03, Dhaval Patel 
> wrote:

I am accessing a shared cluster mode Spark environment. However, there is an 
existing application (SparkSQL/Thrift Server), running under a different user, 
that occupies all available cores. Please see attached screenshot to get an 
idea about current resource utilization.

Is there a way I can use this application to submit my jobs (under different 
user than mapr) without restarting this app with reduced number of  cores 
(spark.deploy.defaultCores)?

What could be optimal solution for this type of resource sharing issues? Fari 
Scheduler or any other approach? - 
http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application



\



Resource allocation issue - is it possible to submit a new job in existing application under a different user?

2015-09-03 Thread Dhaval Patel
I am accessing a shared cluster mode Spark environment. However, there is
an existing application (SparkSQL/Thrift Server), running under a different
user, that occupies all available cores. Please see attached screenshot to
get an idea about current resource utilization.

Is there a way I can use this application to submit my jobs (under
different user than mapr) without restarting this app with reduced number
of  cores (spark.deploy.defaultCores)?

What could be optimal solution for this type of resource sharing issues?
Fari Scheduler or any other approach? -
http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application


[image: Inline image 1]
\[image: Inline image 3]


Re: Exceptions in threads in executor code don't get caught properly

2015-09-03 Thread Wayne Song
Sorry, I guess my code and the exception didn't make it to the mailing
list.  Here's my code:

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Test app")
val sc = new SparkContext(conf)

val rdd = sc.parallelize(Array(1, 2, 3))
val rdd1 = rdd.map({x =>
  throw new Exception("Test exception 12345")
  x
})
rdd1.foreachPartition(part => {
  val t = new Thread(new Runnable {
override def run(): Unit = {
  for (row <- part) {
Console.println(s"Got $row")
  }
}
  })
  t.start()
  t.join()
})
  }

And here's the exception:

15/08/31 19:42:20 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[Thread-7,5,main]
java.lang.Exception: Test exception 12345
at TestApp$anonfun$1.apply$mcII$sp(TestApp.scala:15)
at TestApp$anonfun$1.apply(TestApp.scala:14)
at TestApp$anonfun$1.apply(TestApp.scala:14)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at TestApp$anonfun$main$1$anon$1.run(TestApp.scala:21)
at java.lang.Thread.run(Thread.java:745)

Anyways, I realized that this is occurs because of the
SparkUncaughtExceptionHandler, which executors set as their threads'
default uncaught exception handler.  Any exceptions that happen in the main
thread get caught here:

https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/core/src/main/scala/org/apache/spark/executor/Executor.scala#L294

And thus get logged to task metrics.  The SparkUncaughtExceptionHandler,
however, catches stuff in subthreads, and therefore those exceptions don't
get logged in task metrics (and thus not in the web UI).

My fix was to catch exceptions within my threads, like this:

rdd1.foreachPartition(part => {
  var exception = null
  val t = new Thread(new Runnable {
override def run(): Unit = {
  try {
for (row <- part) {
  Console.println(s"Got $row")
}
  } catch {
case e: Exception => exception = e
  }
}
  })
  t.start()
  t.join()
  if (exception != null) {
throw e
  }
})

It'd be nice if the uncaught exception handler in executors logged
exceptions to task metrics, but I'm not sure how feasible that'd be.

On Thu, Sep 3, 2015 at 12:26 AM, Akhil Das 
wrote:

> [image: Inline image 1]
>
> I'm not able to find the piece of code that you wrote, but you can use a
> try...catch to catch your user specific exceptions and log it in the logs.
>
> Something like this:
>
> myRdd.map(x => try{ //something }catch{ case e:Exception =>
> log.error("Whoops!! :" + e) })
>
>
>
>
> Thanks
> Best Regards
>
> On Tue, Sep 1, 2015 at 1:22 AM, Wayne Song  wrote:
>
>> We've been running into a situation where exceptions in rdd.map() calls
>> will
>> not get recorded and shown on the web UI properly.  We've discovered that
>> this seems to occur because we're creating our own threads in
>> foreachPartition() calls.  If I have code like this:
>>
>>
>>
>> The tasks on the executors will fail because rdd1 will raise an exception
>> for each record as we iterate across the "part" iterator inside the thread
>> in the foreachPartition call.  Usually, exceptions in Spark apps show up
>> in
>> the web UI on the application detail page, making problems easy to debug.
>> However, if any exceptions get raised inside of these user threads, they
>> don't show up in the web UI (instead, it just says that the executor was
>> lost), and in the executor logs, we see errors like:
>>
>>
>>
>> What's going on here?  Why are these exceptions not caught?  And is there
>> a
>> way to have user threads register their exceptions properly?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-in-threads-in-executor-code-don-t-get-caught-properly-tp24525.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Ranger-like Security on Spark

2015-09-03 Thread Daniel Schulz
Hi,

I really enjoy using Spark. An obstacle to sell it to our clients currently is 
the missing Kerberos-like security on a Hadoop with simple authentication. Are 
there plans, a proposal, or a project to deliver a Ranger plugin or something 
similar to Spark. The target is to differentiate users and their privileges 
when reading and writing data to HDFS? Is Kerberos my only option then?

Kind regards, Daniel.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



VaryMax Rotation and other questions for PCA in Spark MLLIB

2015-09-03 Thread Behzad Altaf
Hi All,

Hope you are doing good.

We are using Spark MLLIB (1.4.1) PCA functionality for
dimensionality reduction.

So far we are able to condense n features into k features using
https://spark.apache.org/docs/1.4.1/mllib-dimensionality-reduction.html#principal-component-analysis-pca

The requirements, as per our data scientist , are as follows

a) We need to find Varimax Rotation (
https://en.wikipedia.org/wiki/Varimax_rotation) for the data - I could not
find anything on this in the documentation. Would anybody please help with
this.

b) We also need to find out what all features are getting clubbed together
so that we can understand the feature condensation that is taking place. Is
there a way to see this? E.g. we have a CSV with header as feature names,
the header is dropped but preserved for later use. We move from n features
to k in PCA i.e. n columns to k. What are those k columns made up of? How
to find this out?

c) Is there a way to preserve the primary key of the row that is getting
dropped in the analysis. i.e. when preparing the feature vector the PK is
dropped. (General Knowledge question :-))

Any help is appreciated.

Thanks in Advance,
~BA


RE: Unbale to run Group BY on Large File

2015-09-03 Thread SAHA, DEBOBROTA
Hi Silvio,

I am trying to group the data from a Oracle RAW table by loading the raw table 
into a  RDD first and the registering that as a table in SAPRK.

Thanks,
Debobrota

From: Silvio Fiorito [mailto:silvio.fior...@granturing.com]
Sent: Wednesday, September 02, 2015 5:03 PM
To: SAHA, DEBOBROTA; 'user@spark.apache.org'
Subject: Re: Unbale to run Group BY on Large File

Unfortunately, groupBy is not the most efficient operation. What is it you’re 
trying to do? It may be possible with one of the other *byKey transformations.

From: "SAHA, DEBOBROTA"
Date: Wednesday, September 2, 2015 at 7:46 PM
To: "'user@spark.apache.org'"
Subject: Unbale to run Group BY on Large File

Hi ,

I am getting below error while I am trying to select data using SPARK SQL from 
a RDD table.

java.lang.OutOfMemoryError: GC overhead limit exceeded
"Spark Context Cleaner" java.lang.InterruptedException


The file or table size is around 113 GB and I am running SPARK 1.4 on a 
standalone cluster. Tried to extend the heap size but extending to 64GB also 
didn’t help.

I would really appreciate any help on this.

Thanks,
Debobrota


Re: pySpark window functions are not working in the same way as Spark/Scala ones

2015-09-03 Thread Davies Liu
This is an known but in 1.4.1, fixed in 1.4.2 and 1.5 (both are not
released yet).

On Thu, Sep 3, 2015 at 7:41 AM, Sergey Shcherbakov
 wrote:
> Hello all,
>
> I'm experimenting with Spark 1.4.1 window functions
> and have come to a problem in pySpark that I've described in a Stackoverflow
> question
>
> In essence, the
>
> wSpec = Window.orderBy(df.a)
> df.select(df.a, func.rank().over(wSpec).alias("rank")).collect()
> df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
> func.lead(df.b,1).over(wSpec).alias("next")).collect()
>
> does not work in pySpark: exception for the first collect() and None output
> from window function in the second collect().
>
> While the same example in Spark/Scala works fine:
>
> val wSpec = Window.orderBy("a")
> df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
> df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"),
> lead(df("b"),1).over(wSpec).alias("next"))
>
> Am I doing anything wrong or this is a pySpark issue indeed?
>
>
> Best Regards,
> Sergey
>
> PS: Here is the full pySpark shell example:
>
> from pyspark.sql.window import Window
> import pyspark.sql.functions as func
>
> l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
> df = sqlContext.createDataFrame(l,["a","b"])
> wSpec = Window.orderBy(df.a).rowsBetween(-1,1)
> df.select(df.a, func.rank().over(wSpec).alias("rank"))
> # ==> Failure org.apache.spark.sql.AnalysisException: Window function rank
> does not take a frame specification.
> df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
> func.lead(df.b,1).over(wSpec).alias("next"))
> # ===>  org.apache.spark.sql.AnalysisException: Window function lag does not
> take a frame specification.;
>
>
> wSpec = Window.orderBy(df.a)
> df.select(df.a, func.rank().over(wSpec).alias("rank"))
> # ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more
> arguments are expected.
>
> df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
> func.lead(df.b,1).over(wSpec).alias("next")).collect()
> # [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202,
> next=None), Row(a=3, prev=None, b=303, next=None)]
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Small File to HDFS

2015-09-03 Thread Jörn Franke
Har is transparent and hardly any performance overhead. You may decide not
to compress or use a fast compression algorithm, such as snappy
(recommended)

Le jeu. 3 sept. 2015 à 16:17,   a écrit :

> My main question in case of HAR usage is , is it possible to use Pig on it
> and what about performances ?
>
> - Mail original -
> De: "Jörn Franke" 
> À: nib...@free.fr, user@spark.apache.org
> Envoyé: Jeudi 3 Septembre 2015 15:54:42
> Objet: Re: Small File to HDFS
>
>
>
>
> Store them as hadoop archive (har)
>
>
> Le mer. 2 sept. 2015 à 18:07, < nib...@free.fr > a écrit :
>
>
> Hello,
> I'am currently using Spark Streaming to collect small messages (events) ,
> size being <50 KB , volume is high (several millions per day) and I have to
> store those messages in HDFS.
> I understood that storing small files can be problematic in HDFS , how can
> I manage it ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: large number of import-related function calls in PySpark profile

2015-09-03 Thread Davies Liu
The slowness in PySpark may be related to searching path added by PySpark,
could you show the sys.path?

On Thu, Sep 3, 2015 at 1:38 PM, Priedhorsky, Reid  wrote:
>
> On Sep 3, 2015, at 12:39 PM, Davies Liu  wrote:
>
> I think this is not a problem of PySpark, you also saw this if you
> profile this script:
>
> ```
> list(map(map_, range(sc.defaultParallelism)))
> ```
>
> 81777/808740.0860.0000.3600.000  importlib._bootstrap>:2264(_handle_fromlist)
>
>
> Thanks. Yes, I think you’re right; they seem to be coming from Pandas. Plain
> NumPy calculations do not generate the numerous import-related calls.
>
> That said, I’m still not sure why the time consumed in my real program is so
> much more (~20% rather than ~1%). I will see if I can figure out a better
> test program, or maybe try a different approach.
>
> Reid

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Resource allocation issue - is it possible to submit a new job in existing application under a different user?

2015-09-03 Thread Dhaval Patel
Yes you're right and I can connect it through Tableau etc. tools but don't
know how I can connect from shell where I can submit more jobs to this
application.

Any insight on how can I connect using shell?

On Thu, Sep 3, 2015 at 1:39 PM, Steve Loughran 
wrote:

> If its running the thrift server from hive, it's got a SQL API for you to
> connect to...
>
> On 3 Sep 2015, at 17:03, Dhaval Patel  wrote:
>
> I am accessing a shared cluster mode Spark environment. However, there is
> an existing application (SparkSQL/Thrift Server), running under a different
> user, that occupies all available cores. Please see attached screenshot to
> get an idea about current resource utilization.
>
> Is there a way I can use this application to submit my jobs (under
> different user than mapr) without restarting this app with reduced number
> of  cores (spark.deploy.defaultCores)?
>
> What could be optimal solution for this type of resource sharing issues?
> Fari Scheduler or any other approach? -
> http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
>
>
> 
> \
>
>
>


Re: Hbase Lookup

2015-09-03 Thread ayan guha
Hi

Thanks for your comments. My driving point is instead of loading Hbase data
entirely I want to process record by record lookup and that is best done in
UDF or map function. I also would loved to do it in Spark but no production
cluster yet here :(

@Franke: I do not have enough competency on coprocessors so I am not able
to visualize the solution as you are suggesting, so it would be really
helpful if you shed some more light to it?

Best
Ayan

On Fri, Sep 4, 2015 at 1:44 AM, Tao Lu  wrote:

> But I don't see how it works here with phoenix or hbase coprocessor.
> Remember we are joining 2 big data sets here, one is the big file in HDFS,
> and records in HBASE. The driving force comes from Hadoop cluster.
>
>
>
>
> On Thu, Sep 3, 2015 at 11:37 AM, Jörn Franke  wrote:
>
>> If you use pig or spark you increase the complexity from an operations
>> management perspective significantly. Spark should be seen from a platform
>> perspective if it make sense. If you can do it directly with hbase/phoenix
>> or only hbase coprocessor then this should be preferred. Otherwise you pay
>> more money for maintenance and development.
>>
>> Le jeu. 3 sept. 2015 à 17:16, Tao Lu  a écrit :
>>
>>> Yes. Ayan, you approach will work.
>>>
>>> Or alternatively, use Spark, and write a Scala/Java function which
>>> implements similar logic in your Pig UDF.
>>>
>>> Both approaches look similar.
>>>
>>> Personally, I would go with Spark solution, it will be slightly faster,
>>> and easier if you already have Spark cluster setup on top of your hadoop
>>> cluster in your infrastructure.
>>>
>>> Cheers,
>>> Tao
>>>
>>>
>>> On Thu, Sep 3, 2015 at 1:15 AM, ayan guha  wrote:
>>>
 Thanks for your info. I am planning to implement a pig udf to do record
 look ups. Kindly let me know if this is a good idea.

 Best
 Ayan

 On Thu, Sep 3, 2015 at 2:55 PM, Jörn Franke 
 wrote:

>
> You may check if it makes sense to write a coprocessor doing an upsert
> for you, if it does not exist already. Maybe phoenix for Hbase supports
> this already.
>
> Another alternative, if the records do not have an unique Id, is to
> put them into a text index engine, such as Solr or Elasticsearch, which
> does in this case a fast matching with relevancy scores.
>
>
> You can use also Spark and Pig there. However, I am not sure if Spark
> is suitable for these one row lookups. Same holds for Pig.
>
>
> Le mer. 2 sept. 2015 à 23:53, ayan guha  a
> écrit :
>
> Hello group
>
> I am trying to use pig or spark in order to achieve following:
>
> 1. Write a batch process which will read from a file
> 2. Lookup hbase to see if the record exists. If so then need to
> compare incoming values with hbase and update fields which do not match.
> Else create a new record.
>
> My questions:
> 1. Is this a good use case for pig or spark?
> 2. Is there any way to read hbase for each incoming record in pig
> without writing map reduce code?
> 3. In case of spark I think we have to connect to hbase for every
> record. Is thr any other way?
> 4. What is the best connector for hbase which gives this functionality?
>
> Best
>
> Ayan
>
>
>


 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --
>>> 
>>> Thanks!
>>> Tao
>>>
>>
>
>
> --
> 
> Thanks!
> Tao
>



-- 
Best Regards,
Ayan Guha


Re: Small File to HDFS

2015-09-03 Thread Jörn Franke
Well it is the same as in normal hdfs, delete file and put a new one with
the same name works.

Le jeu. 3 sept. 2015 à 21:18,   a écrit :

> HAR archive seems a good idea , but just a last question to be sure to do
> the best choice :
> - Is it possible to override (remove/replace) a file inside the HAR ?
> Basically the name of my small files will be the keys of my records , and
> sometimes I will need to replace the content of a file by a new content
> (remove/replace)
>
>
> Tks a lot
> Nicolas
>
> - Mail original -
> De: "Jörn Franke" 
> À: nib...@free.fr
> Cc: user@spark.apache.org
> Envoyé: Jeudi 3 Septembre 2015 19:29:42
> Objet: Re: Small File to HDFS
>
>
>
> Har is transparent and hardly any performance overhead. You may decide not
> to compress or use a fast compression algorithm, such as snappy
> (recommended)
>
>
>
> Le jeu. 3 sept. 2015 à 16:17, < nib...@free.fr > a écrit :
>
>
> My main question in case of HAR usage is , is it possible to use Pig on it
> and what about performances ?
>
> - Mail original -
> De: "Jörn Franke" < jornfra...@gmail.com >
> À: nib...@free.fr , user@spark.apache.org
> Envoyé: Jeudi 3 Septembre 2015 15:54:42
> Objet: Re: Small File to HDFS
>
>
>
>
> Store them as hadoop archive (har)
>
>
> Le mer. 2 sept. 2015 à 18:07, < nib...@free.fr > a écrit :
>
>
> Hello,
> I'am currently using Spark Streaming to collect small messages (events) ,
> size being <50 KB , volume is high (several millions per day) and I have to
> store those messages in HDFS.
> I understood that storing small files can be problematic in HDFS , how can
> I manage it ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: large number of import-related function calls in PySpark profile

2015-09-03 Thread Priedhorsky, Reid

On Sep 3, 2015, at 12:39 PM, Davies Liu 
> wrote:

I think this is not a problem of PySpark, you also saw this if you
profile this script:

```
list(map(map_, range(sc.defaultParallelism)))
```

81777/808740.0860.0000.3600.000 :2264(_handle_fromlist)

Thanks. Yes, I think you’re right; they seem to be coming from Pandas. Plain 
NumPy calculations do not generate the numerous import-related calls.

That said, I’m still not sure why the time consumed in my real program is so 
much more (~20% rather than ~1%). I will see if I can figure out a better test 
program, or maybe try a different approach.

Reid


spark-shell does not see conf folder content on emr-4

2015-09-03 Thread Alexander Pivovarov
Hi Everyone

My question is specific to running spark-1.4.1 on emr-4.0.0

spark installed to /usr/lib/spark
conf folder linked to /etc/spark/conf
spark-shell location /usr/bin/spark-shell

I noticed that if I run spark-shell it does not read /etc/spark/conf folder
files (e.g. spark-env.sh and log4j configuration)

To solve the problem I have to add /etc/spark/conf to SPARK_CLASSPATH
export SPARK_CLASSPATH=/etc/spark/conf

How to configure spark/emr4 to avoid manual step of adding /etc/spark/conf
to SPARK_CLASSPATH?

Alex


Re: Parquet partitioning for unique identifier

2015-09-03 Thread Adrien Mogenet
Any code / Parquet schema to provide? I'm not sure to understand which step
fails right there...

On 3 September 2015 at 04:12, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> Did you specify partitioning column while saving data..
> On Sep 3, 2015 5:41 AM, "Kohki Nishio"  wrote:
>
>> Hello experts,
>>
>> I have a huge json file (> 40G) and trying to use Parquet as a file
>> format. Each entry has a unique identifier but other than that, it doesn't
>> have 'well balanced value' column to partition it. Right now it just throws
>> OOM and couldn't figure out what to do with it.
>>
>> It would be ideal if I could provide a partitioner based on the unique
>> identifier value like computing its hash value or something.  One of the
>> option would be to produce a hash value and add it as a separate column,
>> but it doesn't sound right to me. Is there any other ways I can try ?
>>
>> Regards,
>> --
>> Kohki Nishio
>>
>


-- 

*Adrien Mogenet*
Head of Backend/Infrastructure
adrien.moge...@contentsquare.com
(+33)6.59.16.64.22
http://www.contentsquare.com
50, avenue Montaigne - 75008 Paris


RE: FlatMap Explanation

2015-09-03 Thread Zalzberg, Idan (Agoda)
Hi,
Yes, I can explain

1 to 3 -> 1,2,3
2 to 3- > 2,3
3 to 3 -> 3
3 to 3 -> 3

Flat map that concatenates the results, so you get

1,2,3, 2,3, 3,3

You should get the same with any scala collection

Cheers

From: Ashish Soni [mailto:asoni.le...@gmail.com]
Sent: Thursday, September 03, 2015 9:06 AM
To: user 
Subject: FlatMap Explanation

Hi ,
Can some one please explain the output of the flat map
data in RDD as below
{1, 2, 3, 3}

rdd.flatMap(x => x.to(3))

output as below

{1, 2, 3, 2, 3, 3, 3}
i am not able to understand how the output came as above.
Thanks,


This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.


Re: Slow Mongo Read from Spark

2015-09-03 Thread Akhil Das
On SSD you will get around 30-40MB/s on a single machine (on 4 cores).

Thanks
Best Regards

On Mon, Aug 31, 2015 at 3:13 PM, Deepesh Maheshwari <
deepesh.maheshwar...@gmail.com> wrote:

> tried it,,gives the same above exception
>
> Exception in thread "main" java.io.IOException: No FileSystem for scheme:
> mongodb
>
> In you case, do you have used above code.
> What read throughput , you get?
>
> On Mon, Aug 31, 2015 at 2:04 PM, Akhil Das 
> wrote:
>
>> FYI, newAPIHadoopFile and newAPIHadoopRDD uses the NewHadoopRDD class
>> itself underneath and it doesnt mean it will only read from HDFS. Give it a
>> shot if you haven't tried it already (it just the inputformat and the
>> reader which are different from your approach).
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Aug 31, 2015 at 1:14 PM, Deepesh Maheshwari <
>> deepesh.maheshwar...@gmail.com> wrote:
>>
>>> Hi Akhil,
>>>
>>> This code snippet is from below link
>>>
>>> https://github.com/crcsmnky/mongodb-spark-demo/blob/master/src/main/java/com/mongodb/spark/demo/Recommender.java
>>>
>>> Here it reading data from HDFS file system but in our case i need to
>>> read from mongodb.
>>>
>>> I have tried it earlier and now again tried it but is giving below error
>>> which is self explanantory.
>>>
>>> Exception in thread "main" java.io.IOException: No FileSystem for
>>> scheme: mongodb
>>>
>>> On Mon, Aug 31, 2015 at 1:03 PM, Akhil Das 
>>> wrote:
>>>
 Here's a piece of code which works well for us (spark 1.4.1)

 Configuration bsonDataConfig = new Configuration();
 bsonDataConfig.set("mongo.job.input.format",
 "com.mongodb.hadoop.BSONFileInputFormat");

 Configuration predictionsConfig = new Configuration();
 predictionsConfig.set("mongo.output.uri", mongodbUri);

 JavaPairRDD bsonRatingsData =
 sc.newAPIHadoopFile(
 ratingsUri, BSONFileInputFormat.class, Object.class,
 BSONObject.class, bsonDataConfig);


 Thanks
 Best Regards

 On Mon, Aug 31, 2015 at 12:59 PM, Deepesh Maheshwari <
 deepesh.maheshwar...@gmail.com> wrote:

> Hi, I am using 1.3.0
>
> I am not getting constructor for above values
>
> [image: Inline image 1]
>
> So, i tried to shuffle the values in constructor .
> [image: Inline image 2]
>
> But, it is giving this error.Please suggest
> [image: Inline image 3]
>
> Best Regards
>
> On Mon, Aug 31, 2015 at 12:43 PM, Akhil Das <
> ak...@sigmoidanalytics.com> wrote:
>
>> Can you try with these key value classes and see the performance?
>>
>> inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
>>
>>
>> keyClassName = "org.apache.hadoop.io.Text"
>> valueClassName = "org.apache.hadoop.io.MapWritable"
>>
>>
>> Taken from databricks blog
>> 
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Aug 31, 2015 at 12:26 PM, Deepesh Maheshwari <
>> deepesh.maheshwar...@gmail.com> wrote:
>>
>>> Hi, I am trying to read mongodb in Spark newAPIHadoopRDD.
>>>
>>> / Code */
>>>
>>> config.set("mongo.job.input.format",
>>> "com.mongodb.hadoop.MongoInputFormat");
>>> config.set("mongo.input.uri",SparkProperties.MONGO_OUTPUT_URI);
>>> config.set("mongo.input.query","{host: 'abc.com'}");
>>>
>>> JavaSparkContext sc=new JavaSparkContext("local", "MongoOps");
>>>
>>> JavaPairRDD mongoRDD =
>>> sc.newAPIHadoopRDD(config,
>>> com.mongodb.hadoop.MongoInputFormat.class,
>>> Object.class,
>>> BSONObject.class);
>>>
>>> long count=mongoRDD.count();
>>>
>>> There are about 1.5million record.
>>> Though i am getting data but read operation took around 15min to
>>> read whole.
>>>
>>> Is this Api really too slow or am i missing something.
>>> Please suggest if there is an alternate approach to read data from
>>> Mongo faster.
>>>
>>> Thanks,
>>> Deepesh
>>>
>>
>>
>

>>>
>>
>


Re: Managing httpcomponent dependency in Spark/Solr

2015-09-03 Thread Igor Berman
not sure if it will help, but have you checked
https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html

On 31 August 2015 at 19:33, Oliver Schrenk  wrote:

> Hi,
>
> We are running a distibuted indexing service for Solr (4.7) on a Spark
> (1.2) cluster. Now we wanted to upgrade to Solr 5.3 and are running into
> problems with dependencies.
>
> Solr 5 brings in org.apache.httpcomponents httpclient 4.4.1 (1) and the
> prebuilt binary for Spark 1.2.2 for CDH 4 brings in httpclient 4.2.5 which
> crashes indexing Solr via SolrJ.
>
>
>
> Is there a way of not bringing this dependency in? Is there a way of
> having a different classloader for my client application? I saw that there
> is `spark.driver.userClassPathFirst`, is it something that would help?
>
>
>
> Cheers,
> Olive
>
>
>
> (1)
> https://github.com/apache/lucene-solr/blob/trunk/lucene/ivy-versions.properties#L163
> (2) cd $HOME/Downloads
> wget http://apache.xl-mirror.nl/spark/spark-1.2.2/spark-1.2.2-bin-cdh4.tgz
> tar xzvf spark-1.2.2-bin-cdh4.tgz
> unzip -p
> spark-1.2.2-bin-cdh4/lib/spark-assembly-1.2.2-hadoop2.0.0-mr1-cdh4.2.0.jar
> org/apache/http/version.properties | grep release
> info.release   = 4.2.5
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Connection closed error while running Terasort

2015-09-03 Thread Akhil Das
Can you look at bit deeper in the executor logs? It may happen that it hit
the GC Overhead etc which lead to the connection failures.

Thanks
Best Regards

On Tue, Sep 1, 2015 at 5:43 AM, Suman Somasundar <
suman.somasun...@oracle.com> wrote:

> Hi,
>
>
>
> I am getting the following error while trying to run a 10GB terasort under
> Yarn with 8 nodes.
>
> The command is:
>
> spark-submit --class com.github.ehiggs.spark.terasort.TeraSort --master
> yarn-cluster --num-executors 10 --executor-memory 32g
> spark-terasort-master/target/spark-terasort-1.0-SNAPSHOT-jar-with-dependencies.jar
> hdfs://hadoop-solaris-a:8020/user/hadoop/terasort/input-10
> hdfs://hadoop-solaris-a:8020/user/hadoop/terasort/output-10
>
>
>
> What might be causing this error?
>
>
>
> 15/08/31 17:09:48 ERROR server.TransportRequestHandler: Error sending
> result
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1867783019052,
> chunkIndex=0},
> buffer=FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1441064487503_0001/blockmgr-c3c8dbb3-9ae2-4e45-b537-fd0beeff98b5/3e/shuffle_1_9_0.data,
> offset=0, length=1059423784}} to /199.199.35.5:52486; closing connection
>
> java.io.IOException: Broken pipe
>
> at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
>
> at
> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:443)
>
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:575)
>
> at
> org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
>
> at
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
>
> at
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
>
> at
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
>
> at
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
>
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
>
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:321)
>
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:519)
>
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15/08/31 17:10:48 ERROR server.TransportChannelHandler: Connection to
> hadoop-solaris-c/199.199.35.4:48540 has been quiet for 12 ms while
> there are outstanding requests. Assuming connection is dead; please adjust
> spark.network.timeout if this is wrong.
>
> 15/08/31 17:10:48 ERROR client.TransportResponseHandler: Still have 1
> requests outstanding when connection from hadoop-solaris-c/
> 199.199.35.4:48540 is closed
>
> 15/08/31 17:10:48 INFO shuffle.RetryingBlockFetcher: Retrying fetch (3/3)
> for 1 outstanding blocks after 5000 ms
>
> 15/08/31 17:10:49 ERROR server.TransportRequestHandler: Error sending
> result
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1867783019053,
> chunkIndex=0},
> buffer=FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1441064487503_0001/blockmgr-c3c8dbb3-9ae2-4e45-b537-fd0beeff98b5/1b/shuffle_1_6_0.data,
> offset=0, length=1052128440}} to /199.199.35.6:45201; closing connection
>
> java.nio.channels.ClosedChannelException
>
> 15/08/31 17:10:53 INFO client.TransportClientFactory: Found inactive
> connection to hadoop-solaris-c/199.199.35.4:48540, creating a new one.
>
> 15/08/31 17:11:31 ERROR server.TransportRequestHandler: Error sending
> result
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1867783019054,
> chunkIndex=0},
> buffer=FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1441064487503_0001/blockmgr-c3c8dbb3-9ae2-4e45-b537-fd0beeff98b5/1b/shuffle_1_6_0.data,
> offset=0, length=1052128440}} to /199.199.35.10:55082; closing connection
>
> java.nio.channels.ClosedChannelException
>
> 15/08/31 17:11:31 ERROR server.TransportRequestHandler: Error sending
> result
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1867783019055,
> chunkIndex=0},
> buffer=FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1441064487503_0001/blockmgr-c3c8dbb3-9ae2-4e45-b537-fd0beeff98b5/3e/shuffle_1_9_0.data,
> offset=0, length=1059423784}} to /199.199.35.7:54328; closing connection
>
> java.nio.channels.ClosedChannelException
>
> 15/08/31 17:11:53 ERROR server.TransportRequestHandler: Error sending
> result
> 

Re: Error using spark.driver.userClassPathFirst=true

2015-09-03 Thread Akhil Das
Its messing up your classpath, there was a discussion happened here
previously
https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/spark-on-yarn-java-lang-UnsatisfiedLinkError-NativeCodeLoader/td-p/22724

Thanks
Best Regards

On Tue, Sep 1, 2015 at 4:58 PM, cgalan  wrote:

> Hi,
>
> When I am submitting a spark job in the mode "yarn-cluster" with the
> parameter "spark.driver.userClassPathFirst", my job fails; but if I don't
> use this params, my job is concluded with success.. My environment is some
> nodes with CDH5.4 and Spark 1.3.0.
>
> Spark submit with fail:
> spark-submit --class Main --master yarn-cluster --conf
> spark.driver.userClassPathFirst=true --conf
> spark.executor.userClassPathFirst=true Main.jar
>
> Spark-submit with success:
> spark-submit --class Main --master yarn-cluster Main.jar
>
> Error with Snappy:
> java.lang.UnsatisfiedLinkError:
> org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
> at org.xerial.snappy.SnappyNative.maxCompressedLength(Native
> Method)
> at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
> at
> org.xerial.snappy.SnappyOutputStream.(SnappyOutputStream.java:79)
> at
>
> org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:157)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
> at scala.Option.map(Option.scala:145)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:199)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84)
> at
>
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> at
>
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
> at
>
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
> at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:761)
> at org.apache.spark.SparkContext.textFile(SparkContext.scala:589)
>
> My example error code:
> public static void main(String[] args) {
>
> SparkConf conf = new SparkConf().setAppName("Prueba");
> //  conf.setMaster("local"); //Comentar para lanzar en el
> cluster
>
> JavaSparkContext sc = new JavaSparkContext(conf);
> JavaRDD asd = sc.textFile("text.txt");
> asd.count();
>
> sc.close();
> }
>
> Does anyone have any suggestions? The problem of why I need to use
> "spark.driver.userClassPathFirst=true" is because I use commons-cli-1.3.1
> in
> my project, and the spark classpath have one previous version, so  I create
> a shade-jar, but without "spark.driver.userClassPathFirst=true" I have
> problems of conflicts with the dependencies between my spark.jar and spark
> classpath that one class uses the previous class version of classpath
> instead of the last version.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-using-spark-driver-userClassPathFirst-true-tp24536.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Managing httpcomponent dependency in Spark/Solr

2015-09-03 Thread Akhil Das
You can also set the *executor.userClassPathFirst,* There are couple of
classpath configurations available to override defaults, you can find them
from here
http://spark.apache.org/docs/latest/configuration.html#runtime-environment

Thanks
Best Regards

On Mon, Aug 31, 2015 at 10:03 PM, Oliver Schrenk 
wrote:

> Hi,
>
> We are running a distibuted indexing service for Solr (4.7) on a Spark
> (1.2) cluster. Now we wanted to upgrade to Solr 5.3 and are running into
> problems with dependencies.
>
> Solr 5 brings in org.apache.httpcomponents httpclient 4.4.1 (1) and the
> prebuilt binary for Spark 1.2.2 for CDH 4 brings in httpclient 4.2.5 which
> crashes indexing Solr via SolrJ.
>
>
>
> Is there a way of not bringing this dependency in? Is there a way of
> having a different classloader for my client application? I saw that there
> is `spark.driver.userClassPathFirst`, is it something that would help?
>
>
>
> Cheers,
> Olive
>
>
>
> (1)
> https://github.com/apache/lucene-solr/blob/trunk/lucene/ivy-versions.properties#L163
> (2) cd $HOME/Downloads
> wget http://apache.xl-mirror.nl/spark/spark-1.2.2/spark-1.2.2-bin-cdh4.tgz
> tar xzvf spark-1.2.2-bin-cdh4.tgz
> unzip -p
> spark-1.2.2-bin-cdh4/lib/spark-assembly-1.2.2-hadoop2.0.0-mr1-cdh4.2.0.jar
> org/apache/http/version.properties | grep release
> info.release   = 4.2.5
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Exceptions in threads in executor code don't get caught properly

2015-09-03 Thread Akhil Das
[image: Inline image 1]

I'm not able to find the piece of code that you wrote, but you can use a
try...catch to catch your user specific exceptions and log it in the logs.

Something like this:

myRdd.map(x => try{ //something }catch{ case e:Exception =>
log.error("Whoops!! :" + e) })




Thanks
Best Regards

On Tue, Sep 1, 2015 at 1:22 AM, Wayne Song  wrote:

> We've been running into a situation where exceptions in rdd.map() calls
> will
> not get recorded and shown on the web UI properly.  We've discovered that
> this seems to occur because we're creating our own threads in
> foreachPartition() calls.  If I have code like this:
>
>
>
> The tasks on the executors will fail because rdd1 will raise an exception
> for each record as we iterate across the "part" iterator inside the thread
> in the foreachPartition call.  Usually, exceptions in Spark apps show up in
> the web UI on the application detail page, making problems easy to debug.
> However, if any exceptions get raised inside of these user threads, they
> don't show up in the web UI (instead, it just says that the executor was
> lost), and in the executor logs, we see errors like:
>
>
>
> What's going on here?  Why are these exceptions not caught?  And is there a
> way to have user threads register their exceptions properly?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-in-threads-in-executor-code-don-t-get-caught-properly-tp24525.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Small File to HDFS

2015-09-03 Thread nibiau
Ok but so some questions :
- Sometimes I have to remove some messages from HDFS (cancel/replace cases) , 
is it possible ?
- In the case of a big zip file, is it possible to easily process Pig on it 
directly ?

Tks
Nicolas

- Mail original -
De: "Tao Lu" 
À: nib...@free.fr
Cc: "Ted Yu" , "user" 
Envoyé: Mercredi 2 Septembre 2015 19:09:23
Objet: Re: Small File to HDFS


You may consider storing it in one big HDFS file, and to keep appending new 
messages to it. 


For instance, 
one message -> zip it -> append it to the HDFS as one line 


On Wed, Sep 2, 2015 at 12:43 PM, < nib...@free.fr > wrote: 


Hi, 
I already store them in MongoDB in parralel for operational access and don't 
want to add an other database in the loop 
Is it the only solution ? 

Tks 
Nicolas 

- Mail original - 
De: "Ted Yu" < yuzhih...@gmail.com > 
À: nib...@free.fr 
Cc: "user" < user@spark.apache.org > 
Envoyé: Mercredi 2 Septembre 2015 18:34:17 
Objet: Re: Small File to HDFS 




Instead of storing those messages in HDFS, have you considered storing them in 
key-value store (e.g. hbase) ? 


Cheers 


On Wed, Sep 2, 2015 at 9:07 AM, < nib...@free.fr > wrote: 


Hello, 
I'am currently using Spark Streaming to collect small messages (events) , size 
being <50 KB , volume is high (several millions per day) and I have to store 
those messages in HDFS. 
I understood that storing small files can be problematic in HDFS , how can I 
manage it ? 

Tks 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 





-- 


 Thanks! 
Tao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Code generation for GPU

2015-09-03 Thread kiran lonikar
Hi,

I am speaking in Spark Europe summit on exploiting GPUs for columnar
DataFrame operations
.
I was going through various blogs, talks and JIRAs given by all the key
spark folks and trying to figure out where to make changes for this
proposal.

First of all, I must thank the recent progress in project tungsten that has
made my job easier. The changes for code generation

make it possible to allow me to generate OpenCL code for expressions
instead of existing java/scala code and run the OpenCL code on GPUs through
a Java library JavaCL.

However, before starting the work, I have a few questions/doubts as below:


   1. I found where the code generation
   

happens
   in spark code from the blogs
   
https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html,

   
https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
   and
   
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html.
   However, I could not find where is the generated code executed? A major
   part of my changes will be there since this executor will now have to send
   vectors of columns to GPU RAM, invoke execution, and get the results back
   to CPU RAM. Thus, the existing executor will significantly change.
   2. On the project tungsten blog
   
,
   in the third Code Generation section, it is mentioned that you plan
   to increase the level of code generation from record-at-a-time expression
   evaluation to vectorized expression evaluation. Has this been implemented?
   If not, how do I implement this? I will need access to columnar ByteBuffer
   objects in DataFrame to do this. Having row by row access to data will
   defeat this exercise. In particular, I need access to
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
   in the executor of the generated code.
   3. One thing that confuses me is the changes from 1.4 to 1.5 possibly
   due to JIRA https://issues.apache.org/jira/browse/SPARK-7956 and pull
   request https://github.com/apache/spark/pull/6479/files*. *This changed
   the code generation from quasiquotes (q) to string s operator. This makes
   it simpler for me to generate OpenCL code which is string based. The
   question, is this branch stable now? Should I make my changes on spark 1.4
   or spark 1.5 or master branch?
   4. How do I tune the batch size (number of rows in the ByteBuffer)? Is
   it through the property spark.sql.inMemoryColumnarStorage.batchSize?


Thanks in anticipation,

Kiran
PS:

Other things I found useful were:

*Spark DataFrames*: https://www.brighttalk.com/webcast/12891/166495
*Apache Spark 1.5*: https://www.brighttalk.com/webcast/12891/168177

The links to JavaCL/ScalaCL:

*Library to execute OpenCL code through Java*:
https://github.com/nativelibs4java/ScalaCL
*Library to convert Scala code to OpenCL and execute on GPUs*:
https://github.com/nativelibs4java/JavaCL


Re: FlatMap Explanation

2015-09-03 Thread Ashish Soni
Thanks a lot everyone.
Very Helpful.

Ashish

On Thu, Sep 3, 2015 at 2:19 AM, Zalzberg, Idan (Agoda) <
idan.zalzb...@agoda.com> wrote:

> Hi,
>
> Yes, I can explain
>
>
>
> 1 to 3 -> 1,2,3
>
> 2 to 3- > 2,3
>
> 3 to 3 -> 3
>
> 3 to 3 -> 3
>
>
>
> Flat map that concatenates the results, so you get
>
>
>
> 1,2,3, 2,3, 3,3
>
>
>
> You should get the same with any scala collection
>
>
>
> Cheers
>
>
>
> *From:* Ashish Soni [mailto:asoni.le...@gmail.com]
> *Sent:* Thursday, September 03, 2015 9:06 AM
> *To:* user 
> *Subject:* FlatMap Explanation
>
>
>
> Hi ,
>
> Can some one please explain the output of the flat map
>
> data in RDD as below
>
> {1, 2, 3, 3}
>
> rdd.flatMap(x => x.to(3))
>
> output as below
>
> {1, 2, 3, 2, 3, 3, 3}
>
> i am not able to understand how the output came as above.
>
> Thanks,
>
> --
> This message is confidential and is for the sole use of the intended
> recipient(s). It may also be privileged or otherwise protected by copyright
> or other legal rules. If you have received it by mistake please let us know
> by reply email and delete it from your system. It is prohibited to copy
> this message or disclose its content to anyone. Any confidentiality or
> privilege is not waived or lost by any mistaken delivery or unauthorized
> disclosure of the message. All messages sent to and from Agoda may be
> monitored to ensure compliance with company policies, to protect the
> company's interests and to remove potential malware. Electronic messages
> may be intercepted, amended, lost or deleted, or contain viruses.
>


INDEXEDRDD in PYSPARK

2015-09-03 Thread shahid ashraf
Hi Folks

Any resource to get started using https://github.com/amplab/spark-indexedrdd
in pyspark

-- 
with Regards
Shahid Ashraf


Re: Small File to HDFS

2015-09-03 Thread Ndjido Ardo Bar
Hi Nibiau,

Hbase seems to be a good solution to your problems. As you may know storing 
yours messages as a key-value pairs in Hbase saves you the overhead of manually 
resizing blocks of data using zip files. 
The added advantage along with the fact that Hbase uses HDFS for storage, is 
the capability of updating your records for example with the "put" function. 

Cheers,
Ardo

> On 03 Sep 2015, at 13:35, nib...@free.fr wrote:
> 
> Ok but so some questions :
> - Sometimes I have to remove some messages from HDFS (cancel/replace cases) , 
> is it possible ?
> - In the case of a big zip file, is it possible to easily process Pig on it 
> directly ?
> 
> Tks
> Nicolas
> 
> - Mail original -
> De: "Tao Lu" 
> À: nib...@free.fr
> Cc: "Ted Yu" , "user" 
> Envoyé: Mercredi 2 Septembre 2015 19:09:23
> Objet: Re: Small File to HDFS
> 
> 
> You may consider storing it in one big HDFS file, and to keep appending new 
> messages to it. 
> 
> 
> For instance, 
> one message -> zip it -> append it to the HDFS as one line 
> 
> 
> On Wed, Sep 2, 2015 at 12:43 PM, < nib...@free.fr > wrote: 
> 
> 
> Hi, 
> I already store them in MongoDB in parralel for operational access and don't 
> want to add an other database in the loop 
> Is it the only solution ? 
> 
> Tks 
> Nicolas 
> 
> - Mail original - 
> De: "Ted Yu" < yuzhih...@gmail.com > 
> À: nib...@free.fr 
> Cc: "user" < user@spark.apache.org > 
> Envoyé: Mercredi 2 Septembre 2015 18:34:17 
> Objet: Re: Small File to HDFS 
> 
> 
> 
> 
> Instead of storing those messages in HDFS, have you considered storing them 
> in key-value store (e.g. hbase) ? 
> 
> 
> Cheers 
> 
> 
> On Wed, Sep 2, 2015 at 9:07 AM, < nib...@free.fr > wrote: 
> 
> 
> Hello, 
> I'am currently using Spark Streaming to collect small messages (events) , 
> size being <50 KB , volume is high (several millions per day) and I have to 
> store those messages in HDFS. 
> I understood that storing small files can be problematic in HDFS , how can I 
> manage it ? 
> 
> Tks 
> Nicolas 
> 
> - 
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> - 
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> 
> 
>  Thanks! 
> Tao
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Small File to HDFS

2015-09-03 Thread Ted Yu
Agree with Ado. 
API provided by hbase is versatile. There is checkAndPut as well. 

Cheers


> On Sep 3, 2015, at 5:00 AM, Ndjido Ardo Bar  wrote:
> 
> Hi Nibiau,
> 
> Hbase seems to be a good solution to your problems. As you may know storing 
> yours messages as a key-value pairs in Hbase saves you the overhead of 
> manually resizing blocks of data using zip files. 
> The added advantage along with the fact that Hbase uses HDFS for storage, is 
> the capability of updating your records for example with the "put" function. 
> 
> Cheers,
> Ardo
> 
>> On 03 Sep 2015, at 13:35, nib...@free.fr wrote:
>> 
>> Ok but so some questions :
>> - Sometimes I have to remove some messages from HDFS (cancel/replace cases) 
>> , is it possible ?
>> - In the case of a big zip file, is it possible to easily process Pig on it 
>> directly ?
>> 
>> Tks
>> Nicolas
>> 
>> - Mail original -
>> De: "Tao Lu" 
>> À: nib...@free.fr
>> Cc: "Ted Yu" , "user" 
>> Envoyé: Mercredi 2 Septembre 2015 19:09:23
>> Objet: Re: Small File to HDFS
>> 
>> 
>> You may consider storing it in one big HDFS file, and to keep appending new 
>> messages to it. 
>> 
>> 
>> For instance, 
>> one message -> zip it -> append it to the HDFS as one line 
>> 
>> 
>> On Wed, Sep 2, 2015 at 12:43 PM, < nib...@free.fr > wrote: 
>> 
>> 
>> Hi, 
>> I already store them in MongoDB in parralel for operational access and don't 
>> want to add an other database in the loop 
>> Is it the only solution ? 
>> 
>> Tks 
>> Nicolas 
>> 
>> - Mail original - 
>> De: "Ted Yu" < yuzhih...@gmail.com > 
>> À: nib...@free.fr 
>> Cc: "user" < user@spark.apache.org > 
>> Envoyé: Mercredi 2 Septembre 2015 18:34:17 
>> Objet: Re: Small File to HDFS 
>> 
>> 
>> 
>> 
>> Instead of storing those messages in HDFS, have you considered storing them 
>> in key-value store (e.g. hbase) ? 
>> 
>> 
>> Cheers 
>> 
>> 
>> On Wed, Sep 2, 2015 at 9:07 AM, < nib...@free.fr > wrote: 
>> 
>> 
>> Hello, 
>> I'am currently using Spark Streaming to collect small messages (events) , 
>> size being <50 KB , volume is high (several millions per day) and I have to 
>> store those messages in HDFS. 
>> I understood that storing small files can be problematic in HDFS , how can I 
>> manage it ? 
>> 
>> Tks 
>> Nicolas 
>> 
>> - 
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
>> 
>> - 
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
>> 
>> 
>> 
>> -- 
>> 
>> 
>>  Thanks! 
>> Tao
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running Examples

2015-09-03 Thread delbert
hi folks,

i am running into the same issue.

running the script (with sc. instead of spark. and filling in NUM_SAMPLES)
works fine for me.

running on Windows 10, admin PowerShell console, started with the command: 
./bin/spark-shell --master local[4]

delbert



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Examples-tp11380p24574.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Hbase Lookup

2015-09-03 Thread Ted Yu
Ayan:
Please read this:
http://hbase.apache.org/book.html#cp

Cheers

On Thu, Sep 3, 2015 at 2:13 PM, ayan guha  wrote:

> Hi
>
> Thanks for your comments. My driving point is instead of loading Hbase
> data entirely I want to process record by record lookup and that is best
> done in UDF or map function. I also would loved to do it in Spark but no
> production cluster yet here :(
>
> @Franke: I do not have enough competency on coprocessors so I am not able
> to visualize the solution as you are suggesting, so it would be really
> helpful if you shed some more light to it?
>
> Best
> Ayan
>
> On Fri, Sep 4, 2015 at 1:44 AM, Tao Lu  wrote:
>
>> But I don't see how it works here with phoenix or hbase coprocessor.
>> Remember we are joining 2 big data sets here, one is the big file in HDFS,
>> and records in HBASE. The driving force comes from Hadoop cluster.
>>
>>
>>
>>
>> On Thu, Sep 3, 2015 at 11:37 AM, Jörn Franke 
>> wrote:
>>
>>> If you use pig or spark you increase the complexity from an operations
>>> management perspective significantly. Spark should be seen from a platform
>>> perspective if it make sense. If you can do it directly with hbase/phoenix
>>> or only hbase coprocessor then this should be preferred. Otherwise you pay
>>> more money for maintenance and development.
>>>
>>> Le jeu. 3 sept. 2015 à 17:16, Tao Lu  a écrit :
>>>
 Yes. Ayan, you approach will work.

 Or alternatively, use Spark, and write a Scala/Java function which
 implements similar logic in your Pig UDF.

 Both approaches look similar.

 Personally, I would go with Spark solution, it will be slightly faster,
 and easier if you already have Spark cluster setup on top of your hadoop
 cluster in your infrastructure.

 Cheers,
 Tao


 On Thu, Sep 3, 2015 at 1:15 AM, ayan guha  wrote:

> Thanks for your info. I am planning to implement a pig udf to do
> record look ups. Kindly let me know if this is a good idea.
>
> Best
> Ayan
>
> On Thu, Sep 3, 2015 at 2:55 PM, Jörn Franke 
> wrote:
>
>>
>> You may check if it makes sense to write a coprocessor doing an
>> upsert for you, if it does not exist already. Maybe phoenix for Hbase
>> supports this already.
>>
>> Another alternative, if the records do not have an unique Id, is to
>> put them into a text index engine, such as Solr or Elasticsearch, which
>> does in this case a fast matching with relevancy scores.
>>
>>
>> You can use also Spark and Pig there. However, I am not sure if Spark
>> is suitable for these one row lookups. Same holds for Pig.
>>
>>
>> Le mer. 2 sept. 2015 à 23:53, ayan guha  a
>> écrit :
>>
>> Hello group
>>
>> I am trying to use pig or spark in order to achieve following:
>>
>> 1. Write a batch process which will read from a file
>> 2. Lookup hbase to see if the record exists. If so then need to
>> compare incoming values with hbase and update fields which do not match.
>> Else create a new record.
>>
>> My questions:
>> 1. Is this a good use case for pig or spark?
>> 2. Is there any way to read hbase for each incoming record in pig
>> without writing map reduce code?
>> 3. In case of spark I think we have to connect to hbase for every
>> record. Is thr any other way?
>> 4. What is the best connector for hbase which gives this
>> functionality?
>>
>> Best
>>
>> Ayan
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



 --
 
 Thanks!
 Tao

>>>
>>
>>
>> --
>> 
>> Thanks!
>> Tao
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


different Row objects?

2015-09-03 Thread Wei Chen
Hey Friends,

Recently I have been using Spark 1.3.1, mainly pyspark.sql. I noticed that
the Row object collected directly from a DataFrame is different from the
Row object we directly defined from Row(*arg, **kwarg).

>>>from pyspark.sql.types import Row
>>>aaa = Row(a=1, b=2, c=Row(a=1, b=2))
>>>tuple(sc.parallelize([aaa]).toDF().collect()[0])

(1, 2, (1, 2))

>>>tuple(aaa)

(1, 2, Row(a=1, b=2))


This matters to me because I wanted to be able to create a DataFrame
with one of the columns being a Row object by
sqlcontext.createDataFrame(data, schema) where I specifically pass in
the schema. However, if the data is RDD of Row objects like "aaa" in
my example, it'll fail in __verify_type function.



Thank you,

Wei


Re: Ranger-like Security on Spark

2015-09-03 Thread Matei Zaharia
Even simple Spark-on-YARN should run as the user that submitted the job, yes, 
so HDFS ACLs should be enforced. Not sure how it plays with the rest of Ranger.

Matei

> On Sep 3, 2015, at 4:57 PM, Jörn Franke  wrote:
> 
> Well if it needs to read from hdfs then it will adhere to the permissions 
> defined there And/or in ranger. However, I am not aware that you can protect 
> dataframes, tables or streams in general in Spark.
> 
> Le jeu. 3 sept. 2015 à 21:47, Daniel Schulz  > a écrit :
> Hi Matei,
> 
> Thanks for your answer.
> 
> My question is regarding simple authenticated Spark-on-YARN only, without 
> Kerberos. So when I run Spark on YARN and HDFS, Spark will pass through my 
> HDFS user and only be able to access files I am entitled to read/write? Will 
> it enforce HDFS ACLs and Ranger policies as well?
> 
> Best regards, Daniel.
> 
> > On 03 Sep 2015, at 21:16, Matei Zaharia  > > wrote:
> >
> > If you run on YARN, you can use Kerberos, be authenticated as the right 
> > user, etc in the same way as MapReduce jobs.
> >
> > Matei
> >
> >> On Sep 3, 2015, at 1:37 PM, Daniel Schulz  >> > wrote:
> >>
> >> Hi,
> >>
> >> I really enjoy using Spark. An obstacle to sell it to our clients 
> >> currently is the missing Kerberos-like security on a Hadoop with simple 
> >> authentication. Are there plans, a proposal, or a project to deliver a 
> >> Ranger plugin or something similar to Spark. The target is to 
> >> differentiate users and their privileges when reading and writing data to 
> >> HDFS? Is Kerberos my only option then?
> >>
> >> Kind regards, Daniel.
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> >> 
> >> For additional commands, e-mail: user-h...@spark.apache.org 
> >> 
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> > 
> > For additional commands, e-mail: user-h...@spark.apache.org 
> > 
> >
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 



Re: Ranger-like Security on Spark

2015-09-03 Thread Marcelo Vanzin
On Thu, Sep 3, 2015 at 5:15 PM, Matei Zaharia  wrote:
> Even simple Spark-on-YARN should run as the user that submitted the job,
> yes, so HDFS ACLs should be enforced. Not sure how it plays with the rest of
> Ranger.

It's slightly more complicated than that (without kerberos, the
underlying process runs as the same user running the YARN daemons, but
the connections to HDFS and other Hadoop services identify as the user
who submitted the application), but the end effect is what Matei
describes. I also do not know about how Ranger enforces things.

Also note that "simple authentication" is not secure at all. You're
basically just asking your users to be nice instead of actually
enforcing anything. Any user can tell YARN that he's actually someone
else when starting the application, and YARN will believe him. Just
say "HADOOP_USER_NAME=somebodyelse" and you're good to go!

> On Sep 3, 2015, at 4:57 PM, Jörn Franke  wrote:
>
> Well if it needs to read from hdfs then it will adhere to the permissions
> defined there And/or in ranger. However, I am not aware that you can protect
> dataframes, tables or streams in general in Spark.
>
>
> Le jeu. 3 sept. 2015 à 21:47, Daniel Schulz  a
> écrit :
>>
>> Hi Matei,
>>
>> Thanks for your answer.
>>
>> My question is regarding simple authenticated Spark-on-YARN only, without
>> Kerberos. So when I run Spark on YARN and HDFS, Spark will pass through my
>> HDFS user and only be able to access files I am entitled to read/write? Will
>> it enforce HDFS ACLs and Ranger policies as well?
>>
>> Best regards, Daniel.
>>
>> > On 03 Sep 2015, at 21:16, Matei Zaharia  wrote:
>> >
>> > If you run on YARN, you can use Kerberos, be authenticated as the right
>> > user, etc in the same way as MapReduce jobs.
>> >
>> > Matei
>> >
>> >> On Sep 3, 2015, at 1:37 PM, Daniel Schulz
>> >>  wrote:
>> >>
>> >> Hi,
>> >>
>> >> I really enjoy using Spark. An obstacle to sell it to our clients
>> >> currently is the missing Kerberos-like security on a Hadoop with simple
>> >> authentication. Are there plans, a proposal, or a project to deliver a
>> >> Ranger plugin or something similar to Spark. The target is to 
>> >> differentiate
>> >> users and their privileges when reading and writing data to HDFS? Is
>> >> Kerberos my only option then?


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Ranger-like Security on Spark

2015-09-03 Thread Ruslan Dautkhanov
You could define access in Sentry and enable permissions sync with HDFS, so
you could just grant access on Hive per-database or per-table basis. It
should work for Spark too, as Sentry will propage "grants" to HDFS acls.

http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/sg_hdfs_sentry_sync.html




-- 
Ruslan Dautkhanov

On Thu, Sep 3, 2015 at 1:46 PM, Daniel Schulz 
wrote:

> Hi Matei,
>
> Thanks for your answer.
>
> My question is regarding simple authenticated Spark-on-YARN only, without
> Kerberos. So when I run Spark on YARN and HDFS, Spark will pass through my
> HDFS user and only be able to access files I am entitled to read/write?
> Will it enforce HDFS ACLs and Ranger policies as well?
>
> Best regards, Daniel.
>
> > On 03 Sep 2015, at 21:16, Matei Zaharia  wrote:
> >
> > If you run on YARN, you can use Kerberos, be authenticated as the right
> user, etc in the same way as MapReduce jobs.
> >
> > Matei
> >
> >> On Sep 3, 2015, at 1:37 PM, Daniel Schulz 
> wrote:
> >>
> >> Hi,
> >>
> >> I really enjoy using Spark. An obstacle to sell it to our clients
> currently is the missing Kerberos-like security on a Hadoop with simple
> authentication. Are there plans, a proposal, or a project to deliver a
> Ranger plugin or something similar to Spark. The target is to differentiate
> users and their privileges when reading and writing data to HDFS? Is
> Kerberos my only option then?
> >>
> >> Kind regards, Daniel.
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Does Spark.ml LogisticRegression assumes only Double valued features?

2015-09-03 Thread njoshi
Hi,

I was looking at the `Spark 1.5`  dataframe/row api

  
and the  implementation for the logistic regression

 
. As I understand, the /train/ method therein first converts the /data
frame/ to /RDD[LabeledPoint]/ as,

override protected def train(dataset: DataFrame):
LogisticRegressionModel = {
 // Extract columns from data.  If dataset is persisted, do not
persist oldDataset.
 val instances = extractLabeledPoints(dataset).map {
   case LabeledPoint(label: Double, features: Vector) => (label,
features)
 }
...

And then it proceeds to feature standardization, etc. 

What I am confused with is, the /DataFrame/ is of type /RDD[Row]/ and /Row/
is allowed to have any /valueTypes/, for e.g. /(1, true, "a string", null)/
seems a valid row of a dataframe. If that is so, what does the
/extractLabeledPoints/ above mean? It seems it is selecting only
/Array[Double]/ as the feature values in /Vector/ (dense or sparse). What
happens if a column in the data-frame was a set of /strings/? Also, what
happens to the integer categorical values?

Thanks in advance,
Nikhil



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-ml-LogisticRegression-assumes-only-Double-valued-features-tp24575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: different Row objects?

2015-09-03 Thread Davies Liu
This was fixed by 1.5, could you download 1.5-RC3 to test this?

On Thu, Sep 3, 2015 at 4:45 PM, Wei Chen  wrote:
> Hey Friends,
>
> Recently I have been using Spark 1.3.1, mainly pyspark.sql. I noticed that
> the Row object collected directly from a DataFrame is different from the Row
> object we directly defined from Row(*arg, **kwarg).
>
from pyspark.sql.types import Row
aaa = Row(a=1, b=2, c=Row(a=1, b=2))
tuple(sc.parallelize([aaa]).toDF().collect()[0])
>
> (1, 2, (1, 2))
>
tuple(aaa)
>
> (1, 2, Row(a=1, b=2))
>
>
> This matters to me because I wanted to be able to create a DataFrame with
> one of the columns being a Row object by sqlcontext.createDataFrame(data,
> schema) where I specifically pass in the schema. However, if the data is RDD
> of Row objects like "aaa" in my example, it'll fail in __verify_type
> function.
>
>
>
> Thank you,
>
> Wei

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



repartition on direct kafka stream

2015-09-03 Thread Shushant Arora
1.Does repartitioning on direct kafka stream shuffles only the offsets or
exact kafka messages across executors?

Say I have a direct kafkastream

directKafkaStream.repartition(numexecutors).mapPartitions(new
FlatMapFunction>, String>(){
...
}

Say originally I have 5*numexceutor partitons in kafka.

Now only the offset ranges should be shuffled to executors not exact kafka
messages? But I am seeing a very large size of shuffles data read/write on
streaming ui. When I remove this repartition - shuffle read /write becomes
0.


Re: repartition on direct kafka stream

2015-09-03 Thread Saisai Shao
Yes not the offset ranges, but the real data will be shuffled when you
using repartition().

Thanks
Saisai

On Fri, Sep 4, 2015 at 12:42 PM, Shushant Arora 
wrote:

> 1.Does repartitioning on direct kafka stream shuffles only the offsets or
> exact kafka messages across executors?
>
> Say I have a direct kafkastream
>
> directKafkaStream.repartition(numexecutors).mapPartitions(new
> FlatMapFunction>, String>(){
> ...
> }
>
> Say originally I have 5*numexceutor partitons in kafka.
>
> Now only the offset ranges should be shuffled to executors not exact kafka
> messages? But I am seeing a very large size of shuffles data read/write on
> streaming ui. When I remove this repartition - shuffle read /write becomes
> 0.
>
>


Re: spark-submit not using conf/spark-defaults.conf

2015-09-03 Thread Axel Dahl
logged it here:

https://issues.apache.org/jira/browse/SPARK-10436

On Thu, Sep 3, 2015 at 10:32 AM, Davies Liu  wrote:

> I think it's a missing feature.
>
> On Wed, Sep 2, 2015 at 10:58 PM, Axel Dahl  wrote:
> > So a bit more investigation, shows that:
> >
> > if I have configured spark-defaults.conf with:
> >
> > "spark.files  library.py"
> >
> > then if I call
> >
> > "spark-submit.py -v test.py"
> >
> > I see that my "spark.files" default option has been replaced with
> > "spark.files  test.py",  basically spark-submit is overwriting
> > spark.files with the name of the script.
> >
> > Is this a bug or is there another way to add default libraries without
> > having to specify them on the command line?
> >
> > Thanks,
> >
> > -Axel
> >
> >
> >
> > On Wed, Sep 2, 2015 at 10:34 PM, Davies Liu 
> wrote:
> >>
> >> This should be a bug, could you create a JIRA for it?
> >>
> >> On Wed, Sep 2, 2015 at 4:38 PM, Axel Dahl 
> wrote:
> >> > in my spark-defaults.conf I have:
> >> > spark.files   file1.zip, file2.py
> >> > spark.master   spark://master.domain.com:7077
> >> >
> >> > If I execute:
> >> > bin/pyspark
> >> >
> >> > I can see it adding the files correctly.
> >> >
> >> > However if I execute
> >> >
> >> > bin/spark-submit test.py
> >> >
> >> > where test.py relies on the file1.zip, I get and error.
> >> >
> >> > If I i instead execute
> >> >
> >> > bin/spark-submit --py-files file1.zip test.py
> >> >
> >> > It works as expected.
> >> >
> >> > How do I get spark-submit to import the spark-defaults.conf file or
> what
> >> > should I start checking to figure out why one works and the other
> >> > doesn't?
> >> >
> >> > Thanks,
> >> >
> >> > -Axel
> >
> >
>


Re: Ranger-like Security on Spark

2015-09-03 Thread Jörn Franke
Well if it needs to read from hdfs then it will adhere to the permissions
defined there And/or in ranger. However, I am not aware that you can
protect dataframes, tables or streams in general in Spark.

Le jeu. 3 sept. 2015 à 21:47, Daniel Schulz 
a écrit :

> Hi Matei,
>
> Thanks for your answer.
>
> My question is regarding simple authenticated Spark-on-YARN only, without
> Kerberos. So when I run Spark on YARN and HDFS, Spark will pass through my
> HDFS user and only be able to access files I am entitled to read/write?
> Will it enforce HDFS ACLs and Ranger policies as well?
>
> Best regards, Daniel.
>
> > On 03 Sep 2015, at 21:16, Matei Zaharia  wrote:
> >
> > If you run on YARN, you can use Kerberos, be authenticated as the right
> user, etc in the same way as MapReduce jobs.
> >
> > Matei
> >
> >> On Sep 3, 2015, at 1:37 PM, Daniel Schulz 
> wrote:
> >>
> >> Hi,
> >>
> >> I really enjoy using Spark. An obstacle to sell it to our clients
> currently is the missing Kerberos-like security on a Hadoop with simple
> authentication. Are there plans, a proposal, or a project to deliver a
> Ranger plugin or something similar to Spark. The target is to differentiate
> users and their privileges when reading and writing data to HDFS? Is
> Kerberos my only option then?
> >>
> >> Kind regards, Daniel.
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Parsing Avro from Kafka Message

2015-09-03 Thread Daniel Haviv
Hi,
I'm reading messages from Kafka where the value is an avro file.
I would like to parse the contents of the message and work with it as a
DataFrame, like with the spark-avro package but instead of files, pass it a
RDD.

How can this be achieved ?

Thank you.
Daniel


Re: Input size increasing every iteration of gradient boosted trees [1.4]

2015-09-03 Thread Peter Rudenko
Confirm, having the same issue (1.4.1 mllib package). For smaller 
dataset accuracy degradeted also. Haven’t tested yet in 1.5 with ml 
package implementation.


|val boostingStrategy = BoostingStrategy.defaultParams("Classification") 
boostingStrategy.setNumIterations(30) 
boostingStrategy.setLearningRate(1.0) 
boostingStrategy.treeStrategy.setMaxDepth(3) 
boostingStrategy.treeStrategy.setMaxBins(128) 
boostingStrategy.treeStrategy.setSubsamplingRate(1.0) 
boostingStrategy.treeStrategy.setMinInstancesPerNode(1) 
boostingStrategy.treeStrategy.setUseNodeIdCache(true) 
boostingStrategy.treeStrategy.setCategoricalFeaturesInfo( 
mapAsJavaMap(categoricalFeatures).asInstanceOf[java.util.Map[java.lang.Integer, 
java.lang.Integer]]) val model = GradientBoostedTrees.train(instances, 
boostingStrategy) |


Thanks,
Peter Rudenko

On 2015-08-14 00:33, Sean Owen wrote:


Not that I have any answer at this point, but I was discussing this
exact same problem with Johannes today. An input size of ~20K records
was growing each iteration by ~15M records. I could not see why on a
first look.

@jkbradley I know it's not much info but does that ring any bells? I
think Johannes even has an instance of this up and running for
examination.

On Thu, Aug 13, 2015 at 10:04 PM, Matt Forbes
 wrote:

I am training a boosted trees model on a couple million input samples (with
around 300 features) and am noticing that the input size of each stage is
increasing each iteration. For each new tree, the first step seems to be
building the decision tree metadata, which does a .count() on the input
data, so this is the step I've been using to track the input size changing.
Here is what I'm seeing:

count at DecisionTreeMetadata.scala:111
1. Input Size / Records: 726.1 MB / 1295620
2. Input Size / Records: 106.9 GB / 64780816
3. Input Size / Records: 160.3 GB / 97171224
4. Input Size / Records: 214.8 GB / 129680959
5. Input Size / Records: 268.5 GB / 162533424

Input Size / Records: 1912.6 GB / 1382017686


This step goes from taking less than 10s up to 5 minutes by the 15th or so
iteration. I'm not quite sure what could be causing this. I am passing a
memory-only cached RDD[LabeledPoint] to GradientBoostedTrees.train

Does anybody have some insight? Is this a bug or could it be an error on my
part?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


​


RE: Problem while loading saved data

2015-09-03 Thread Ewan Leith
Your error log shows you attempting to read from 'people.parquet2' not 
‘people.parquet’ as you’ve put below, is that just from a different attempt?

Otherwise, it’s an odd one! There aren’t _SUCCESS, _common_metadata and 
_metadata files under people.parquet that you’ve listed below, which would 
normally be created when the write completes, can you show us your write output?


Thanks,
Ewan



From: Amila De Silva [mailto:jaa...@gmail.com]
Sent: 03 September 2015 05:44
To: Guru Medasani 
Cc: user@spark.apache.org
Subject: Re: Problem while loading saved data

Hi Guru,

Thanks for the reply.

Yes, I checked if the file exists. But instead of a single file what I found 
was a directory having the following structure.

people.parquet
└── _temporary
└── 0
├── task_201509030057_4699_m_00
│   └── part-r-0-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
├── task_201509030057_4699_m_01
│   └── part-r-1-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
└── _temporary


On Thu, Sep 3, 2015 at 7:13 AM, Guru Medasani 
> wrote:
Hi Amila,

Error says that the ‘people.parquet’ file does not exist. Can you manually 
check to see if that file exists?


Py4JJavaError: An error occurred while calling o53840.parquet.

: java.lang.AssertionError: assertion failed: No schema defined, and no Parquet 
data file or summary file found under file:/home/ubuntu/ipython/people.parquet2.



Guru Medasani
gdm...@gmail.com



On Sep 2, 2015, at 8:25 PM, Amila De Silva 
> wrote:

Hi All,

I have a two node spark cluster, to which I'm connecting using IPython notebook.
To see how data saving/loading works, I simply created a dataframe using 
people.json using the Code below;

df = sqlContext.read.json("examples/src/main/resources/people.json")

Then called the following to save the dataframe as a parquet.
df.write.save("people.parquet")

Tried loading the saved dataframe using;
df2 = sqlContext.read.parquet('people.parquet');

But this simply fails giving the following exception


---

Py4JJavaError Traceback (most recent call last)

 in ()

> 1 df2 = sqlContext.read.parquet('people.parquet2');



/srv/spark/python/pyspark/sql/readwriter.pyc in parquet(self, *path)

154 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 
'int')]

155 """

--> 156 return 
self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))

157

158 @since(1.4)



/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
__call__(self, *args)

536 answer = self.gateway_client.send_command(command)

537 return_value = get_return_value(answer, self.gateway_client,

--> 538 self.target_id, self.name)

539

540 for temp_arg in temp_args:



/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)

298 raise Py4JJavaError(

299 'An error occurred while calling {0}{1}{2}.\n'.

--> 300 format(target_id, '.', name), value)

301 else:

302 raise Py4JError(



Py4JJavaError: An error occurred while calling o53840.parquet.

: java.lang.AssertionError: assertion failed: No schema defined, and no Parquet 
data file or summary file found under file:/home/ubuntu/ipython/people.parquet2.

   at scala.Predef$.assert(Predef.scala:179)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.org$apache$spark$sql$parquet$ParquetRelation2$MetadataCache$$readSchema(newParquet.scala:429)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)

   at scala.Option.orElse(Option.scala:257)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:369)

   at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$metadataCache$lzycompute(newParquet.scala:126)

   at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$metadataCache(newParquet.scala:124)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)

   at scala.Option.getOrElse(Option.scala:120)

   at 

RE: spark 1.4.1 saveAsTextFile (and Parquet) is slow on emr-4.0.0

2015-09-03 Thread Ewan Leith
For those who have similar issues on EMR writing Parquet files, if you update 
mapred-site.xml with the following lines:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue
parquet.enable.summary-metadatafalse
spark.sql.parquet.output.committer.classorg.apache.spark.sql.parquet.DirectParquetOutputCommitter
 

Then you get Parquet files writing direct to S3 without use of temporary files 
too, and the disabled summary-metadata files which can cause a performance hit 
with writing large Parquet datasets on S3

The easiest way to add them across the cluster is via the –configurations flag 
on the “aws emr create-cluster” command

Thanks,
Ewan


From: Alexander Pivovarov [mailto:apivova...@gmail.com]
Sent: 03 September 2015 00:12
To: Neil Jonkers 
Cc: user@spark.apache.org
Subject: Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

Hi Neil

Yes! it helps!!! I do  not see _temporary in console output anymore.  
saveAsTextFile is fast now.
2015-09-02 23:07:00,022 INFO  [task-result-getter-0] scheduler.TaskSetManager 
(Logging.scala:logInfo(59)) - Finished task 18.0 in stage 0.0 (TID 18) in 4398 
ms on ip-10-0-24-103.ec2.internal (1/24)
2015-09-02 23:07:01,887 INFO  [task-result-getter-2] scheduler.TaskSetManager 
(Logging.scala:logInfo(59)) - Finished task 5.0 in stage 0.0 (TID 5) in 6282 ms 
on ip-10-0-26-14.ec2.internal (24/24)
2015-09-02 23:07:01,888 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - ResultStage 0 (saveAsTextFile at :22) 
finished in 6.319 s
2015-09-02 23:07:02,123 INFO  [main] s3n.Jets3tNativeFileSystemStore 
(Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar 
tmp/test40_141_24_406/_SUCCESS 0

Thank you!

On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers 
> wrote:
Hi,
Can you set the following parameters in your mapred-site.xml file please:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue
You can also config this at cluster launch time with the following 
Classification via EMR console:

classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]


Thank you

On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov 
> wrote:
I checked previous emr config (emr-3.8)
mapred-site.xml has the following setting
 
mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
 


On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov 
> wrote:
Should I use DirectOutputCommitter?
spark.hadoop.mapred.output.committer.class  
com.appsflyer.spark.DirectOutputCommitter



On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov 
> wrote:
I run spark 1.4.1 in amazom aws emr 4.0.0

For some reason spark saveAsTextFile is very slow on emr 4.0.0 in comparison to 
emr 3.8  (was 5 sec, now 95 sec)

Actually saveAsTextFile says that it's done in 4.356 sec but after that I see 
lots of INFO messages with 404 error from com.amazonaws.latency logger for next 
90 sec

spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + 
"A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")

2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - ResultStage 5 (saveAsTextFile at :22) 
finished in 4.356 s
2015-09-01 21:16:17,637 INFO  [task-result-getter-2] cluster.YarnScheduler 
(Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all 
completed, from pool
2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at :22, 
took 4.547829 s
2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem 
(S3NativeFileSystem.java:listStatus(896)) - listStatus 
s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], 
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found 
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 
3B2F06FD11682D22), S3 Extended Request ID: 
C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], 
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], 
AWSRequestID=[3B2F06FD11682D22], 
ServiceEndpoint=[https://foo-bar.s3.amazonaws.com], Exception=1, 
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, 
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], 
HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], 
RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], 
ServiceName=[Amazon S3], 

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-03 Thread Tathagata Das
Are you accidentally recovering from checkpoint files which has 10 second
as the batch interval?


On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg 
wrote:

> I'm seeing an oddity where I initially set the batchdurationmillis to 1
> second and it works fine:
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.milliseconds(batchDurationMillis));
>
> Then I tried changing the value to 10 seconds. The change didn't seem to
> take. I've bounced the Spark workers and the consumers and now I'm seeing
> RDD's coming in once around 10 seconds (not always 10 seconds according to
> the logs).
>
> However, now I'm trying to change the value to 20 seconds and it's just
> not taking. I've bounced Spark master, workers, and consumers and the value
> seems "stuck" at 10 seconds.
>
> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.
>
> Thanks.
>
> - Dmitry
>
>


DataFrame creation delay?

2015-09-03 Thread Isabelle Phan
Hello,

I am using SparkSQL to query some Hive tables. Most of the time, when I
create a DataFrame using sqlContext.sql("select * from table") command,
DataFrame creation is less than 0.5 second.
But I have this one table with which it takes almost 12 seconds!

scala>  val start = scala.compat.Platform.currentTime; val logs =
sqlContext.sql("select * from temp.log"); val execution =
scala.compat.Platform.currentTime - start
15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from temp.log
15/09/04 12:07:02 INFO ParseDriver: Parse Completed
start: Long = 1441336022731
logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
log_time: string, tag: string, dt: string, test_id: int]
execution: Long = *11567*

This table has 3.6 B rows, and 2 partitions (on dt and test_id columns).
I have created DataFrames on even larger tables and do not see such delay.
So my questions are:
- What can impact DataFrame creation time?
- Is it related to the table partitions?


Thanks much your help!

Isabelle


Re: How to determine the value for spark.sql.shuffle.partitions?

2015-09-03 Thread Isabelle Phan
+1

I had the exact same question as I am working on my first Spark
applications.
Hope someone can share some best practices.

Thanks!

Isabelle

On Tue, Sep 1, 2015 at 2:17 AM, Romi Kuntsman  wrote:

> Hi all,
>
> The number of partition greatly affect the speed and efficiency of
> calculation, in my case in DataFrames/SparkSQL on Spark 1.4.0.
>
> Too few partitions with large data cause OOM exceptions.
> Too many partitions on small data cause a delay due to overhead.
>
> How do you programmatically determine the optimal number of partitions and
> cores in Spark, as a function of:
>
>1. available memory per core
>2. number of records in input data
>3. average/maximum record size
>4. cache configuration
>5. shuffle configuration
>6. serialization
>7. etc?
>
> Any general best practices?
>
> Thanks!
>
> Romi K.
>