spark core api vs. google cloud dataflow

2016-02-23 Thread lonely Feb
oogle Cloud Dataflow provides distributed dataset which called PCollection,
and syntactic sugar based on PCollection is provided in the form of
"apply". Note that "apply" is different from spark api "map" which passing
each element of the source through a function func. I wonder can spark
support this kind of syntactic sugar, if not, why?


Re: spark core api vs. google cloud dataflow

2016-02-23 Thread Reynold Xin
That's the just transform function in DataFrame

  /**
   * Concise syntax for chaining custom transformations.
   * {{{
   *   def featurize(ds: DataFrame) = ...
   *
   *   df
   * .transform(featurize)
   * .transform(...)
   * }}}
   * @since 1.6.0
   */
  def transform[U](t: DataFrame => DataFrame): DataFrame = t(this)


Note that while this is great for chaining, having *only* this leads to
pretty bad user experience, especially in interactive analysis when it is
not obvious what operations are available.



On Tue, Feb 23, 2016 at 12:16 AM, lonely Feb  wrote:

> oogle Cloud Dataflow provides distributed dataset which called
> PCollection, and syntactic sugar based on PCollection is provided in the
> form of "apply". Note that "apply" is different from spark api "map" which
> passing each element of the source through a function func. I wonder can
> spark support this kind of syntactic sugar, if not, why?
>


Re: Accessing Web UI

2016-02-23 Thread Vasanth Bhat
Hi,
>
>Is there a way to  provide minThreads and maxThreds  for
> Threadpool through jetty.xml  for the jetty that is  used by spark Web
> UI?
>
>  I am hitting an issue  very similar to the  issue  described in
> http://lifelongprogrammer.blogspot.com/2014/10/jetty-insufficient-threads-configured.html
>
>

I am unable to connect to the web UI after I bring up the master daemon
using sbin/start-master.sh   It looks like this is an issue due to the
threadpool settings used by  the embedded in the master daemon.

>
> I am unable to figure out the location where I can provide jetty.xml , so
> that it is picked up by spark master daemon.
>
> Thanks
> Vasanth
>
>
>
> On Mon, Feb 22, 2016 at 7:09 PM, Vasanth Bhat  wrote:
>
>> Thanks a lot robin.
>>
>> Doing a search on Goolge,  seems to indicate that I need to  control the
>> minThreads and maxThreds  for Threadpool through jetty.xml
>>
>> But I am not able to find the jetty.xml in the spark installation.
>>
>> Thanks
>> Vasanth
>>
>>
>> On Mon, Feb 22, 2016 at 5:43 PM, Robin East 
>> wrote:
>>
>>> I suspect that it is related to the warning in your master startup log:
>>>
>>> WARN AbstractConnector: insufficient threads configured
>>>
>>> I don’t know much about that area of Spark - maybe someone else on the
>>> mailing list can comment - but it looks like the embedded Jetty web server
>>> doesn’t have enough resources to do its job
>>>
>>>
>>>
>>>
>>>
>>> On 22 Feb 2016, at 12:04, Vasanth Bhat  wrote:
>>>
>>> The port 4040 is *not* used.  No process is listening on 4040.
>>>
>>> As per the  logs,  8080 is used for WebUI.  The log mentions the below
>>>
>>> 16/02/19 03:07:32 INFO Utils: Successfully started service 'MasterUI' on
>>> port 8080.
>>> 16/02/19 03:07:32 INFO MasterWebUI: Started MasterWebUI at
>>> http://127.0.0.1:8080
>>>
>>> However  I am not able to see any UI,  when I point the browser to
>>> http://127.0.0.1:8080
>>>
>>>
>>> The browser just  hangs.
>>>
>>> Thanks
>>> Vasanth
>>>
>>> On Mon, Feb 22, 2016 at 4:53 PM, Robin East 
>>> wrote:
>>>
 port 4040 is the application UI but I think the OP is looking for the
 UI presented by the Spark master usually this would be 8080





 On 22 Feb 2016, at 11:00, Kayode Odeyemi  wrote:

 Try http://localhost:4040

 On Mon, Feb 22, 2016 at 8:23 AM, Vasanth Bhat 
 wrote:

> Thanks Gourav, Eduardo
>
> I tried  http://localhost:8080  and   http://OAhtvJ5MCA:8080/
>   .  Both cases the forefox just hangs.
>
> Also I tried with lynx text based browser.   I get the message  "HTTP
> request sent; waiting for response."  and it hangs as well.
>
> Is there way to enable debug logs in spark master service, to
> understand what's going wrong?
>
>
> Thanks
> Vasanth
>
>
>
> On Fri, Feb 19, 2016 at 5:46 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> can you please try localhost:8080?
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Feb 19, 2016 at 11:18 AM, vasbhat  wrote:
>>
>>> Hi,
>>>
>>>I have installed the spark1.6 and  trying to start the master
>>> (start-master.sh) and access the webUI.
>>>
>>> I get the following logs on running the start-master.sh
>>>
>>> Spark Command: /usr/jdk/instances/jdk1.8.0/jre/bin/java -cp
>>>
>>> /usr/local/spark-1.6.0-bin-hadoop2.6/conf/:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
>>> -Xms4g -Xmx4g org.apache.spark.deploy.master.Master --ip OAhtvJ5MCA
>>> --port
>>> 7077 --webui-port 8080
>>> 
>>> Using Spark's default log4j profile:
>>> org/apache/spark/log4j-defaults.properties
>>> 16/02/19 03:07:30 INFO Master: Registered signal handlers for [TERM,
>>> HUP,
>>> INT]
>>> 16/02/19 03:07:30 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where
>>> applicable
>>> 16/02/19 03:07:31 INFO SecurityManager: Changing view acls to: sluser
>>> 16/02/19 03:07:31 INFO SecurityManager: Changing modify acls to:
>>> sluser
>>> 16/02/19 03:07:31 INFO SecurityManager: SecurityManager:
>>> authentication
>>> disabled; ui acls disabled; users with view permissions:
>>> Set(sluser); users
>>> with modify permissions: Set(sluser)
>>> 16/02/19 03:07:32 INFO Utils: Successfully started service
>>> 'sparkMaster' on
>>> port 7077.
>>> 16/02/19 03:07:32 INFO Master: Starting Spark master at
>>> spark://OAhtvJ5MCA:7077
>>> 16/02/19 03:07:32 INFO Master: Running Spark version 1.6.0
>>> 16/02/19 03:0

Re: Opening a JIRA for QuantileDiscretizer bug

2016-02-23 Thread Sean Owen
Good catch, though probably very slightly simpler to write

math.min(requiredSamples.toDouble ...

Make sure you're logged in to JIRA maybe. If you have any trouble I'll
open it for you. You can file it as a minor bug against ML.

This is how you open a PR and everything else
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark

On Tue, Feb 23, 2016 at 2:45 AM, Pierson, Oliver C  wrote:
> Hello,
>
>   I've discovered a bug in the QuantileDiscretizer estimator.  Specifically,
> for large DataFrames QuantileDiscretizer will only create one split (i.e.
> two bins).
>
>
> The error happens in lines 113 and 114 of QuantileDiscretizer.scala:
>
>
> val requiredSamples = math.max(numBins * numBins, 1)
>
> val fraction = math.min(requiredSamples / dataset.count(), 1.0)
>
>
> After the first line, requiredSamples is an Int.  Therefore, if
> requiredSamples > dataset.count() then fraction is always 0.0.
>
>
> The problem can be simply fixed by replacing the first with:
>
>
>   val requiredSamples = math.max(numBins * numBins, 1.0)
>
>
> I've implemented this change in my fork and all tests passed (except for
> docker integration, but I think that's another issue).  I'm happy to submit
> a PR if it will ease someone else's workload.  However, I'm unsure of how to
> create a JIRA.  I've created an account on the issue tracker
> (issues.apache.org) but when I try to create an issue it asks me to choose a
> "Service Desk".  Which one should I be choosing?
>
>
> Thanks much,
>
> Oliver Pierson
>
>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Accessing Web UI

2016-02-23 Thread Vasanth Bhat
Hi Gourav,

 The spark version is   spark-1.6.0-bin-hadoop2.6 . The  Java
version is JDK 8.  I have  also tried with JDK 7 also, but the results are
same.

Thanks
Vasanth



On Tue, Feb 23, 2016 at 2:57 PM, Gourav Sengupta 
wrote:

> Hi,
>
> This should really work out of the box, I have tried SPARK installations
> in cluster and stand alone mode in MAC, Debian, Ubuntu boxes without any
> issues.
>
> Can you please let me know which version of SPARK you are using?
>
>
> Regards,
> Gourav
>
> On Tue, Feb 23, 2016 at 9:02 AM, Vasanth Bhat  wrote:
>
>> Hi,
>>
>>Is there a way to  provide minThreads and maxThreds  for
>> Threadpool through jetty.xml  for the jetty that is  used by spark Web
>> UI?
>>
>>  I am hitting an issue  very similar to the  issue  described in
>> http://lifelongprogrammer.blogspot.com/2014/10/jetty-insufficient-threads-configured.html
>>
>>
>> I am unable to figure out the location where I can provide jetty.xml , so
>> that it is picked up by spark master daemon.
>>
>> Thanks
>> Vasanth
>>
>>
>>
>> On Mon, Feb 22, 2016 at 7:09 PM, Vasanth Bhat  wrote:
>>
>>> Thanks a lot robin.
>>>
>>> Doing a search on Goolge,  seems to indicate that I need to  control the
>>> minThreads and maxThreds  for Threadpool through jetty.xml
>>>
>>> But I am not able to find the jetty.xml in the spark installation.
>>>
>>> Thanks
>>> Vasanth
>>>
>>>
>>> On Mon, Feb 22, 2016 at 5:43 PM, Robin East 
>>> wrote:
>>>
 I suspect that it is related to the warning in your master startup log:

 WARN AbstractConnector: insufficient threads configured

 I don’t know much about that area of Spark - maybe someone else on the
 mailing list can comment - but it looks like the embedded Jetty web server
 doesn’t have enough resources to do its job





 On 22 Feb 2016, at 12:04, Vasanth Bhat  wrote:

 The port 4040 is *not* used.  No process is listening on 4040.

 As per the  logs,  8080 is used for WebUI.  The log mentions the below

 16/02/19 03:07:32 INFO Utils: Successfully started service 'MasterUI'
 on port 8080.
 16/02/19 03:07:32 INFO MasterWebUI: Started MasterWebUI at
 http://127.0.0.1:8080

 However  I am not able to see any UI,  when I point the browser to
 http://127.0.0.1:8080


 The browser just  hangs.

 Thanks
 Vasanth

 On Mon, Feb 22, 2016 at 4:53 PM, Robin East 
 wrote:

> port 4040 is the application UI but I think the OP is looking for the
> UI presented by the Spark master usually this would be 8080
>
>
>
>
>
> On 22 Feb 2016, at 11:00, Kayode Odeyemi  wrote:
>
> Try http://localhost:4040
>
> On Mon, Feb 22, 2016 at 8:23 AM, Vasanth Bhat 
> wrote:
>
>> Thanks Gourav, Eduardo
>>
>> I tried  http://localhost:8080  and   http://OAhtvJ5MCA:8080/
>>   .  Both cases the forefox just hangs.
>>
>> Also I tried with lynx text based browser.   I get the message  "HTTP
>> request sent; waiting for response."  and it hangs as well.
>>
>> Is there way to enable debug logs in spark master service, to
>> understand what's going wrong?
>>
>>
>> Thanks
>> Vasanth
>>
>>
>>
>> On Fri, Feb 19, 2016 at 5:46 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> can you please try localhost:8080?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Fri, Feb 19, 2016 at 11:18 AM, vasbhat  wrote:
>>>
 Hi,

I have installed the spark1.6 and  trying to start the master
 (start-master.sh) and access the webUI.

 I get the following logs on running the start-master.sh

 Spark Command: /usr/jdk/instances/jdk1.8.0/jre/bin/java -cp

 /usr/local/spark-1.6.0-bin-hadoop2.6/conf/:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
 -Xms4g -Xmx4g org.apache.spark.deploy.master.Master --ip OAhtvJ5MCA
 --port
 7077 --webui-port 8080
 
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 16/02/19 03:07:30 INFO Master: Registered signal handlers for
 [TERM, HUP,
 INT]
 16/02/19 03:07:30 WARN NativeCodeLoader: Unable to load
 native-hadoop
 library for your platform... using builtin-java classes where
 applicable
 16/02/19 03:07:31 INFO SecurityManager: Changing view acls to:
 sluser
 16/02/19 03:07:31 INFO SecurityManager: Changing modify acls to:
 sluser
 16/02/19 0

ORC file writing hangs in pyspark

2016-02-23 Thread James Barney
I'm trying to write an ORC file after running the FPGrowth algorithm on a
dataset of around just 2GB in size. The algorithm performs well and can
display results if I take(n) the freqItemSets() of the result after
converting that to a DF.

I'm using Spark 1.5.2 on HDP 2.3.4 and Python 3.4.2 on Yarn.

I get the results from querying a Hive table, also ORC format, running a
number of maps, joins, and filters on the data.

When the program attempts to write the files:
result.write.orc('/data/staged/raw_result')
  size_1_buckets.write.orc('/data/staged/size_1_results')
  filter_size_2_buckets.write.orc('/data/staged/size_2_results')

The first path, /data/staged/raw_result, is created with a _temporary
folder, but the data is never written. The job hangs at this point,
apparently indefinitely.

Additionally, no logs are recorded or available for the jobs on the history
server.

What could be the problem?


Modify text in spark-packages

2016-02-23 Thread Sergio Ramírez

Hello,

I have some problems in modifying the description of some of my packages 
in spark-packages.com. However, I haven't been able to change anything. 
I've written to the e-mail direction in charge of managing this page, 
but I got no answer.


Any clue?

Thanks

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Using Encoding to reduce GraphX's static graph memory consumption

2016-02-23 Thread Adnan Haider
Hi
I have created a jira for this issue here.

As
for the pull request, my implementation is based on removing localSrcIds
and storing an array of offsets into localDstIds. I am running into issues
with this method when testing operations which create partitions from
existing edge partitions. The other method for implementing this would be
to use a hashmap from local id to offset/length pairs. This seems to work
fine but there is more storage overhead associated with this since each
source vertex requires space for three integers.

Thanks, Adnan Haider
B.S Candidate, Computer Science
Illinois Institute of Technology
www.adnanhaider.com

On Mon, Feb 22, 2016 at 8:16 AM, Adnan Haider  wrote:

> Yes, sounds good. I can submit the pull request.
> On 22 Feb 2016 00:35, "Reynold Xin"  wrote:
>
>> + Joey
>>
>> We think this is worth doing. Are you interested in submitting a pull
>> request?
>>
>>
>> On Sat, Feb 20, 2016 at 8:05 PM ahaider3  wrote:
>>
>>> Hi,
>>> I have been looking through the GraphX source code, dissecting the reason
>>> for its high memory consumption compared to the on-disk size of the
>>> graph. I
>>> have found that there may be room to reduce the memory footprint of the
>>> graph structures. I think the biggest savings can come from the
>>> localSrcIds
>>> and localDstIds in EdgePartitions.
>>>
>>> In particular, instead of storing both a source and destination local ID
>>> for
>>> each edge, we could store only the destination id. For example after
>>> sorting
>>> edges by global source id, we can map each of the source vertices first
>>> to
>>> local values followed by unmapped global destination ids. This would make
>>> localSrcIds sorted starting from 0 to n, where n is the number of
>>> distinct
>>> global source ids. Then instead of actually storing the local source id
>>> for
>>> each edge, we can store an array of size n, with each element storing an
>>> index into localDstIds.  From my understanding, this would also eliminate
>>> the need for storing an index for indexed scanning, since each element in
>>> localSrcIds would be the start of a cluster. From some extensive testing,
>>> this along with some delta encoding strategies on localDstIds and the
>>> mapping structures can reduce memory consumption of the graph by nearly
>>> half.
>>>
>>> However, I am not entirely sure if there is any reason for storing both
>>> localSrcIds and localDstIds for each edge in terms of integration of
>>> future
>>> functionalities, such as graph mutations. I noticed there was another
>>> post
>>> similar to this one as well, but it had not replies.
>>>
>>> The idea is quite similar to  Netflix graph library
>>>    and would be happy to open
>>> a
>>> jira on this issue with partial improvements. But, I may not be
>>> completely
>>> correct with my thinking!
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Using-Encoding-to-reduce-GraphX-s-static-graph-memory-consumption-tp16373.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>>


Fwd: HANA data access from SPARK

2016-02-23 Thread Dushyant Rajput
Hi,

I am writting a python app to load data from SAP HANA.

dfr = DataFrameReader(sqlContext)
df =
dfr.jdbc(url='jdbc:sap://ip_hana:30015/?user=&password=',table=table)
df.show()

It throws a
​ serialization error​
:

y4j.protocol.Py4JJavaError: An error occurred while calling o59.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
not serializable: java.io.NotSerializableException:
com.sap.db.jdbc.topology.Host
Serialization stack:
- object not serializable (class: com.sap.db.jdbc.topology.Host, value:
:30015)
- writeObject data (class: java.util.ArrayList)
- object (class java.util.ArrayList, [])
- writeObject data (class: java.util.Hashtable)
- field (class:
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1,
name: properties$1, type: class java.util.Properties)
- object (class
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1,
)
- field (class: org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD,
name:
org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$getConnection,
type: interface scala.Function0)
- object (class org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD,
JDBCRDD[5] at showString at NativeMethodAccessorImpl.java:-2)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class
org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency,
org.apache.spark.OneToOneDependency@57931c92)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon,
List(org.apache.spark.OneToOneDependency@57931c92))
- field (class: org.apache.spark.rdd.RDD, name:
org$apache$spark$rdd$RDD$$dependencies_, type: interface
scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[6]
at showString at NativeMethodAccessorImpl.java:-2)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (MapPartitionsRDD[6] at showString at
NativeMethodAccessorImpl.java:-2,))
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:865)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1466)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
at
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
at
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)

Rgds,
Dushyant.


Re: Using Encoding to reduce GraphX's static graph memory consumption

2016-02-23 Thread Joseph E. Gonzalez
Actually another improvement would be to use something like compressed sparse 
row encoding which can be used to store A and A^T relatively efficiently (I 
think using 5 arrays instead of 6).  There is an option to also be more cache 
aware using something like a block compressed sparse row encoding.

Ankur also at some point looked at renumbering the vertex ids on each edge to 
be consecutive with respect to the edge partition.  This is a pretty common 
technique in most graph processing systems but in our early experiments we 
didn’t see much of a gain.   In theory, however this should allow for lower bit 
precision encoding and a more dense representation and eliminate the need to 
use a hash lookup when joining vertices with edges. 

Joey





> On Feb 23, 2016, at 1:13 PM, Adnan Haider  wrote:
> 
> Hi
> I have created a jira for this issue here. 
> 
>  As for the pull request, my implementation is based on removing localSrcIds 
> and storing an array of offsets into localDstIds. I am running into issues 
> with this method when testing operations which create partitions from 
> existing edge partitions. The other method for implementing this would be to 
> use a hashmap from local id to offset/length pairs. This seems to work fine 
> but there is more storage overhead associated with this since each source 
> vertex requires space for three integers. 
> 
> Thanks, Adnan Haider
> B.S Candidate, Computer Science
> Illinois Institute of Technology 
> www.adnanhaider.com 
> 
> On Mon, Feb 22, 2016 at 8:16 AM, Adnan Haider  > wrote:
> Yes, sounds good. I can submit the pull request.
> 
> On 22 Feb 2016 00:35, "Reynold Xin"  > wrote:
> + Joey
> 
> We think this is worth doing. Are you interested in submitting a pull request?
> 
> 
> On Sat, Feb 20, 2016 at 8:05 PM ahaider3  > wrote:
> Hi,
> I have been looking through the GraphX source code, dissecting the reason
> for its high memory consumption compared to the on-disk size of the graph. I
> have found that there may be room to reduce the memory footprint of the
> graph structures. I think the biggest savings can come from the localSrcIds
> and localDstIds in EdgePartitions.
> 
> In particular, instead of storing both a source and destination local ID for
> each edge, we could store only the destination id. For example after sorting
> edges by global source id, we can map each of the source vertices first to
> local values followed by unmapped global destination ids. This would make
> localSrcIds sorted starting from 0 to n, where n is the number of distinct
> global source ids. Then instead of actually storing the local source id for
> each edge, we can store an array of size n, with each element storing an
> index into localDstIds.  From my understanding, this would also eliminate
> the need for storing an index for indexed scanning, since each element in
> localSrcIds would be the start of a cluster. From some extensive testing,
> this along with some delta encoding strategies on localDstIds and the
> mapping structures can reduce memory consumption of the graph by nearly
> half.
> 
> However, I am not entirely sure if there is any reason for storing both
> localSrcIds and localDstIds for each edge in terms of integration of future
> functionalities, such as graph mutations. I noticed there was another post
> similar to this one as well, but it had not replies.
> 
> The idea is quite similar to  Netflix graph library
>  >   and would be happy to open a
> jira on this issue with partial improvements. But, I may not be completely
> correct with my thinking!
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Using-Encoding-to-reduce-GraphX-s-static-graph-memory-consumption-tp16373.html
>  
> 
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: dev-h...@spark.apache.org 
> 
> 
> 



Spark Job on YARN Hogging the entire Cluster resource

2016-02-23 Thread Prabhu Joseph
Hi All,

 A YARN cluster with 352 Nodes (10TB, 3000cores) and has Fair Scheduler
with root queue having 230 queues.

Each Queue is configured with maxResources equal to Total Cluster
Resource. When a Spark job is submitted into a queue A, it is given with
10TB, 3000 cores according to instantaneous Fair Share and it is holding
the entire resource without releasing. After some time, when another job is
submitted into other queue B, it will get the Fair Share 45GB and 13 cores
i.e (10TB,3000 cores)/230 using Preemption. Now if some more jobs are
submitted into queue B, all the jobs in B has to share the 45GB and 13
cores. Whereas the job which is in queue A holds the entire cluster
resource affecting the other jobs.
 This kind of issue often happens when a Spark job submitted first
which holds the entire cluster resource. What is the best way to fix this
issue. Can we make preemption to happen for instantaneous fair share
instead of fair share, will it help.

Note:

1. We do not want to give weight for particular queue. Because all the 240
queues are critical.
2. Changing the queues into nested does not solve the issue.
3. Adding maxResource to queue  won't allow the first job to pick entire
cluster resource, but still configuring the optimal maxResource for 230
queue is difficult and also the first job can't use the entire cluster
resource when the cluster is idle.
4. We do not want to handle it in Spark ApplicationMaster, then we need to
check for other new YARN application type with similar behavior. We want
YARN to control this behavior by killing the resources which is hold by
first job for longer period.


Thanks,
Prabhu Joseph


Re: ORC file writing hangs in pyspark

2016-02-23 Thread Jeff Zhang
Have you checked the live spark UI and yarn app logs ?

On Tue, Feb 23, 2016 at 10:05 PM, James Barney 
wrote:

> I'm trying to write an ORC file after running the FPGrowth algorithm on a
> dataset of around just 2GB in size. The algorithm performs well and can
> display results if I take(n) the freqItemSets() of the result after
> converting that to a DF.
>
> I'm using Spark 1.5.2 on HDP 2.3.4 and Python 3.4.2 on Yarn.
>
> I get the results from querying a Hive table, also ORC format, running a
> number of maps, joins, and filters on the data.
>
> When the program attempts to write the files:
> result.write.orc('/data/staged/raw_result')
>   size_1_buckets.write.orc('/data/staged/size_1_results')
>   filter_size_2_buckets.write.orc('/data/staged/size_2_results')
>
> The first path, /data/staged/raw_result, is created with a _temporary
> folder, but the data is never written. The job hangs at this point,
> apparently indefinitely.
>
> Additionally, no logs are recorded or available for the jobs on the
> history server.
>
> What could be the problem?
>



-- 
Best Regards

Jeff Zhang


Re: ORC file writing hangs in pyspark

2016-02-23 Thread Zhan Zhang
Hi James,

You can try to write with other format, e.g., parquet to see whether it is a 
orc specific issue or more generic issue.

Thanks.

Zhan Zhang

On Feb 23, 2016, at 6:05 AM, James Barney 
mailto:jamesbarne...@gmail.com>> wrote:

I'm trying to write an ORC file after running the FPGrowth algorithm on a 
dataset of around just 2GB in size. The algorithm performs well and can display 
results if I take(n) the freqItemSets() of the result after converting that to 
a DF.

I'm using Spark 1.5.2 on HDP 2.3.4 and Python 3.4.2 on Yarn.

I get the results from querying a Hive table, also ORC format, running a number 
of maps, joins, and filters on the data.

When the program attempts to write the files:
result.write.orc('/data/staged/raw_result')
  size_1_buckets.write.orc('/data/staged/size_1_results')
  filter_size_2_buckets.write.orc('/data/staged/size_2_results')

The first path, /data/staged/raw_result, is created with a _temporary folder, 
but the data is never written. The job hangs at this point, apparently 
indefinitely.

Additionally, no logs are recorded or available for the jobs on the history 
server.

What could be the problem?