Re: Can't load a RandomForestClassificationModel in Spark job

2017-02-14 Thread Hollin Wilkins
Hey there,

Creating a new SparkContext on workers will not work, only the driver is
allowed to own a SparkContext. Are you trying to distribute your model to
workers so you can create a distributed scoring service? If so, it may be
worth looking into taking your models outside of a SparkContext and serving
them separately.

If this is your use case, take a look at MLeap. We use it in production to
serve high-volume realtime requests from Spark-trained models:
https://github.com/combust/mleap

Cheers,
Hollin

On Tue, Feb 14, 2017 at 4:46 PM, Jianhong Xia  wrote:

> Is there any update on this problem?
>
>
>
> I encountered the same issue that was mentioned here.
>
>
>
> I have CrossValidatorModel.transform(df) running on workers, which
> requires DataFrame as an input. However, we only have Arrays on workers.
> When we deploy our model into cluster mode, we could not create
> createDataFrame on workers. It will give me error:
>
>
>
>
>
> 17/02/13 20:21:27 ERROR Detector$: Error while detecting threats
>
> java.lang.NullPointerException
>
>  at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:111)
>
>  at org.apache.spark.sql.SparkSession.sessionState(
> SparkSession.scala:109)
>
>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:62)
>
>  at org.apache.spark.sql.SparkSession.createDataFrame(
> SparkSession.scala:270)
>
>  at com.mycompany.analytics.models.app.serializable.
> AppModeler.detection(modeler.scala:370)
>
>
>
>
>
>
>
> On the other hand, if we run in the local, everything works fine.
>
>
>
> Just want to know, if there is any successful case that run machine
> learning models on the workers.
>
>
>
>
>
> Thanks,
>
> Jianhong
>
>
>
>
>
> *From:* Sumona Routh [mailto:sumos...@gmail.com]
> *Sent:* Thursday, January 12, 2017 6:20 PM
> *To:* ayan guha ; user@spark.apache.org
> *Subject:* Re: Can't load a RandomForestClassificationModel in Spark job
>
>
>
> Yes, I save it to S3 in a different process. It is actually the
> RandomForestClassificationModel.load method (passed an s3 path) where I
> run into problems.
> When you say you load it during map stages, do you mean that you are able
> to directly load a model from inside of a transformation? When I try this,
> it passes the function to a worker, and the load method itself appears to
> attempt to create a new SparkContext, which causes an NPE downstream
> (because creating a SparkContext on the worker is not an appropriate thing
> to do, according to various threads I've found).
>
> Maybe there is a different load function I should be using?
>
> Thanks!
>
> Sumona
>
>
>
> On Thu, Jan 12, 2017 at 6:26 PM ayan guha  wrote:
>
> Hi
>
>
>
> Given training and predictions are two different applications, I typically
> save model objects to hdfs and load it back during prediction map stages.
>
>
>
> Best
>
> Ayan
>
>
>
> On Fri, 13 Jan 2017 at 5:39 am, Sumona Routh  wrote:
>
> Hi all,
>
> I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.
>
> I encountered two frustrating issues and would really appreciate some
> advice:
>
> 1)  RandomForestClassificationModel is effectively not serializable (I
> assume it's referencing something that can't be serialized, since it itself
> extends serializable), so I ended up with the well-known exception:
> org.apache.spark.SparkException: Task not serializable.
> Basically, my original intention was to pass the model as a parameter
>
> because which model we use is dynamic based on what record we are
>
> predicting on.
>
> Has anyone else encountered this? Is this currently being addressed? I
> would expect objects from Spark's own libraries be able to be used
> seamlessly in their applications without these types of exceptions.
>
> 2) The RandomForestClassificationModel.load method appears to hang
> indefinitely when executed from inside a map function (which I assume is
> passed to the executor). So, I basically cannot load a model from a worker.
> We have multiple "profiles" that use differently trained models, which are
> accessed from within a map function to run predictions on different sets of
> data.
>
> The thread that is hanging has this as the latest (most pertinent) code:
> org.apache.spark.ml.util.DefaultParamsReader$.
> loadMetadata(ReadWrite.scala:391)
>
> Looking at the code in github, it appears that it is calling sc.textFile.
> I could not find anything stating that this particular function would not
> work from within a map function.
>
> Are there any suggestions as to how I can get this model to work on a real
> production job (either by allowing it to be serializable and passed around
> or loaded from a worker)?
>
> I've extenisvely POCed this model (saving, loading, transforming,
> training, etc.), however this is the first time I'm attempting to use it
> from within a real application.
>
> Sumona
>
>


Re: Spark's execution plan debugging

2017-02-14 Thread ??????????
HI  all,

I have on issue about the text.

-sortMergeJoin[c1#41,c1#98]

what  does 41 and 98 stand for please.
thanks:)

---Original---
From: "Swapnil Shinde"
Date: 2017/2/11 07:38:42
To: "Yong Zhang";
Cc: "user@spark.apache.org";
Subject: Re: Spark's execution plan debugging


Thanks for your reply. I agree to your explanation of caching and seeing that 
it's working as expected. I am running given snippet on spark 2.0.1 and even 
with caching, I can see it's going back to dataframes a & b.

On Thu, Feb 9, 2017 at 3:41 PM, Yong Zhang  wrote:
   
You may misunderstand what the cache mean. Caching a DF just means the data can 
be retrieved from the memory directly, instead of going to parent dependency to 
get the data. In your example, even the C is cached, but if you have 2 DFs 
derived out from it,  then the DF of C will be scanned 2 times in your 
application, but they are retrieved directly from the memory, instead of going 
to A/B DFs, which are the parent DFs that C is derived out from.
 

 
 
In the Spark execution plan, it can find out if any DFs in the chain being 
cached or not, then generate the right execution plan accordingly, as shown in 
following example (Tested with Spark 1.6.3). So as you can see, if the C is NOT 
cached, then your X  has to go to A/B (Scanning existing RDDs), but after C 
caches, Spark will get from "InMemoryColumnarTableScan".  But cache has nothing 
to do how many times the data will be scanned or not.
 

 
 

 scala> x.explain
 == Physical Plan ==
 SortMergeJoin [c1#41], [c1#98]
 :- SortMergeJoin [c1#41], [d1#45]
 : :- Sort [c1#41 ASC], false, 0
 : : +- TungstenExchange hashpartitioning(c1#41,200), None
 : :   +- Project [_1#39 AS c1#41,_2#40 AS c2#42]
 : :+- Filter (_1#39 = a)
 : :  +- Scan 
ExistingRDD[_1#39,_2#40]
 : +- Sort [d1#45 ASC], false, 0
 :   +- TungstenExchange hashpartitioning(d1#45,200), None
 :+- Project [_1#43 AS d1#45,_2#44 AS d2#46]
 :  +- Scan ExistingRDD[_1#43,_2#44]
 +- SortMergeJoin [c1#98], [d1#102]
  :- Sort [c1#98 ASC], false, 0
  : +- TungstenExchange hashpartitioning(c1#98,200), None
  :   +- Project [_1#39 AS c1#98,_2#40 AS c2#99]
  :+- Scan 
ExistingRDD[_1#39,_2#40]
  +- Sort [d1#102 ASC], false, 0
+- TungstenExchange hashpartitioning(d1#102,200), None
 +- Project [_1#43 AS d1#102,_2#44 AS d2#103]
   +- Filter (_1#43 = b)
+- Scan 
ExistingRDD[_1#43,_2#44]
 
 
 scala> c.cache
 res17: c.type = [c1: string, c2: int, d1: string, d2: int]
 
 
 scala> x.explain
 == Physical Plan ==
 SortMergeJoin [c1#41], [c1#98]
 :- Filter (c1#41 = a)
 : +- InMemoryColumnarTableScan [c1#41,c2#42,d1#45,d2#46], [(c1#41 = a)], 
InMemoryRelation [c1#41,c2#42,d1#45,d2#46], true, 1, StorageLevel(true, 
true, false, true, 1), SortMergeJoin [c1#41], [d1#45], None
 +- Sort [c1#98 ASC], false, 0
  +- TungstenExchange hashpartitioning(c1#98,200), None
+- Filter (d1#102 = b)
 +- InMemoryColumnarTableScan 
[c1#98,c2#99,d1#102,d2#103], [(d1#102 = b)], InMemoryRelation 
[c1#98,c2#99,d1#102,d2#103], true, 1, StorageLevel(true, true, false, true, 
1), SortMergeJoin [c1#41], [d1#45], None
 
 
 
 

 
 
  
 From: Swapnil Shinde 
 Sent: Thursday, February 9, 2017 2:53 PM
 To: user@spark.apache.org
 Subject: Re: Spark's execution plan debugging 
 
  Any suggestions, please..
 
 On Wed, Feb 8, 2017 at 12:02 PM, Swapnil Shinde   
wrote:
  HelloI am trying to figure out how spark 
generates its execution plan with and without caching. I have this small 
example to illustrate what I am doing-
 
 
  val a = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("c1", "c2")
 val b = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("d1", "d2")
 val c = a.join(b, $"c1" === $"d1")
 val d1 = c.map(x => {val c1 = x.getAs[String]("c1"); val c2 = 
x.getAs[Int]("c2"); (c1, c2*2)}).toDF("nC1", "nC2")
 val d2 = c.map(x => {val d1 = x.getAs[String]("d1"); val d2 = 
x.getAs[Int]("d2"); (d1, d2*3)}).toDF("nD1", "nD2")
 val x = d1.as("a").join(d2.as("b"), $"a.nC1" === $"b.nD1")
 
 
 
 generic DAG for dataframe 'x' would be something like this- (Fig1)

 
 
 Obviously, physical plan (x.explain) generates something like this - (Without 
any caching)
 
 
 
 
 I am interpreting this as -
 
 
 
 
 
 
 As per my understanding, dataframe C is being used twice so it will be good to 
cache to it. I am hoping if I cache 'c' then execution plan will look like 
generic (explained above in fig1). However, I dont see it that way. Correct me 
if my understanding  is wrong in interpreting plan- (Here c is cached)
 
 
 
 
 I don't think caching 'c' is helping anyway. Basically, input dataframes 'a' & 
'b' are being fetched twice. (In this example a,b are dataframes generated from 
local collection but real world has large files)
 
 
 Question:
   Why caching 'c' doesn't build 

Re: Spark Thrift Server - Skip header when load data from local file

2017-02-14 Thread ayan guha
I doubt you can do that. Create a staging table and then insert into main
table after filtering the header
On Wed, 15 Feb 2017 at 4:01 pm, kumar r  wrote:

> Hi,
>
> I want to load data from csv file to Spark table using Spark thrift
> server. When i load, header(first line) should be ignored.
>
> I have used tblproperties("skip.header.line.count"="1") option. But its
> not working and first line also included.
>
> Below is spark sql query i have tried.
>
> create table tabname(id string,name string) row format delimited fields
> terminated by ',' tblproperties("skip.header.line.count"="1");
> load data local inpath 'tabname.csv' overwrite into table tabname;
>
> How can i achieve this? Is there any other solution or workaround.
>
-- 
Best Regards,
Ayan Guha


Spark Thrift Server - Skip header when load data from local file

2017-02-14 Thread kumar r
Hi,

I want to load data from csv file to Spark table using Spark thrift server.
When i load, header(first line) should be ignored.

I have used tblproperties("skip.header.line.count"="1") option. But its not
working and first line also included.

Below is spark sql query i have tried.

create table tabname(id string,name string) row format delimited fields
terminated by ',' tblproperties("skip.header.line.count"="1");
load data local inpath 'tabname.csv' overwrite into table tabname;

How can i achieve this? Is there any other solution or workaround.


Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Mohammad Kargar
I will.

Thanks anyway
Mohammad

On Feb 14, 2017 7:24 PM, "Cody Koeninger"  wrote:

> Not sure what to tell you at that point - maybe compare what is
> present in ZK to a known working group.
>
> On Tue, Feb 14, 2017 at 9:06 PM, Mohammad Kargar 
> wrote:
> > Yes offset nodes are in zk and I can get the values.
> >
> > On Feb 14, 2017 6:54 PM, "Cody Koeninger"  wrote:
> >>
> >> Are there offset nodes in ZK?
> >>
> >> for a consumer group named mygroup, a topic named test, and partition
> >> 0 you should be able to connect to ZK and do e.g.
> >>
> >> get /consumers/mygroup/offsets/test/0
> >>
> >> If those don't exist, those are the ZK nodes you would need to make
> >> sure get created / updated from your spark job.
> >>
> >>
> >>
> >> On Tue, Feb 14, 2017 at 8:40 PM, Mohammad Kargar 
> >> wrote:
> >> > I'm running 0.10 version and
> >> >
> >> > ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
> >> >
> >> >  lists the group.
> >> >
> >> > On Feb 14, 2017 6:34 PM, "Cody Koeninger"  wrote:
> >> >>
> >> >> As far as I know, kafka-consumer-groups.sh wasn't available with 0.8
> >> >> versions of the kafka server.  Which version of the server are you
> >> >> running?
> >> >>
> >> >> At any rate, the low-level consumer used by the
> >> >> spark-streaming-kafka-0-8 integration isn't going to create a
> consumer
> >> >> group or otherwise interact with them.  If
> >> >>
> >> >> ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
> >> >>
> >> >> doesn't show the group you're expecting, have you used a consumer
> that
> >> >> would have created it?  Or did you explicitly try to create the
> >> >> related ZK nodes yourself?
> >> >>
> >> >>
> >> >> On Tue, Feb 14, 2017 at 8:07 PM, Mohammad Kargar 
> >> >> wrote:
> >> >> > Tried
> >> >> >
> >> >> > ${kafka-home}/bin/kafka-consumer-groups.sh --zookeeper zhost:zp
> >> >> > --describe
> >> >> > --group mygroup
> >> >> >
> >> >> > and got
> >> >> >
> >> >> > No topic available for consumer group provided!
> >> >> >
> >> >> > Let me know if you still need the code (have to make changes before
> >> >> > sharing).
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >> >
> >> >> > On Tue, Feb 14, 2017 at 5:57 PM, Cody Koeninger <
> c...@koeninger.org>
> >> >> > wrote:
> >> >> >>
> >> >> >> Can you explain what wasn't successful and/or show code?
> >> >> >>
> >> >> >> On Tue, Feb 14, 2017 at 6:03 PM, Mohammad Kargar <
> mkar...@phemi.com>
> >> >> >> wrote:
> >> >> >> > As explained here, direct approach of integration between spark
> >> >> >> > streaming
> >> >> >> > and kafka does not update offsets in Zookeeper, hence
> >> >> >> > Zookeeper-based
> >> >> >> > Kafka
> >> >> >> > monitoring tools will not show progress (details).  We followed
> >> >> >> > the
> >> >> >> > recommended workaround to update the zookeeper with the latest
> >> >> >> > offset
> >> >> >> > after
> >> >> >> > each batch, but no success. Wondering if there's any end-to-end
> >> >> >> > example
> >> >> >> > we
> >> >> >> > can use?
> >> >> >> >
> >> >> >> > Thanks,
> >> >> >> > Mohammad
> >> >> >> >
> >> >> >
> >> >> >
>


Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Cody Koeninger
Not sure what to tell you at that point - maybe compare what is
present in ZK to a known working group.

On Tue, Feb 14, 2017 at 9:06 PM, Mohammad Kargar  wrote:
> Yes offset nodes are in zk and I can get the values.
>
> On Feb 14, 2017 6:54 PM, "Cody Koeninger"  wrote:
>>
>> Are there offset nodes in ZK?
>>
>> for a consumer group named mygroup, a topic named test, and partition
>> 0 you should be able to connect to ZK and do e.g.
>>
>> get /consumers/mygroup/offsets/test/0
>>
>> If those don't exist, those are the ZK nodes you would need to make
>> sure get created / updated from your spark job.
>>
>>
>>
>> On Tue, Feb 14, 2017 at 8:40 PM, Mohammad Kargar 
>> wrote:
>> > I'm running 0.10 version and
>> >
>> > ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
>> >
>> >  lists the group.
>> >
>> > On Feb 14, 2017 6:34 PM, "Cody Koeninger"  wrote:
>> >>
>> >> As far as I know, kafka-consumer-groups.sh wasn't available with 0.8
>> >> versions of the kafka server.  Which version of the server are you
>> >> running?
>> >>
>> >> At any rate, the low-level consumer used by the
>> >> spark-streaming-kafka-0-8 integration isn't going to create a consumer
>> >> group or otherwise interact with them.  If
>> >>
>> >> ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
>> >>
>> >> doesn't show the group you're expecting, have you used a consumer that
>> >> would have created it?  Or did you explicitly try to create the
>> >> related ZK nodes yourself?
>> >>
>> >>
>> >> On Tue, Feb 14, 2017 at 8:07 PM, Mohammad Kargar 
>> >> wrote:
>> >> > Tried
>> >> >
>> >> > ${kafka-home}/bin/kafka-consumer-groups.sh --zookeeper zhost:zp
>> >> > --describe
>> >> > --group mygroup
>> >> >
>> >> > and got
>> >> >
>> >> > No topic available for consumer group provided!
>> >> >
>> >> > Let me know if you still need the code (have to make changes before
>> >> > sharing).
>> >> >
>> >> > Thanks
>> >> >
>> >> >
>> >> > On Tue, Feb 14, 2017 at 5:57 PM, Cody Koeninger 
>> >> > wrote:
>> >> >>
>> >> >> Can you explain what wasn't successful and/or show code?
>> >> >>
>> >> >> On Tue, Feb 14, 2017 at 6:03 PM, Mohammad Kargar 
>> >> >> wrote:
>> >> >> > As explained here, direct approach of integration between spark
>> >> >> > streaming
>> >> >> > and kafka does not update offsets in Zookeeper, hence
>> >> >> > Zookeeper-based
>> >> >> > Kafka
>> >> >> > monitoring tools will not show progress (details).  We followed
>> >> >> > the
>> >> >> > recommended workaround to update the zookeeper with the latest
>> >> >> > offset
>> >> >> > after
>> >> >> > each batch, but no success. Wondering if there's any end-to-end
>> >> >> > example
>> >> >> > we
>> >> >> > can use?
>> >> >> >
>> >> >> > Thanks,
>> >> >> > Mohammad
>> >> >> >
>> >> >
>> >> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: My spark job runs faster in spark 1.6 and much slower in spark 2.0

2017-02-14 Thread arun kumar Natva
Hi John,
The number of rows in input file is 30 billion rows. The size of input data
is 72 GB, and the output is expected to have readings for each account &
day combination for 50k sample accounts, which means total output records
count =  50k * 365


On Tue, Feb 14, 2017 at 6:29 PM, Jörn Franke  wrote:

> Can you check in the UI which tasks took most of the time?
>
> Even the 45 min looks a little bit much given that you only work most of
> the time with 50k rows
>
> On 15 Feb 2017, at 00:03, Timur Shenkao  wrote:
>
> Hello,
> I'm not sure that's your reason but check this discussion:
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-
> performance-regression-between-1-6-and-2-x-td20803.html
>
> On Tue, Feb 14, 2017 at 9:25 PM, anatva  wrote:
>
>> Hi,
>> I am reading an ORC file, and perform some joins, aggregations and finally
>> generate a dense vector to perform analytics.
>>
>> The code runs in 45 minutes on spark 1.6 on a 4 node cluster. When the
>> same
>> code is migrated to run on spark 2.0 on the same cluster, it takes around
>> 4-5 hours. It is surprising and frustrating.
>>
>> Can anyone take a look and help me what should I change in order to get
>> atleast same performance in spark 2.0.
>>
>> spark-shell --master yarn-client --driver-memory 5G --executor-memory 20G
>> \
>> --num-executors 15 --executor-cores 5 --queue grid_analytics --conf
>> spark.yarn.executor.memoryOverhead=5120 --conf
>> spark.executor.heartbeatInterval=1200
>>
>> import sqlContext.implicits._
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.sql.functions.lit
>> import org.apache.hadoop._
>> import org.apache.spark.sql.functions.udf
>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>> import org.apache.spark.ml.clustering.KMeans
>> import org.apache.spark.ml.feature.StandardScaler
>> import org.apache.spark.ml.feature.Normalizer
>>
>> here are the steps:
>> 1. read orc file
>> 2. filter some of the records
>> 3. persist resulting data frame
>> 4. get distinct accounts from the df and get a sample of 50k accts from
>> the
>> distinct list
>> 5. join the above data frame with distinct 50k accounts to pull records
>> for
>> only those 50k accts
>> 6. perform a group by to get the avg, mean, sum, count of readings for the
>> given 50k accts
>> 7. join the df obtained by GROUPBY with original DF
>> 8. convert the resultant DF to an RDD, do a groupbyKey(), and generate a
>> DENSE VECTOR
>> 9. convert RDD back to DF and store it in a parquet file
>>
>> The above steps worked fine in spark 1.6 but i m not sure why they run
>> painfully long in spark 2.0.
>>
>> I am using spark 1.6 & spark 2.0 on HDP 2.5.3
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/My-spark-job-runs-faster-in-spark-1-6-
>> and-much-slower-in-spark-2-0-tp28390.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


-- 
Regards,
Arun Kumar Natva


Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Mohammad Kargar
Yes offset nodes are in zk and I can get the values.

On Feb 14, 2017 6:54 PM, "Cody Koeninger"  wrote:

> Are there offset nodes in ZK?
>
> for a consumer group named mygroup, a topic named test, and partition
> 0 you should be able to connect to ZK and do e.g.
>
> get /consumers/mygroup/offsets/test/0
>
> If those don't exist, those are the ZK nodes you would need to make
> sure get created / updated from your spark job.
>
>
>
> On Tue, Feb 14, 2017 at 8:40 PM, Mohammad Kargar 
> wrote:
> > I'm running 0.10 version and
> >
> > ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
> >
> >  lists the group.
> >
> > On Feb 14, 2017 6:34 PM, "Cody Koeninger"  wrote:
> >>
> >> As far as I know, kafka-consumer-groups.sh wasn't available with 0.8
> >> versions of the kafka server.  Which version of the server are you
> >> running?
> >>
> >> At any rate, the low-level consumer used by the
> >> spark-streaming-kafka-0-8 integration isn't going to create a consumer
> >> group or otherwise interact with them.  If
> >>
> >> ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
> >>
> >> doesn't show the group you're expecting, have you used a consumer that
> >> would have created it?  Or did you explicitly try to create the
> >> related ZK nodes yourself?
> >>
> >>
> >> On Tue, Feb 14, 2017 at 8:07 PM, Mohammad Kargar 
> >> wrote:
> >> > Tried
> >> >
> >> > ${kafka-home}/bin/kafka-consumer-groups.sh --zookeeper zhost:zp
> >> > --describe
> >> > --group mygroup
> >> >
> >> > and got
> >> >
> >> > No topic available for consumer group provided!
> >> >
> >> > Let me know if you still need the code (have to make changes before
> >> > sharing).
> >> >
> >> > Thanks
> >> >
> >> >
> >> > On Tue, Feb 14, 2017 at 5:57 PM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> Can you explain what wasn't successful and/or show code?
> >> >>
> >> >> On Tue, Feb 14, 2017 at 6:03 PM, Mohammad Kargar 
> >> >> wrote:
> >> >> > As explained here, direct approach of integration between spark
> >> >> > streaming
> >> >> > and kafka does not update offsets in Zookeeper, hence
> Zookeeper-based
> >> >> > Kafka
> >> >> > monitoring tools will not show progress (details).  We followed the
> >> >> > recommended workaround to update the zookeeper with the latest
> offset
> >> >> > after
> >> >> > each batch, but no success. Wondering if there's any end-to-end
> >> >> > example
> >> >> > we
> >> >> > can use?
> >> >> >
> >> >> > Thanks,
> >> >> > Mohammad
> >> >> >
> >> >
> >> >
>


Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Cody Koeninger
Are there offset nodes in ZK?

for a consumer group named mygroup, a topic named test, and partition
0 you should be able to connect to ZK and do e.g.

get /consumers/mygroup/offsets/test/0

If those don't exist, those are the ZK nodes you would need to make
sure get created / updated from your spark job.



On Tue, Feb 14, 2017 at 8:40 PM, Mohammad Kargar  wrote:
> I'm running 0.10 version and
>
> ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
>
>  lists the group.
>
> On Feb 14, 2017 6:34 PM, "Cody Koeninger"  wrote:
>>
>> As far as I know, kafka-consumer-groups.sh wasn't available with 0.8
>> versions of the kafka server.  Which version of the server are you
>> running?
>>
>> At any rate, the low-level consumer used by the
>> spark-streaming-kafka-0-8 integration isn't going to create a consumer
>> group or otherwise interact with them.  If
>>
>> ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
>>
>> doesn't show the group you're expecting, have you used a consumer that
>> would have created it?  Or did you explicitly try to create the
>> related ZK nodes yourself?
>>
>>
>> On Tue, Feb 14, 2017 at 8:07 PM, Mohammad Kargar 
>> wrote:
>> > Tried
>> >
>> > ${kafka-home}/bin/kafka-consumer-groups.sh --zookeeper zhost:zp
>> > --describe
>> > --group mygroup
>> >
>> > and got
>> >
>> > No topic available for consumer group provided!
>> >
>> > Let me know if you still need the code (have to make changes before
>> > sharing).
>> >
>> > Thanks
>> >
>> >
>> > On Tue, Feb 14, 2017 at 5:57 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> Can you explain what wasn't successful and/or show code?
>> >>
>> >> On Tue, Feb 14, 2017 at 6:03 PM, Mohammad Kargar 
>> >> wrote:
>> >> > As explained here, direct approach of integration between spark
>> >> > streaming
>> >> > and kafka does not update offsets in Zookeeper, hence Zookeeper-based
>> >> > Kafka
>> >> > monitoring tools will not show progress (details).  We followed the
>> >> > recommended workaround to update the zookeeper with the latest offset
>> >> > after
>> >> > each batch, but no success. Wondering if there's any end-to-end
>> >> > example
>> >> > we
>> >> > can use?
>> >> >
>> >> > Thanks,
>> >> > Mohammad
>> >> >
>> >
>> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Write JavaDStream to Kafka (how?)

2017-02-14 Thread Cody Koeninger
It looks like you're creating a kafka producer on the driver, and
attempting to write the string representation of
stringIntegerJavaPairRDD.  Instead, you probably want to be calling
stringIntegerJavaPairRDD.foreachPartition, so that producing to kafka
is being done on the executor.

Read

https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Fri, Feb 10, 2017 at 4:08 AM, Gutwein, Sebastian
 wrote:
> Hi,
>
>
> I'am new to Spark-Streaming and want to run some end-to-end-tests with Spark
> and Kafka.
>
> My program is running but at the kafka topic nothing arrives. Can someone
> please help me?
>
> Where is my mistake, has someone a runnig example of writing a DStream to
> Kafka 0.10.1.0?
>
>
> The program looks like follows:
>
> import kafka.Kafka;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.rdd.RDD;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import scala.Int;
> import scala.Tuple2;
>
> import java.util.*;
> import java.util.regex.Pattern;
>
> /**
>  * Consumes messages from one or more topics in Kafka and does wordcount.
>  *
>  * Usage: JavaKafkaWordCount
>  *is a list of one or more zookeeper servers that make quorum
>  *is the name of kafka consumer group
>  *is a list of one or more kafka topics to consume from
>  *is the number of threads the kafka consumer should use
>  *
>  * To run this example:
>  *   `$ bin/run-example
> org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
>  *zoo03 my-consumer-group topic1,topic2 1`
>  */
>
> public final class JavaKafkaWordCountTest {
>   private static final Pattern SPACE = Pattern.compile(" ");
>
>   private JavaKafkaWordCountTest() {
>   }
>
>   public static void main(String[] args) throws Exception {
> if (args.length < 4) {
>   System.err.println("Usage: JavaKafkaWordCount  
>  ");
>   System.exit(1);
> }
>
> SparkConf sparkConf = new
> SparkConf().setAppName("GutweinKafkaWordCount");
> // Create the context with 2 seconds batch size
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(2000));
>
> int numThreads = Integer.parseInt(args[3]);
> Map topicMap = new HashMap<>();
> String[] topics = args[2].split(",");
> for (String topic: topics) {
>   topicMap.put(topic, numThreads);
> }
>
> final JavaPairReceiverInputDStream messages =
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
>
> JavaDStream lines = messages.map(new Function String>, String>() {
>   @Override
>   public String call(Tuple2 tuple2) {
> return tuple2._2();
>   }
> });
>
> JavaDStream words = lines.flatMap(new FlatMapFunction String>() {
>   @Override
>   public Iterator call(String x) {
> return Arrays.asList(SPACE.split(x)).iterator();
>   }
> });
>
> JavaPairDStream wordCounts = words.mapToPair(
>   new PairFunction() {
> @Override
> public Tuple2 call(String s) {
>   return new Tuple2<>(s, 1);
> }
>   }).reduceByKey(new Function2() {
> @Override
> public Integer call(Integer i1, Integer i2) {
>   return i1 + i2;
> }
>   });
>
> final KafkaWriter writer = new KafkaWriter("localhost:9081");
>
> wordCounts.foreachRDD(new VoidFunction>() {
> @Override
> public void call(JavaPairRDD
> stringIntegerJavaPairRDD) throws Exception {
> writer.writeToTopic("output",
> stringIntegerJavaPairRDD.toString());
> }
> });
>
> wordCounts.print();
> jssc.start();
> jssc.awaitTermination();
>   }
>
>   public static class KafkaWriter {
> Properties props = new Properties();
> KafkaProducer producer;
>
> // Constructor
> KafkaWriter(String bootstrap_server){
>   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);
>   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringSerializer");
>   

Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Cody Koeninger
Can you explain what wasn't successful and/or show code?

On Tue, Feb 14, 2017 at 6:03 PM, Mohammad Kargar  wrote:
> As explained here, direct approach of integration between spark streaming
> and kafka does not update offsets in Zookeeper, hence Zookeeper-based Kafka
> monitoring tools will not show progress (details).  We followed the
> recommended workaround to update the zookeeper with the latest offset after
> each batch, but no success. Wondering if there's any end-to-end example we
> can use?
>
> Thanks,
> Mohammad
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to specify default value for StructField?

2017-02-14 Thread Yong Zhang
You maybe are looking for something like "spark.sql.parquet.mergeSchema" for 
ORC. Unfortunately, I don't think it is available, unless someone tells me I am 
wrong.

You can create a JIRA to request this feature, but we all know that Parquet is 
the first citizen format []

Yong


From: Begar, Veena 
Sent: Tuesday, February 14, 2017 10:37 AM
To: smartzjp; user@spark.apache.org
Subject: RE: How to specify default value for StructField?

Thanks, it didn't work. Because, the folder has files from 2 different schemas.
It fails with the following exception:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];


-Original Message-
From: smartzjp [mailto:zjp_j...@163.com]
Sent: Tuesday, February 14, 2017 10:32 AM
To: Begar, Veena ; user@spark.apache.org
Subject: Re: How to specify default value for StructField?

You can try the below code.

val df = spark.read.format("orc").load("/user/hos/orc_files_test_together")
df.select(“f1”,”f2”).show





在 2017/2/14 

RE: Can't load a RandomForestClassificationModel in Spark job

2017-02-14 Thread Jianhong Xia
Is there any update on this problem?

I encountered the same issue that was mentioned here.

I have CrossValidatorModel.transform(df) running on workers, which requires 
DataFrame as an input. However, we only have Arrays on workers. When we deploy 
our model into cluster mode, we could not create createDataFrame on workers. It 
will give me error:


17/02/13 20:21:27 ERROR Detector$: Error while detecting threats
java.lang.NullPointerException
 at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
 at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
 at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:62)
 at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:270)
 at 
com.mycompany.analytics.models.app.serializable.AppModeler.detection(modeler.scala:370)



On the other hand, if we run in the local, everything works fine.

Just want to know, if there is any successful case that run machine learning 
models on the workers.


Thanks,
Jianhong


From: Sumona Routh [mailto:sumos...@gmail.com]
Sent: Thursday, January 12, 2017 6:20 PM
To: ayan guha ; user@spark.apache.org
Subject: Re: Can't load a RandomForestClassificationModel in Spark job

Yes, I save it to S3 in a different process. It is actually the 
RandomForestClassificationModel.load method (passed an s3 path) where I run 
into problems.
When you say you load it during map stages, do you mean that you are able to 
directly load a model from inside of a transformation? When I try this, it 
passes the function to a worker, and the load method itself appears to attempt 
to create a new SparkContext, which causes an NPE downstream (because creating 
a SparkContext on the worker is not an appropriate thing to do, according to 
various threads I've found).
Maybe there is a different load function I should be using?
Thanks!
Sumona

On Thu, Jan 12, 2017 at 6:26 PM ayan guha 
> wrote:
Hi

Given training and predictions are two different applications, I typically save 
model objects to hdfs and load it back during prediction map stages.

Best
Ayan

On Fri, 13 Jan 2017 at 5:39 am, Sumona Routh 
> wrote:
Hi all,
I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.
I encountered two frustrating issues and would really appreciate some advice:

1)  RandomForestClassificationModel is effectively not serializable (I assume 
it's referencing something that can't be serialized, since it itself extends 
serializable), so I ended up with the well-known exception: 
org.apache.spark.SparkException: Task not serializable.
Basically, my original intention was to pass the model as a parameter

because which model we use is dynamic based on what record we are

predicting on.
Has anyone else encountered this? Is this currently being addressed? I would 
expect objects from Spark's own libraries be able to be used seamlessly in 
their applications without these types of exceptions.
2) The RandomForestClassificationModel.load method appears to hang indefinitely 
when executed from inside a map function (which I assume is passed to the 
executor). So, I basically cannot load a model from a worker. We have multiple 
"profiles" that use differently trained models, which are accessed from within 
a map function to run predictions on different sets of data.
The thread that is hanging has this as the latest (most pertinent) code:
org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:391)
Looking at the code in github, it appears that it is calling sc.textFile. I 
could not find anything stating that this particular function would not work 
from within a map function.
Are there any suggestions as to how I can get this model to work on a real 
production job (either by allowing it to be serializable and passed around or 
loaded from a worker)?
I've extenisvely POCed this model (saving, loading, transforming, training, 
etc.), however this is the first time I'm attempting to use it from within a 
real application.
Sumona


Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread bryan . jeffrey


Mohammad, 


We store our offsets in Cassandra,  and use that for tracking. This solved a 
few issues for us,  as it provides a good persistence mechanism even when 
you're reading from multiple clusters. 


Bryan Jeffrey 




Get Outlook for Android









On Tue, Feb 14, 2017 at 7:03 PM -0500, "Mohammad Kargar"  
wrote:










As explained here, direct approach of integration between spark streaming and 
kafka does not update offsets in Zookeeper, hence Zookeeper-based Kafka 
monitoring tools will not show progress (details).  We followed the recommended 
workaround to update the zookeeper with the latest offset after each batch, but 
no success. Wondering if there's any end-to-end example we can use?

Thanks,
Mohammad









Re: using spark-xml_2.10 to extract data from XML file

2017-02-14 Thread Hyukjin Kwon
Hi Carlo,


There was a bug in lower versions when accessing to nested values in the
library.

Otherwise, I suspect another issue about parsing malformed XML.

Could you maybe open an issue in
https://github.com/databricks/spark-xml/issues with your sample data?

I will stick with it until it is solved.


Thanks.



2017-02-15 5:04 GMT+09:00 Carlo.Allocca :

> more specifically:
>
> Given the following XML data structure:
>
> This is the Structure of the XML file:
>
> xocs:doc
>  |-- xocs:item: struct (nullable = true)
>  ||-- bibrecord: struct (nullable = true)
>  |||-- head: struct (nullable = true)
>  ||||-- abstracts: struct (nullable = true)
>  |||||-- abstract: struct (nullable = true)
>  ||||||-- _original: string (nullable = true)
>  ||||||-- _lang: string (nullable = true)
>  ||||||-- ce:para: string (nullable = true)
>
>
>
> CASE 1:
>
> String rowTag="abstracts”;
> Dataset df = (new XmlReader()).withAttributePrefix("_").
> withRowTag(rowTag).xmlFile(sqlContext, localxml);
> df.select(df.col("abstract.ce:para"), 
> df.col("abstract._original"),df.col("abstract._lang")
> ).show();
>
> *I got the right values. *
>
> CASE 2:
>
> String rowTag="xocs:doc";
> Dataset df = (new XmlReader()).withAttributePrefix("_").
> withRowTag(rowTag).xmlFile(sqlContext, localxml);
> df.select(df.col("xocs:item.item.bibrecord.head.abstracts.
> abstract.ce:para")).show();
>
> *I got null values.*
>
>
> My question is: How Can I get it right to use String rowTag="xocs:doc”;
> and get the right values for  ….abstract.ce:para, etc? what am I doing
> wrong?
>
> Many Thanks in advance.
> Best Regards,
> Carlo
>
>
>
> On 14 Feb 2017, at 17:35, carlo allocca  wrote:
>
> Dear All,
>
> I would like to ask you help about the following issue when using
> spark-xml_2.10:
>
> Given a XML file with the following structure:
>
> xocs:doc
>  |-- xocs:item: struct (nullable = true)
>  ||-- bibrecord: struct (nullable = true)
>  |||-- head: struct (nullable = true)
>  ||||-- abstracts: struct (nullable = true)
>  |||||-- abstract: struct (nullable = true)
>  ||||||-- _original: string (nullable = true)
>  ||||||-- _lang: string (nullable = true)
>  ||||||-- ce:para: string (nullable = true)
>
> Using the below code to extract all the info from the abstract:
>
> 1) I got “null" for each three values: _original, _lang and ce:para when I
> use rowTag = “xocs:doc”.
> 2) I got the right values when I use rowTag = “abstracts”.
>
> Of course, I need to write a general parser that works at xocs:doc level.
> I have been reading the documentation at https://github.com/
> databricks/spark-xml but I did not help me much to solve the above issue.
>
> Am I doing something wrong? or it may be related to bug the library I am
> using?
>
> Please, could you advice?
>
> Many Thanks,
> Best Regards,
> carlo
>
>
>
>
>
> === Code:
> public static void main(String arr[]) {
>
> // xocs:item/item/bibrecord/head/abstracts  section
> StructType _abstract = new StructType(new StructField[]{
> new StructField("_original", DataTypes.StringType, true,
> Metadata.empty()),
> new StructField("_lang", DataTypes.StringType, true,
> Metadata.empty()),
> new StructField("ce:para", DataTypes.StringType, true,
> Metadata.empty())
> });
> StructType _abstracts = new StructType(new StructField[]{
> new StructField("abstract", _abstract, true, Metadata.empty())
> });
>
> StructType _head = new StructType(new StructField[]{
> new StructField("abstracts", _abstracts, true,
> Metadata.empty())
> });
>
> StructType _bibrecord = new StructType(new StructField[]{
> new StructField("head", _head, true, Metadata.empty())
>
> });
>
> StructType _item = new StructType(new StructField[]{
> new StructField("bibrecord", _bibrecord, true,
> Metadata.empty())
> });
>
> StructType _xocs_item = new StructType(new StructField[]{
> new StructField("item", _item, true, Metadata.empty()),});
>
> StructType rexploreXMLDataSchema = new StructType(new
> StructField[]{
> new StructField("xocs:item", _xocs_item, true,
> Metadata.empty()),});
>
> String localxml = “..filename.xml";
>
> SparkSession spark = SparkSession
> .builder()
> .master("local[2]")
> .appName("DatasetForCaseNew")
> .getOrCreate();
>
> String rowTag = "xocs:doc";
>
>
>
> SQLContext sqlContext = new SQLContext(spark);
> Dataset df = sqlContext.read()
> .format("com.databricks.spark.xml")
> .option("rowTag", rowTag)
> 

PySpark: use one column to index another (udf of two columns?)

2017-02-14 Thread apu
Let's say I have a Spark (PySpark) dataframe with the following schema:

root
 |-- myarray: array (nullable = true)
 ||-- element: string (containsNull = true)
 |-- myindices: array (nullable = true)
 ||-- element: integer (containsNull = true)

It looks like:

++--+
|  myarray   | myindices|
++--+
| [A]|[0]   |
|  [B, C]|[1]   |
|[D, E, F, G]|   [0,2]  |
++--+

How can I use the second array to index the first?

My goal is to create a new dataframe which would look like:

++--+--+
|  myarray   | myindices|result|
++--+--+
| [A]|[0]   |  [A] |
|  [B, C]|[1]   |  [C] |
|[D, E, F, G]|   [0,2]  | [D,F]|
++--+--+

(It is safe to assume that the contents of myindices are always guaranteed
to be within the cardinality of myarray for the row in question, so there
are no out-of-bounds problems.)

It appears that the .getItem() method only works with single arguments, so
I might need a UDF here, but I know of no way to create a UDF that has more
than one column as input. Any solutions, with or without UDFs?


streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Mohammad Kargar
As explained here
,
direct approach of integration between spark streaming and kafka does not
update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools
will not show progress (details).  We followed the recommended workaround
to update the zookeeper with the latest offset after each batch, but no
success. Wondering if there's any end-to-end example we can use?

Thanks,
Mohammad


Re: My spark job runs faster in spark 1.6 and much slower in spark 2.0

2017-02-14 Thread Jörn Franke
Can you check in the UI which tasks took most of the time?

Even the 45 min looks a little bit much given that you only work most of the 
time with 50k rows

> On 15 Feb 2017, at 00:03, Timur Shenkao  wrote:
> 
> Hello,
> I'm not sure that's your reason but check this discussion:
> 
> http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-td20803.html
> 
>> On Tue, Feb 14, 2017 at 9:25 PM, anatva  wrote:
>> Hi,
>> I am reading an ORC file, and perform some joins, aggregations and finally
>> generate a dense vector to perform analytics.
>> 
>> The code runs in 45 minutes on spark 1.6 on a 4 node cluster. When the same
>> code is migrated to run on spark 2.0 on the same cluster, it takes around
>> 4-5 hours. It is surprising and frustrating.
>> 
>> Can anyone take a look and help me what should I change in order to get
>> atleast same performance in spark 2.0.
>> 
>> spark-shell --master yarn-client --driver-memory 5G --executor-memory 20G \
>> --num-executors 15 --executor-cores 5 --queue grid_analytics --conf
>> spark.yarn.executor.memoryOverhead=5120 --conf
>> spark.executor.heartbeatInterval=1200
>> 
>> import sqlContext.implicits._
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.sql.functions.lit
>> import org.apache.hadoop._
>> import org.apache.spark.sql.functions.udf
>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>> import org.apache.spark.ml.clustering.KMeans
>> import org.apache.spark.ml.feature.StandardScaler
>> import org.apache.spark.ml.feature.Normalizer
>> 
>> here are the steps:
>> 1. read orc file
>> 2. filter some of the records
>> 3. persist resulting data frame
>> 4. get distinct accounts from the df and get a sample of 50k accts from the
>> distinct list
>> 5. join the above data frame with distinct 50k accounts to pull records for
>> only those 50k accts
>> 6. perform a group by to get the avg, mean, sum, count of readings for the
>> given 50k accts
>> 7. join the df obtained by GROUPBY with original DF
>> 8. convert the resultant DF to an RDD, do a groupbyKey(), and generate a
>> DENSE VECTOR
>> 9. convert RDD back to DF and store it in a parquet file
>> 
>> The above steps worked fine in spark 1.6 but i m not sure why they run
>> painfully long in spark 2.0.
>> 
>> I am using spark 1.6 & spark 2.0 on HDP 2.5.3
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/My-spark-job-runs-faster-in-spark-1-6-and-much-slower-in-spark-2-0-tp28390.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 


Re: My spark job runs faster in spark 1.6 and much slower in spark 2.0

2017-02-14 Thread Timur Shenkao
Hello,
I'm not sure that's your reason but check this discussion:

http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-td20803.html

On Tue, Feb 14, 2017 at 9:25 PM, anatva  wrote:

> Hi,
> I am reading an ORC file, and perform some joins, aggregations and finally
> generate a dense vector to perform analytics.
>
> The code runs in 45 minutes on spark 1.6 on a 4 node cluster. When the same
> code is migrated to run on spark 2.0 on the same cluster, it takes around
> 4-5 hours. It is surprising and frustrating.
>
> Can anyone take a look and help me what should I change in order to get
> atleast same performance in spark 2.0.
>
> spark-shell --master yarn-client --driver-memory 5G --executor-memory 20G \
> --num-executors 15 --executor-cores 5 --queue grid_analytics --conf
> spark.yarn.executor.memoryOverhead=5120 --conf
> spark.executor.heartbeatInterval=1200
>
> import sqlContext.implicits._
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.sql.functions.lit
> import org.apache.hadoop._
> import org.apache.spark.sql.functions.udf
> import org.apache.spark.mllib.linalg.{Vector, Vectors}
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.feature.StandardScaler
> import org.apache.spark.ml.feature.Normalizer
>
> here are the steps:
> 1. read orc file
> 2. filter some of the records
> 3. persist resulting data frame
> 4. get distinct accounts from the df and get a sample of 50k accts from the
> distinct list
> 5. join the above data frame with distinct 50k accounts to pull records for
> only those 50k accts
> 6. perform a group by to get the avg, mean, sum, count of readings for the
> given 50k accts
> 7. join the df obtained by GROUPBY with original DF
> 8. convert the resultant DF to an RDD, do a groupbyKey(), and generate a
> DENSE VECTOR
> 9. convert RDD back to DF and store it in a parquet file
>
> The above steps worked fine in spark 1.6 but i m not sure why they run
> painfully long in spark 2.0.
>
> I am using spark 1.6 & spark 2.0 on HDP 2.5.3
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/My-spark-job-runs-faster-in-
> spark-1-6-and-much-slower-in-spark-2-0-tp28390.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: fault tolerant dataframe write with overwrite

2017-02-14 Thread Jörn Franke
If you use S3 you can first copy it into a temporary folder on HDFS. However 
for the checkpointing I would use the spark implementation. You can load also 
the file from S3 and checkpoint to HDFS.

> On 14 Feb 2017, at 17:43, Mendelson, Assaf  wrote:
> 
> Thanks, I didn’t know the Hadoop API supports other file systems other than 
> HDFS and local file system (when there is 1 node).
> My main goal is indeed for checkpointing, every N iterations I save the data 
> for future use. The problem is that if I use overwrite mode then it first 
> deletes and then write the new one so that is what I am looking to solve.
>  
> I wasn’t aware of the issues with renaming in S3 (we currently not using it, 
> we just know we would probably need to support it or a similar store in the 
> future). That said, how does spark handle this case then when writing a 
> dataframe? Currently it writes everything to a temporary sub directory and 
> renames it at the end?
>  
> In any case, I was hoping for some way internal to spark to do a write which 
> does not harm the previous version of the dataframe on disk until a 
> successful writing of the new one.
> Thanks,
> Assaf.
>  
>  
> From: Steve Loughran [mailto:ste...@hortonworks.com] 
> Sent: Tuesday, February 14, 2017 3:25 PM
> To: Mendelson, Assaf
> Cc: Jörn Franke; user
> Subject: Re: fault tolerant dataframe write with overwrite
>  
>  
> On 14 Feb 2017, at 11:12, Mendelson, Assaf  wrote:
>  
> I know how to get the filesystem, the problem is that this means using Hadoop 
> directly so if in the future we change to something else (e.g. S3) I would 
> need to rewrite the code.
>  
> well, no, because the s3 and hfs clients use the same API
>  
> FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)
>  
> vs
>  
> FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)
>  
> same for wasb://  (which, being consistent and with fast atomic rename, can 
> be used instead of HDFS), other cluster filesystems. If it's a native fs, 
> then file:// should work everywhere, or some derivative (as redhat do with 
> gluster)
> 
> 
> This also relate to finding the last iteration, I would need to use Hadoop 
> filesystem which is not agnostic to the deployment.
>  
>  
> see above. if you are using a spark cluster of size > 1 you will need some 
> distributed filesystem, which is going to have to provide a
>  
> If there is an issue here, it is that if you rely on FileSystem.rename() 
> being an atomic O(1) operation then you are going to be disappointed on S3, 
> as its a non-atomic O(data) copy & delete whose failure state is "undefined". 
>  
>  
> The solution here comes from having specific commiter logic for the different 
> object stores. You really, really don' t want to go there. If you do, have a 
> start by looking at the S3guard WiP one: 
> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md
>  
> further reading: 
> http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores
> 
> 
> Kyroserializer still costs much more than dataframe write.
>  
> As for the use case, I am doing a very large number of iterations. So the 
> idea is that every X iterations I want to save to disk so that if something 
> crashes I do not have to begin from the first iteration but just from the 
> relevant iteration.
>  
>  
> sounds like you don't really want the output to always be the FS, more 
> checkpointing iterations. Couldn't you do something like every 20 iterations, 
> write() the relevant RDD to the DFS
>  
>  
> Basically I would have liked to see something like saving normally and the 
> original data would not be removed until a successful write.
> Assaf.
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Tuesday, February 14, 2017 12:54 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: fault tolerant dataframe write with overwrite
>  
> Normally you can fetch the filesystem interface from the configuration ( I 
> assume you mean URI).
> Managing to get the last iteration: I do not understand the issue. You can 
> have as the directory the current timestamp and at the end you simply select 
> the directory with the highest number.
>  
> Regards to checkpointing , you can use also kyroserializer to avoid some 
> space overhead.
>  
> Aside from that, can you elaborate on the use case why you need to write 
> every iteration?
> 
> On 14 Feb 2017, at 11:22, Mendelson, Assaf  wrote:
> 
> Hi,
>  
> I have a case where I have an iterative process which overwrites the results 
> of a previous iteration.
> Every iteration I need to write a dataframe with the results.
> The problem is that when I write, if I simply overwrite the results of the 
> previous iteration, this is not fault tolerant. i.e. if the program crashes 
> in the middle of an iteration, the 

My spark job runs faster in spark 1.6 and much slower in spark 2.0

2017-02-14 Thread anatva
Hi,
I am reading an ORC file, and perform some joins, aggregations and finally
generate a dense vector to perform analytics.

The code runs in 45 minutes on spark 1.6 on a 4 node cluster. When the same
code is migrated to run on spark 2.0 on the same cluster, it takes around
4-5 hours. It is surprising and frustrating.

Can anyone take a look and help me what should I change in order to get
atleast same performance in spark 2.0.

spark-shell --master yarn-client --driver-memory 5G --executor-memory 20G \
--num-executors 15 --executor-cores 5 --queue grid_analytics --conf
spark.yarn.executor.memoryOverhead=5120 --conf
spark.executor.heartbeatInterval=1200

import sqlContext.implicits._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions.lit
import org.apache.hadoop._
import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.Normalizer

here are the steps:
1. read orc file
2. filter some of the records
3. persist resulting data frame
4. get distinct accounts from the df and get a sample of 50k accts from the
distinct list
5. join the above data frame with distinct 50k accounts to pull records for
only those 50k accts
6. perform a group by to get the avg, mean, sum, count of readings for the
given 50k accts
7. join the df obtained by GROUPBY with original DF
8. convert the resultant DF to an RDD, do a groupbyKey(), and generate a
DENSE VECTOR
9. convert RDD back to DF and store it in a parquet file

The above steps worked fine in spark 1.6 but i m not sure why they run
painfully long in spark 2.0.

I am using spark 1.6 & spark 2.0 on HDP 2.5.3




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/My-spark-job-runs-faster-in-spark-1-6-and-much-slower-in-spark-2-0-tp28390.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: using spark-xml_2.10 to extract data from XML file

2017-02-14 Thread Carlo . Allocca
more specifically:

Given the following XML data structure:

This is the Structure of the XML file:

xocs:doc
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)
 ||||||-- ce:para: string (nullable = true)


CASE 1:

String rowTag="abstracts”;
Dataset df = (new 
XmlReader()).withAttributePrefix("_").withRowTag(rowTag).xmlFile(sqlContext, 
localxml);
df.select(df.col("abstract.ce:para"), 
df.col("abstract._original"),df.col("abstract._lang") ).show();

I got the right values.

CASE 2:

String rowTag="xocs:doc";
Dataset df = (new 
XmlReader()).withAttributePrefix("_").withRowTag(rowTag).xmlFile(sqlContext, 
localxml);
df.select(df.col("xocs:item.item.bibrecord.head.abstracts.abstract.ce:para")).show();

I got null values.


My question is: How Can I get it right to use String rowTag="xocs:doc”; and get 
the right values for  ….abstract.ce:para, etc? what am I doing wrong?

Many Thanks in advance.
Best Regards,
Carlo



On 14 Feb 2017, at 17:35, carlo allocca 
> wrote:

Dear All,

I would like to ask you help about the following issue when using 
spark-xml_2.10:

Given a XML file with the following structure:

xocs:doc
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)
 ||||||-- ce:para: string (nullable = true)

Using the below code to extract all the info from the abstract:

1) I got “null" for each three values: _original, _lang and ce:para when I use 
rowTag = “xocs:doc”.
2) I got the right values when I use rowTag = “abstracts”.

Of course, I need to write a general parser that works at xocs:doc level.
I have been reading the documentation at 
https://github.com/databricks/spark-xml but I did not help me much to solve the 
above issue.

Am I doing something wrong? or it may be related to bug the library I am using?

Please, could you advice?

Many Thanks,
Best Regards,
carlo





=== Code:
public static void main(String arr[]) {

// xocs:item/item/bibrecord/head/abstracts  section
StructType _abstract = new StructType(new StructField[]{
new StructField("_original", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("_lang", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("ce:para", DataTypes.StringType, true, 
Metadata.empty())
});
StructType _abstracts = new StructType(new StructField[]{
new StructField("abstract", _abstract, true, Metadata.empty())
});

StructType _head = new StructType(new StructField[]{
new StructField("abstracts", _abstracts, true, Metadata.empty())
});

StructType _bibrecord = new StructType(new StructField[]{
new StructField("head", _head, true, Metadata.empty())

});

StructType _item = new StructType(new StructField[]{
new StructField("bibrecord", _bibrecord, true, Metadata.empty())
});

StructType _xocs_item = new StructType(new StructField[]{
new StructField("item", _item, true, Metadata.empty()),});

StructType rexploreXMLDataSchema = new StructType(new StructField[]{
new StructField("xocs:item", _xocs_item, true, Metadata.empty()),});

String localxml = “..filename.xml";

SparkSession spark = SparkSession
.builder()
.master("local[2]")
.appName("DatasetForCaseNew")
.getOrCreate();

String rowTag = "xocs:doc";



SQLContext sqlContext = new SQLContext(spark);
Dataset df = sqlContext.read()
.format("com.databricks.spark.xml")
.option("rowTag", rowTag)
.option("attributePrefix", "_")
.schema(rexploreXMLDataSchema)
.load(localxml);

df.printSchema();

df.select(

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("_original"),

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getItem("_lang"),

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("ce:para")
  

Re: using spark-xml_2.10 to extract data from XML file

2017-02-14 Thread Carlo . Allocca
Dear All,

I would like to ask you help about the following issue when using 
spark-xml_2.10:

Given a XML file with the following structure:

xocs:doc
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)
 ||||||-- ce:para: string (nullable = true)

Using the below code to extract all the info from the abstract:

1) I got “null" for each three values: _original, _lang and ce:para when I use 
rowTag = “xocs:doc”.
2) I got the right values when I use rowTag = “abstracts”.

Of course, I need to write a general parser that works at xocs:doc level.
I have been reading the documentation at 
https://github.com/databricks/spark-xml but I did not help me much to solve the 
above issue.

Am I doing something wrong? or it may be related to bug the library I am using?

Please, could you advice?

Many Thanks,
Best Regards,
carlo





=== Code:
public static void main(String arr[]) {

// xocs:item/item/bibrecord/head/abstracts  section
StructType _abstract = new StructType(new StructField[]{
new StructField("_original", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("_lang", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("ce:para", DataTypes.StringType, true, 
Metadata.empty())
});
StructType _abstracts = new StructType(new StructField[]{
new StructField("abstract", _abstract, true, Metadata.empty())
});

StructType _head = new StructType(new StructField[]{
new StructField("abstracts", _abstracts, true, Metadata.empty())
});

StructType _bibrecord = new StructType(new StructField[]{
new StructField("head", _head, true, Metadata.empty())

});

StructType _item = new StructType(new StructField[]{
new StructField("bibrecord", _bibrecord, true, Metadata.empty())
});

StructType _xocs_item = new StructType(new StructField[]{
new StructField("item", _item, true, Metadata.empty()),});

StructType rexploreXMLDataSchema = new StructType(new StructField[]{
new StructField("xocs:item", _xocs_item, true, Metadata.empty()),});

String localxml = “..filename.xml";

SparkSession spark = SparkSession
.builder()
.master("local[2]")
.appName("DatasetForCaseNew")
.getOrCreate();

String rowTag = "xocs:doc";



SQLContext sqlContext = new SQLContext(spark);
Dataset df = sqlContext.read()
.format("com.databricks.spark.xml")
.option("rowTag", rowTag)
.option("attributePrefix", "_")
.schema(rexploreXMLDataSchema)
.load(localxml);

df.printSchema();

df.select(

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("_original"),

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getItem("_lang"),

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("ce:para")
).show();

//df.select(
//df.col("_original"),
//df.col("_lang"),
//df.col("ce:para")
//
//).show();

//df.select(
//df.col("abstract").getField("_original"),
//df.col("abstract").getField("_lang"),
//df.col("abstract").getField("ce:para")
//
//).show();


//df.select(
//
df.col("head").getField("abstracts").getField("abstract").getField("_original"),
//
df.col("head").getField("abstracts").getField("abstract").getField("_lang"),
//
df.col("head").getField("abstracts").getField("abstract").getField("ce:para")
//
//).show();




}




On 13 Feb 2017, at 18:17, Carlo.Allocca 
> wrote:

Dear All,

I am using spark-xml_2.10 to parse and extract some data from XML files.

I got the issue of getting null value whereas the XML file contains actually 
values.

++--++
|xocs:item.bibrecord.head.abstracts.abstract._original 

Re: Strange behavior with 'not' and filter pushdown

2017-02-14 Thread Everett Anderson
Wrapping this up -- fix is in 2.1.0 and has been backported to the 2.0.x
branch, as well.

On Mon, Feb 13, 2017 at 6:41 PM, Everett Anderson  wrote:

> Went ahead and opened
>
> https://issues.apache.org/jira/browse/SPARK-19586
>
> though I'd generally expect to just close it as fixed in 2.1.0 and roll on.
>
> On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson 
> wrote:
>
>> On the plus side, looks like this may be fixed in 2.1.0:
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>+- *HashAggregate(keys=[], functions=[partial_count(1)])
>>   +- *Project
>>  +- *Filter NOT isnotnull(username#14)
>> +- *FileScan parquet [username#14] Batched: true, Format:
>> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
>> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
>> ReadSchema: struct
>>
>>
>>
>> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson 
>> wrote:
>>
>>> Bumping this thread.
>>>
>>> Translating "where not(username is not null)" into a filter of  
>>> [IsNotNull(username),
>>> Not(IsNotNull(username))] seems like a rather severe bug.
>>>
>>> Spark 1.6.2:
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Final,isDistinct=false)],
>>> output=[_c0#1822L])
>>> +- TungstenExchange SinglePartition, None
>>>  +- TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Partial,isDistinct=false)],
>>> output=[count#1825L])
>>>  +- Project
>>>  +- Filter NOT isnotnull(username#1590)
>>>  +- Scan ParquetRelation[username#1590] InputPaths: ,
>>> PushedFilters: [Not(IsNotNull(username))]
>>>
>>> Spark 2.0.2
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)])
>>> +- Exchange SinglePartition
>>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>>  +- *Project
>>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>>  +- *BatchedScan parquet default.[username#35] Format:
>>> ParquetFormat, InputPaths: , PartitionFilters: [],
>>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>>> ReadSchema: struct
>>>
>>> Example to generate the above:
>>>
>>> // Create some fake data
>>>
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.Dataset
>>> import org.apache.spark.sql.types._
>>>
>>> val rowsRDD = sc.parallelize(Seq(
>>> Row(1, "fred"),
>>> Row(2, "amy"),
>>> Row(3, null)))
>>>
>>> val schema = StructType(Seq(
>>> StructField("id", IntegerType, nullable = true),
>>> StructField("username", StringType, nullable = true)))
>>>
>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>
>>> val path = "SOME PATH HERE"
>>>
>>> data.write.mode("overwrite").parquet(path)
>>>
>>> val testData = sqlContext.read.parquet(path)
>>>
>>> testData.registerTempTable("filter_test_table")
>>>
>>>
>>> %sql
>>> explain select count(*) from filter_test_table where not( username is
>>> not null)
>>>
>>>
>>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>>> akosti...@nuna.com.invalid> wrote:
>>>
 Hi,

 I have an application where I’m filtering data with SparkSQL with
 simple WHERE clauses. I also want the ability to show the unmatched rows
 for any filter, and so am wrapping the previous clause in `NOT()` to get
 the inverse. Example:

 Filter:  username is not null
 Inverse filter:  NOT(username is not null)

 This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
 inverse filter always returns zero results. It looks like this is a problem
 with how the filter is getting pushed down to Parquet. Specifically, the
 pushdown includes both the “is not null” filter, AND “not(is not null)”,
 which would obviously result in zero matches. An example below:

 pyspark:
 > x = spark.sql('select my_id from my_table where *username is not
 null*')
 > y = spark.sql('select my_id from my_table where not(*username is not
 null*)')

 > x.explain()
 == Physical Plan ==
 *Project [my_id#6L]
 +- *Filter isnotnull(username#91)
+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
PartitionFilters: [], PushedFilters: [IsNotNull(username)],
ReadSchema: struct
 [1159]> y.explain()
 == Physical Plan ==
 *Project [my_id#6L]
 +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
 e
+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
PartitionFilters: [],
PushedFilters: 

RE: fault tolerant dataframe write with overwrite

2017-02-14 Thread Mendelson, Assaf
Thanks, I didn’t know the Hadoop API supports other file systems other than 
HDFS and local file system (when there is 1 node).
My main goal is indeed for checkpointing, every N iterations I save the data 
for future use. The problem is that if I use overwrite mode then it first 
deletes and then write the new one so that is what I am looking to solve.

I wasn’t aware of the issues with renaming in S3 (we currently not using it, we 
just know we would probably need to support it or a similar store in the 
future). That said, how does spark handle this case then when writing a 
dataframe? Currently it writes everything to a temporary sub directory and 
renames it at the end?

In any case, I was hoping for some way internal to spark to do a write which 
does not harm the previous version of the dataframe on disk until a successful 
writing of the new one.
Thanks,
Assaf.


From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Tuesday, February 14, 2017 3:25 PM
To: Mendelson, Assaf
Cc: Jörn Franke; user
Subject: Re: fault tolerant dataframe write with overwrite


On 14 Feb 2017, at 11:12, Mendelson, Assaf 
> wrote:

I know how to get the filesystem, the problem is that this means using Hadoop 
directly so if in the future we change to something else (e.g. S3) I would need 
to rewrite the code.

well, no, because the s3 and hfs clients use the same API

FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)

vs

FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)

same for wasb://  (which, being consistent and with fast atomic rename, can be 
used instead of HDFS), other cluster filesystems. If it's a native fs, then 
file:// should work everywhere, or some derivative (as redhat do with gluster)


This also relate to finding the last iteration, I would need to use Hadoop 
filesystem which is not agnostic to the deployment.


see above. if you are using a spark cluster of size > 1 you will need some 
distributed filesystem, which is going to have to provide a

If there is an issue here, it is that if you rely on FileSystem.rename() being 
an atomic O(1) operation then you are going to be disappointed on S3, as its a 
non-atomic O(data) copy & delete whose failure state is "undefined".


The solution here comes from having specific commiter logic for the different 
object stores. You really, really don' t want to go there. If you do, have a 
start by looking at the S3guard WiP one: 
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

further reading: 
http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores


Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea 
is that every X iterations I want to save to disk so that if something crashes 
I do not have to begin from the first iteration but just from the relevant 
iteration.


sounds like you don't really want the output to always be the FS, more 
checkpointing iterations. Couldn't you do something like every 20 iterations, 
write() the relevant RDD to the DFS


Basically I would have liked to see something like saving normally and the 
original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I 
assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have 
as the directory the current timestamp and at the end you simply select the 
directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space 
overhead.

Aside from that, can you elaborate on the use case why you need to write every 
iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf 
> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven’t fully tested the process 
there), however, checkpointing converts the 

Re: Parquet Gzipped Files

2017-02-14 Thread Benjamin Kim
Jörn,

I agree with you, but the vendor is a little difficult to work with. For now, I 
will try to decompress it from S3 and save it plainly into HDFS. If someone 
already has this example, please let me know.

Cheers,
Ben


> On Feb 13, 2017, at 9:50 AM, Jörn Franke  wrote:
> 
> Your vendor should use the parquet internal compression and not take a 
> parquet file and gzip it.
> 
>> On 13 Feb 2017, at 18:48, Benjamin Kim  wrote:
>> 
>> We are receiving files from an outside vendor who creates a Parquet data 
>> file and Gzips it before delivery. Does anyone know how to Gunzip the file 
>> in Spark and inject the Parquet data into a DataFrame? I thought using 
>> sc.textFile or sc.wholeTextFiles would automatically Gunzip the file, but 
>> I’m getting a decompression header error when trying to open the Parquet 
>> file.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: How to specify default value for StructField?

2017-02-14 Thread Begar, Veena
Thanks, it didn't work. Because, the folder has files from 2 different schemas. 
It fails with the following exception:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];


-Original Message-
From: smartzjp [mailto:zjp_j...@163.com] 
Sent: Tuesday, February 14, 2017 10:32 AM
To: Begar, Veena ; user@spark.apache.org
Subject: Re: How to specify default value for StructField?

You can try the below code.

val df = spark.read.format("orc").load("/user/hos/orc_files_test_together")
df.select(“f1”,”f2”).show





在 2017/2/14 

Re: How to specify default value for StructField?

2017-02-14 Thread smartzjp
You can try the below code.

val df = spark.read.format("orc").load("/user/hos/orc_files_test_together")
df.select(“f1”,”f2”).show





在 2017/2/14 

Reusing HBase connection in transformations

2017-02-14 Thread DandyDev
Hi!

I'm struggling with the following problem: I have a couple of Spark
Streaming jobs that keep state (using mapWithState, and in one case
updateStateByKey) and write their results to HBase. One of the Streaming
jobs, needs the results that the other Streaming job writes to HBase.
How it's currently implemented, is that within the state function, data is
read from HBase that is used in calculations. The drawback, is that for each
time the state function is called, a new connection is opened to HBase.
In the Spark Streaming guide, it is suggested that you reuse the same
connection within one partition, but this applies only to /actions/ (ie.
foreachRDD). How would you do it for transformations (like mapWithState)?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reusing-HBase-connection-in-transformations-tp28389.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Lost executor 4 Container killed by YARN for exceeding memory limits.

2017-02-14 Thread Thakrar, Jayesh
Nancy,

As the message from Spark indicates, spark.shuffle.memoryFraction is no longer 
used.
It’s a unified heap space for both data caching and other things.
Also, the previous 11 GB was not sufficient, and you are making the executor 
memory even smaller, not sure how it will work.

From: nancy henry 
Date: Tuesday, February 14, 2017 at 1:04 AM
To: Conversant 
Cc: Jon Gregg , "user @spark" 
Subject: Re: Lost executor 4 Container killed by YARN for exceeding memory 
limits.

Hi,


How to set this parameters while launching spark shell

spark.shuffle.memoryFraction=0.5

and

spark.yarn.executor.memoryOverhead=1024

I tried giving like this but I am giving below error

spark-shell --master yarn --deploy-mode client --driver-memory 16G 
--num-executors 500 executor-cores 4 --executor-memory 7G --conf 
spark.shuffle.memoryFraction=0.5 --conf spark.yarn.executor.memoryOverhead=1024

Warning
17/02/13 22:42:02 WARN SparkConf: Detected deprecated memory fraction settings: 
[spark.shuffle.memoryFraction]. As of Spark 1.6, execution and storage memory 
management are unified. All memory fractions used in the old model are now 
deprecated and no longer read. If you wish to use the old memory management, 
you may explicitly enable `spark.memory.useLegacyMode` (not recommended).



On Mon, Feb 13, 2017 at 11:23 PM, Thakrar, Jayesh 
> wrote:
Nancy,

As your log output indicated, your executor 11 GB memory limit.
While you might want to address the root cause/data volume as suggested by Jon, 
you can do an immediate test by changing your command as follows

spark-shell --master yarn --deploy-mode client --driver-memory 16G 
--num-executors 500 executor-cores 7 --executor-memory 14G

This essentially increases your executor memory from 11 GB to 14 GB.
Note that it will result in a potentially large footprint - from 500x11 to 
500x14 GB.
You may want to consult with your DevOps/Operations/Spark Admin team first.

From: Jon Gregg >
Date: Monday, February 13, 2017 at 8:58 AM
To: nancy henry >
Cc: "user @spark" >
Subject: Re: Lost executor 4 Container killed by YARN for exceeding memory 
limits.

Setting Spark's memoryOverhead configuration variable is recommended in your 
logs, and has helped me with these issues in the past.  Search for 
"memoryOverhead" here:  http://spark.apache.org/docs/latest/running-on-yarn.html

That said, you're running on a huge cluster as it is.  If it's possible to 
filter your tables down before the join (keeping just the rows/columns you 
need), that may be a better solution.

Jon

On Mon, Feb 13, 2017 at 5:27 AM, nancy henry 
> wrote:
Hi All,,

I am getting below error while I am trying to join 3 tables which are in ORC 
format in hive from 5 10gb tables through hive context in spark

Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB physical 
memory used. Consider boosting spark.yarn.executor.memoryOverhead.
17/02/13 02:21:19 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container 
killed by YARN for exceeding memory limits. 11.1 GB of 11 GB physical memory 
used


I am using below memory parameters to launch shell .. what else i could 
increase from these parameters or do I need to change any configuration 
settings please let me know

spark-shell --master yarn --deploy-mode client --driver-memory 16G 
--num-executors 500 executor-cores 7 --executor-memory 10G





Re: Dealing with missing columns in SPARK SQL in JSON

2017-02-14 Thread Sam Elamin
ah if thats the case then you might need to define the schema before hand.
Either that or if you want to infer it then ensure a jsonfile exists with
the right schema so spark infers the right columns

essentially making both files one dataframe if that makes sense

On Tue, Feb 14, 2017 at 3:04 PM, Aseem Bansal  wrote:

> Sorry if I trivialized the example. It is the same kind of file and
> sometimes it could have "a", sometimes "b", sometimes both. I just don't
> know. That is what I meant by missing columns.
>
> It would be good if I read any of the JSON and if I do spark sql and it
> gave me
>
> for json1.json
>
> a | b
> 1 | null
>
> for json2.json
>
> a | b
> null | 2
>
>
> On Tue, Feb 14, 2017 at 8:13 PM, Sam Elamin 
> wrote:
>
>> I may be missing something super obvious here but can't you combine them
>> into a single dataframe. Left join perhaps?
>>
>> Try writing it in sql " select a from json1 and b from josn2"then run
>> explain to give you a hint to how to do it in code
>>
>> Regards
>> Sam
>> On Tue, 14 Feb 2017 at 14:30, Aseem Bansal  wrote:
>>
>>> Say I have two files containing single rows
>>>
>>> json1.json
>>>
>>> {"a": 1}
>>>
>>> json2.json
>>>
>>> {"b": 2}
>>>
>>> I read in this json file using spark's API into a dataframe one at a
>>> time. So I have
>>>
>>> Dataset json1DF
>>> and
>>> Dataset json2DF
>>>
>>> If I run "select a, b from __THIS__" in a SQLTransformer then I will get
>>> an exception as for json1DF does not have "b" and json2DF does not have "a"
>>>
>>> How could I handle this situation with missing columns in JSON?
>>>
>>
>


Re: Dealing with missing columns in SPARK SQL in JSON

2017-02-14 Thread Aseem Bansal
Sorry if I trivialized the example. It is the same kind of file and
sometimes it could have "a", sometimes "b", sometimes both. I just don't
know. That is what I meant by missing columns.

It would be good if I read any of the JSON and if I do spark sql and it
gave me

for json1.json

a | b
1 | null

for json2.json

a | b
null | 2


On Tue, Feb 14, 2017 at 8:13 PM, Sam Elamin  wrote:

> I may be missing something super obvious here but can't you combine them
> into a single dataframe. Left join perhaps?
>
> Try writing it in sql " select a from json1 and b from josn2"then run
> explain to give you a hint to how to do it in code
>
> Regards
> Sam
> On Tue, 14 Feb 2017 at 14:30, Aseem Bansal  wrote:
>
>> Say I have two files containing single rows
>>
>> json1.json
>>
>> {"a": 1}
>>
>> json2.json
>>
>> {"b": 2}
>>
>> I read in this json file using spark's API into a dataframe one at a
>> time. So I have
>>
>> Dataset json1DF
>> and
>> Dataset json2DF
>>
>> If I run "select a, b from __THIS__" in a SQLTransformer then I will get
>> an exception as for json1DF does not have "b" and json2DF does not have "a"
>>
>> How could I handle this situation with missing columns in JSON?
>>
>


Re: Handling Skewness and Heterogeneity

2017-02-14 Thread Galen Marchetti
Anis,

If your random partitions are smaller than your smallest machine, and you
request executors for your spark jobs no larger than your smallest machine,
then spark/cluster manager will automatically assign many executors to your
larger machines.

As long as you request small executors, you will utilize your large boxes
effectively because they will run many more executors simultaneously than
the small boxes do.

On Tue, Feb 14, 2017 at 5:09 PM, Anis Nasir  wrote:

> Thank you very much for your reply.
>
> I guess this approach balances the load across the cluster of machines.
>
> However, I am looking for something for heterogeneous cluster for which
> the distribution is not known in prior.
>
> Cheers,
> Anis
>
>
> On Tue, 14 Feb 2017 at 20:19, Galen Marchetti 
> wrote:
>
>> Anis,
>>
>> I've typically seen people handle skew by seeding the keys corresponding
>> to high volumes with random values, then partitioning the dataset based on
>> the original key *and* the random value, then reducing.
>>
>> Ex: ( ,  ) -> ( ,
>> ,  )
>>
>> This transformation reduces the size of the huge partition, making it
>> tenable for spark, as long as you can figure out logic for aggregating the
>> results of the seeded partitions together again.
>>
>> On Tue, Feb 14, 2017 at 12:01 PM, Anis Nasir  wrote:
>>
>> Dear All,
>>
>> I have few use cases for spark streaming where spark cluster consist of
>> heterogenous machines.
>>
>> Additionally, there is skew present in both the input distribution (e.g.,
>> each tuple is drawn from a zipf distribution) and the service time (e.g.,
>> service time required for each tuple comes from a zipf distribution).
>>
>> I want to know who spark will handle such use cases.
>>
>> Any help will be highly appreciated!
>>
>>
>> Regards,
>> Anis
>>
>>
>>
>>
>>


Re: Dealing with missing columns in SPARK SQL in JSON

2017-02-14 Thread Sam Elamin
I may be missing something super obvious here but can't you combine them
into a single dataframe. Left join perhaps?

Try writing it in sql " select a from json1 and b from josn2"then run
explain to give you a hint to how to do it in code

Regards
Sam
On Tue, 14 Feb 2017 at 14:30, Aseem Bansal  wrote:

> Say I have two files containing single rows
>
> json1.json
>
> {"a": 1}
>
> json2.json
>
> {"b": 2}
>
> I read in this json file using spark's API into a dataframe one at a time.
> So I have
>
> Dataset json1DF
> and
> Dataset json2DF
>
> If I run "select a, b from __THIS__" in a SQLTransformer then I will get
> an exception as for json1DF does not have "b" and json2DF does not have "a"
>
> How could I handle this situation with missing columns in JSON?
>


Dealing with missing columns in SPARK SQL in JSON

2017-02-14 Thread Aseem Bansal
Say I have two files containing single rows

json1.json

{"a": 1}

json2.json

{"b": 2}

I read in this json file using spark's API into a dataframe one at a time.
So I have

Dataset json1DF
and
Dataset json2DF

If I run "select a, b from __THIS__" in a SQLTransformer then I will get an
exception as for json1DF does not have "b" and json2DF does not have "a"

How could I handle this situation with missing columns in JSON?


HiveContext on Spark 1.6 Linkage Error:ClassCastException

2017-02-14 Thread Enrico DUrso
Hello guys,
hope all of you are ok.
I am trying to use HiveContext on Spark 1.6, I am developing using Eclipse and 
I placed the hive-site.xml in the classPath, so doing I use the Hive instance 
running on my cluster instead
of creating a local metastore and a local warehouse.
So far so good, in this scenario select * and insert into query work ok, but 
the problem arise when trying to drop table and/or create new ones.
Provided that is not a permission problem, my issue is:
ClassCastException: attempting to cast jar 
file://.../com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar!javax/ws/rs/ext/RunTimeDelegate.class
 to jar cast jar 
file://.../com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar!javax/ws/rs/ext/RunTimeDelegate.class.

As you can see, it is attempting to cast the same jar, and it throws the 
exception, I think because the same jar has been loaded before from a different 
classloader, in fact one is loaded by
org.apache.spark.sql.hive.client.IsolatedClientLoader and the other one by 
sun.misc.Launcher.$AppClassLoader.

Any suggestion to fix this issue? The same happens when building the jar and 
running it with spark-submit (yarn RM).

Cheers,

best



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


HiveContext on Spark 1.6 Linkage Error:ClassCastException

2017-02-14 Thread Enrico DUrso


Hello guys,
hope all of you are ok.
I am trying to use HiveContext on Spark 1.6, I am developing using Eclipse and 
I placed the hive-site.xml in the classPath, so doing I use the Hive instance 
running on my cluster instead
of creating a local metastore and a local warehouse.
So far so good, in this scenario select * and insert into query work ok, but 
the problem arise when trying to drop table and/or create new ones.
Provided that is not a permission problem, my issue is:
ClassCastException: attempting to cast jar 
file://.../com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar!javax/ws/rs/ext/RunTimeDelegate.class
 to jar cast jar 
file://.../com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar!javax/ws/rs/ext/RunTimeDelegate.class.

As you can see, it is attempting to cast the same jar, and it throws the 
exception, I think because the same jar has been loaded before from a different 
classloader, in fact one is loaded by
org.apache.spark.sql.hive.client.IsolatedClientLoader and the other one by 
sun.misc.Launcher.$AppClassLoader.

Any suggestion to fix this issue? The same happens when building the jar and 
running it with spark-submit (yarn RM).

Cheers,

best



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


NoSuchMethodException: org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions writing to Hive

2017-02-14 Thread nimrodo
Hi,

I'm trying to write a DataFrame to a Hive partitioned table. This works fine
from spark-shell, however when I use spark-submit i get the following
exception:

Exception in thread "main" java.lang.NoSuchMethodException:
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(org.apache.hadoop.fs.Path,
java.lang.String, java.util.Map, boolean, int, boolean, boolean, boolean)
at java.lang.Class.getMethod(Class.java:1665)
at
org.apache.spark.sql.hive.client.Shim.findMethod(HiveShim.scala:114)
at
org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitionsMethod$lzycompute(HiveShim.scala:404)
at
org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitionsMethod(HiveShim.scala:403)
at
org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitions(HiveShim.scala:455)
at
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(ClientWrapper.scala:562)
at
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply(ClientWrapper.scala:562)
at
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply(ClientWrapper.scala:562)
at
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:281)
at
org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:228)
at
org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:227)
at
org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:270)
at
org.apache.spark.sql.hive.client.ClientWrapper.loadDynamicPartitions(ClientWrapper.scala:561)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:225)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:127)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:276)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at
org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
at com.pelephone.TrueCallLoader$.main(TrueCallLoader.scala:175)
at com.pelephone.TrueCallLoader.main(TrueCallLoader.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Can you help me finding the problem?

Nimrod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodException-org-apache-hadoop-hive-ql-metadata-Hive-loadDynamicPartitions-writing-to-Hive-tp28388.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: fault tolerant dataframe write with overwrite

2017-02-14 Thread Steve Loughran

On 14 Feb 2017, at 11:12, Mendelson, Assaf 
> wrote:

I know how to get the filesystem, the problem is that this means using Hadoop 
directly so if in the future we change to something else (e.g. S3) I would need 
to rewrite the code.

well, no, because the s3 and hfs clients use the same API

FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)

vs

FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)

same for wasb://  (which, being consistent and with fast atomic rename, can be 
used instead of HDFS), other cluster filesystems. If it's a native fs, then 
file:// should work everywhere, or some derivative (as redhat do with gluster)

This also relate to finding the last iteration, I would need to use Hadoop 
filesystem which is not agnostic to the deployment.


see above. if you are using a spark cluster of size > 1 you will need some 
distributed filesystem, which is going to have to provide a

If there is an issue here, it is that if you rely on FileSystem.rename() being 
an atomic O(1) operation then you are going to be disappointed on S3, as its a 
non-atomic O(data) copy & delete whose failure state is "undefined".


The solution here comes from having specific commiter logic for the different 
object stores. You really, really don' t want to go there. If you do, have a 
start by looking at the S3guard WiP one: 
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

further reading: 
http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores

Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea 
is that every X iterations I want to save to disk so that if something crashes 
I do not have to begin from the first iteration but just from the relevant 
iteration.


sounds like you don't really want the output to always be the FS, more 
checkpointing iterations. Couldn't you do something like every 20 iterations, 
write() the relevant RDD to the DFS


Basically I would have liked to see something like saving normally and the 
original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I 
assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have 
as the directory the current timestamp and at the end you simply select the 
directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space 
overhead.

Aside from that, can you elaborate on the use case why you need to write every 
iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf 
> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven’t fully tested the process 
there), however, checkpointing converts the result to RDD which both takes more 
time and more space.
I was wondering if there is any efficient method of managing this from inside 
spark.
Thanks,
Assaf.



Re: Handling Skewness and Heterogeneity

2017-02-14 Thread Anis Nasir
Thank you very much for your reply.

I guess this approach balances the load across the cluster of machines.

However, I am looking for something for heterogeneous cluster for which the
distribution is not known in prior.

Cheers,
Anis


On Tue, 14 Feb 2017 at 20:19, Galen Marchetti 
wrote:

> Anis,
>
> I've typically seen people handle skew by seeding the keys corresponding
> to high volumes with random values, then partitioning the dataset based on
> the original key *and* the random value, then reducing.
>
> Ex: ( ,  ) -> ( ,
> ,  )
>
> This transformation reduces the size of the huge partition, making it
> tenable for spark, as long as you can figure out logic for aggregating the
> results of the seeded partitions together again.
>
> On Tue, Feb 14, 2017 at 12:01 PM, Anis Nasir  wrote:
>
> Dear All,
>
> I have few use cases for spark streaming where spark cluster consist of
> heterogenous machines.
>
> Additionally, there is skew present in both the input distribution (e.g.,
> each tuple is drawn from a zipf distribution) and the service time (e.g.,
> service time required for each tuple comes from a zipf distribution).
>
> I want to know who spark will handle such use cases.
>
> Any help will be highly appreciated!
>
>
> Regards,
> Anis
>
>
>
>
>


Re: how to fix the order of data

2017-02-14 Thread ??????????
IT works well now, thanks

---Original---
From: "Sam Elamin"
Date: 2017/2/14 19:54:36
To: "??"<1427357...@qq.com>;
Cc: "user";
Subject: Re: how to fix the order of data


Its because you are just printing on the rdd

You can sort the df like below


 
input.toDF().sort().collect()




or if you do not want to convert to a dataframe you can use the sort 
bysortByKey([ascending], [numTasks])




Regards

Sam












On Tue, Feb 14, 2017 at 11:41 AM, ?? <1427357...@qq.com> wrote:
HIall,
thebelowingismytestcode.Ifoundtheoutputofvalinputisdifferent.howdoifixtheorderplease?

scala>valinput=sc.parallelize(Array(1,2,3))
input:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]atparallelizeat:24

scala>input.foreach(print)
132
scala>input.foreach(print)
213
scala>input.foreach(print)
312

Re: how to fix the order of data

2017-02-14 Thread Sam Elamin
Its because you are just printing on the rdd

You can sort the df like below

 input.toDF().sort().collect()


or if you do not want to convert to a dataframe you can use the sort by
*sortByKey*([*ascending*], [*numTasks*])


Regards

Sam





On Tue, Feb 14, 2017 at 11:41 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

>HI  all,
> the belowing is my test code. I found the output of val
> input is different. how do i fix the order please?
>
> scala> val input = sc.parallelize( Array(1,2,3))
> input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at
> parallelize at :24
>
> scala> input.foreach(print)
> 132
> scala> input.foreach(print)
> 213
> scala> input.foreach(print)
> 312


how to fix the order of data

2017-02-14 Thread ??????????
HI  all,
the belowing is my test code. I found the output of val input is different. how 
do i fix the order please?

scala> val input = sc.parallelize( Array(1,2,3))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize 
at :24

scala> input.foreach(print)
132
scala> input.foreach(print)
213
scala> input.foreach(print)
312

Re: Handling Skewness and Heterogeneity

2017-02-14 Thread Galen Marchetti
Anis,

I've typically seen people handle skew by seeding the keys corresponding to
high volumes with random values, then partitioning the dataset based on the
original key *and* the random value, then reducing.

Ex: ( ,  ) -> ( , ,
 )

This transformation reduces the size of the huge partition, making it
tenable for spark, as long as you can figure out logic for aggregating the
results of the seeded partitions together again.

On Tue, Feb 14, 2017 at 12:01 PM, Anis Nasir  wrote:

> Dear All,
>
> I have few use cases for spark streaming where spark cluster consist of
> heterogenous machines.
>
> Additionally, there is skew present in both the input distribution (e.g.,
> each tuple is drawn from a zipf distribution) and the service time (e.g.,
> service time required for each tuple comes from a zipf distribution).
>
> I want to know who spark will handle such use cases.
>
> Any help will be highly appreciated!
>
>
> Regards,
> Anis
>
>
>


RE: fault tolerant dataframe write with overwrite

2017-02-14 Thread Mendelson, Assaf
I know how to get the filesystem, the problem is that this means using Hadoop 
directly so if in the future we change to something else (e.g. S3) I would need 
to rewrite the code. This also relate to finding the last iteration, I would 
need to use Hadoop filesystem which is not agnostic to the deployment.

Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea 
is that every X iterations I want to save to disk so that if something crashes 
I do not have to begin from the first iteration but just from the relevant 
iteration.

Basically I would have liked to see something like saving normally and the 
original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I 
assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have 
as the directory the current timestamp and at the end you simply select the 
directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space 
overhead.

Aside from that, can you elaborate on the use case why you need to write every 
iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf 
> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven’t fully tested the process 
there), however, checkpointing converts the result to RDD which both takes more 
time and more space.
I was wondering if there is any efficient method of managing this from inside 
spark.
Thanks,
Assaf.


Re: fault tolerant dataframe write with overwrite

2017-02-14 Thread Jörn Franke
Normally you can fetch the filesystem interface from the configuration ( I 
assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have 
as the directory the current timestamp and at the end you simply select the 
directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space 
overhead.

Aside from that, can you elaborate on the use case why you need to write every 
iteration?

> On 14 Feb 2017, at 11:22, Mendelson, Assaf  wrote:
> 
> Hi,
>  
> I have a case where I have an iterative process which overwrites the results 
> of a previous iteration.
> Every iteration I need to write a dataframe with the results.
> The problem is that when I write, if I simply overwrite the results of the 
> previous iteration, this is not fault tolerant. i.e. if the program crashes 
> in the middle of an iteration, the data from previous ones is lost as 
> overwrite first removes the previous data and then starts writing.
>  
> Currently we simply write to a new directory and then rename but this is not 
> the best way as it requires us to know the interfaces to the underlying file 
> system (as well as requiring some extra work to manage which is the last one 
> etc.)
> I know I can also use checkpoint (although I haven’t fully tested the process 
> there), however, checkpointing converts the result to RDD which both takes 
> more time and more space.
> I was wondering if there is any efficient method of managing this from inside 
> spark.
> Thanks,
> Assaf.


Different Results When Performing PCA with Spark and R

2017-02-14 Thread Amlan Jyoti
Dear all,

I was exploring an use case of PCA , and found out that the results of 
Spark ML and R are different. 

More clearly,
 1) eigenMatrix_Spark EQUALS-TO eigenMatrix_R
 2) transformedData_Spark NOT-EQUALS-TO transformedData_R
 
Sample Spark Code
--
PCAModel pca = new 
PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(numberOfCol).fit(inputDataset);
DenseMatrix eigenMatrix_Spark = pca.pc
Dataset transformedData_Spark = 
pca.transform(inputDataset.select("features"));

Sample R Code
- 
pc <- prcomp(mydata)
eigenMatrix_R<- pc$Rotation
transformedData_R<- pc$x

**
 
 
After further analysis, I found out that:

- By Default, R initially performs mean-centering on the input 
dataset and then uses this modified dataset for calculating both Eigen 
Matrix and Transformed Data. [ Uses a parameter : 'center = TRUE'; for 
mean-centering]
 
- Whereas, probably Spark is performing mean-centering on the 
input data to calculate only the Eigen Matrix; and using the original 
dataset to compute the Transformed Data. [Generally, Transformed data = 
Eigen Matrix * Dataset ]
 
That is why, the result of- Eigen Matrix of Spark and R are same, whereas 
the Transformed dataset result is different for both the cases.

So, can anyone please point out the reason for why spark is not 
considering mean-centered Input data for Transformed data calculation[But 
considers while calculating for Eigen Matrix], as opposed to R?
 [Initial, Mean centering on the Input Data is done for a good PCA 
analysis as pointed out by many technical papers as well as in R]


With Best Regards
Amlan Jyoti
=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




Re: [Spark Launcher] How to launch parallel jobs?

2017-02-14 Thread Cosmin Posteuca
Hi,

Egor is right, for every partition it create a task, and every task run on
a single core. But with different configurations spark has different
results:

1 executor with 4 cores takes 120 seconds
2 executors with 2 cores each, takes twice 60 seconds, and once 120 seconds
4 executors with 1 core each, take 60 seconds

Why is it happen? why is non deterministic?

Thanks

2017-02-14 10:29 GMT+02:00 Cosmin Posteuca :

> Memory seems to be enough. My cluster has 22.5 gb total memory and my job
> use 6.88 gb. If i run twice this job, they will use 13.75 gb, but sometimes
> the cluster has a spike of memory of 19.5 gb.
>
> Thanks,
> Cosmin
>
> 2017-02-14 10:03 GMT+02:00 Mendelson, Assaf :
>
>> You should also check your memory usage.
>>
>> Let’s say for example you have 16 cores and 8 GB. And that you use 4
>> executors with 1 core each.
>>
>> When you use an executor, spark reserves it from yarn and yarn allocates
>> the number of cores (e.g. 1 in our case) and the memory. The memory is
>> actually more than you asked for. If you ask for 1GB it will in fact
>> allocate almost 1.5GB with overhead. In addition, it will probably allocate
>> an executor for the driver (probably with 1024MB memory usage).
>>
>> When you run your program and look in port 8080, you should look not only
>> on the VCores used out of the VCores total but also on the Memory used and
>> Memory total. You should also navigate to the executors (e.g.
>> applications->running on the left and then choose you application and
>> navigate all the way down to a single container). You can see there the
>> actual usage.
>>
>>
>>
>> BTW, it doesn’t matter how much memory your program wants but how much it
>> reserves. In your example it will not take the 50MB of the test but the
>> ~1.5GB (after overhead) per executor.
>>
>> Hope this helps,
>>
>> Assaf.
>>
>>
>>
>> *From:* Cosmin Posteuca [mailto:cosmin.poste...@gmail.com]
>> *Sent:* Tuesday, February 14, 2017 9:53 AM
>> *To:* Egor Pahomov
>> *Cc:* user
>> *Subject:* Re: [Spark Launcher] How to launch parallel jobs?
>>
>>
>>
>> Hi Egor,
>>
>>
>>
>> About the first problem i think you are right, it's make sense.
>>
>>
>>
>> About the second problem, i check available resource on 8088 port and
>> there show 16 available cores. I start my job with 4 executors with 1 core
>> each, and 1gb per executor. My job use maximum 50mb of memory(just for
>> test). From my point of view the resources are enough, and the problem i
>> think is from yarn configuration files, but i don't know what is missing.
>>
>>
>>
>> Thank you
>>
>>
>>
>> 2017-02-13 21:14 GMT+02:00 Egor Pahomov :
>>
>> About second problem: I understand this can be in two cases: when one job
>> prevents the other one from getting resources for executors or (2)
>> bottleneck is reading from disk, so you can not really parallel that. I
>> have no experience with second case, but it's easy to verify the fist one:
>> just look on you hadoop UI and verify, that both job get enough resources.
>>
>>
>>
>> 2017-02-13 11:07 GMT-08:00 Egor Pahomov :
>>
>> "But if i increase only executor-cores the finish time is the same".
>> More experienced ones can correct me, if I'm wrong, but as far as I
>> understand that: one partition processed by one spark task. Task is always
>> running on 1 core and not parallelized among cores. So if you have 5
>> partitions and you increased totall number of cores among cluster from 7 to
>> 10 for example - you have not gained anything. But if you repartition you
>> give an opportunity to process thing in more threads, so now more tasks can
>> execute in parallel.
>>
>>
>>
>> 2017-02-13 7:05 GMT-08:00 Cosmin Posteuca :
>>
>> Hi,
>>
>>
>>
>> I think i don't understand enough how to launch jobs.
>>
>>
>>
>> I have one job which takes 60 seconds to finish. I run it with following
>> command:
>>
>>
>>
>> spark-submit --executor-cores 1 \
>>
>>  --executor-memory 1g \
>>
>>  --driver-memory 1g \
>>
>>  --master yarn \
>>
>>  --deploy-mode cluster \
>>
>>  --conf spark.dynamicAllocation.enabled=true \
>>
>>  --conf spark.shuffle.service.enabled=true \
>>
>>  --conf spark.dynamicAllocation.minExecutors=1 \
>>
>>  --conf spark.dynamicAllocation.maxExecutors=4 \
>>
>>  --conf spark.dynamicAllocation.initialExecutors=4 \
>>
>>  --conf spark.executor.instances=4 \
>>
>> If i increase number of partitions from code and number of executors the app 
>> will finish faster, which it's ok. But if i increase only executor-cores the 
>> finish time is the same, and i don't understand why. I expect the time to be 
>> lower than initial time.
>>
>> My second problem is if i launch twice above code i expect that both jobs to 
>> finish in 60 seconds, but this don't happen. Both jobs finish after 120 
>> 

fault tolerant dataframe write with overwrite

2017-02-14 Thread Mendelson, Assaf
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven't fully tested the process 
there), however, checkpointing converts the result to RDD which both takes more 
time and more space.
I was wondering if there is any efficient method of managing this from inside 
spark.
Thanks,
Assaf.


Re: wholeTextfiles not parallel, runs out of memory

2017-02-14 Thread Jörn Franke
Well 1) the goal of wholetextfiles is to have only one executor 2) you use .gz 
i.e. you will have only one executor per file maximum

> On 14 Feb 2017, at 09:36, Henry Tremblay  wrote:
> 
> When I use wholeTextFiles, spark does not run in parallel, and yarn runs out 
> of memory. 
> I have documented the steps below. First I copy 6 s3 files to hdfs. Then I 
> create an rdd by:
> 
> 
> sc.wholeTextFiles("/mnt/temp")
> 
> 
> Then I process the files line by line using a simple function. When I look at 
> my nodes, I see only one executor is running. (I assume the other is the name 
> node?) I then get an error message that yarn has run out of memory.
> 
> 
> Steps below:
> 
> 
> 
> [hadoop@ip-172-31-40-213 mnt]$ hadoop fs -ls /mnt/temp
> Found 6 items
> -rw-r--r--   3 hadoop hadoop3684566 2017-02-14 07:58 
> /mnt/temp/CC-MAIN-20170116095122-00570-ip-10-171-10-70.ec2.internal.warc.gz
> -rw-r--r--   3 hadoop hadoop3486510 2017-02-14 08:01 
> /mnt/temp/CC-MAIN-20170116095122-00571-ip-10-171-10-70.ec2.internal.warc.gz
> -rw-r--r--   3 hadoop hadoop3498649 2017-02-14 08:05 
> /mnt/temp/CC-MAIN-20170116095122-00572-ip-10-171-10-70.ec2.internal.warc.gz
> -rw-r--r--   3 hadoop hadoop4007644 2017-02-14 08:06 
> /mnt/temp/CC-MAIN-20170116095122-00573-ip-10-171-10-70.ec2.internal.warc.gz
> -rw-r--r--   3 hadoop hadoop3990553 2017-02-14 08:07 
> /mnt/temp/CC-MAIN-20170116095122-00574-ip-10-171-10-70.ec2.internal.warc.gz
> -rw-r--r--   3 hadoop hadoop3689213 2017-02-14 07:54 
> /mnt/temp/CC-MAIN-20170116095122-00575-ip-10-171-10-70.ec2.internal.warc.gz
> 
> 
> In [6]: rdd1 = sc.wholeTextFiles("mnt/temp"
> In [7]: rdd1.count()
> Out[7]: 6
> 
> def process_file(s):
> text = s[1]
> d = {}
> l =  text.split("\n")
> final = []
> the_id = "init"
> for line in l:
> if line[0:15] == 'WARC-Record-ID:':
> the_id = line[15:]
> d[the_id] = line
> final.append(Row(**d))
> return final
> 
> 
> In [8]: rdd2 = rdd1.map(process_file)
> In [9]: rdd2.take(1)
> 
> 
> 
> 
> 
> 17/02/14 08:25:25 ERROR YarnScheduler: Lost executor 2 on 
> ip-172-31-35-32.us-west-2.compute.internal: Container killed by YARN for 
> exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider 
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/14 08:25:25 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container 
> killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory 
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 17/02/14 08:25:25 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, 
> ip-172-31-35-32.us-west-2.compute.internal, executor 2): ExecutorLostFailure 
> (executor 2 exited caused by one of the running tasks) Reason: Container 
> killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory 
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 17/02/14 08:29:34 ERROR YarnScheduler: Lost executor 3 on 
> ip-172-31-45-106.us-west-2.compute.internal: Container killed by YARN for 
> exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider 
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/14 08:29:34 WARN TaskSetManager: Lost task 0.1 in stage 2.0 (TID 4, 
> ip-172-31-45-106.us-west-2.compute.internal, executor 3): ExecutorLostFailure 
> (executor 3 exited caused by one of the running tasks) Reason: Container 
> killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory 
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 17/02/14 08:29:34 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container 
> killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory 
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 17/02/14 08:33:44 ERROR YarnScheduler: Lost executor 4 on 
> ip-172-31-35-32.us-west-2.compute.internal: Container killed by YARN for 
> exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider 
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/14 08:33:44 WARN TaskSetManager: Lost task 0.2 in stage 2.0 (TID 5, 
> ip-172-31-35-32.us-west-2.compute.internal, executor 4): ExecutorLostFailure 
> (executor 4 exited caused by one of the running tasks) Reason: Container 
> killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory 
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 17/02/14 08:33:44 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container 
> killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory 
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 
> -- 
> Henry Tremblay
> Robert Half Technology


Re: [Spark Launcher] How to launch parallel jobs?

2017-02-14 Thread Cosmin Posteuca
Memory seems to be enough. My cluster has 22.5 gb total memory and my job
use 6.88 gb. If i run twice this job, they will use 13.75 gb, but sometimes
the cluster has a spike of memory of 19.5 gb.

Thanks,
Cosmin

2017-02-14 10:03 GMT+02:00 Mendelson, Assaf :

> You should also check your memory usage.
>
> Let’s say for example you have 16 cores and 8 GB. And that you use 4
> executors with 1 core each.
>
> When you use an executor, spark reserves it from yarn and yarn allocates
> the number of cores (e.g. 1 in our case) and the memory. The memory is
> actually more than you asked for. If you ask for 1GB it will in fact
> allocate almost 1.5GB with overhead. In addition, it will probably allocate
> an executor for the driver (probably with 1024MB memory usage).
>
> When you run your program and look in port 8080, you should look not only
> on the VCores used out of the VCores total but also on the Memory used and
> Memory total. You should also navigate to the executors (e.g.
> applications->running on the left and then choose you application and
> navigate all the way down to a single container). You can see there the
> actual usage.
>
>
>
> BTW, it doesn’t matter how much memory your program wants but how much it
> reserves. In your example it will not take the 50MB of the test but the
> ~1.5GB (after overhead) per executor.
>
> Hope this helps,
>
> Assaf.
>
>
>
> *From:* Cosmin Posteuca [mailto:cosmin.poste...@gmail.com]
> *Sent:* Tuesday, February 14, 2017 9:53 AM
> *To:* Egor Pahomov
> *Cc:* user
> *Subject:* Re: [Spark Launcher] How to launch parallel jobs?
>
>
>
> Hi Egor,
>
>
>
> About the first problem i think you are right, it's make sense.
>
>
>
> About the second problem, i check available resource on 8088 port and
> there show 16 available cores. I start my job with 4 executors with 1 core
> each, and 1gb per executor. My job use maximum 50mb of memory(just for
> test). From my point of view the resources are enough, and the problem i
> think is from yarn configuration files, but i don't know what is missing.
>
>
>
> Thank you
>
>
>
> 2017-02-13 21:14 GMT+02:00 Egor Pahomov :
>
> About second problem: I understand this can be in two cases: when one job
> prevents the other one from getting resources for executors or (2)
> bottleneck is reading from disk, so you can not really parallel that. I
> have no experience with second case, but it's easy to verify the fist one:
> just look on you hadoop UI and verify, that both job get enough resources.
>
>
>
> 2017-02-13 11:07 GMT-08:00 Egor Pahomov :
>
> "But if i increase only executor-cores the finish time is the same". More
> experienced ones can correct me, if I'm wrong, but as far as I understand
> that: one partition processed by one spark task. Task is always running on
> 1 core and not parallelized among cores. So if you have 5 partitions and
> you increased totall number of cores among cluster from 7 to 10 for example
> - you have not gained anything. But if you repartition you give an
> opportunity to process thing in more threads, so now more tasks can execute
> in parallel.
>
>
>
> 2017-02-13 7:05 GMT-08:00 Cosmin Posteuca :
>
> Hi,
>
>
>
> I think i don't understand enough how to launch jobs.
>
>
>
> I have one job which takes 60 seconds to finish. I run it with following
> command:
>
>
>
> spark-submit --executor-cores 1 \
>
>  --executor-memory 1g \
>
>  --driver-memory 1g \
>
>  --master yarn \
>
>  --deploy-mode cluster \
>
>  --conf spark.dynamicAllocation.enabled=true \
>
>  --conf spark.shuffle.service.enabled=true \
>
>  --conf spark.dynamicAllocation.minExecutors=1 \
>
>  --conf spark.dynamicAllocation.maxExecutors=4 \
>
>  --conf spark.dynamicAllocation.initialExecutors=4 \
>
>  --conf spark.executor.instances=4 \
>
> If i increase number of partitions from code and number of executors the app 
> will finish faster, which it's ok. But if i increase only executor-cores the 
> finish time is the same, and i don't understand why. I expect the time to be 
> lower than initial time.
>
> My second problem is if i launch twice above code i expect that both jobs to 
> finish in 60 seconds, but this don't happen. Both jobs finish after 120 
> seconds and i don't understand why.
>
> I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu has 2 
> threads). From what i saw in default EMR configurations, yarn is set on 
> FIFO(default) mode with CapacityScheduler.
>
> What do you think about this problems?
>
> Thanks,
>
> Cosmin
>
>
>
>
>
> --
>
>
> *Sincerely yours Egor Pakhomov*
>
>
>
>
>
> --
>
>
> *Sincerely yours Egor Pakhomov*
>
>
>


RE: [Spark Launcher] How to launch parallel jobs?

2017-02-14 Thread Mendelson, Assaf
You should also check your memory usage.
Let’s say for example you have 16 cores and 8 GB. And that you use 4 executors 
with 1 core each.
When you use an executor, spark reserves it from yarn and yarn allocates the 
number of cores (e.g. 1 in our case) and the memory. The memory is actually 
more than you asked for. If you ask for 1GB it will in fact allocate almost 
1.5GB with overhead. In addition, it will probably allocate an executor for the 
driver (probably with 1024MB memory usage).
When you run your program and look in port 8080, you should look not only on 
the VCores used out of the VCores total but also on the Memory used and Memory 
total. You should also navigate to the executors (e.g. applications->running on 
the left and then choose you application and navigate all the way down to a 
single container). You can see there the actual usage.

BTW, it doesn’t matter how much memory your program wants but how much it 
reserves. In your example it will not take the 50MB of the test but the ~1.5GB 
(after overhead) per executor.
Hope this helps,
Assaf.

From: Cosmin Posteuca [mailto:cosmin.poste...@gmail.com]
Sent: Tuesday, February 14, 2017 9:53 AM
To: Egor Pahomov
Cc: user
Subject: Re: [Spark Launcher] How to launch parallel jobs?

Hi Egor,

About the first problem i think you are right, it's make sense.

About the second problem, i check available resource on 8088 port and there 
show 16 available cores. I start my job with 4 executors with 1 core each, and 
1gb per executor. My job use maximum 50mb of memory(just for test). From my 
point of view the resources are enough, and the problem i think is from yarn 
configuration files, but i don't know what is missing.

Thank you

2017-02-13 21:14 GMT+02:00 Egor Pahomov 
>:
About second problem: I understand this can be in two cases: when one job 
prevents the other one from getting resources for executors or (2) bottleneck 
is reading from disk, so you can not really parallel that. I have no experience 
with second case, but it's easy to verify the fist one: just look on you hadoop 
UI and verify, that both job get enough resources.

2017-02-13 11:07 GMT-08:00 Egor Pahomov 
>:
"But if i increase only executor-cores the finish time is the same". More 
experienced ones can correct me, if I'm wrong, but as far as I understand that: 
one partition processed by one spark task. Task is always running on 1 core and 
not parallelized among cores. So if you have 5 partitions and you increased 
totall number of cores among cluster from 7 to 10 for example - you have not 
gained anything. But if you repartition you give an opportunity to process 
thing in more threads, so now more tasks can execute in parallel.

2017-02-13 7:05 GMT-08:00 Cosmin Posteuca 
>:
Hi,

I think i don't understand enough how to launch jobs.

I have one job which takes 60 seconds to finish. I run it with following 
command:


spark-submit --executor-cores 1 \

 --executor-memory 1g \

 --driver-memory 1g \

 --master yarn \

 --deploy-mode cluster \

 --conf spark.dynamicAllocation.enabled=true \

 --conf spark.shuffle.service.enabled=true \

 --conf spark.dynamicAllocation.minExecutors=1 \

 --conf spark.dynamicAllocation.maxExecutors=4 \

 --conf spark.dynamicAllocation.initialExecutors=4 \

 --conf spark.executor.instances=4 \

If i increase number of partitions from code and number of executors the app 
will finish faster, which it's ok. But if i increase only executor-cores the 
finish time is the same, and i don't understand why. I expect the time to be 
lower than initial time.

My second problem is if i launch twice above code i expect that both jobs to 
finish in 60 seconds, but this don't happen. Both jobs finish after 120 seconds 
and i don't understand why.

I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu has 2 
threads). From what i saw in default EMR configurations, yarn is set on 
FIFO(default) mode with CapacityScheduler.

What do you think about this problems?

Thanks,

Cosmin



--
Sincerely yours
Egor Pakhomov



--
Sincerely yours
Egor Pakhomov



Re: Is it better to Use Java or Python on Scala for Spark for using big data sets

2017-02-14 Thread Gourav Sengupta
SPARK is  written in SCALA and that is why its so efficient and super fast.

If you are looking for efficiency, speed, and easily maintainable code
(where you do not have to write thousands of lines of code instead of a few
lines) please use Scala functional programming. I have seen horrible JAVA
morons who use Scala for OOPs and claim its nothing new please avoid them.

Regards,
Gourav

On Mon, Feb 13, 2017 at 5:57 PM, Spark User  wrote:

> Spark has more support for scala, by that I mean more APIs are available
> for scala compared to python or Java. Also scala code will be more concise
> and easy to read. Java is very verbose.
>
> On Thu, Feb 9, 2017 at 10:21 PM, Irving Duran 
> wrote:
>
>> I would say Java, since it will be somewhat similar to Scala.  Now, this
>> assumes that you have some app already written in Scala. If you don't, then
>> pick the language that you feel most comfortable with.
>>
>> Thank you,
>>
>> Irving Duran
>>
>> On Feb 9, 2017, at 11:59 PM, nancy henry 
>> wrote:
>>
>> Hi All,
>>
>> Is it better to Use Java or Python on Scala for Spark coding..
>>
>> Mainly My work is with getting file data which is in csv format  and I
>> have to do some rule checking and rule aggrgeation
>>
>> and put the final filtered data back to oracle so that real time apps can
>> use it..
>>
>
>


Handling Skewness and Heterogeneity

2017-02-14 Thread Anis Nasir
Dear All,

I have few use cases for spark streaming where spark cluster consist of
heterogenous machines.

Additionally, there is skew present in both the input distribution (e.g.,
each tuple is drawn from a zipf distribution) and the service time (e.g.,
service time required for each tuple comes from a zipf distribution).

I want to know who spark will handle such use cases.

Any help will be highly appreciated!


Regards,
Anis