Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-19 Thread vijay.bvp
apologies for the long answer. 

understanding partitioning at each stage of the the RDD graph/lineage is
important for efficient parallelism and having load balanced. This applies
to working with any sources streaming or static. 
you have tricky situation here of one source kafka with 9 partitions and
static data set 90 partitions. 

before joining both these try to have number of partitions equal for both
RDD's
you can either repartition kafka source to 90 partitions or coalesce flat
file RDD to 9 partitions
or midway between 9 and 90. 

in general no of tasks that can run in parallel equal to total no of cores
spark job has (no of executors * no of cores per executor).

As an example
if the flat file has 90 partitions and if you set 4 executors each with 5
cores for a total of 20 cores if you have 20+20+20+20+10 tasks gets
scheduled. as you can see at the last you will have only 10 tasks though you
have 20 cores. 

compare this with 6 executors each with 5 cores for a total of 30 cores,
then it would be
30+30+30. 

ideally no of partitions for each RDD (in the graph lineage) should be a
multiple of total no of available cores for the spark job.

in terms of data locality prefer process-local over node-local over rack
local
as an example 
5 executors with 4 cores and 4 executors with 5 cores each of this option
will have 20 cores in total.
But with 4 executors its less shuffling more process-local/node-local

need to look at RDD graph for this
df = sqlContext.read.parquet(...)
and
RDD rdd = df.as[T].rdd


on your final question, you should be able to tune the static RDD without
external store by carefully looking at each batch RDD lineage for that 30
mins before the RDD gets refreshed again. 

if you would like to use external system Apache Ignite is something that you
can use as cache.

thanks
Vijay








--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



sqoop import job not working when spark thrift server is running.

2018-02-19 Thread akshay naidu
Hello ,

I was trying to optimize my spark cluster. I did it to some extent by doing
some changes in yarn-site.xml and spark-defaults.conf file. before the
changes the mapreduce import job was running fine along with slow thrift
server.
after changes, i have to kill the thrift server to execute my sqoop import
job.

following are the configurations-

*yarn-site.xml*

yarn.nodemanager.resource.pcores-vcores-multiplier
1.0

yarn.nodemanager.vmem-pmem-ratio
5

yarn.nodemanager.resource.cpu-vcores
4

yarn.scheduler.maximum-allocation-vcores
4


*spark-defaults.conf*

spark.master   yarn
spark.driver.memory9g
spark.executor.memory  8570m
spark.yarn.executor.memoryOverhead 646m

spark.executor.instances   11
spark.executor.cores   3
spark.default.parallelism30

SPARK_WORKER_MEMORY 10g
SPARK_WORKER_INSTANCES 1
SPARK_WORKER_CORES 4

SPARK_DRIVER_MEMORY 9g
SPARK_DRIVER_CORES 3

SPARK_MASTER_PORT 7077

SPARK_EXECUTOR_INSTANCES 11
SPARK_EXECUTOR_CORES 3
SPARK_EXECUTOR_MEMORY 8570m


*Resources in cluster of 9 nodes are *
12GB RAM and 6 cores on each nodes.


Thanks for your time.


Re: Does Pyspark Support Graphx?

2018-02-19 Thread xiaobo
When using the --jars option, we should include it every time we submit a job , 
it seems add the jars to the classpath to every slave node a spark is only way 
to "install" spark packages.




-- Original --
From: Nicholas Hakobian 
Date: Tue,Feb 20,2018 3:37 AM
To: xiaobo 
Cc: Denny Lee , user@spark.apache.org 

Subject: Re: Does Pyspark Support Graphx?



If you copy the Jar file and all of the dependencies to the machines, you can 
manually add them to the classpath. If you are using Yarn and HDFS you can 
alternatively use --jars and point it to the hdfs locations of the jar files 
and it will (in most cases) distribute them to the worker nodes at job 
submission time.

Nicholas Szandor Hakobian, Ph.D.Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com














On Sun, Feb 18, 2018 at 7:24 PM, xiaobo  wrote:
Another question is how to install graphframes permanently when the spark nodes 
can not connect to the internet.




-- Original --
From: Denny Lee 
Date: Mon,Feb 19,2018 10:23 AM
To: xiaobo 
Cc: user@spark.apache.org 
Subject: Re: Does Pyspark Support Graphx?



Note the --packages option works for both PySpark and Spark (Scala).  For the 
SparkLauncher class, you should be able to include packages ala:

spark.addSparkArg("--packages", "graphframes:0.5.0-spark2.0-s_2.11")


On Sun, Feb 18, 2018 at 3:30 PM xiaobo  wrote:

Hi Denny,
The pyspark script uses the --packages option to load graphframe library, what 
about the SparkLauncher class? 




-- Original --
From: Denny Lee 
Date: Sun,Feb 18,2018 11:07 AM
To: 94035420 
Cc: user@spark.apache.org 



Subject: Re: Does Pyspark Support Graphx?



That??s correct - you can use GraphFrames though as it does support PySpark.  
On Sat, Feb 17, 2018 at 17:36 94035420  wrote:

I can not find anything for graphx module in the python API document, does it 
mean it is not supported yet?

Re: [graphframes]how Graphframes Deal With BidirectionalRelationships

2018-02-19 Thread xiaobo
So the question comes to does graphframes support bidirectional relationship 
natively with only one edge?




-- Original --
From: Felix Cheung 
Date: Tue,Feb 20,2018 10:01 AM
To: xiaobo , user@spark.apache.org 
Subject: Re: [graphframes]how Graphframes Deal With BidirectionalRelationships



Generally that would be the approach.
But since you have effectively double the number of edges this will likely 
affect the scale your job will run.







From: xiaobo 
Sent: Monday, February 19, 2018 3:22:02 AM
To: user@spark.apache.org
Subject: [graphframes]how Graphframes Deal With Bidirectional Relationships 

Hi,
To represent a bidirectional relationship, one solution is to insert two edges 
for the vertices pair, my question is do the algorithms of graphframes still 
work when we doing this.


Thanks

Errors when running unit tests

2018-02-19 Thread karuppayya
Hi ,
I get errors like below when trying to run the spark unit tests

zipPartitions(test.org.apache.spark.Java8RDDAPISuite)  Time elapsed: 2.212
> sec  <<< ERROR!
> java.lang.IllegalStateException: failed to create a child event loop
> at test.org.apache.spark.Java8RDDAPISuite.setUp(Java8RDDAPISuite.java:54)
> Caused by: io.netty.channel.ChannelException: failed to open a new
> selector
> at test.org.apache.spark.Java8RDDAPISuite.setUp(Java8RDDAPISuite.java:54)
> Caused by: java.io.IOException:* Too many open files*
> at test.org.apache.spark.Java8RDDAPISuite.setUp(Java8RDDAPISuite.java:54)


I am running unit tests from version 2.2.1 with following system config

> $ sysctl fs.file-max
> fs.file-max = 10
> $ ulimit -u
> 65536


Command line used:

>  build/mvn -Phadoop-2.6 -Pyarn -Phive -Phive-thriftserver -Pkinesis-asl
> -Pmesos --fail-at-end test


 Any pointers on this will be helpful

Thanks
Karuppayya


Re: [graphframes]how Graphframes Deal With Bidirectional Relationships

2018-02-19 Thread Felix Cheung
Generally that would be the approach.
But since you have effectively double the number of edges this will likely 
affect the scale your job will run.


From: xiaobo 
Sent: Monday, February 19, 2018 3:22:02 AM
To: user@spark.apache.org
Subject: [graphframes]how Graphframes Deal With Bidirectional Relationships

Hi,
To represent a bidirectional relationship, one solution is to insert two edges 
for the vertices pair, my question is do the algorithms of graphframes still 
work when we doing this.

Thanks



Re: KafkaUtils.createStream(..) is removed for API

2018-02-19 Thread Cody Koeninger
I can't speak for committers, but my guess is it's more likely for
DStreams in general to stop being supported before that particular
integration is removed.

On Sun, Feb 18, 2018 at 9:34 PM, naresh Goud  wrote:
> Thanks Ted.
>
> I see  createDirectStream is experimental as annotated with
> "org.apache.spark.annotation.Experimental".
>
> Is it possible to be this API will be removed in future?  because we wanted
> to use this API in one of our production jobs. afraid if it will not be
> supported in future.
>
> Thank you,
> Naresh
>
>
>
>
> On Sun, Feb 18, 2018 at 7:47 PM, Ted Yu  wrote:
>>
>> createStream() is still in
>> external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
>> But it is not in
>> external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>
>> FYI
>>
>> On Sun, Feb 18, 2018 at 5:17 PM, naresh Goud 
>> wrote:
>>>
>>> Hello Team,
>>>
>>> I see "KafkaUtils.createStream() " method not available in spark 2.2.1.
>>>
>>> Can someone please confirm if these methods are removed?
>>>
>>> below is my pom.xml entries.
>>>
>>>
>>> 
>>>   2.11.8
>>>   2.11
>>> 
>>>
>>>
>>>   
>>>   org.apache.spark
>>>   spark-streaming_${scala.tools.version}
>>>   2.2.1
>>>   provided
>>>   
>>> 
>>>   org.apache.spark
>>>   spark-streaming-kafka-0-10_2.11
>>>   2.2.1
>>>   provided
>>> 
>>> 
>>>   org.apache.spark
>>>   spark-core_2.11
>>>   2.2.1
>>>   provided
>>> 
>>>   
>>>
>>>
>>>
>>>
>>>
>>> Thank you,
>>> Naresh
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does Pyspark Support Graphx?

2018-02-19 Thread Nicholas Hakobian
If you copy the Jar file and all of the dependencies to the machines, you
can manually add them to the classpath. If you are using Yarn and HDFS you
can alternatively use --jars and point it to the hdfs locations of the jar
files and it will (in most cases) distribute them to the worker nodes at
job submission time.


Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Sun, Feb 18, 2018 at 7:24 PM, xiaobo  wrote:

> Another question is how to install graphframes permanently when the spark
> nodes can not connect to the internet.
>
>
>
> -- Original --
> *From:* Denny Lee 
> *Date:* Mon,Feb 19,2018 10:23 AM
> *To:* xiaobo 
> *Cc:* user@spark.apache.org 
> *Subject:* Re: Does Pyspark Support Graphx?
>
> Note the --packages option works for both PySpark and Spark (Scala).  For
> the SparkLauncher class, you should be able to include packages ala:
>
> spark.addSparkArg("--packages", "graphframes:0.5.0-spark2.0-s_2.11")
>
>
> On Sun, Feb 18, 2018 at 3:30 PM xiaobo  wrote:
>
>> Hi Denny,
>> The pyspark script uses the --packages option to load graphframe library,
>> what about the SparkLauncher class?
>>
>>
>>
>> -- Original --
>> *From:* Denny Lee 
>> *Date:* Sun,Feb 18,2018 11:07 AM
>> *To:* 94035420 
>> *Cc:* user@spark.apache.org 
>> *Subject:* Re: Does Pyspark Support Graphx?
>> That’s correct - you can use GraphFrames though as it does support
>> PySpark.
>> On Sat, Feb 17, 2018 at 17:36 94035420  wrote:
>>
>>> I can not find anything for graphx module in the python API document,
>>> does it mean it is not supported yet?
>>>
>>


Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-19 Thread Aleksandar Vitorovic
 Hi Vijay,

Thank you very much for your reply. Setting the number of partitions
explicitly in the join, and memory pressure influence on partitioning were
definitely very good insights.

At the end, we avoid the issue of uneven load balancing completely by doing
the following two:
a) Reducing the number of executors, and increasing the number of cores and
executor memory
b) Increasing the batch interval size from 15s to 30s.

Here is a nice blog post that explains how to improve performance for Spark
jobs in general:
https://mapr.com/blog/performance-tuning-apache-kafkaspark-streaming-system/
.

@Vijay: And here are the responses to your questions:
1) Correct.

2) This is exactly what confuses us: There is nothing between the following
lines:
df = sqlContext.read.parquet(...)
and
RDD rdd = df.as[T].rdd

We saw that a separate query plan is executed on converting DataFrame to
RDD (.rdd method). Is it equivalent to repartition, coalesce or something
else?

3) Exactly.

4) We are caching the static rdd for 30 minutes. That is, we have a trait
with readLast method that returns the last read RDD, and once the RDD is
more than 30 minutes old, we reload its content from disk using  df =
sqlContext.read.parquet(...).

---

My final question is the following: What would be the most efficient way
(including possibly an external key-value store) for efficient store,
update and retrieval of final_rdd? The state may grow beyond 3GB, and we
want to maintain our scalability and latency. In fact, we have many Spark
jobs that join the same RDD with different Kafka streams.

Thank you very much!

On Wed, Jan 31, 2018 at 11:24 AM, vijay.bvp  wrote:

> Summarizing
>
> 1) Static data set read from Parquet files as DataFrame in HDFS has initial
> parallelism of 90 (based on no input files)
>
> 2) static data set DataFrame is converted as rdd, and rdd has parallelism
> of
> 18 this was not expected
> dataframe.rdd is lazy evaluation there must be some operation you were
> doing
> that would have triggered
> conversion from 90 to 18, this would be some operation that breaks
> stage/requires shuffling such as groupby, reduceby, repartition,coalesce
> if you are using coalesce, the second parameter shuff is by default false
> which means upstream parallelism is not preserved.
>
> 3) you have DStream of Kafka source with 9 partitions this is joined with
> above static data set? when joining have you tried setting up numPartitions
> an optional parameter to provide no of partitions required.
>
> 4) your batch interval is 15 seconds but you are caching the static data
> set
> for 30 minutes, what exactly you mean caching for 30 minutes?
>
> Note when you cache data based on the memory pressure there is chance that
> partitioning is not preserved.
>
> it would be useful to provide spark UI screen shots for one complete batch,
> the DAG and other details
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Understand task timing

2018-02-19 Thread Thomas Decaux
Using Spark 1.6.2, I want to understand what « Duration » really mean (and why 
is slow).

Running a simple SELECT COUNT against a parquet file, stored within HDFS:

NODE_LOCAL 1 / DATA02 2018/02/19 09:54:27 5 s 30 ms 8.8 MB (hadoop) / 3010830 8 
ms 77.2 KB / 1666

This means "took 5 secondes to read 8 M from HDFS » ? 

Thomas Decaux

Unsubscribe

2018-02-19 Thread Ryan Myer

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org