Re: [Spark Launcher] How to launch parallel jobs?

2017-02-13 Thread Cosmin Posteuca
Hi Egor,

About the first problem i think you are right, it's make sense.

About the second problem, i check available resource on 8088 port and there
show 16 available cores. I start my job with 4 executors with 1 core each,
and 1gb per executor. My job use maximum 50mb of memory(just for test).
>From my point of view the resources are enough, and the problem i think is
from yarn configuration files, but i don't know what is missing.

Thank you

2017-02-13 21:14 GMT+02:00 Egor Pahomov :

> About second problem: I understand this can be in two cases: when one job
> prevents the other one from getting resources for executors or (2)
> bottleneck is reading from disk, so you can not really parallel that. I
> have no experience with second case, but it's easy to verify the fist one:
> just look on you hadoop UI and verify, that both job get enough resources.
>
> 2017-02-13 11:07 GMT-08:00 Egor Pahomov :
>
>> "But if i increase only executor-cores the finish time is the same".
>> More experienced ones can correct me, if I'm wrong, but as far as I
>> understand that: one partition processed by one spark task. Task is always
>> running on 1 core and not parallelized among cores. So if you have 5
>> partitions and you increased totall number of cores among cluster from 7 to
>> 10 for example - you have not gained anything. But if you repartition you
>> give an opportunity to process thing in more threads, so now more tasks can
>> execute in parallel.
>>
>> 2017-02-13 7:05 GMT-08:00 Cosmin Posteuca :
>>
>>> Hi,
>>>
>>> I think i don't understand enough how to launch jobs.
>>>
>>> I have one job which takes 60 seconds to finish. I run it with following
>>> command:
>>>
>>> spark-submit --executor-cores 1 \
>>>  --executor-memory 1g \
>>>  --driver-memory 1g \
>>>  --master yarn \
>>>  --deploy-mode cluster \
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.shuffle.service.enabled=true \
>>>  --conf spark.dynamicAllocation.minExecutors=1 \
>>>  --conf spark.dynamicAllocation.maxExecutors=4 \
>>>  --conf spark.dynamicAllocation.initialExecutors=4 \
>>>  --conf spark.executor.instances=4 \
>>>
>>> If i increase number of partitions from code and number of executors the 
>>> app will finish faster, which it's ok. But if i increase only 
>>> executor-cores the finish time is the same, and i don't understand why. I 
>>> expect the time to be lower than initial time.
>>>
>>> My second problem is if i launch twice above code i expect that both jobs 
>>> to finish in 60 seconds, but this don't happen. Both jobs finish after 120 
>>> seconds and i don't understand why.
>>>
>>> I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu has 2 
>>> threads). From what i saw in default EMR configurations, yarn is set on 
>>> FIFO(default) mode with CapacityScheduler.
>>>
>>> What do you think about this problems?
>>>
>>> Thanks,
>>>
>>> Cosmin
>>>
>>>
>>
>>
>> --
>>
>>
>> *Sincerely yoursEgor Pakhomov*
>>
>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>


Re: Lost executor 4 Container killed by YARN for exceeding memory limits.

2017-02-13 Thread nancy henry
Hi,


How to set this parameters while launching spark shell

spark.shuffle.memoryFraction=0.5

and

spark.yarn.executor.memoryOverhead=1024


I tried giving like this but I am giving below error

spark-shell --master yarn --deploy-mode client --driver-memory 16G
--num-executors 500 executor-cores 4 --executor-memory 7G --conf
spark.shuffle.memoryFraction=0.5 --conf
spark.yarn.executor.memoryOverhead=1024

Warning
17/02/13 22:42:02 WARN SparkConf: Detected deprecated memory fraction
settings: [spark.shuffle.memoryFraction]. As of Spark 1.6, execution and
storage memory management are unified. All memory fractions used in the old
model are now deprecated and no longer read. If you wish to use the old
memory management, you may explicitly enable `spark.memory.useLegacyMode`
(not recommended).



On Mon, Feb 13, 2017 at 11:23 PM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Nancy,
>
>
>
> As your log output indicated, your executor 11 GB memory limit.
>
> While you might want to address the root cause/data volume as suggested by
> Jon, you can do an immediate test by changing your command as follows
>
>
>
> spark-shell --master yarn --deploy-mode client --driver-memory 16G
> --num-executors 500 executor-cores 7 --executor-memory 14G
>
>
>
> This essentially increases your executor memory from 11 GB to 14 GB.
>
> Note that it will result in a potentially large footprint - from 500x11 to
> 500x14 GB.
>
> You may want to consult with your DevOps/Operations/Spark Admin team first.
>
>
>
> *From: *Jon Gregg 
> *Date: *Monday, February 13, 2017 at 8:58 AM
> *To: *nancy henry 
> *Cc: *"user @spark" 
> *Subject: *Re: Lost executor 4 Container killed by YARN for exceeding
> memory limits.
>
>
>
> Setting Spark's memoryOverhead configuration variable is recommended in
> your logs, and has helped me with these issues in the past.  Search for
> "memoryOverhead" here:  http://spark.apache.org/docs/
> latest/running-on-yarn.html
>
>
>
> That said, you're running on a huge cluster as it is.  If it's possible to
> filter your tables down before the join (keeping just the rows/columns you
> need), that may be a better solution.
>
>
>
> Jon
>
>
>
> On Mon, Feb 13, 2017 at 5:27 AM, nancy henry 
> wrote:
>
> Hi All,,
>
>
>
> I am getting below error while I am trying to join 3 tables which are in
> ORC format in hive from 5 10gb tables through hive context in spark
>
>
>
> Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB
> physical memory used. Consider boosting spark.yarn.executor.
> memoryOverhead.
>
> 17/02/13 02:21:19 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB
> physical memory used
>
>
>
>
>
> I am using below memory parameters to launch shell .. what else i could
> increase from these parameters or do I need to change any configuration
> settings please let me know
>
>
>
> spark-shell --master yarn --deploy-mode client --driver-memory 16G
> --num-executors 500 executor-cores 7 --executor-memory 10G
>
>
>
>
>


Re: Strange behavior with 'not' and filter pushdown

2017-02-13 Thread Everett Anderson
Went ahead and opened

https://issues.apache.org/jira/browse/SPARK-19586

though I'd generally expect to just close it as fixed in 2.1.0 and roll on.

On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson  wrote:

> On the plus side, looks like this may be fixed in 2.1.0:
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[partial_count(1)])
>   +- *Project
>  +- *Filter NOT isnotnull(username#14)
> +- *FileScan parquet [username#14] Batched: true, Format:
> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
> ReadSchema: struct
>
>
>
> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson 
> wrote:
>
>> Bumping this thread.
>>
>> Translating "where not(username is not null)" into a filter of  
>> [IsNotNull(username),
>> Not(IsNotNull(username))] seems like a rather severe bug.
>>
>> Spark 1.6.2:
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>> output=[_c0#1822L])
>> +- TungstenExchange SinglePartition, None
>>  +- TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)],
>> output=[count#1825L])
>>  +- Project
>>  +- Filter NOT isnotnull(username#1590)
>>  +- Scan ParquetRelation[username#1590] InputPaths: ,
>> PushedFilters: [Not(IsNotNull(username))]
>>
>> Spark 2.0.2
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>  +- *Project
>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>  +- *BatchedScan parquet default.[username#35] Format:
>> ParquetFormat, InputPaths: , PartitionFilters: [],
>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>> ReadSchema: struct
>>
>> Example to generate the above:
>>
>> // Create some fake data
>>
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.Dataset
>> import org.apache.spark.sql.types._
>>
>> val rowsRDD = sc.parallelize(Seq(
>> Row(1, "fred"),
>> Row(2, "amy"),
>> Row(3, null)))
>>
>> val schema = StructType(Seq(
>> StructField("id", IntegerType, nullable = true),
>> StructField("username", StringType, nullable = true)))
>>
>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>
>> val path = "SOME PATH HERE"
>>
>> data.write.mode("overwrite").parquet(path)
>>
>> val testData = sqlContext.read.parquet(path)
>>
>> testData.registerTempTable("filter_test_table")
>>
>>
>> %sql
>> explain select count(*) from filter_test_table where not( username is not
>> null)
>>
>>
>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>> akosti...@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> I have an application where I’m filtering data with SparkSQL with simple
>>> WHERE clauses. I also want the ability to show the unmatched rows for any
>>> filter, and so am wrapping the previous clause in `NOT()` to get the
>>> inverse. Example:
>>>
>>> Filter:  username is not null
>>> Inverse filter:  NOT(username is not null)
>>>
>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>> inverse filter always returns zero results. It looks like this is a problem
>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>> which would obviously result in zero matches. An example below:
>>>
>>> pyspark:
>>> > x = spark.sql('select my_id from my_table where *username is not null*
>>> ')
>>> > y = spark.sql('select my_id from my_table where not(*username is not
>>> null*)')
>>>
>>> > x.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter isnotnull(username#91)
>>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>>ReadSchema: struct
>>> [1159]> y.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
>>> e
>>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>PartitionFilters: [],
>>>PushedFilters: [IsNotNull(username),
>>> Not(IsNotNull(username))],username
>>>ReadSchema: struct
>>>
>>> Presently I’m working around this by using the new functionality of NOT
>>> EXISTS in Spark 2, but that seems like overkill.
>>>
>>> Any help appreciated.
>>>
>>>
>>> *Alexi Kostibas*Engineering
>>> *Nuna*

Re: Case class with POJO - encoder issues

2017-02-13 Thread Michael Armbrust
You are right, you need that PR.  I pinged the author, but otherwise it
would be great if someone could carry it over the finish line.

On Sat, Feb 11, 2017 at 4:19 PM, Jason White 
wrote:

> I'd like to create a Dataset using some classes from Geotools to do some
> geospatial analysis. In particular, I'm trying to use Spark to distribute
> the work based on ID and label fields that I extract from the polygon data.
>
> My simplified case class looks like this:
> implicit val geometryEncoder: Encoder[Geometry] = Encoders.kryo[Geometry]
> case class IndexedGeometry(label: String, tract: Geometry)
>
> When I try to create a dataset using this case class, it give me this error
> message:
> Exception in thread "main" java.lang.UnsupportedOperationException: No
> Encoder found for com.vividsolutions.jts.geom.Geometry
> - field (class: "com.vividsolutions.jts.geom.Geometry", name: "tract")
> - root class: "org.me.HelloWorld.IndexedGeometry"
>
> If I add another encoder for my case class...:
> implicit val indexedGeometryEncoder: Encoder[IndexedGeometry] =
> Encoders.kryo[IndexedGeometry]
>
> ...it works, but now the entire dataset has a single field, "value", and
> it's a binary blob.
>
> Is there a way to do what I'm trying to do?
> I believe this PR is related, but it's been idle since December:
> https://github.com/apache/spark/pull/15918
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Case-class-with-POJO-encoder-issues-tp28381.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-13 Thread Nick Pentreath
The original Uber authors provided this performance test result:
https://docs.google.com/document/d/19BXg-67U83NVB3M0I84HVBVg3baAVaESD_mrg_-vLro

This was for MinHash only though, so it's not clear about what the
scalability is for the other metric types.

The SignRandomProjectionLSH is not yet in Spark master (see
https://issues.apache.org/jira/browse/SPARK-18082). It could be there are
some implementation details that would make a difference here.

By the way, what is the join threshold you use in approx join?

Could you perhaps create a JIRA ticket with the details in order to track
this?


On Sun, 12 Feb 2017 at 22:52 nguyen duc Tuan  wrote:

> After all, I switched back to LSH implementation that I used before (
> https://github.com/karlhigley/spark-neighbors ). I can run on my dataset
> now. If someone has any suggestion, please tell me.
> Thanks.
>
> 2017-02-12 9:25 GMT+07:00 nguyen duc Tuan :
>
> Hi Timur,
> 1) Our data is transformed to dataset of Vector already.
> 2) If I use RandomSignProjectLSH, the job dies after I call
> approximateSimilarJoin. I tried to use Minhash instead, the job is still
> slow. I don't thinks the problem is related to the GC. The time for GC is
> small compare with the time for computation. Here is some screenshots of my
> job.
> Thanks
>
> 2017-02-12 8:01 GMT+07:00 Timur Shenkao :
>
> Hello,
>
> 1) Are you sure that your data is "clean"?  No unexpected missing values?
> No strings in unusual encoding? No additional or missing columns ?
> 2) How long does your job run? What about garbage collector parameters?
> Have you checked what happens with jconsole / jvisualvm ?
>
> Sincerely yours, Timur
>
> On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan 
> wrote:
>
> Hi Nick,
> Because we use *RandomSignProjectionLSH*, there is only one parameter for
> LSH is the number of hashes. I try with small number of hashes (2) but the
> error is still happens. And it happens when I call similarity join. After
> transformation, the size of  dataset is about 4G.
>
> 2017-02-11 3:07 GMT+07:00 Nick Pentreath :
>
> What other params are you using for the lsh transformer?
>
> Are the issues occurring during transform or during the similarity join?
>
>
> On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan 
> wrote:
>
> hi Das,
> In general, I will apply them to larger datasets, so I want to use LSH,
> which is more scaleable than the approaches as you suggested. Have you
> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
> parameters/configuration to make it work ?
> Thanks.
>
> 2017-02-10 19:21 GMT+07:00 Debasish Das :
>
> If it is 7m rows and 700k features (or say 1m features) brute force row
> similarity will run fine as well...check out spark-4823...you can compare
> quality with approximate variant...
> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan"  wrote:
>
> Hi everyone,
> Since spark 2.1.0 introduces LSH (
> http://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing),
> we want to use LSH to find approximately nearest neighbors. Basically, We
> have dataset with about 7M rows. we want to use cosine distance to meassure
> the similarity between items, so we use *RandomSignProjectionLSH* (
> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db) instead
> of *BucketedRandomProjectionLSH*. I try to tune some configurations such
> as serialization, memory fraction, executor memory (~6G), number of
> executors ( ~20), memory overhead ..., but nothing works. I often get error
> "java.lang.OutOfMemoryError: Java heap space" while running. I know that
> this implementation is done by engineer at Uber but I don't know right
> configurations,.. to run the algorithm at scale. Do they need very big
> memory to run it?
>
> Any help would be appreciated.
> Thanks
>
>
>
>
>
>
>


How to specify default value for StructField?

2017-02-13 Thread vbegar
Hello,

I specified a StructType like this: 

*val mySchema = StructType(Array(StructField("f1", StringType,
true),StructField("f2", StringType, true)))*

I have many ORC files stored in HDFS location:*
/user/hos/orc_files_test_together
*

These files use different schema : some of them have only f1 columns and
other have both f1 and f2 columns. 

I read the data from these files to a dataframe:
*val df =
spark.read.format("orc").schema(mySchema).load("/user/hos/orc_files_test_together")*

But, now when I give the following command to see the data, it fails:
*df.show*

The error message is like "f2" comun doesn't exist. 

Since I have specified nullable attribute as true for f2 column, why it
fails?

Or, is there any way to specify default vaule for StructField?

Because, in AVRO schema, we can specify the default value in this way and
can read AVRO files in a folder which have 2 different schemas (either only
f1 column or both f1 and f2 columns): 

*{
   "type": "record",
   "name": "myrecord",
   "fields": 
   [
  {
 "name": "f1",
 "type": "string",
 "default": ""
  },
  {
 "name": "f2",
 "type": "string",
 "default": ""
  }
   ]
}*

Wondering why it doesn't work with ORC files.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-specify-default-value-for-StructField-tp28386.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.1.0 issue with spark-shell and pyspark

2017-02-13 Thread jerrytim
I came across the same problem while I ran my code at "model.save(sc, path)"

Error info:
IllegalArgumentException: u"Error while instantiating
'org.apache.spark.sql.hive.HiveSessionState':"

My platform is Mac, I installed Spark with Hadoop prebuilt. Then I
integrated PySpark with Jupyter.

Anyone has any ideas?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-1-0-issue-with-spark-shell-and-pyspark-tp28339p28385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Launcher] How to launch parallel jobs?

2017-02-13 Thread Egor Pahomov
About second problem: I understand this can be in two cases: when one job
prevents the other one from getting resources for executors or (2)
bottleneck is reading from disk, so you can not really parallel that. I
have no experience with second case, but it's easy to verify the fist one:
just look on you hadoop UI and verify, that both job get enough resources.

2017-02-13 11:07 GMT-08:00 Egor Pahomov :

> "But if i increase only executor-cores the finish time is the same". More
> experienced ones can correct me, if I'm wrong, but as far as I understand
> that: one partition processed by one spark task. Task is always running on
> 1 core and not parallelized among cores. So if you have 5 partitions and
> you increased totall number of cores among cluster from 7 to 10 for example
> - you have not gained anything. But if you repartition you give an
> opportunity to process thing in more threads, so now more tasks can execute
> in parallel.
>
> 2017-02-13 7:05 GMT-08:00 Cosmin Posteuca :
>
>> Hi,
>>
>> I think i don't understand enough how to launch jobs.
>>
>> I have one job which takes 60 seconds to finish. I run it with following
>> command:
>>
>> spark-submit --executor-cores 1 \
>>  --executor-memory 1g \
>>  --driver-memory 1g \
>>  --master yarn \
>>  --deploy-mode cluster \
>>  --conf spark.dynamicAllocation.enabled=true \
>>  --conf spark.shuffle.service.enabled=true \
>>  --conf spark.dynamicAllocation.minExecutors=1 \
>>  --conf spark.dynamicAllocation.maxExecutors=4 \
>>  --conf spark.dynamicAllocation.initialExecutors=4 \
>>  --conf spark.executor.instances=4 \
>>
>> If i increase number of partitions from code and number of executors the app 
>> will finish faster, which it's ok. But if i increase only executor-cores the 
>> finish time is the same, and i don't understand why. I expect the time to be 
>> lower than initial time.
>>
>> My second problem is if i launch twice above code i expect that both jobs to 
>> finish in 60 seconds, but this don't happen. Both jobs finish after 120 
>> seconds and i don't understand why.
>>
>> I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu has 2 
>> threads). From what i saw in default EMR configurations, yarn is set on 
>> FIFO(default) mode with CapacityScheduler.
>>
>> What do you think about this problems?
>>
>> Thanks,
>>
>> Cosmin
>>
>>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>



-- 


*Sincerely yoursEgor Pakhomov*


Re: [Spark Launcher] How to launch parallel jobs?

2017-02-13 Thread Egor Pahomov
"But if i increase only executor-cores the finish time is the same". More
experienced ones can correct me, if I'm wrong, but as far as I understand
that: one partition processed by one spark task. Task is always running on
1 core and not parallelized among cores. So if you have 5 partitions and
you increased totall number of cores among cluster from 7 to 10 for example
- you have not gained anything. But if you repartition you give an
opportunity to process thing in more threads, so now more tasks can execute
in parallel.

2017-02-13 7:05 GMT-08:00 Cosmin Posteuca :

> Hi,
>
> I think i don't understand enough how to launch jobs.
>
> I have one job which takes 60 seconds to finish. I run it with following
> command:
>
> spark-submit --executor-cores 1 \
>  --executor-memory 1g \
>  --driver-memory 1g \
>  --master yarn \
>  --deploy-mode cluster \
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.shuffle.service.enabled=true \
>  --conf spark.dynamicAllocation.minExecutors=1 \
>  --conf spark.dynamicAllocation.maxExecutors=4 \
>  --conf spark.dynamicAllocation.initialExecutors=4 \
>  --conf spark.executor.instances=4 \
>
> If i increase number of partitions from code and number of executors the app 
> will finish faster, which it's ok. But if i increase only executor-cores the 
> finish time is the same, and i don't understand why. I expect the time to be 
> lower than initial time.
>
> My second problem is if i launch twice above code i expect that both jobs to 
> finish in 60 seconds, but this don't happen. Both jobs finish after 120 
> seconds and i don't understand why.
>
> I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu has 2 
> threads). From what i saw in default EMR configurations, yarn is set on 
> FIFO(default) mode with CapacityScheduler.
>
> What do you think about this problems?
>
> Thanks,
>
> Cosmin
>
>


-- 


*Sincerely yoursEgor Pakhomov*


using spark-xml_2.10 to extract data from XML file

2017-02-13 Thread Carlo . Allocca
Dear All,

I am using spark-xml_2.10 to parse and extract some data from XML files.

I got the issue of getting null value whereas the XML file contains actually 
values.

++--++
|xocs:item.bibrecord.head.abstracts.abstract._original 
|xocs:item.bibrecord.head.abstracts.abstract._lang | 
xocs:item.bibrecord.head.abstracts.abstract.ce:para|
++--+-+
| null| 
null|   
 null|
++---+-+

Below, I report an example of XML that I processing and the code I am using to 
parse it.

What am I doing wrong?

Please, any help on this would be very appreciated.

Many Thanks in advance,
Best Regards,
Carlo




= An example


SPARK prints the following schema:

root
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)



 XML file example

XML file structure:


 
   
 

df = sqlContext.read()
.format("com.databricks.spark.xml")
.option("rowTag", rowTag)
.option("attributePrefix", "_")
.schema(rexploreXMLDataSchema)
.load(localxml);

df.printSchema();

df.select(

df.col("xocs:item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("_original"),

df.col("xocs:item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getItem("_lang"),

df.col("xocs:item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("ce:para")
).show();

}

}


-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


Re: Driver hung and happend out of memory while writing to console progress bar

2017-02-13 Thread Spark User
How much memory have you allocated to the driver? Driver stores some state
for tracking the task, stage and job history that you can see in the spark
console, it does take up a significant portion of the heap, anywhere from
200MB - 1G, depending no your map reduce steps.

Either way that is a good place to start by checking how much memory you
have allocated to the driver.  If it is sufficient , like in the order of
2- 3G + at least, then you will have to take heap dumps of the driver
process periodically and find out what objects grow over time.

On Fri, Feb 10, 2017 at 9:34 AM, Ryan Blue 
wrote:

> This isn't related to the progress bar, it just happened while in that
> section of code. Something else is taking memory in the driver, usually a
> broadcast table or something else that requires a lot of memory and happens
> on the driver.
>
> You should check your driver memory settings and the query plan (if this
> was SparkSQL) for this stage to investigate further.
>
> rb
>
> On Thu, Feb 9, 2017 at 8:41 PM, John Fang 
> wrote:
>
>> the spark version is 2.1.0
>>
>> --
>> 发件人:方孝健(玄弟) 
>> 发送时间:2017年2月10日(星期五) 12:35
>> 收件人:spark-dev ; spark-user 
>> 主 题:Driver hung and happend out of memory while writing to console
>> progress bar
>>
>> [Stage 172:==> (10328 + 93) / 
>> 16144][Stage 172:==> (10329 + 
>> 93) / 16144][Stage 172:==> 
>> (10330 + 93) / 16144][Stage 172:==>  
>>(10331 + 93) / 16144][Stage 172:==>   
>>   (10333 + 92) / 16144][Stage 172:==>
>>  (10333 + 93) / 16144][Stage 172:==> 
>> (10333 + 94) / 16144][Stage 172:==>  
>>(10334 + 94) / 16144][Stage 
>> 172:==> (10338 + 93) / 
>> 16144][Stage 172:==> (10339 + 
>> 92) / 16144][Stage 172:==> 
>> (10340 + 93) / 16144][Stage 172:==>  
>>(10341 + 92) / 16144][Stage 172:==>   
>>   (10341 + 93) / 16144][Stage 172:==>
>>  (10342 + 93) / 16144][Stage 172:==> 
>> (10343 + 93) / 16144][Stage 172:==>  
>>(10344 + 92) / 16144][Stage 
>> 172:==> (10345 + 92) / 
>> 16144][Stage 172:==> (10345 + 
>> 93) / 16144][Stage 172:==> 
>> (10346 + 93) / 16144][Stage 172:==>  
>>(10348 + 92) / 16144][Stage 172:==>   
>>   (10348 + 93) / 16144][Stage 172:==>
>>  (10349 + 92) / 16144][Stage 172:==> 
>> (10349 + 93) / 16144][Stage 172:==>  
>>(10350 + 92) / 16144][Stage 
>> 172:==> (10352 + 92) / 
>> 16144][Stage 172:==> (10353 + 
>> 92) / 16144][Stage 172:==> 
>> (10354 + 92) / 16144][Stage 172:==>  
>>(10355 + 92) / 16144][Stage 172:==>   
>>   (10356 + 92) / 16144][Stage 172:==>
>>  (10356 + 93) / 16144][Stage 172:==> 
>> (10357 + 92) / 16144][Stage 172:==>  
>>(10357 + 93) / 16144][Stage 
>> 172:==> (10358 + 92) / 
>> 16144][Stage 172:==> (10358 + 
>> 93) / 16144][Stage 172:==> 
>> (10359 + 92) / 16144][Stage 172:==>  
>>(10359 + 93) / 16144][Stage 172:==>   
>>   (10359 + 94) / 16144][Stage 172:==>
>>  (10361 + 92) / 16144][Stage 172:==> 
>> (10361 + 93) / 16144][Stage 172:==>  
>>(10362 + 92) / 16144][Stage 
>> 172:==> (10362 + 93) / 
>> 16144][Stage 172:==> (10363 + 
>> 93) / 

Re: Question about best Spark tuning

2017-02-13 Thread Spark User
My take on the 2-3 tasks per CPU core is that you want to ensure you are
utilizing the cores to the max, which means it will help you with scaling
and performance. The question would be why not 1 task per core? The reason
is that you can probably get a good handle on the average execution time
per task but the execution time p90 + can be spiky. In which case you don't
want the long poll task (s) to slow down your entire batch (which is in
general what you would tune your application for). So by having 2-3 tasks
per CPU core, you can further break down the work to smaller chunks hence
completing tasks quicker and let the spark scheduler (which is low cost and
efficient based on my observation, it is never the bottleneck) do the work
of distributing the work among the tasks.
I have experimented with 1 task per core, 2-3 tasks per core and all the
way up to 20+ tasks per core. The performance difference was similar
between 3 tasks per core and 20+ tasks per core. But it does make a
difference in performance when you compare 1  task per core v/s 2-3 tasks
per core.

Hope this explanation makes sense.
Best,
Bharath


On Thu, Feb 9, 2017 at 2:11 PM, Ji Yan  wrote:

> Dear spark users,
>
> From this site https://spark.apache.org/docs/latest/tuning.html where it
> offers recommendation on setting the level of parallelism
>
> Clusters will not be fully utilized unless you set the level of
>> parallelism for each operation high enough. Spark automatically sets the
>> number of “map” tasks to run on each file according to its size (though you
>> can control it through optional parameters to SparkContext.textFile,
>> etc), and for distributed “reduce” operations, such as groupByKey and
>> reduceByKey, it uses the largest parent RDD’s number of partitions. You
>> can pass the level of parallelism as a second argument (see the
>> spark.PairRDDFunctions
>> 
>>  documentation), or set the config property spark.default.parallelism to
>> change the default. *In general, we recommend 2-3 tasks per CPU core in
>> your cluster*.
>
>
> Do people have a general theory/intuition about why it is a good idea to
> have 2-3 tasks running per CPU core?
>
> Thanks
> Ji
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>


Re: Is it better to Use Java or Python on Scala for Spark for using big data sets

2017-02-13 Thread Spark User
Spark has more support for scala, by that I mean more APIs are available
for scala compared to python or Java. Also scala code will be more concise
and easy to read. Java is very verbose.

On Thu, Feb 9, 2017 at 10:21 PM, Irving Duran 
wrote:

> I would say Java, since it will be somewhat similar to Scala.  Now, this
> assumes that you have some app already written in Scala. If you don't, then
> pick the language that you feel most comfortable with.
>
> Thank you,
>
> Irving Duran
>
> On Feb 9, 2017, at 11:59 PM, nancy henry  wrote:
>
> Hi All,
>
> Is it better to Use Java or Python on Scala for Spark coding..
>
> Mainly My work is with getting file data which is in csv format  and I
> have to do some rule checking and rule aggrgeation
>
> and put the final filtered data back to oracle so that real time apps can
> use it..
>


Re: Lost executor 4 Container killed by YARN for exceeding memory limits.

2017-02-13 Thread Thakrar, Jayesh
Nancy,

As your log output indicated, your executor 11 GB memory limit.
While you might want to address the root cause/data volume as suggested by Jon, 
you can do an immediate test by changing your command as follows

spark-shell --master yarn --deploy-mode client --driver-memory 16G 
--num-executors 500 executor-cores 7 --executor-memory 14G

This essentially increases your executor memory from 11 GB to 14 GB.
Note that it will result in a potentially large footprint - from 500x11 to 
500x14 GB.
You may want to consult with your DevOps/Operations/Spark Admin team first.

From: Jon Gregg 
Date: Monday, February 13, 2017 at 8:58 AM
To: nancy henry 
Cc: "user @spark" 
Subject: Re: Lost executor 4 Container killed by YARN for exceeding memory 
limits.

Setting Spark's memoryOverhead configuration variable is recommended in your 
logs, and has helped me with these issues in the past.  Search for 
"memoryOverhead" here:  http://spark.apache.org/docs/latest/running-on-yarn.html

That said, you're running on a huge cluster as it is.  If it's possible to 
filter your tables down before the join (keeping just the rows/columns you 
need), that may be a better solution.

Jon

On Mon, Feb 13, 2017 at 5:27 AM, nancy henry 
> wrote:
Hi All,,

I am getting below error while I am trying to join 3 tables which are in ORC 
format in hive from 5 10gb tables through hive context in spark

Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB physical 
memory used. Consider boosting spark.yarn.executor.memoryOverhead.
17/02/13 02:21:19 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container 
killed by YARN for exceeding memory limits. 11.1 GB of 11 GB physical memory 
used


I am using below memory parameters to launch shell .. what else i could 
increase from these parameters or do I need to change any configuration 
settings please let me know

spark-shell --master yarn --deploy-mode client --driver-memory 16G 
--num-executors 500 executor-cores 7 --executor-memory 10G




Re: Parquet Gzipped Files

2017-02-13 Thread Jörn Franke
Your vendor should use the parquet internal compression and not take a parquet 
file and gzip it.

> On 13 Feb 2017, at 18:48, Benjamin Kim  wrote:
> 
> We are receiving files from an outside vendor who creates a Parquet data file 
> and Gzips it before delivery. Does anyone know how to Gunzip the file in 
> Spark and inject the Parquet data into a DataFrame? I thought using 
> sc.textFile or sc.wholeTextFiles would automatically Gunzip the file, but I’m 
> getting a decompression header error when trying to open the Parquet file.
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Parquet Gzipped Files

2017-02-13 Thread Benjamin Kim
We are receiving files from an outside vendor who creates a Parquet data file 
and Gzips it before delivery. Does anyone know how to Gunzip the file in Spark 
and inject the Parquet data into a DataFrame? I thought using sc.textFile or 
sc.wholeTextFiles would automatically Gunzip the file, but I’m getting a 
decompression header error when trying to open the Parquet file.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: is dataframe thread safe?

2017-02-13 Thread Mark Hamstra
If you update the data, then you don't have the same DataFrame anymore. If
you don't do like Assaf did, caching and forcing evaluation of the
DataFrame before using that DataFrame concurrently, then you'll still get
consistent and correct results, but not necessarily efficient results. If
the fully materialized, cached are not yet available when multiple
concurrent Jobs try to use the DataFrame, then you can end up with more
than one Job doing the same work to generate what needs to go in the cache.
To avoid that kind of work duplication you need some mechanism to ensure
that only one action/Job is run to populate the cache before multiple
actions/Jobs can then use the cached results efficiently.

On Mon, Feb 13, 2017 at 9:15 AM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> How about having a thread that update and cache a dataframe in-memory next
> to other threads requesting this dataframe, is it thread safe ?
>
> 2017-02-13 9:02 GMT+01:00 Reynold Xin :
>
>> Yes your use case should be fine. Multiple threads can transform the same
>> data frame in parallel since they create different data frames.
>>
>>
>> On Sun, Feb 12, 2017 at 9:07 AM Mendelson, Assaf 
>> wrote:
>>
>>> Hi,
>>>
>>> I was wondering if dataframe is considered thread safe. I know the spark
>>> session and spark context are thread safe (and actually have tools to
>>> manage jobs from different threads) but the question is, can I use the same
>>> dataframe in both threads.
>>>
>>> The idea would be to create a dataframe in the main thread and then in
>>> two sub threads do different transformations and actions on it.
>>>
>>> I understand that some things might not be thread safe (e.g. if I
>>> unpersist in one thread it would affect the other. Checkpointing would
>>> cause similar issues), however, I can’t find any documentation as to what
>>> operations (if any) are thread safe.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Assaf.
>>>
>>
>


Re: is dataframe thread safe?

2017-02-13 Thread vincent gromakowski
How about having a thread that update and cache a dataframe in-memory next
to other threads requesting this dataframe, is it thread safe ?

2017-02-13 9:02 GMT+01:00 Reynold Xin :

> Yes your use case should be fine. Multiple threads can transform the same
> data frame in parallel since they create different data frames.
>
>
> On Sun, Feb 12, 2017 at 9:07 AM Mendelson, Assaf 
> wrote:
>
>> Hi,
>>
>> I was wondering if dataframe is considered thread safe. I know the spark
>> session and spark context are thread safe (and actually have tools to
>> manage jobs from different threads) but the question is, can I use the same
>> dataframe in both threads.
>>
>> The idea would be to create a dataframe in the main thread and then in
>> two sub threads do different transformations and actions on it.
>>
>> I understand that some things might not be thread safe (e.g. if I
>> unpersist in one thread it would affect the other. Checkpointing would
>> cause similar issues), however, I can’t find any documentation as to what
>> operations (if any) are thread safe.
>>
>>
>>
>> Thanks,
>>
>> Assaf.
>>
>


Re: Order of rows not preserved after cache + count + coalesce

2017-02-13 Thread Jon Gregg
Spark has a zipWithIndex function for RDDs (
http://stackoverflow.com/a/26081548) that adds an index column right after
you create an RDD, and I believe it preserves order.  Then you can sort it
by the index after the cache step.

I haven't tried this with a Dataframe but this answer seems promising:
http://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex



On Mon, Feb 13, 2017 at 8:34 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> RDDs and DataFrames do not guarantee any specific ordering of data. They
> are like tables in a SQL database. The only way to get a guaranteed
> ordering of rows is to explicitly specify an orderBy() clause in your
> statement. Any ordering you see otherwise is incidental.
> ​
>
> On Mon, Feb 13, 2017 at 7:52 AM David Haglund (external) <
> david.hagl...@husqvarnagroup.com> wrote:
>
>> Hi,
>>
>>
>>
>> I found something that surprised me, I expected the order of the rows to
>> be preserved, so I suspect this might be a bug. The problem is illustrated
>> with the Python example below:
>>
>>
>>
>> In [1]:
>>
>> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>>
>> df.cache()
>>
>> df.count()
>>
>> df.coalesce(2).rdd.glom().collect()
>>
>> Out[1]:
>>
>> [[Row(n=1)], [Row(n=0), Row(n=2)]]
>>
>>
>>
>> Note how n=1 comes before n=0, above.
>>
>>
>>
>>
>>
>> If I remove the cache line I get the rows in the correct order and the
>> same if I use df.rdd.count() instead of df.count(), see examples below:
>>
>>
>>
>> In [2]:
>>
>> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>>
>> df.count()
>>
>> df.coalesce(2).rdd.glom().collect()
>>
>> Out[2]:
>>
>> [[Row(n=0)], [Row(n=1), Row(n=2)]]
>>
>>
>>
>> In [3]:
>>
>> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>>
>> df.cache()
>>
>> df.rdd.count()
>>
>> df.coalesce(2).rdd.glom().collect()
>>
>> Out[3]:
>>
>> [[Row(n=0)], [Row(n=1), Row(n=2)]]
>>
>>
>>
>>
>>
>> I use spark 2.1.0 and pyspark.
>>
>>
>>
>> Regards,
>>
>> /David
>>
>> The information in this email may be confidential and/or legally
>> privileged. It has been sent for the sole use of the intended recipient(s).
>> If you are not an intended recipient, you are strictly prohibited from
>> reading, disclosing, distributing, copying or using this email or any of
>> its contents, in any way whatsoever. If you have received this email in
>> error, please contact the sender by reply email and destroy all copies of
>> the original message. Please also be advised that emails are not a secure
>> form for communication, and may contain errors.
>>
>


[Spark Launcher] How to launch parallel jobs?

2017-02-13 Thread Cosmin Posteuca
Hi,

I think i don't understand enough how to launch jobs.

I have one job which takes 60 seconds to finish. I run it with following
command:

spark-submit --executor-cores 1 \
 --executor-memory 1g \
 --driver-memory 1g \
 --master yarn \
 --deploy-mode cluster \
 --conf spark.dynamicAllocation.enabled=true \
 --conf spark.shuffle.service.enabled=true \
 --conf spark.dynamicAllocation.minExecutors=1 \
 --conf spark.dynamicAllocation.maxExecutors=4 \
 --conf spark.dynamicAllocation.initialExecutors=4 \
 --conf spark.executor.instances=4 \

If i increase number of partitions from code and number of executors
the app will finish faster, which it's ok. But if i increase only
executor-cores the finish time is the same, and i don't understand
why. I expect the time to be lower than initial time.

My second problem is if i launch twice above code i expect that both
jobs to finish in 60 seconds, but this don't happen. Both jobs finish
after 120 seconds and i don't understand why.

I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu
has 2 threads). From what i saw in default EMR configurations, yarn is
set on FIFO(default) mode with CapacityScheduler.

What do you think about this problems?

Thanks,

Cosmin


Re: Lost executor 4 Container killed by YARN for exceeding memory limits.

2017-02-13 Thread Jon Gregg
Setting Spark's memoryOverhead configuration variable is recommended in
your logs, and has helped me with these issues in the past.  Search for
"memoryOverhead" here:
http://spark.apache.org/docs/latest/running-on-yarn.html

That said, you're running on a huge cluster as it is.  If it's possible to
filter your tables down before the join (keeping just the rows/columns you
need), that may be a better solution.

Jon

On Mon, Feb 13, 2017 at 5:27 AM, nancy henry 
wrote:

> Hi All,,
>
> I am getting below error while I am trying to join 3 tables which are in
> ORC format in hive from 5 10gb tables through hive context in spark
>
> Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB
> physical memory used. Consider boosting spark.yarn.executor.
> memoryOverhead.
> 17/02/13 02:21:19 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB
> physical memory used
>
>
> I am using below memory parameters to launch shell .. what else i could
> increase from these parameters or do I need to change any configuration
> settings please let me know
>
> spark-shell --master yarn --deploy-mode client --driver-memory 16G
> --num-executors 500 executor-cores 7 --executor-memory 10G
>
>


Does Spark support heavy duty third party libraries?

2017-02-13 Thread bhayes
I have a rather heavy metal shared library which among other options can also
be accessed via a Java/JNI wrapper JAR (the library itself is written in
C++). This library needs up to 1000 external files which in total can be
larger than 50 GBytes in size. At init time all this data needs to be read
by the library, so it may take some time to initialize. The memory size
needed is also in this range, say 64 GBytes. The library itself would then
be accessed by some mapping function which is passed to Spark. My question
now is whether Spark (or if not, Hadoop) supports such kind of libraries,
specifically:

1. Can it be configured to init this lib only once at startup time of the
cluster or job?

2. Can it be configured to have the library available on only a few nodes
(non uniform cluster)?

3. Can it distribute the library, its connector JAR, its config files and
its data files throughout the cluster? (its main data files could be held in
a cluster file system otherwise)

4. The library can in fact also use multi-threading internally to process an
array of inputs, so the questions here are:
4a.does Spark support passing an array of inputs (Partitioning?)?
4b. Can Spark be made aware of the library-internal multi-threading?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-support-heavy-duty-third-party-libraries-tp28384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to measure IO time in Spark over S3

2017-02-13 Thread Steve Loughran
Hadoop 2.8's s3a does a lot more metrics here, most of which you can find on 
HDP-2.5 if you can grab those JARs. Everything comes out as hadoop JMX metrics, 
also readable & aggregatable through a call to FileSystem.getStorageStatistics


Measuring IO time isn't something picked up, because it's actually hard to 
measure in a multihreaded world: you can't just count up the time seconds spent 
talking to s3, because it can actually make things seem unduly negative. I know 
that as we did try adding up upload times & bytes uploaded to give a metric of 
bandwidth, but it turns out to be fairly misleading. If 10 threads each took 
60s to upload a megabyte of data you could conclude that B/W is 1 MB every 10 
minutes...if they were running in parallel it's a B/W of 10MB per minute: 10x 
as fast.


FWIW the main bottlenecks in s3a perf are


1. time for metadata operations. This is shockingly bad: 
http://steveloughran.blogspot.co.uk/2016/12/how-long-does-filesystemexists-take.html
  As well as code improvements up the stack, you can help here by not having 
deep directory structures for partitioning; prefer wider trees.

2.  cost of re-opening HTTPS connection after forward/backward seek. Hadoop 2.8 
s3a does a lot of work here to improve things, through forward seeks of many KB 
before abort/restart the connectin, and an fadvise=random option for max perf 
on column table storage (ORC, Parquet)

3. how s3a waits until close() before uploading data. The 
fs.s3a.fast.output.enabled=true option boosts this, but it's pretty brittle in 
Hadoop 2.7.x as it uses lots of on-heap storage if code is generating faster 
than upload B/W; 2.8 can use HDD as buffering.

4.  time to commit data. This is an O(data) copy server side, at 6-10 MB/s. 
Needs a committer which doesn't do renames. The 1.6 DirectOutputCommitter did, 
but it couldn't handle failure & retry. Future ones will.


-Steve



From: Gili Nachum 
Sent: 13 February 2017 06:55
To: user@spark.apache.org
Subject: How to measure IO time in Spark over S3

Hi!

How can I tell IO duration for a Spark application doing R/W from S3 (using S3 
as a filesystem sc.textFile("s3a://...")?
I would like to know the % of time doing IO of the overall app execution time.

Gili.


Re: Order of rows not preserved after cache + count + coalesce

2017-02-13 Thread Nicholas Chammas
RDDs and DataFrames do not guarantee any specific ordering of data. They
are like tables in a SQL database. The only way to get a guaranteed
ordering of rows is to explicitly specify an orderBy() clause in your
statement. Any ordering you see otherwise is incidental.
​

On Mon, Feb 13, 2017 at 7:52 AM David Haglund (external) <
david.hagl...@husqvarnagroup.com> wrote:

> Hi,
>
>
>
> I found something that surprised me, I expected the order of the rows to
> be preserved, so I suspect this might be a bug. The problem is illustrated
> with the Python example below:
>
>
>
> In [1]:
>
> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>
> df.cache()
>
> df.count()
>
> df.coalesce(2).rdd.glom().collect()
>
> Out[1]:
>
> [[Row(n=1)], [Row(n=0), Row(n=2)]]
>
>
>
> Note how n=1 comes before n=0, above.
>
>
>
>
>
> If I remove the cache line I get the rows in the correct order and the
> same if I use df.rdd.count() instead of df.count(), see examples below:
>
>
>
> In [2]:
>
> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>
> df.count()
>
> df.coalesce(2).rdd.glom().collect()
>
> Out[2]:
>
> [[Row(n=0)], [Row(n=1), Row(n=2)]]
>
>
>
> In [3]:
>
> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>
> df.cache()
>
> df.rdd.count()
>
> df.coalesce(2).rdd.glom().collect()
>
> Out[3]:
>
> [[Row(n=0)], [Row(n=1), Row(n=2)]]
>
>
>
>
>
> I use spark 2.1.0 and pyspark.
>
>
>
> Regards,
>
> /David
>
> The information in this email may be confidential and/or legally
> privileged. It has been sent for the sole use of the intended recipient(s).
> If you are not an intended recipient, you are strictly prohibited from
> reading, disclosing, distributing, copying or using this email or any of
> its contents, in any way whatsoever. If you have received this email in
> error, please contact the sender by reply email and destroy all copies of
> the original message. Please also be advised that emails are not a secure
> form for communication, and may contain errors.
>


Order of rows not preserved after cache + count + coalesce

2017-02-13 Thread David Haglund (external)
Hi,

I found something that surprised me, I expected the order of the rows to be 
preserved, so I suspect this might be a bug. The problem is illustrated with 
the Python example below:

In [1]:
df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
df.cache()
df.count()
df.coalesce(2).rdd.glom().collect()
Out[1]:
[[Row(n=1)], [Row(n=0), Row(n=2)]]

Note how n=1 comes before n=0, above.


If I remove the cache line I get the rows in the correct order and the same if 
I use df.rdd.count() instead of df.count(), see examples below:

In [2]:
df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
df.count()
df.coalesce(2).rdd.glom().collect()
Out[2]:
[[Row(n=0)], [Row(n=1), Row(n=2)]]

In [3]:
df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
df.cache()
df.rdd.count()
df.coalesce(2).rdd.glom().collect()
Out[3]:
[[Row(n=0)], [Row(n=1), Row(n=2)]]


I use spark 2.1.0 and pyspark.

Regards,
/David

The information in this email may be confidential and/or legally privileged. It 
has been sent for the sole use of the intended recipient(s). If you are not an 
intended recipient, you are strictly prohibited from reading, disclosing, 
distributing, copying or using this email or any of its contents, in any way 
whatsoever. If you have received this email in error, please contact the sender 
by reply email and destroy all copies of the original message. Please also be 
advised that emails are not a secure form for communication, and may contain 
errors.

Lost executor 4 Container killed by YARN for exceeding memory limits.

2017-02-13 Thread nancy henry
Hi All,,

I am getting below error while I am trying to join 3 tables which are in
ORC format in hive from 5 10gb tables through hive context in spark

Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
17/02/13 02:21:19 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB
physical memory used


I am using below memory parameters to launch shell .. what else i could
increase from these parameters or do I need to change any configuration
settings please let me know

spark-shell --master yarn --deploy-mode client --driver-memory 16G
--num-executors 500 executor-cores 7 --executor-memory 10G


Re: Remove dependence on HDFS

2017-02-13 Thread Calvin Jia
Hi Ben,

You can replace HDFS with a number of storage systems since Spark is
compatible with other storage like S3. This would allow you to scale your
compute nodes solely for the purpose of adding compute power and not disk
space. You can deploy Alluxio on your compute nodes to offset the
performance impact of decoupling your compute and storage, as well as unify
multiple storage spaces if you would like to still use HDFS, S3, and/or
other storage solutions in tandem. Here is an article

which describes a similar architecture.

Hope this helps,
Calvin

On Mon, Feb 13, 2017 at 12:46 AM, Saisai Shao 
wrote:

> IIUC Spark doesn't strongly bind to HDFS, it uses a common FileSystem
> layer which supports different FS implementations, HDFS is just one option.
> You could also use S3 as a backend FS, from Spark's point it is transparent
> to different FS implementations.
>
>
>
> On Sun, Feb 12, 2017 at 5:32 PM, ayan guha  wrote:
>
>> How about adding more NFS storage?
>>
>> On Sun, 12 Feb 2017 at 8:14 pm, Sean Owen  wrote:
>>
>>> Data has to live somewhere -- how do you not add storage but store more
>>> data?  Alluxio is not persistent storage, and S3 isn't on your premises.
>>>
>>> On Sun, Feb 12, 2017 at 4:29 AM Benjamin Kim  wrote:
>>>
>>> Has anyone got some advice on how to remove the reliance on HDFS for
>>> storing persistent data. We have an on-premise Spark cluster. It seems like
>>> a waste of resources to keep adding nodes because of a lack of storage
>>> space only. I would rather add more powerful nodes due to the lack of
>>> processing power at a less frequent rate, than add less powerful nodes at a
>>> more frequent rate just to handle the ever growing data. Can anyone point
>>> me in the right direction? Is Alluxio a good solution? S3? I would like to
>>> hear your thoughts.
>>>
>>> Cheers,
>>> Ben
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Remove dependence on HDFS

2017-02-13 Thread Saisai Shao
IIUC Spark doesn't strongly bind to HDFS, it uses a common FileSystem layer
which supports different FS implementations, HDFS is just one option. You
could also use S3 as a backend FS, from Spark's point it is transparent to
different FS implementations.



On Sun, Feb 12, 2017 at 5:32 PM, ayan guha  wrote:

> How about adding more NFS storage?
>
> On Sun, 12 Feb 2017 at 8:14 pm, Sean Owen  wrote:
>
>> Data has to live somewhere -- how do you not add storage but store more
>> data?  Alluxio is not persistent storage, and S3 isn't on your premises.
>>
>> On Sun, Feb 12, 2017 at 4:29 AM Benjamin Kim  wrote:
>>
>> Has anyone got some advice on how to remove the reliance on HDFS for
>> storing persistent data. We have an on-premise Spark cluster. It seems like
>> a waste of resources to keep adding nodes because of a lack of storage
>> space only. I would rather add more powerful nodes due to the lack of
>> processing power at a less frequent rate, than add less powerful nodes at a
>> more frequent rate just to handle the ever growing data. Can anyone point
>> me in the right direction? Is Alluxio a good solution? S3? I would like to
>> hear your thoughts.
>>
>> Cheers,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: is dataframe thread safe?

2017-02-13 Thread Reynold Xin
Yes your use case should be fine. Multiple threads can transform the same
data frame in parallel since they create different data frames.


On Sun, Feb 12, 2017 at 9:07 AM Mendelson, Assaf 
wrote:

> Hi,
>
> I was wondering if dataframe is considered thread safe. I know the spark
> session and spark context are thread safe (and actually have tools to
> manage jobs from different threads) but the question is, can I use the same
> dataframe in both threads.
>
> The idea would be to create a dataframe in the main thread and then in two
> sub threads do different transformations and actions on it.
>
> I understand that some things might not be thread safe (e.g. if I
> unpersist in one thread it would affect the other. Checkpointing would
> cause similar issues), however, I can’t find any documentation as to what
> operations (if any) are thread safe.
>
>
>
> Thanks,
>
> Assaf.
>


Re: is dataframe thread safe?

2017-02-13 Thread 任弘迪
for my understanding, all transformations are thread-safe cause dataframe
is just a description of the calculation and it's immutable, so the case
above is all right. just be careful with the actions.

On Sun, Feb 12, 2017 at 4:06 PM, Mendelson, Assaf 
wrote:

> Hi,
>
> I was wondering if dataframe is considered thread safe. I know the spark
> session and spark context are thread safe (and actually have tools to
> manage jobs from different threads) but the question is, can I use the same
> dataframe in both threads.
>
> The idea would be to create a dataframe in the main thread and then in two
> sub threads do different transformations and actions on it.
>
> I understand that some things might not be thread safe (e.g. if I
> unpersist in one thread it would affect the other. Checkpointing would
> cause similar issues), however, I can’t find any documentation as to what
> operations (if any) are thread safe.
>
>
>
> Thanks,
>
> Assaf.
>