Yarn resource utilization with Spark pipe()

2016-11-23 Thread Sameer Choudhary
Hi,

I am working on an Spark 1.6.2 application on YARN managed EMR cluster
that uses RDD's pipe method to process my data. I start a light weight
daemon process that starts processes for each task via pipes. This is
to ensure that I don't run into
https://issues.apache.org/jira/browse/SPARK-671.

I'm running into Spark job failure due to task failures across the
cluster. Following are the questions that I think would help in
understanding the issue:

- How does resource allocation in PySpark work? How does YARN and
SPARK track the memory consumed by python processes launched on the
worker nodes?

- As an example, let's say SPARK started n tasks on a worker node.
These n tasks start n processes via pipe. Memory for executors is
already reserved during application launch. As the processes run their
memory footprint grows and eventually there is not enough memory on
the box. In this case how will YARN and SPARK behave? Will the
executors be killed or my processes will kill, eventually killing the
task? I think this could lead to cascading failures of tasks across
cluster as retry attempts also fail, eventually leading to termination
of SPARK job. Is there a way to avoid this?

- When we define number of executors in my SparkConf, are they
distributed evenly across my nodes? One approach to get around this
problem would be to limit the number of executors on each host that
YARN can launch. So we will manage the memory for piped processes
outside of YARN. Is there way to avoid this?

Thanks,
Sameer

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Invalid log directory running pyspark job

2016-11-23 Thread Stephen Boesch
This problem appears to be a regression on HEAD/master:  when running
against 2.0.2 the pyspark job completes successfully including running
predictions.

2016-11-23 19:36 GMT-08:00 Stephen Boesch :

>
> For a pyspark job with 54 executors all of the task outputs have a single
> line in both the stderr and stdout similar to:
>
> Error: invalid log directory 
> /shared/sparkmaven/work/app-20161119222540-/0/
>
>
> Note: the directory /shared/sparkmaven/work exists and is owned by the
> same user running the job. There are plenty of other app-*** subdirectories
> that do have contents in the stdout/stderr files.
>
>
> $ls -lrta  /shared/sparkmaven/work
> total 0
> drwxr-xr-x  59 steve  staff  2006 Nov 23 05:01 ..
> drwxr-xr-x  41 steve  staff  1394 Nov 23 18:22 app-20161123050122-0002
> drwxr-xr-x   6 steve  staff   204 Nov 23 18:22 app-20161123182031-0005
> drwxr-xr-x   6 steve  staff   204 Nov 23 18:44 app-20161123184349-0006
> drwxr-xr-x   6 steve  staff   204 Nov 23 18:46 app-20161123184613-0007
> drwxr-xr-x   3 steve  staff   102 Nov 23 19:20 app-20161123192048-0008
>
>
>
> Here is a sample of the contents
>
> /shared/sparkmaven/work/app-20161123184613-0007/2:
> total 16
> -rw-r--r--  1 steve  staff 0 Nov 23 18:46 stdout
> drwxr-xr-x  4 steve  staff   136 Nov 23 18:46 .
> -rw-r--r--  1 steve  staff  4830 Nov 23 18:46 stderr
> drwxr-xr-x  6 steve  staff   204 Nov 23 18:46 ..
>
> /shared/sparkmaven/work/app-20161123184613-0007/3:
> total 16
> -rw-r--r--  1 steve  staff 0 Nov 23 18:46 stdout
> drwxr-xr-x  6 steve  staff   204 Nov 23 18:46 ..
> drwxr-xr-x  4 steve  staff   136 Nov 23 18:46 .
> -rw-r--r--  1 steve  staff  4830 Nov 23 18:46 stderr
>
>
> Note also:  the *SparkPI* program does run succesfully - which validates
> the basic spark installation/functionality.
>
>


Invalid log directory running pyspark job

2016-11-23 Thread Stephen Boesch
For a pyspark job with 54 executors all of the task outputs have a single
line in both the stderr and stdout similar to:

Error: invalid log directory /shared/sparkmaven/work/app-20161119222540-/0/


Note: the directory /shared/sparkmaven/work exists and is owned by the same
user running the job. There are plenty of other app-*** subdirectories that
do have contents in the stdout/stderr files.


$ls -lrta  /shared/sparkmaven/work
total 0
drwxr-xr-x  59 steve  staff  2006 Nov 23 05:01 ..
drwxr-xr-x  41 steve  staff  1394 Nov 23 18:22 app-20161123050122-0002
drwxr-xr-x   6 steve  staff   204 Nov 23 18:22 app-20161123182031-0005
drwxr-xr-x   6 steve  staff   204 Nov 23 18:44 app-20161123184349-0006
drwxr-xr-x   6 steve  staff   204 Nov 23 18:46 app-20161123184613-0007
drwxr-xr-x   3 steve  staff   102 Nov 23 19:20 app-20161123192048-0008



Here is a sample of the contents

/shared/sparkmaven/work/app-20161123184613-0007/2:
total 16
-rw-r--r--  1 steve  staff 0 Nov 23 18:46 stdout
drwxr-xr-x  4 steve  staff   136 Nov 23 18:46 .
-rw-r--r--  1 steve  staff  4830 Nov 23 18:46 stderr
drwxr-xr-x  6 steve  staff   204 Nov 23 18:46 ..

/shared/sparkmaven/work/app-20161123184613-0007/3:
total 16
-rw-r--r--  1 steve  staff 0 Nov 23 18:46 stdout
drwxr-xr-x  6 steve  staff   204 Nov 23 18:46 ..
drwxr-xr-x  4 steve  staff   136 Nov 23 18:46 .
-rw-r--r--  1 steve  staff  4830 Nov 23 18:46 stderr


Note also:  the *SparkPI* program does run succesfully - which validates
the basic spark installation/functionality.


Re: GraphX Pregel not update vertex state properly, cause messages loss

2016-11-23 Thread Dale Wang
The problem comes from the inconsistency between graph’s triplet view and
vertex view. The message may not be lost but the message is just not sent
in sendMsgfunction because sendMsg function gets wrong value of srcAttr!

It is not a new bug. I met a similar bug that appeared in version 1.2.1
according to JIAR-6378 
before. I can reproduce that inconsistency bug with a small and simple
program (See that JIRA issue for more details). It seems that in some
situation the triplet view of a Graph object does not update consistently
with vertex view. The GraphX Pregel API heavily relies on
mapReduceTriplets(old)/aggregateMessages(new) API who heavily relies on the
correct behavior of the triplet view of a graph. Thus this bug influences
on behavior of Pregel API.

Though I cannot figure out why the bug appears either, but I suspect that
the bug has some connection with the data type of the vertex property. If
you use *primitive* types such as Double and Long, it is OK. But if you use
some self-defined type with mutable fields such as mutable Map and mutable
ArrayBuffer, the bug appears. In your case I notice that you use JSONObject
as your vertex’s data type. After looking up the definition ofJSONObject,
JSONObject has a java map as its field to store data which is mutable. To
temporarily avoid the bug, you can modify the data type of your vertex
property to avoid any mutable data type by replacing mutable data
collection to immutable data collection provided by Scala and replacing var
field to val field. At least, that suggestion works for me.

Zhaokang Wang
​

2016-11-18 11:47 GMT+08:00 fuz_woo :

> hi,everyone, I encountered a strange problem these days when i'm attempting
> to use the GraphX Pregel interface to implement a simple
> single-source-shortest-path algorithm.
> below is my code:
>
> import com.alibaba.fastjson.JSONObject
> import org.apache.spark.graphx._
>
> import org.apache.spark.{SparkConf, SparkContext}
>
> object PregelTest {
>
>   def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
> JSONObject] = {
>
> def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
>   if ( msg < 0 ) {
> // init message received
> if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
> else attr.put("LENGTH", Integer.MAX_VALUE)
>   } else {
> attr.put("LENGTH", msg+1)
>   }
>   attr
> }
>
> def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
> Iterator[(VertexId, Integer)] = {
>   val len = triplet.srcAttr.getInteger("LENGTH")
>   // send a msg if last hub is reachable
>   if ( len   else Iterator.empty
> }
>
> def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
>   if ( msg1  msg2 ) msg1 else msg2
> }
>
> Pregel(graph, new Integer(-1), 3, EdgeDirection.Out)(vProg, sendMsg,
> mergeMsg)
>   }
>
>   def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName(Pregel Test)
> conf.set(spark.master, local)
> val sc = new SparkContext(conf)
>
> // create a simplest test graph with 3 nodes and 2 edges
> val vertexList = Array(
>   (0.asInstanceOf[VertexId], new JSONObject()),
>   (1.asInstanceOf[VertexId], new JSONObject()),
>   (2.asInstanceOf[VertexId], new JSONObject()))
> val edgeList = Array(
>   Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
> JSONObject()),
>   Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
> JSONObject()))
>
> val vertexRdd = sc.parallelize(vertexList)
> val edgeRdd = sc.parallelize(edgeList)
> val g = Graph[JSONObject, JSONObject](vertexRdd, edgeRdd)
>
> // run test code
> val lpa = run(g)
> lpa
>   }
> }
>
> and after i run the code, I got a incorrect result in which the vertex 2
> has
> a LENGTH label valued Integer.MAX_VALUE>, it seems that the
> messages sent to vertex 2 was lost unexpectedly. I then tracked the
> debugger
> to file Pregel.scala,  where I saw the code:
>
>  file/n28100/%E7%B2%98%E8%B4%B4%E5%9B%BE%E7%89%87.png>
>
> In the first iteration 0, the variable messages in line 138 is
> reconstructed
> , and then recomputed in line 143, in where activeMessages got a value 0,
> which means the messages is lost.
> then I set a breakpoint in line 138, and before its execution I execute an
> expression " g.triplets().collect() " which just collects the updated graph
> data. after I done this and execute the rest code, the messages is no
> longer
> empty and activeMessages got value 1 as expected.
>
> I have tested the code with both spark& 1.4 and 1.6 in scala 2.10,
> and got the same result.
>
> I must say this problem makes me really confused, I've spent almost 2 weeks
> to resolve it and I have no idea how to do it now. If this is not a bug, I
> totally can't understand why just executing a non-disturb 

Is there any api for categorical column statistic ?

2016-11-23 Thread canan chen
DataSet.describe only calculate the statistics for numerical data, but not
for categorical column. R's summary method can also calculate statistical
for numerical data which is very useful for exploratory data analysis. Just
wondering is there any api for categorical column statistics as well or is
there any jira for it ? Thanks


Re: spark.yarn.executor.memoryOverhead

2016-11-23 Thread Saisai Shao
>From my understanding, this memory overhead should include
"spark.memory.offHeap.size", which means off-heap memory size should not be
larger than the overhead memory size when running in yarn.

On Thu, Nov 24, 2016 at 3:01 AM, Koert Kuipers  wrote:

> in YarnAllocator i see that memoryOverhead is by default set to
> math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt,
> MEMORY_OVERHEAD_MIN))
>
> this does not take into account spark.memory.offHeap.size i think. should
> it?
>
> something like:
>
> math.max((MEMORY_OVERHEAD_FACTOR * executorMemory + offHeapMemory).toInt,
> MEMORY_OVERHEAD_MIN))
>


RE: How to expose Spark-Shell in the production?

2016-11-23 Thread Shreya Agarwal
Use Livy out job server to execute spark-shell commands remotely

Sent from my Windows 10 phone

From: kant kodali
Sent: Saturday, November 19, 2016 12:57 AM
To: user @spark
Subject: How to expose Spark-Shell in the production?

How to expose Spark-Shell in the production?

1) Should we expose it on Master Nodes or Executer nodes?
2) Should we simple give access to those machines and Spark-Shell binary? what 
is the recommended way?

Thanks!


Re: Spark driver not reusing HConnection

2016-11-23 Thread Mukesh Jha
Corrosponding HBase bug: https://issues.apache.org/jira/browse/HBASE-12629

On Wed, Nov 23, 2016 at 1:55 PM, Mukesh Jha  wrote:

> The solution is to disable region size caluculation check.
>
> hbase.regionsizecalculator.enable: false
>
> On Sun, Nov 20, 2016 at 9:29 PM, Mukesh Jha 
> wrote:
>
>> Any ideas folks?
>>
>> On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha 
>> wrote:
>>
>>> Hi
>>>
>>> I'm accessing multiple regions (~5k) of an HBase table using spark's
>>> newAPIHadoopRDD. But the driver is trying to calculate the region size
>>> of all the regions.
>>> It is not even reusing the hconnection and creting a new connection for
>>> every request (see below) which is taking lots of time.
>>>
>>> Is there a better approach to do this?
>>>
>>>
>>> 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process
>>> identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble=
>>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
>>> [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client
>>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
>>> hbase28.cloud.com:2181 sessionTimeout=6
>>> watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181,
>>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
>>> [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Opening socket connection to server
>>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
>>> using SASL (unknown error)
>>> [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Socket connection established, initiating
>>> session, client: /10.193.138.145:47891, server:
>>> hbase24.cloud.com/10.193.150.217:2181
>>> [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Session establishment complete on server
>>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95,
>>> negotiated timeout = 6
>>> [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator:
>>> Calculating region sizes for table "message".
>>> [18 Nov 2016 22:25:27,867] [INFO Driver] 
>>> ConnectionManager$HConnectionImplementation:
>>> Closing master protocol: MasterService
>>> [18 Nov 2016 22:25:27,868] [INFO Driver] 
>>> ConnectionManager$HConnectionImplementation:
>>> Closing zookeeper sessionid=0x2564f6f013e0e95
>>> [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session:
>>> 0x2564f6f013e0e95 closed
>>> [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn:
>>> EventThread shut down
>>> [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process
>>> identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble=
>>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
>>> [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client
>>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
>>> hbase28.cloud.com:2181 sessionTimeout=6
>>> watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181,
>>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
>>> [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Opening socket connection to server
>>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
>>> using SASL (unknown error)
>>> [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Socket connection established, initiating
>>> session, client: /10.193.138.145:47894, server:
>>> hbase24.cloud.com/10.193.150.217:2181
>>> [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Session establishment complete on server
>>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97,
>>> negotiated timeout = 6
>>> [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator:
>>> Calculating region sizes for table "message".
>>> 
>>>
>>> --
>>> Thanks & Regards,
>>>
>>> *Mukesh Jha *
>>>
>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> *Mukesh Jha *
>>
>
>
>
> --
>
>
> Thanks & Regards,
>
> *Mukesh Jha *
>



-- 


Thanks & Regards,

*Mukesh Jha *


Re: Spark Shell doesnt seem to use spark workers but Spark Submit does.

2016-11-23 Thread kant kodali
Sorry please ignore this if you like. Looks like the network throughput is
very low but every worker/executor machine is indeed working.

My current incoming Network throughput on each worker machine is about
2.5KB/s (Kilobyte per second) so this needs to go somewhere in 5MB-6MB/s
and that means somehow the table scan to do the count of billion rows in
Cassandra is not being done in parallel.

On Wed, Nov 23, 2016 at 12:45 PM, kant kodali  wrote:

> Hi All,
>
>
> Spark Shell doesnt seem to use spark workers but Spark Submit does. I had
> the workers ips listed under conf/slaves file.
>
> I am trying to count number of rows in Cassandra using spark-shell  so I
> do the following on spark master
>
> val df = spark.sql("SELECT test from hello") // This has about billion rows
>
> scala> df.count
>
> [Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?
>
>  This is taking forever so I checked the I/O, CPU, Network usage using
> dstat, iostat and so on it looks like nothing is going on in worker
> machines but for master I can see it.
>
> I am using spark 2.0.2
>
> Any ideas on what is going on? and how to fix this?
>
> Thanks,
>
> kant
>
>
>


Re: Spark driver not reusing HConnection

2016-11-23 Thread Mukesh Jha
The solution is to disable region size caluculation check.

hbase.regionsizecalculator.enable: false

On Sun, Nov 20, 2016 at 9:29 PM, Mukesh Jha  wrote:

> Any ideas folks?
>
> On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha 
> wrote:
>
>> Hi
>>
>> I'm accessing multiple regions (~5k) of an HBase table using spark's
>> newAPIHadoopRDD. But the driver is trying to calculate the region size
>> of all the regions.
>> It is not even reusing the hconnection and creting a new connection for
>> every request (see below) which is taking lots of time.
>>
>> Is there a better approach to do this?
>>
>>
>> 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process
>> identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble=
>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
>> [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client
>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
>> hbase28.cloud.com:2181 sessionTimeout=6
>> watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181,
>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
>> [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Opening socket connection to server
>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
>> using SASL (unknown error)
>> [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Socket connection established, initiating session, client: /
>> 10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181
>> [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Session establishment complete on server
>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95,
>> negotiated timeout = 6
>> [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator:
>> Calculating region sizes for table "message".
>> [18 Nov 2016 22:25:27,867] [INFO Driver] 
>> ConnectionManager$HConnectionImplementation:
>> Closing master protocol: MasterService
>> [18 Nov 2016 22:25:27,868] [INFO Driver] 
>> ConnectionManager$HConnectionImplementation:
>> Closing zookeeper sessionid=0x2564f6f013e0e95
>> [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session:
>> 0x2564f6f013e0e95 closed
>> [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn:
>> EventThread shut down
>> [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process
>> identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble=
>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
>> [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client
>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
>> hbase28.cloud.com:2181 sessionTimeout=6
>> watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181,
>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
>> [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Opening socket connection to server
>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
>> using SASL (unknown error)
>> [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Socket connection established, initiating session, client: /
>> 10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181
>> [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Session establishment complete on server
>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97,
>> negotiated timeout = 6
>> [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator:
>> Calculating region sizes for table "message".
>> 
>>
>> --
>> Thanks & Regards,
>>
>> *Mukesh Jha *
>>
>
>
>
> --
>
>
> Thanks & Regards,
>
> *Mukesh Jha *
>



-- 


Thanks & Regards,

*Mukesh Jha *


Spark Shell doesnt seem to use spark workers but Spark Submit does.

2016-11-23 Thread kant kodali
Hi All,


Spark Shell doesnt seem to use spark workers but Spark Submit does. I had
the workers ips listed under conf/slaves file.

I am trying to count number of rows in Cassandra using spark-shell  so I do
the following on spark master

val df = spark.sql("SELECT test from hello") // This has about billion rows

scala> df.count

[Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?

 This is taking forever so I checked the I/O, CPU, Network usage using
dstat, iostat and so on it looks like nothing is going on in worker
machines but for master I can see it.

I am using spark 2.0.2

Any ideas on what is going on? and how to fix this?

Thanks,

kant


Mapping KMean trained-data to respective records

2016-11-23 Thread Reth RM
I am using wholeTextFiles api to load bunch of text files and (caching this
object) mapping its text content to tf-idf vectors and then applying kmean
on these vectors.  The k-mean model after training, predicts the clusterId
of trained data by taking list of training data, question is how
to map this with wholeTextFiles object?

Use case
 Input:  Set of text files present in a directory, process text files and
cluster through kmean,
 output : get cluster membership of each text-file, read its file content
that is in wholeTextFiles, and write it to respective clusterId directory.


Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-23 Thread Shixiong(Ryan) Zhu
Scala has not yet resolved this issue. Once they fix and release a new
version, you can just upgrade the Scala version by yourself.

On Tue, Nov 22, 2016 at 10:58 PM, Denis Bolshakov  wrote:

> Hello Zhu,
>
> Thank you very much for such detailed explanation and providing
> workaround, it works fine.
>
> But since the problem is related to scala issue can we expect the fix in
> Spark 2.0? Or it's not a good idea to update such important dependency as
> scala in minor maintenance release?
>
> Kind regards,
> Denis
>
> On 22 November 2016 at 22:13, Shixiong(Ryan) Zhu 
> wrote:
>
>> The workaround is defining the imports and class together using ":paste".
>>
>> On Tue, Nov 22, 2016 at 11:12 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> This relates to a known issue: https://issues.apache.o
>>> rg/jira/browse/SPARK-14146 and https://issues.scala-lang.
>>> org/browse/SI-9799
>>>
>>> On Tue, Nov 22, 2016 at 6:37 AM, dbolshak 
>>> wrote:
>>>
 Hello,

 We have the same issue,

 We use latest release 2.0.2.

 Setup with 1.6.1 works fine.

 Could somebody provide a workaround how to fix that?

 Kind regards,
 Denis



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
 for-Databricks-example-tp28113p28116.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>
>
> --
> //with Best Regards
> --Denis Bolshakov
> e-mail: bolshakov.de...@gmail.com
>


spark.yarn.executor.memoryOverhead

2016-11-23 Thread Koert Kuipers
in YarnAllocator i see that memoryOverhead is by default set to
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt,
MEMORY_OVERHEAD_MIN))

this does not take into account spark.memory.offHeap.size i think. should
it?

something like:

math.max((MEMORY_OVERHEAD_FACTOR * executorMemory + offHeapMemory).toInt,
MEMORY_OVERHEAD_MIN))


spark sql jobs heap memory

2016-11-23 Thread Koert Kuipers
we are testing Dataset/Dataframe jobs instead of RDD jobs. one thing we
keep running into is containers getting killed by yarn. i realize this has
to do with off-heap memory, and the suggestion is to increase
spark.yarn.executor.memoryOverhead.

at times our memoryOverhead is as large as the executor memory (say 4G and
4G).

why is Dataset/Dataframe using so much off heap memory?

we havent changed spark.memory.offHeap.enabled which defaults to false.
should we enable that to get a better handle on this?


Re: how to see Pipeline model information

2016-11-23 Thread Xiaomeng Wan
You can use pipelinemodel.stages(0).asInstanceOf[RandomForestModel]. The
number (0 in example) for stages depends on the order you call setStages.

Shawn

On 23 November 2016 at 10:21, Zhiliang Zhu 
wrote:

>
> Dear All,
>
> I am building model by spark pipeline, and in the pipeline I used Random
> Forest Alg as its stage.
> If I just use Random Forest but not make it by way of pipeline, I could
> see the information about the forest by API as
> rfModel.toDebugString() and rfModel.toString() .
>
> However, while it comes to pipeline, how to check the alg information,
> such as the tree, or the threshold selected by lr etc ...
>
> Thanks in advance~~
>
> zhiliang
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: GraphX Pregel not update vertex state properly, cause messages loss

2016-11-23 Thread rohit13k
Created a JIRA for the same

https://issues.apache.org/jira/browse/SPARK-18568



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: GraphX Pregel not update vertex state properly, cause messages loss

2016-11-23 Thread rohit13k
Hi 

I am facing a similar issue. It's not that the message is getting lost or
something. The vertex 1 attributes changes in super step 1 but when the
sendMsg gets the vertex attribute from the edge triplet in the 2nd superstep
it stills has the old value of vertex 1 and not the latest value. So as per
your code no new msg will be generated in the superstep. I think the bug is
in the replicatedVertexView where the srcAttr and dstAttr of the
edgeTripplet is updated from the latest version of the vertex after each
superstep.

How to get this bug raised? I am struggling to find an exact solution for it
except for recreating the graph after every superstep to reinforce edge
triplets to have the latest value of the vertex. but this is not a good
solution performance wise.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how to see Pipeline model information

2016-11-23 Thread Zhiliang Zhu

Dear All,

I am building model by spark pipeline, and in the pipeline I used Random Forest 
Alg as its stage.
If I just use Random Forest but not make it by way of pipeline, I could see the 
information about the forest by API as
rfModel.toDebugString() and rfModel.toString() .

However, while it comes to pipeline, how to check the alg information, such as 
the tree, or the threshold selected by lr etc ...

Thanks in advance~~

zhiliang


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



subtractByKey modifes values in the source RDD

2016-11-23 Thread Dmitry Dzhus
I'm experiencing a problem with subtractByKey using Spark 2.0.2 with
Scala 2.11.x:

Relevant code:

object Types {
   type ContentId = Int
   type ContentKey = Tuple2[Int, ContentId]
   type InternalContentId = Int
}

val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] =
itemIDMap.map(_.swap).cache()
logger.info(s"Built an inverse map of ${inverseItemIDMap.count()} item
IDs")
logger.info(inverseItemIDMap.collect().mkString("I->E ", "\nI->E ", ""))

val superfluousItems: RDD[(InternalContentId, Int)] = .. .cache()
logger.info(superfluousItems.collect().mkString("SI ", "\nSI ", ""))

val filteredInverseItemIDMap: RDD[(InternalContentId, ContentKey)] = 
  inverseItemIDMap.subtractByKey(superfluousItems).cache() // <<===!!!
logger.info(s"${filteredInverseItemIDMap.count()} items in the filtered
inverse ID mapping")
logger.info(filteredInverseItemIDMap.collect().mkString("F I->E ", "\nF
I->E ", ""))

The operation in question is .subtractByKey. Both RDDs involved are
cached and forced via count() prior to calling subtractByKey, so I
would expect the result to be unaffected by how exactly
superfluousItems is built.

I added debugging output and filtered the resulting logs by relevant
InternalContentId values (829911, 830071). Output:

Built an inverse map of 827354 item IDs
.
.
I->E (829911,(2,1135081))
I->E (830071,(1,2295102))
.
.
748190 items in the training set had less than 28 ratings
SI (829911,3)
.
.
79164 items in the filtered inverse ID mapping
F I->E (830071,(2,1135081))

There's no element with key 830071 in superfluousItems (SI), so it's
not removed from the source RDD. However, its value is for some reason
replaced with the one from key 829911. How could this be? I cannot
reproduce it locally - only when running on a multi-machine cluster.
Is this a bug or I'm missing something?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/subtractByKey-modifes-values-in-the-source-RDD-tp28121.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: CSV to parquet preserving partitioning

2016-11-23 Thread benoitdr
Best solution I've found so far (no shuffling and as many threads as input
dirs) :

Create an rdd of input dirs, with as many partitions as input dirs
Transform it to an rdd of input files (preserving the partitions by dirs)
Flat-map it with a custom csv parser
Convert rdd to dataframe
Write dataframe to parquet table partitioned by dirs

It requires to write his own parser. I could not find a solution to preserve
the partitioning using sc.textfile or the databricks csv parser.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28120.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-23 Thread kant kodali
Hi Michael,

Looks like all from_json functions will require me to pass schema and that
can be little tricky for us but the code below doesn't require me to pass
schema at all.

import org.apache.spark.sql._
val rdd = df2.rdd.map { case Row(j: String) => j }
spark.read.json(rdd).show()


On Tue, Nov 22, 2016 at 2:42 PM, Michael Armbrust 
wrote:

> The first release candidate should be coming out this week. You can
> subscribe to the dev list if you want to follow the release schedule.
>
> On Mon, Nov 21, 2016 at 9:34 PM, kant kodali  wrote:
>
>> Hi Michael,
>>
>> I only see spark 2.0.2 which is what I am using currently. Any idea on
>> when 2.1 will be released?
>>
>> Thanks,
>> kant
>>
>> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust > > wrote:
>>
>>> In Spark 2.1 we've added a from_json
>>> 
>>> function that I think will do what you want.
>>>
>>> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali  wrote:
>>>
 This seem to work

 import org.apache.spark.sql._
 val rdd = df2.rdd.map { case Row(j: String) => j }
 spark.read.json(rdd).show()

 However I wonder if this any inefficiency here ? since I have to apply
 this function for billion rows.


>>>
>>
>