Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread kant kodali
I have the following code

I invoke spark-shell as follows

./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
--executor-memory 15G --executor-cores 12 --conf
spark.cassandra.input.split.size_in_mb=67108864

code

scala> val df = spark.sql("SELECT test from hello") // Billion rows in
hello and test column is 1KB

df: org.apache.spark.sql.DataFrame = [test: binary]

scala> df.count

[Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean
precisely.

If I invoke spark-shell as follows

./spark-shell --conf spark.cassandra.connection.host=170.99.99.134

code


val df = spark.sql("SELECT test from hello") // This has about billion
rows

scala> df.count


[Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?


Both of these versions didn't work Spark keeps running forever and I have
been waiting for more than 15 mins and no response. Any ideas on what could
be wrong and how to fix this?

I am using Spark 2.0.2
and spark-cassandra-connector_2.11-2.0.0-M3.jar


Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread Anastasios Zouzias
How fast is Cassandra without Spark on the count operation?

cqsh> SELECT COUNT(*) FROM hello

(this is not equivalent with what you are doing but might help you find the
root of the cause)

On Thu, Nov 24, 2016 at 9:03 AM, kant kodali  wrote:

> I have the following code
>
> I invoke spark-shell as follows
>
> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
> --executor-memory 15G --executor-cores 12 --conf
> spark.cassandra.input.split.size_in_mb=67108864
>
> code
>
> scala> val df = spark.sql("SELECT test from hello") // Billion rows in
> hello and test column is 1KB
>
> df: org.apache.spark.sql.DataFrame = [test: binary]
>
> scala> df.count
>
> [Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean
> precisely.
>
> If I invoke spark-shell as follows
>
> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>
> code
>
>
> val df = spark.sql("SELECT test from hello") // This has about billion
> rows
>
> scala> df.count
>
>
> [Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?
>
>
> Both of these versions didn't work Spark keeps running forever and I have
> been waiting for more than 15 mins and no response. Any ideas on what could
> be wrong and how to fix this?
>
> I am using Spark 2.0.2
> and spark-cassandra-connector_2.11-2.0.0-M3.jar
>
>


-- 
-- Anastasios Zouzias



Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread kant kodali
I would be glad if SELECT COUNT(*) FROM hello can return any value for that
size :) I can say for sure it didn't return anything for 30 mins and I
probably need to build more patience to sit for few more hours after that!
Cassandra recommends to use ColumnFamilyStats using nodetool cfstats which
will give a pretty good estimate but not an accurate value.

On Thu, Nov 24, 2016 at 12:48 AM, Anastasios Zouzias 
wrote:

> How fast is Cassandra without Spark on the count operation?
>
> cqsh> SELECT COUNT(*) FROM hello
>
> (this is not equivalent with what you are doing but might help you find
> the root of the cause)
>
> On Thu, Nov 24, 2016 at 9:03 AM, kant kodali  wrote:
>
>> I have the following code
>>
>> I invoke spark-shell as follows
>>
>> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>> --executor-memory 15G --executor-cores 12 --conf
>> spark.cassandra.input.split.size_in_mb=67108864
>>
>> code
>>
>> scala> val df = spark.sql("SELECT test from hello") // Billion rows
>> in hello and test column is 1KB
>>
>> df: org.apache.spark.sql.DataFrame = [test: binary]
>>
>> scala> df.count
>>
>> [Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean
>> precisely.
>>
>> If I invoke spark-shell as follows
>>
>> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>>
>> code
>>
>>
>> val df = spark.sql("SELECT test from hello") // This has about
>> billion rows
>>
>> scala> df.count
>>
>>
>> [Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?
>>
>>
>> Both of these versions didn't work Spark keeps running forever and I have
>> been waiting for more than 15 mins and no response. Any ideas on what could
>> be wrong and how to fix this?
>>
>> I am using Spark 2.0.2
>> and spark-cassandra-connector_2.11-2.0.0-M3.jar
>>
>>
>
>
> --
> -- Anastasios Zouzias
> 
>


Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread kant kodali
According to this link
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md

I tried the following but it still looks like it is taking forever

sc.cassandraTable(keyspace, table).cassandraCount


On Thu, Nov 24, 2016 at 12:56 AM, kant kodali  wrote:

> I would be glad if SELECT COUNT(*) FROM hello can return any value for
> that size :) I can say for sure it didn't return anything for 30 mins and I
> probably need to build more patience to sit for few more hours after that!
> Cassandra recommends to use ColumnFamilyStats using nodetool cfstats which
> will give a pretty good estimate but not an accurate value.
>
> On Thu, Nov 24, 2016 at 12:48 AM, Anastasios Zouzias 
> wrote:
>
>> How fast is Cassandra without Spark on the count operation?
>>
>> cqsh> SELECT COUNT(*) FROM hello
>>
>> (this is not equivalent with what you are doing but might help you find
>> the root of the cause)
>>
>> On Thu, Nov 24, 2016 at 9:03 AM, kant kodali  wrote:
>>
>>> I have the following code
>>>
>>> I invoke spark-shell as follows
>>>
>>> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>>> --executor-memory 15G --executor-cores 12 --conf
>>> spark.cassandra.input.split.size_in_mb=67108864
>>>
>>> code
>>>
>>> scala> val df = spark.sql("SELECT test from hello") // Billion rows
>>> in hello and test column is 1KB
>>>
>>> df: org.apache.spark.sql.DataFrame = [test: binary]
>>>
>>> scala> df.count
>>>
>>> [Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean
>>> precisely.
>>>
>>> If I invoke spark-shell as follows
>>>
>>> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>>>
>>> code
>>>
>>>
>>> val df = spark.sql("SELECT test from hello") // This has about
>>> billion rows
>>>
>>> scala> df.count
>>>
>>>
>>> [Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?
>>>
>>>
>>> Both of these versions didn't work Spark keeps running forever and I
>>> have been waiting for more than 15 mins and no response. Any ideas on what
>>> could be wrong and how to fix this?
>>>
>>> I am using Spark 2.0.2
>>> and spark-cassandra-connector_2.11-2.0.0-M3.jar
>>>
>>>
>>
>>
>> --
>> -- Anastasios Zouzias
>> 
>>
>
>


PySpark TaskContext

2016-11-24 Thread ofer
Hi,
Is there a way to get in PYSPARK something like TaskContext from a code
running on executor like in scala spark?

If not - how can i know my task id from inside the executors?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PySpark TaskContext

2016-11-24 Thread Holden Karau
Hi,

The TaskContext isn't currently exposed in PySpark but I've been meaning to
look at exposing at least some of TaskContext for parity in PySpark. Is
there a particular use case which you want this for? Would help with
crafting the JIRA :)

Cheers,

Holden :)

On Thu, Nov 24, 2016 at 1:39 AM, ofer  wrote:

> Hi,
> Is there a way to get in PYSPARK something like TaskContext from a code
> running on executor like in scala spark?
>
> If not - how can i know my task id from inside the executors?
>
> Thanks!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Yarn resource utilization with Spark pipe()

2016-11-24 Thread Holden Karau
YARN will kill your processes if the child processes you start via PIPE
consume too much memory, you can configured the amount of memory Spark
leaves aside for other processes besides the JVM in the YARN containers
with spark.yarn.executor.memoryOverhead.

On Wed, Nov 23, 2016 at 10:38 PM, Sameer Choudhary 
wrote:

> Hi,
>
> I am working on an Spark 1.6.2 application on YARN managed EMR cluster
> that uses RDD's pipe method to process my data. I start a light weight
> daemon process that starts processes for each task via pipes. This is
> to ensure that I don't run into
> https://issues.apache.org/jira/browse/SPARK-671.
>
> I'm running into Spark job failure due to task failures across the
> cluster. Following are the questions that I think would help in
> understanding the issue:
>
> - How does resource allocation in PySpark work? How does YARN and
> SPARK track the memory consumed by python processes launched on the
> worker nodes?
>
> - As an example, let's say SPARK started n tasks on a worker node.
> These n tasks start n processes via pipe. Memory for executors is
> already reserved during application launch. As the processes run their
> memory footprint grows and eventually there is not enough memory on
> the box. In this case how will YARN and SPARK behave? Will the
> executors be killed or my processes will kill, eventually killing the
> task? I think this could lead to cascading failures of tasks across
> cluster as retry attempts also fail, eventually leading to termination
> of SPARK job. Is there a way to avoid this?
>
> - When we define number of executors in my SparkConf, are they
> distributed evenly across my nodes? One approach to get around this
> problem would be to limit the number of executors on each host that
> YARN can launch. So we will manage the memory for piped processes
> outside of YARN. Is there way to avoid this?
>
> Thanks,
> Sameer
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: PySpark TaskContext

2016-11-24 Thread Ofer Eliassaf
Since we can't work with log4j in pyspark executors we build our own
logging infrastructure (based on logstash/elastic/kibana).
Would help to have TID in the logs, so we can drill down accordingly.


On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau  wrote:

> Hi,
>
> The TaskContext isn't currently exposed in PySpark but I've been meaning
> to look at exposing at least some of TaskContext for parity in PySpark. Is
> there a particular use case which you want this for? Would help with
> crafting the JIRA :)
>
> Cheers,
>
> Holden :)
>
> On Thu, Nov 24, 2016 at 1:39 AM, ofer  wrote:
>
>> Hi,
>> Is there a way to get in PYSPARK something like TaskContext from a code
>> running on executor like in scala spark?
>>
>> If not - how can i know my task id from inside the executors?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



-- 
Regards,
Ofer Eliassaf


Re: PySpark TaskContext

2016-11-24 Thread Holden Karau
Cool - thanks. I'll circle back with the JIRA number once I've got it
created - will probably take awhile before it lands in a Spark release
(since 2.1 has already branched) but better debugging information for
Python users is certainly important/useful.

On Thu, Nov 24, 2016 at 2:03 AM, Ofer Eliassaf 
wrote:

> Since we can't work with log4j in pyspark executors we build our own
> logging infrastructure (based on logstash/elastic/kibana).
> Would help to have TID in the logs, so we can drill down accordingly.
>
>
> On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau 
> wrote:
>
>> Hi,
>>
>> The TaskContext isn't currently exposed in PySpark but I've been meaning
>> to look at exposing at least some of TaskContext for parity in PySpark. Is
>> there a particular use case which you want this for? Would help with
>> crafting the JIRA :)
>>
>> Cheers,
>>
>> Holden :)
>>
>> On Thu, Nov 24, 2016 at 1:39 AM, ofer  wrote:
>>
>>> Hi,
>>> Is there a way to get in PYSPARK something like TaskContext from a code
>>> running on executor like in scala spark?
>>>
>>> If not - how can i know my task id from inside the executors?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Regards,
> Ofer Eliassaf
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: PySpark TaskContext

2016-11-24 Thread Holden Karau
https://issues.apache.org/jira/browse/SPARK-18576

On Thu, Nov 24, 2016 at 2:05 AM, Holden Karau  wrote:

> Cool - thanks. I'll circle back with the JIRA number once I've got it
> created - will probably take awhile before it lands in a Spark release
> (since 2.1 has already branched) but better debugging information for
> Python users is certainly important/useful.
>
> On Thu, Nov 24, 2016 at 2:03 AM, Ofer Eliassaf 
> wrote:
>
>> Since we can't work with log4j in pyspark executors we build our own
>> logging infrastructure (based on logstash/elastic/kibana).
>> Would help to have TID in the logs, so we can drill down accordingly.
>>
>>
>> On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau 
>> wrote:
>>
>>> Hi,
>>>
>>> The TaskContext isn't currently exposed in PySpark but I've been meaning
>>> to look at exposing at least some of TaskContext for parity in PySpark. Is
>>> there a particular use case which you want this for? Would help with
>>> crafting the JIRA :)
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>> On Thu, Nov 24, 2016 at 1:39 AM, ofer  wrote:
>>>
 Hi,
 Is there a way to get in PYSPARK something like TaskContext from a code
 running on executor like in scala spark?

 If not - how can i know my task id from inside the executors?

 Thanks!



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Regards,
>> Ofer Eliassaf
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: PySpark TaskContext

2016-11-24 Thread Ofer Eliassaf
thank u so much for this! Great to see that u listen to the community.

On Thu, Nov 24, 2016 at 12:10 PM, Holden Karau  wrote:

> https://issues.apache.org/jira/browse/SPARK-18576
>
> On Thu, Nov 24, 2016 at 2:05 AM, Holden Karau 
> wrote:
>
>> Cool - thanks. I'll circle back with the JIRA number once I've got it
>> created - will probably take awhile before it lands in a Spark release
>> (since 2.1 has already branched) but better debugging information for
>> Python users is certainly important/useful.
>>
>> On Thu, Nov 24, 2016 at 2:03 AM, Ofer Eliassaf 
>> wrote:
>>
>>> Since we can't work with log4j in pyspark executors we build our own
>>> logging infrastructure (based on logstash/elastic/kibana).
>>> Would help to have TID in the logs, so we can drill down accordingly.
>>>
>>>
>>> On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau 
>>> wrote:
>>>
 Hi,

 The TaskContext isn't currently exposed in PySpark but I've been
 meaning to look at exposing at least some of TaskContext for parity in
 PySpark. Is there a particular use case which you want this for? Would help
 with crafting the JIRA :)

 Cheers,

 Holden :)

 On Thu, Nov 24, 2016 at 1:39 AM, ofer  wrote:

> Hi,
> Is there a way to get in PYSPARK something like TaskContext from a code
> running on executor like in scala spark?
>
> If not - how can i know my task id from inside the executors?
>
> Thanks!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau

>>>
>>>
>>>
>>> --
>>> Regards,
>>> Ofer Eliassaf
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



-- 
Regards,
Ofer Eliassaf


Re: PySpark TaskContext

2016-11-24 Thread Holden Karau
I love working with the Python community & I've heard similar requests in
the past few months so its good to have a solid reason to try and add this
functionality :)

Just to be clear though I'm not a Spark committer so when I work on stuff
getting in it very much dependent on me finding a committer who shares my
view - but I've over a hundred commits so it happens more often than not :)

On Thu, Nov 24, 2016 at 3:15 AM, Ofer Eliassaf 
wrote:

> thank u so much for this! Great to see that u listen to the community.
>
> On Thu, Nov 24, 2016 at 12:10 PM, Holden Karau 
> wrote:
>
>> https://issues.apache.org/jira/browse/SPARK-18576
>>
>> On Thu, Nov 24, 2016 at 2:05 AM, Holden Karau 
>> wrote:
>>
>>> Cool - thanks. I'll circle back with the JIRA number once I've got it
>>> created - will probably take awhile before it lands in a Spark release
>>> (since 2.1 has already branched) but better debugging information for
>>> Python users is certainly important/useful.
>>>
>>> On Thu, Nov 24, 2016 at 2:03 AM, Ofer Eliassaf 
>>> wrote:
>>>
 Since we can't work with log4j in pyspark executors we build our own
 logging infrastructure (based on logstash/elastic/kibana).
 Would help to have TID in the logs, so we can drill down accordingly.


 On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau 
 wrote:

> Hi,
>
> The TaskContext isn't currently exposed in PySpark but I've been
> meaning to look at exposing at least some of TaskContext for parity in
> PySpark. Is there a particular use case which you want this for? Would 
> help
> with crafting the JIRA :)
>
> Cheers,
>
> Holden :)
>
> On Thu, Nov 24, 2016 at 1:39 AM, ofer  wrote:
>
>> Hi,
>> Is there a way to get in PYSPARK something like TaskContext from a
>> code
>> running on executor like in scala spark?
>>
>> If not - how can i know my task id from inside the executors?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



 --
 Regards,
 Ofer Eliassaf

>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Regards,
> Ofer Eliassaf
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


io.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError:

2016-11-24 Thread Karthik Shyamsunder
Greetings,

I am using Spark 2.0.2 with Scala 2.11.7 and Hadoop 2.7.3.  When I run
spark-submit local mode, I get a netty exception like the following.  The
code runs fine with Spark 1.6.3, Scala 2.10.x and Hadoop 2.7.3.

6/11/24 08:18:24 ERROR server.TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/simple-project_2.11-1.0.jar, byteCount=3662,
body=FileSegmentManagedBuffer{file=/home/hdadmin/Examples/
spark/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar,
offset=0, length=3662}} to /10.0.2.15:33926; closing connection
io.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError:
io.netty.channel.DefaultFileRegion.(Ljava/io/File;JJ)V
at
io.netty.handler.codec.MessageToMessageEncoder.write(
MessageToMessageEncoder.java:107)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(
AbstractChannelHandlerContext.java:658)
at
io.netty.channel.AbstractChannelHandlerContext.write(
AbstractChannelHandlerContext.java:716)
at
io.netty.channel.AbstractChannelHandlerContext.write(
AbstractChannelHandlerContext.java:651)
at
io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:266)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(
AbstractChannelHandlerContext.java:658)
at
io.netty.channel.AbstractChannelHandlerContext.write(
AbstractChannelHandlerContext.java:716)
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(
AbstractChannelHandlerContext.java:706)
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(
AbstractChannelHandlerContext.java:741)
at
io.netty.channel.DefaultChannelPipeline.writeAndFlush(
DefaultChannelPipeline.java:895)
at
io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
at
org.apache.spark.network.server.TransportRequestHandler.respond(
TransportRequestHandler.java:194)
at
org.apache.spark.network.server.TransportRequestHandler.
processStreamRequest(TransportRequestHandler.java:150)
at
org.apache.spark.network.server.TransportRequestHandler.handle(
TransportRequestHandler.java:111)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(
TransportChannelHandler.java:119)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(
TransportChannelHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(
SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:319)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(
IdleStateHandler.java:254)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(
MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:319)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(
TransportFrameDecoder.java:85)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(
DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError:
io.netty.channel.DefaultFileRegion.(Ljava/io/File;JJ)V
at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.convertToNetty(
FileSegmentManagedBuffer.java:133)
at
org.apache.spark.network.protocol.MessageEncoder.
encode(MessageEncoder.java:54)
at
org.apache.spark.network.protocol.MessageEncoder.
encode(MessageEncoder.java:33)
at
io.netty.handler.codec.MessageToMessageEncoder.write(
MessageToMessageEncoder.java:89)
... 35 more
16/11/24 08:18:24 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from /10.0.2.15:54561 is closed


PLEASE ADVISE.

Sincerely,

Karthik


.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError

2016-11-24 Thread kshyamsunder
Greetings,

I am using Spark 2.0.2 with Scala 2.11.7 and Hadoop 2.7.3.  When I run
spark-submit local mode, I get a netty exception like the following.  The
code runs fine with Spark 1.6.3, Scala 2.10.x and Hadoop 2.7.3.

6/11/24 08:18:24 ERROR server.TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/simple-project_2.11-1.0.jar, byteCount=3662,
body=FileSegmentManagedBuffer{file=/home/hdadmin/Examples/spark/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar,
offset=0, length=3662}} to /10.0.2.15:33926; closing connection
io.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError:
io.netty.channel.DefaultFileRegion.(Ljava/io/File;JJ)V
at
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:658)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:716)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:651)
at
io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:266)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:658)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:716)
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
at
io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
at
io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
at
org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:194)
at
org.apache.spark.network.server.TransportRequestHandler.processStreamRequest(TransportRequestHandler.java:150)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:119)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError:
io.netty.channel.DefaultFileRegion.(Ljava/io/File;JJ)V
at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.convertToNetty(FileSegmentManagedBuffer.java:133)
at
org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:54)
at
org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:33)
at
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
... 35 more
16/11/24 08:18:24 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from /10.0.2.15:54561 is closed


PLEASE ADVISE.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/net

Re: Fwd: Spark SQL: ArrayIndexOutofBoundsException

2016-11-24 Thread cossy
drop() function is in scala,an attribute of Array,no in spark



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639p28127.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



multiple Spark Thrift Servers running in the same machine throws org.apache.hadoop.security.AccessControlException

2016-11-24 Thread 谭 成灶
I have two users (etl , dev ) start Spark Thrift Server in the same machine . I 
connected by beeline etl STS to execute a command,and throwed  
org.apache.hadoop.security.AccessControlException.I don't know why is dev user 
perform,not etl.
It is a spark bug?  I am using spark2.0.2

Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
 Permission denied: user=dev, access=EXECUTE, 
inode="/user/hive/warehouse/tb_spark_sts/etl_cycle_id=20161122":etl:supergroup:drwxr-x---,group:etl:rwx,group:oth_dev:rwx,default:user:data_mining:r-x,default:group::rwx,default:group:etl:rwx,default:group:oth_dev:rwx,default:mask::rwx,default:other::---
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkAccessAcl(DefaultAuthorizationProvider.java:335)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:231)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkTraverse(DefaultAuthorizationProvider.java:178)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:137)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6250)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3942)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:811)
at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getFileInfo(AuthorizationProviderProxyClientProtocol.java:502)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:815)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)


Regards.





Re: GraphX Pregel not update vertex state properly, cause messages loss

2016-11-24 Thread 吴 郎
Thank you, Dale, I've realized in what situation this bug would be activated. 
Actually, it seems that any user-defined class with dynamic fields (such Map, 
List...) could not be used as message, or it'll lost in the next supersteps. to 
figure this out, I tried to deep-copy an new message object everytime the 
vertex program runs, and it works till now, though it's obviously not an 
elegant way. 

fuz woo
 
--
致好!
吴   郎

---
国防科大计算机学院

湖南省长沙市开福区 邮编:410073
Email: fuz@qq.com






 




-- Original --
From: "Dale Wang"; 
Date: 2016年11月24日(星期四) 中午11:10
To: "吴 郎"; 
Cc: "user"; 
Subject: Re: GraphX Pregel not update vertex state properly, cause messages loss




The problem comes from the inconsistency between graph’s triplet view  and 
vertex view. The message may not be lost but the message is just not  sent in 
sendMsgfunction because sendMsg function gets wrong value  of srcAttr! 
 
 It is not a new bug. I met a similar bug that appeared in version 1.2.1  
according to  JIAR-6378 before. I  can reproduce that inconsistency bug with a 
small and simple program  (See that JIRA issue for more details). It seems that 
in some situation  the triplet view of a Graph object does not update 
consistently with  vertex view. The GraphX Pregel API heavily relies on  
mapReduceTriplets(old)/aggregateMessages(new) API who heavily relies  on the 
correct behavior of the triplet view of a graph. Thus this bug  influences on 
behavior of Pregel API.
 
 Though I cannot figure out why the bug appears either, but I suspect  that the 
bug has some connection with the data type of the vertex  property. If you use 
primitive types such as Double and Long, it is  OK. But if you use some 
self-defined type with mutable fields such as  mutable Map and mutable 
ArrayBuffer, the bug appears. In your case I  notice that you use JSONObject as 
your vertex’s data type. After  looking up the definition ofJSONObject, 
JSONObject has a java map as  its field to store data which is mutable. To 
temporarily avoid the bug,  you can modify the data type of your vertex 
property to avoid any  mutable data type by replacing mutable data collection 
to immutable data  collection provided by Scala and replacing var field to val 
field.  At least, that suggestion works for me.
 
 Zhaokang Wang
 ​



2016-11-18 11:47 GMT+08:00 fuz_woo :
hi,everyone, I encountered a strange problem these days when i'm attempting
 to use the GraphX Pregel interface to implement a simple
 single-source-shortest-path algorithm.
 below is my code:
 
 import com.alibaba.fastjson.JSONObject
 import org.apache.spark.graphx._
 
 import org.apache.spark.{SparkConf, SparkContext}
 
 object PregelTest {
 
   def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
 JSONObject] = {
 
 def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
   if ( msg < 0 ) {
 // init message received
 if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
 else attr.put("LENGTH", Integer.MAX_VALUE)
   } else {
 attr.put("LENGTH", msg+1)
   }
   attr
 }
 
 def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
 Iterator[(VertexId, Integer)] = {
   val len = triplet.srcAttr.getInteger("LENGTH")
   // send a msg if last hub is reachable
   if ( len, it seems that the
 messages sent to vertex 2 was lost unexpectedly. I then tracked the debugger
 to file Pregel.scala,  where I saw the code:
 
 

 
 In the first iteration 0, the variable messages in line 138 is reconstructed
 , and then recomputed in line 143, in where activeMessages got a value 0,
 which means the messages is lost.
 then I set a breakpoint in line 138, and before its execution I execute an
 expression " g.triplets().collect() " which just collects the updated graph
 data. after I done this and execute the rest code, the messages is no longer
 empty and activeMessages got value 1 as expected.
 
 I have tested the code with both spark&&graphx 1.4 and 1.6 in scala 2.10,
 and got the same result.
 
 I must say this problem makes me really confused, I've spent almost 2 weeks
 to resolve it and I have no idea how to do it now. If this is not a bug, I
 totally can't understand why just executing a non-disturb expression (
 g.triplets().collect(), it just collect the data and do noting computing )
 could changing the essential, it's really ridiculous.
 
 
 
 --
 View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Hive on Spark is not populating correct records

2016-11-24 Thread Vikash Pareek
Hi,

Not sure whether it is right place to discuss this issue.

I am running following Hive query multiple times with execution engine as
Hive on Spark and Hive on MapReduce.

With Hive on Spark: Result (count) were different of every execution.
With Hive on MapReduce: Result (count) were same of every execution.

Seems like Hive on Spark behaving differently in each execution and does not
populating correct result.

Volume of data as follow:
my_table1 (left): 30 million records
my_table2 (right): 85 million records

-- Thanks
Vikash




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-on-Spark-is-not-populating-correct-records-tp28128.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



OS killing Executor due to high (possibly off heap) memory usage

2016-11-24 Thread Aniket Bhatnagar
Hi Spark users

I am running a job that does join of a huge dataset (7 TB+) and the
executors keep crashing randomly, eventually causing the job to crash.
There are no out of memory exceptions in the log and looking at the dmesg
output, it seems like the OS killed the JVM because of high memory usage.
My suspicion is towards off heap usage of executor is causing this as I am
limiting the on heap usage of executor to be 46 GB and each host running
the executor has 60 GB of RAM. After the executor crashes, I can see that
the external shuffle manager
(org.apache.spark.network.server.TransportRequestHandler) logs a lot of
channel closed exceptions in yarn node manager logs. This leads me to
believe that something triggers out of memory during shuffle read. Is there
a configuration to completely disable usage of off heap memory? I have
tried setting spark.shuffle.io.preferDirectBufs=false but the executor is
still getting killed by the same error.

Cluster details:
10 AWS c4.8xlarge hosts
RAM on each host - 60 GB
Number of cores on each host - 36
Additional hard disk on each host - 8 TB

Spark configuration:
dynamic allocation enabled
external shuffle service enabled
spark.driver.memory 1024M
spark.executor.memory 47127M
Spark master yarn-cluster

Sample error in yarn node manager:
2016-11-24 10:34:06,507 ERROR
org.apache.spark.network.server.TransportRequestHandler
(shuffle-server-50): Error sending result
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=919299554123,
chunkIndex=0},
buffer=FileSegmentManagedBuffer{file=/mnt3/yarn/usercache/hadoop/appcache/application_1479898345621_0006/blockmgr-ad5301a9-e1e9-4723-a8c4-9276971b2259/2c/shuffle_3_963_0.data,
offset=0, length=669014456}} to /10.192.108.170:52782; closing connection
java.nio.channels.ClosedChannelException

Error in dmesg:
[799873.309897] Out of memory: Kill process 50001 (java) score 927 or
sacrifice child
[799873.314439] Killed process 50001 (java) total-vm:65652448kB,
anon-rss:57246528kB, file-rss:0kB

Thanks,
Aniket


Re: how to see Pipeline model information

2016-11-24 Thread Zhiliang Zhu
Hi Xiaomeng,
Thanks very much for your comment, which is helpful for me.
However, it seems that here met more issue about XXXClassifier and 
XXXClassificationModel,as the codes below:
...        GBTClassifier gbtModel = new GBTClassifier();        ParamMap[] 
grid = new ParamGridBuilder()            .addGrid(gbtModel.maxIter(), new int[] 
{5})            .addGrid(gbtModel.maxDepth(), new int[] {5})            
.build();
        CrossValidator crossValidator = new CrossValidator()            
.setEstimator(gbtModel) //rfModel            .setEstimatorParamMaps(grid)       
     .setEvaluator(new BinaryClassificationEvaluator())            
.setNumFolds(6);
        Pipeline pipeline = new Pipeline()            .setStages(new 
PipelineStage[] {labelIndexer, vectorSlicer, crossValidator});
        PipelineModel plModel = pipeline.fit(data);        
ArrayList m = new ArrayList ();        
m.add(plModel);        JAVA_SPARK_CONTEXT.parallelize(m, 
1).saveAsObjectFile(this.outputPath + POST_MODEL_PATH);
        Transformer[] stages = plModel.stages();        Transformer cvStage = 
stages[2];        CrossValidator crossV = new 
TR2CVConversion(cvStage).getInstanceOfCrossValidator(); //call self defined 
scala class        Estimator estimator = crossV.getEstimator();
        GBTClassifier gbt = (GBTClassifier)estimator;
//all the above is okay to compile, but it is wrong to compile for next 
line//however, in GBTClassifier seems not much detailed model description to 
get//but by GBTClassificationModel.toString(), we may get the specific trees 
which are just I want
        GBTClassificationModel model = (GBTClassificationModel)get;  //wrong to 
compile


Then how to get the specific trees or forest from the model?Thanks in advance~
Zhiliang







 

On Thursday, November 24, 2016 2:15 AM, Xiaomeng Wan  
wrote:
 

 You can use pipelinemodel.stages(0).asInstanceOf[RandomForestModel]. The 
number (0 in example) for stages depends on the order you call setStages.
Shawn
On 23 November 2016 at 10:21, Zhiliang Zhu  wrote:


Dear All,

I am building model by spark pipeline, and in the pipeline I used Random Forest 
Alg as its stage.
If I just use Random Forest but not make it by way of pipeline, I could see the 
information about the forest by API as
rfModel.toDebugString() and rfModel.toString() .

However, while it comes to pipeline, how to check the alg information, such as 
the tree, or the threshold selected by lr etc ...

Thanks in advance~~

zhiliang


-- -- -
To unsubscribe e-mail: user-unsubscribe@spark.apache. org





   

get specific tree or forest structure from pipeline model

2016-11-24 Thread Zhiliang Zhu
Hi All,
Here want to print the specific tree or forest structure from pipeline model. 
However, it seems that here met more issue about XXXClassifier and 
XXXClassificationModel,
as the codes below:
...        GBTClassifier gbtModel = new GBTClassifier();        ParamMap[] 
grid = new ParamGridBuilder()            .addGrid(gbtModel.maxIter(), new int[] 
{5})            .addGrid(gbtModel.maxDepth(), new int[] {5})            
.build();
        CrossValidator crossValidator = new CrossValidator()            
.setEstimator(gbtModel) //rfModel            .setEstimatorParamMaps(grid)       
     .setEvaluator(new BinaryClassificationEvaluator())            
.setNumFolds(6);
        Pipeline pipeline = new Pipeline()            .setStages(new 
PipelineStage[] {labelIndexer, vectorSlicer, crossValidator});
        PipelineModel plModel = pipeline.fit(data);        
ArrayList m = new ArrayList ();        
m.add(plModel);        JAVA_SPARK_CONTEXT.parallelize(m, 
1).saveAsObjectFile(this.outputPath + POST_MODEL_PATH);
        Transformer[] stages = plModel.stages();        Transformer cvStage = 
stages[2];        CrossValidator crossV = new 
TR2CVConversion(cvStage).getInstanceOfCrossValidator(); //call self defined 
scala class        Estimator estimator = crossV.getEstimator();
        GBTClassifier gbt = (GBTClassifier)estimator;
//all the above is okay to compile, but it is wrong to compile for next 
line//however, in GBTClassifier seems not much detailed model description to 
get//but by GBTClassificationModel.toString(), we may get the specific trees 
which are just I want
        GBTClassificationModel model = (GBTClassificationModel)get;  //wrong to 
compile


Then how to get the specific trees or forest from the model?Thanks in advance~
Zhiliang

Re: get specific tree or forest structure from pipeline model

2016-11-24 Thread Zhiliang Zhu
scala codes are also for me, if there is some solution .

 

On Friday, November 25, 2016 1:27 AM, Zhiliang Zhu 
 wrote:
 

 Hi All,
Here want to print the specific tree or forest structure from pipeline model. 
However, it seems that here met more issue about XXXClassifier and 
XXXClassificationModel,
as the codes below:
...        GBTClassifier gbtModel = new GBTClassifier();        ParamMap[] 
grid = new ParamGridBuilder()            .addGrid(gbtModel.maxIter(), new int[] 
{5})            .addGrid(gbtModel.maxDepth(), new int[] {5})            
.build();
        CrossValidator crossValidator = new CrossValidator()            
.setEstimator(gbtModel) //rfModel            .setEstimatorParamMaps(grid)       
     .setEvaluator(new BinaryClassificationEvaluator())            
.setNumFolds(6);
        Pipeline pipeline = new Pipeline()            .setStages(new 
PipelineStage[] {labelIndexer, vectorSlicer, crossValidator});
        PipelineModel plModel = pipeline.fit(data);        
ArrayList m = new ArrayList ();        
m.add(plModel);        JAVA_SPARK_CONTEXT.parallelize(m, 
1).saveAsObjectFile(this.outputPath + POST_MODEL_PATH);
        Transformer[] stages = plModel.stages();        Transformer cvStage = 
stages[2];        CrossValidator crossV = new 
TR2CVConversion(cvStage).getInstanceOfCrossValidator(); //call self defined 
scala class        Estimator estimator = crossV.getEstimator();
        GBTClassifier gbt = (GBTClassifier)estimator;
//all the above is okay to compile, but it is wrong to compile for next 
line//however, in GBTClassifier seems not much detailed model description to 
get//but by GBTClassificationModel.toString(), we may get the specific trees 
which are just I want
        GBTClassificationModel model = (GBTClassificationModel)get;  //wrong to 
compile


Then how to get the specific trees or forest from the model?Thanks in advance~
Zhiliang

   

Re: how to see Pipeline model information

2016-11-24 Thread Xiaomeng Wan
here is the scala code I use to get the best model, I never used java

val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(new
 RegressionEvaluator).setEstimatorParamMaps(paramGrid)

val cvModel = cv.fit(data)

val plmodel = cvModel.bestModel.asInstanceOf[PipelineModel]

val lrModel = plmodel.stages(0).asInstanceOf[LinearRegressionModel]

On 24 November 2016 at 10:23, Zhiliang Zhu  wrote:

> Hi Xiaomeng,
>
> Thanks very much for your comment, which is helpful for me.
>
> However, it seems that here met more issue about XXXClassifier and
> XXXClassificationModel,
> as the codes below:
>
> ...
> GBTClassifier gbtModel = new GBTClassifier();
> ParamMap[] grid = new ParamGridBuilder()
> .addGrid(gbtModel.maxIter(), new int[] {5})
> .addGrid(gbtModel.maxDepth(), new int[] {5})
> .build();
>
> CrossValidator crossValidator = new CrossValidator()
> .setEstimator(gbtModel) //rfModel
> .setEstimatorParamMaps(grid)
> .setEvaluator(new BinaryClassificationEvaluator())
> .setNumFolds(6);
>
> Pipeline pipeline = new Pipeline()
> .setStages(new PipelineStage[] {labelIndexer, vectorSlicer,
> crossValidator});
>
> PipelineModel plModel = pipeline.fit(data);
> ArrayList m = new ArrayList ();
> m.add(plModel);
> JAVA_SPARK_CONTEXT.parallelize(m, 1).saveAsObjectFile(this.outputPath
> + POST_MODEL_PATH);
>
> Transformer[] stages = plModel.stages();
> Transformer cvStage = stages[2];
> CrossValidator crossV = new 
> TR2CVConversion(cvStage).getInstanceOfCrossValidator();
> //call self defined scala class
> Estimator estimator = crossV.getEstimator();
>
> GBTClassifier gbt = (GBTClassifier)estimator;
>
> //all the above is okay to compile, but it is wrong to compile for next
> line
> //however, in GBTClassifier seems not much detailed model description to
> get
> //but by GBTClassificationModel.toString(), we may get the specific trees
> which are just I want
>
> GBTClassificationModel model = (GBTClassificationModel)get;  //wrong
> to compile
>
>
> Then how to get the specific trees or forest from the model?
> Thanks in advance~
>
> Zhiliang
>
>
>
>
>
>
>
>
>
>
> On Thursday, November 24, 2016 2:15 AM, Xiaomeng Wan 
> wrote:
>
>
> You can use pipelinemodel.stages(0).asInstanceOf[RandomForestModel]. The
> number (0 in example) for stages depends on the order you call setStages.
>
> Shawn
>
> On 23 November 2016 at 10:21, Zhiliang Zhu 
> wrote:
>
>
> Dear All,
>
> I am building model by spark pipeline, and in the pipeline I used Random
> Forest Alg as its stage.
> If I just use Random Forest but not make it by way of pipeline, I could
> see the information about the forest by API as
> rfModel.toDebugString() and rfModel.toString() .
>
> However, while it comes to pipeline, how to check the alg information,
> such as the tree, or the threshold selected by lr etc ...
>
> Thanks in advance~~
>
> zhiliang
>
>
> -- -- -
> To unsubscribe e-mail: user-unsubscribe@spark.apache. org
> 
>
>
>
>
>


Re: covert local tsv file to orc file on distributed cloud storage(openstack).

2016-11-24 Thread vr spark
Hi, The source file i have is on local machine and its pretty huge like 150
gb.  How to go about it?

On Sun, Nov 20, 2016 at 8:52 AM, Steve Loughran 
wrote:

>
> On 19 Nov 2016, at 17:21, vr spark  wrote:
>
> Hi,
> I am looking for scala or python code samples to covert local tsv file to
> orc file and store on distributed cloud storage(openstack).
>
> So, need these 3 samples. Please suggest.
>
> 1. read tsv
> 2. convert to orc
> 3. store on distributed cloud storage
>
>
> thanks
> VR
>
>
> all options, 9 lines of code, assuming a spark context has already been
> setup with the permissions to write to AWS, and the relevant JARs for S3A
> to work on the CP. The read operation is inefficient as to determine the
> schema it scans the (here, remote) file twice; that may be OK for an
> example, but I wouldn't do that in production. The source is a real file
> belonging to amazon; dest a bucket of mine.
>
> More details at: http://www.slideshare.net/steve_l/apache-spark-and-
> object-stores
>
>
> val csvdata = spark.read.options(Map(
>   "header" -> "true",
>   "ignoreLeadingWhiteSpace" -> "true",
>   "ignoreTrailingWhiteSpace" -> "true",
>   "timestampFormat" -> "-MM-dd HH:mm:ss.SSSZZZ",
>   "inferSchema" -> "true",
>   "mode" -> "FAILFAST"))
> .csv("s3a://landsat-pds/scene_list.gz")
> csvdata.write.mode("overwrite").orc("s3a://hwdev-stevel-demo2/landsatOrc")
>


Re: spark sql jobs heap memory

2016-11-24 Thread Rohit Karlupia
Dataset/dataframes will use direct/raw/off-heap memory in the most
efficient columnar fashion. Trying to fit the same amount of data in heap
memory would likely increase your memory requirement and decrease the
speed.

So, in short, don't worry about it and increase overhead. You can also set
a bound on off heap memory via some option.

thanks,
rohitk

On Thu, Nov 24, 2016 at 12:23 AM, Koert Kuipers  wrote:

> we are testing Dataset/Dataframe jobs instead of RDD jobs. one thing we
> keep running into is containers getting killed by yarn. i realize this has
> to do with off-heap memory, and the suggestion is to increase
> spark.yarn.executor.memoryOverhead.
>
> at times our memoryOverhead is as large as the executor memory (say 4G and
> 4G).
>
> why is Dataset/Dataframe using so much off heap memory?
>
> we havent changed spark.memory.offHeap.enabled which defaults to false.
> should we enable that to get a better handle on this?
>


Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread kant kodali
Take a look at this https://github.com/brianmhess/cassandra-count

Now It is just matter of incorporating it into spark-cassandra-connector I
guess.

On Thu, Nov 24, 2016 at 1:01 AM, kant kodali  wrote:

> According to this link https://github.com/datastax/
> spark-cassandra-connector/blob/master/doc/3_selection.md
>
> I tried the following but it still looks like it is taking forever
>
> sc.cassandraTable(keyspace, table).cassandraCount
>
>
> On Thu, Nov 24, 2016 at 12:56 AM, kant kodali  wrote:
>
>> I would be glad if SELECT COUNT(*) FROM hello can return any value for
>> that size :) I can say for sure it didn't return anything for 30 mins and I
>> probably need to build more patience to sit for few more hours after that!
>> Cassandra recommends to use ColumnFamilyStats using nodetool cfstats which
>> will give a pretty good estimate but not an accurate value.
>>
>> On Thu, Nov 24, 2016 at 12:48 AM, Anastasios Zouzias 
>> wrote:
>>
>>> How fast is Cassandra without Spark on the count operation?
>>>
>>> cqsh> SELECT COUNT(*) FROM hello
>>>
>>> (this is not equivalent with what you are doing but might help you find
>>> the root of the cause)
>>>
>>> On Thu, Nov 24, 2016 at 9:03 AM, kant kodali  wrote:
>>>
 I have the following code

 I invoke spark-shell as follows

 ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
 --executor-memory 15G --executor-cores 12 --conf
 spark.cassandra.input.split.size_in_mb=67108864

 code

 scala> val df = spark.sql("SELECT test from hello") // Billion rows
 in hello and test column is 1KB

 df: org.apache.spark.sql.DataFrame = [test: binary]

 scala> df.count

 [Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean
 precisely.

 If I invoke spark-shell as follows

 ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134

 code


 val df = spark.sql("SELECT test from hello") // This has about
 billion rows

 scala> df.count


 [Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?


 Both of these versions didn't work Spark keeps running forever and I
 have been waiting for more than 15 mins and no response. Any ideas on what
 could be wrong and how to fix this?

 I am using Spark 2.0.2
 and spark-cassandra-connector_2.11-2.0.0-M3.jar


>>>
>>>
>>> --
>>> -- Anastasios Zouzias
>>> 
>>>
>>
>>
>


Re: OS killing Executor due to high (possibly off heap) memory usage

2016-11-24 Thread Rodrick Brown
Try setting spark.yarn.executor.memoryOverhead 1

On Thu, Nov 24, 2016 at 11:16 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Hi Spark users
>
> I am running a job that does join of a huge dataset (7 TB+) and the
> executors keep crashing randomly, eventually causing the job to crash.
> There are no out of memory exceptions in the log and looking at the dmesg
> output, it seems like the OS killed the JVM because of high memory usage.
> My suspicion is towards off heap usage of executor is causing this as I am
> limiting the on heap usage of executor to be 46 GB and each host running
> the executor has 60 GB of RAM. After the executor crashes, I can see that
> the external shuffle manager 
> (org.apache.spark.network.server.TransportRequestHandler)
> logs a lot of channel closed exceptions in yarn node manager logs. This
> leads me to believe that something triggers out of memory during shuffle
> read. Is there a configuration to completely disable usage of off heap
> memory? I have tried setting spark.shuffle.io.preferDirectBufs=false but
> the executor is still getting killed by the same error.
>
> Cluster details:
> 10 AWS c4.8xlarge hosts
> RAM on each host - 60 GB
> Number of cores on each host - 36
> Additional hard disk on each host - 8 TB
>
> Spark configuration:
> dynamic allocation enabled
> external shuffle service enabled
> spark.driver.memory 1024M
> spark.executor.memory 47127M
> Spark master yarn-cluster
>
> Sample error in yarn node manager:
> 2016-11-24 10:34:06,507 ERROR 
> org.apache.spark.network.server.TransportRequestHandler
> (shuffle-server-50): Error sending result ChunkFetchSuccess{
> streamChunkId=StreamChunkId{streamId=919299554123, chunkIndex=0}, buffer=
> FileSegmentManagedBuffer{file=/mnt3/yarn/usercache/hadoop/
> appcache/application_1479898345621_0006/blockmgr-ad5301a9-e1e9-4723-a8c4-
> 9276971b2259/2c/shuffle_3_963_0.data, offset=0, length=669014456}} to /
> 10.192.108.170:52782; closing connection
> java.nio.channels.ClosedChannelException
>
> Error in dmesg:
> [799873.309897] Out of memory: Kill process 50001 (java) score 927 or
> sacrifice child
> [799873.314439] Killed process 50001 (java) total-vm:65652448kB,
> anon-rss:57246528kB, file-rss:0kB
>
> Thanks,
> Aniket
>



-- 

[image: Orchard Platform] 

*Rodrick Brown */ *DevOPs*

9174456839 / rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.


Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread kant kodali
some accurate numbers here. so it took me 1hr:30 mins to count  698705723
rows (~700 Million)

and my code is just this

sc.cassandraTable("cuneiform", "blocks").cassandraCount



On Thu, Nov 24, 2016 at 10:48 AM, kant kodali  wrote:

> Take a look at this https://github.com/brianmhess/cassandra-count
>
> Now It is just matter of incorporating it into spark-cassandra-connector I
> guess.
>
> On Thu, Nov 24, 2016 at 1:01 AM, kant kodali  wrote:
>
>> According to this link https://github.com/datastax/sp
>> ark-cassandra-connector/blob/master/doc/3_selection.md
>>
>> I tried the following but it still looks like it is taking forever
>>
>> sc.cassandraTable(keyspace, table).cassandraCount
>>
>>
>> On Thu, Nov 24, 2016 at 12:56 AM, kant kodali  wrote:
>>
>>> I would be glad if SELECT COUNT(*) FROM hello can return any value for
>>> that size :) I can say for sure it didn't return anything for 30 mins and I
>>> probably need to build more patience to sit for few more hours after that!
>>> Cassandra recommends to use ColumnFamilyStats using nodetool cfstats which
>>> will give a pretty good estimate but not an accurate value.
>>>
>>> On Thu, Nov 24, 2016 at 12:48 AM, Anastasios Zouzias 
>>> wrote:
>>>
 How fast is Cassandra without Spark on the count operation?

 cqsh> SELECT COUNT(*) FROM hello

 (this is not equivalent with what you are doing but might help you find
 the root of the cause)

 On Thu, Nov 24, 2016 at 9:03 AM, kant kodali 
 wrote:

> I have the following code
>
> I invoke spark-shell as follows
>
> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
> --executor-memory 15G --executor-cores 12 --conf
> spark.cassandra.input.split.size_in_mb=67108864
>
> code
>
> scala> val df = spark.sql("SELECT test from hello") // Billion
> rows in hello and test column is 1KB
>
> df: org.apache.spark.sql.DataFrame = [test: binary]
>
> scala> df.count
>
> [Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean
> precisely.
>
> If I invoke spark-shell as follows
>
> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>
> code
>
>
> val df = spark.sql("SELECT test from hello") // This has about
> billion rows
>
> scala> df.count
>
>
> [Stage 0:=>  (686 + 2) / 24686] // What are these numbers
> precisely?
>
>
> Both of these versions didn't work Spark keeps running forever and I
> have been waiting for more than 15 mins and no response. Any ideas on what
> could be wrong and how to fix this?
>
> I am using Spark 2.0.2
> and spark-cassandra-connector_2.11-2.0.0-M3.jar
>
>


 --
 -- Anastasios Zouzias
 

>>>
>>>
>>
>


[no subject]

2016-11-24 Thread Rostyslav Sotnychenko


Kryo Exception: NegativeArraySizeException

2016-11-24 Thread Pedro Tuero
Hi, I'm trying to broadcast a map of 2.6GB but I'm getting a weird Kryo
exception.

I tried to set -XX:hashCode=0 in executor and driver, following this
copmment:
https://github.com/broadinstitute/gatk/issues/1524#issuecomment-189368808
But it didn't change anything.

Are you aware of this problem?
Is there a workaround?

Thank for yuor comments,
Pedro.

Map info:
 INFO 2016-11-24 15:29:34,230 [main] (Logging.scala:54) - Block broadcast_3
stored as values in memory (estimated size 2.6 GB, free 5.7 GB)

Error Trace:
ERROR ApplicationMaster: User class threw exception:
com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
...
com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
...
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
at
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387)
at
org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:646)
at com.personal.sparkJob.main(sparkJob..java:81)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
Caused by: java.lang.NegativeArraySizeException
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:447)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:245)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:246)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:41)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:658)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:623)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
... 22 more


Re: Yarn resource utilization with Spark pipe()

2016-11-24 Thread Sameer Choudhary
Ok, that makes sense for processes directly launched via fork or exec from
the task.

However, in my case the nd that starts docker daemon starts the new
process. This process runs in a docker container. Will the container use
memory from YARN executor memory overhead, as well? How will YARN know that
the container launched by the docker daemon is linked to an executor?

Best,
Sameer

On Thu, Nov 24, 2016 at 1:59 AM Holden Karau  wrote:

> YARN will kill your processes if the child processes you start via PIPE
> consume too much memory, you can configured the amount of memory Spark
> leaves aside for other processes besides the JVM in the YARN containers
> with spark.yarn.executor.memoryOverhead.
>
> On Wed, Nov 23, 2016 at 10:38 PM, Sameer Choudhary 
> wrote:
>
> Hi,
>
> I am working on an Spark 1.6.2 application on YARN managed EMR cluster
> that uses RDD's pipe method to process my data. I start a light weight
> daemon process that starts processes for each task via pipes. This is
> to ensure that I don't run into
> https://issues.apache.org/jira/browse/SPARK-671.
>
> I'm running into Spark job failure due to task failures across the
> cluster. Following are the questions that I think would help in
> understanding the issue:
>
> - How does resource allocation in PySpark work? How does YARN and
> SPARK track the memory consumed by python processes launched on the
> worker nodes?
>
> - As an example, let's say SPARK started n tasks on a worker node.
> These n tasks start n processes via pipe. Memory for executors is
> already reserved during application launch. As the processes run their
> memory footprint grows and eventually there is not enough memory on
> the box. In this case how will YARN and SPARK behave? Will the
> executors be killed or my processes will kill, eventually killing the
> task? I think this could lead to cascading failures of tasks across
> cluster as retry attempts also fail, eventually leading to termination
> of SPARK job. Is there a way to avoid this?
>
> - When we define number of executors in my SparkConf, are they
> distributed evenly across my nodes? One approach to get around this
> problem would be to limit the number of executors on each host that
> YARN can launch. So we will manage the memory for piped processes
> outside of YARN. Is there way to avoid this?
>
> Thanks,
> Sameer
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: Yarn resource utilization with Spark pipe()

2016-11-24 Thread Holden Karau
So if the process your communicating with from Spark isn't launched inside
of its YARN container then it shouldn't be an issue - although it sounds
like you maybe have multiple resource managers on the same machine which
can sometimes lead to interesting/difficult states.

On Thu, Nov 24, 2016 at 1:27 PM, Sameer Choudhary 
wrote:

> Ok, that makes sense for processes directly launched via fork or exec from
> the task.
>
> However, in my case the nd that starts docker daemon starts the new
> process. This process runs in a docker container. Will the container use
> memory from YARN executor memory overhead, as well? How will YARN know that
> the container launched by the docker daemon is linked to an executor?
>
> Best,
> Sameer
>
> On Thu, Nov 24, 2016 at 1:59 AM Holden Karau  wrote:
>
>> YARN will kill your processes if the child processes you start via PIPE
>> consume too much memory, you can configured the amount of memory Spark
>> leaves aside for other processes besides the JVM in the YARN containers
>> with spark.yarn.executor.memoryOverhead.
>>
>> On Wed, Nov 23, 2016 at 10:38 PM, Sameer Choudhary 
>> wrote:
>>
>> Hi,
>>
>> I am working on an Spark 1.6.2 application on YARN managed EMR cluster
>> that uses RDD's pipe method to process my data. I start a light weight
>> daemon process that starts processes for each task via pipes. This is
>> to ensure that I don't run into
>> https://issues.apache.org/jira/browse/SPARK-671.
>>
>> I'm running into Spark job failure due to task failures across the
>> cluster. Following are the questions that I think would help in
>> understanding the issue:
>>
>> - How does resource allocation in PySpark work? How does YARN and
>> SPARK track the memory consumed by python processes launched on the
>> worker nodes?
>>
>> - As an example, let's say SPARK started n tasks on a worker node.
>> These n tasks start n processes via pipe. Memory for executors is
>> already reserved during application launch. As the processes run their
>> memory footprint grows and eventually there is not enough memory on
>> the box. In this case how will YARN and SPARK behave? Will the
>> executors be killed or my processes will kill, eventually killing the
>> task? I think this could lead to cascading failures of tasks across
>> cluster as retry attempts also fail, eventually leading to termination
>> of SPARK job. Is there a way to avoid this?
>>
>> - When we define number of executors in my SparkConf, are they
>> distributed evenly across my nodes? One approach to get around this
>> problem would be to limit the number of executors on each host that
>> YARN can launch. So we will manage the memory for piped processes
>> outside of YARN. Is there way to avoid this?
>>
>> Thanks,
>> Sameer
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Yarn resource utilization with Spark pipe()

2016-11-24 Thread Sameer Choudhary
I the above setup my executors start one docker container per task. Some of
these containers grow in memory as data is piped. Eventually there is not
enough memory on the machine for docker containers to run (since YARN
already started its containers), and everything starts failing.

The way I'm planning is solve this is by reducing the memory available for
YARN to manage by overriding EMR's default configuration. So, if my machine
has 264 GB of memory, I'll give 150GB to YARN to run Spark, and rest will
be for the docker containers. By default, YARN manages about 220GB of
memory for my instance type.

The only problem is this is very wasteful. Especially, if I want to have a
long running cluster where many users can run Spark jobs simultaneously. I
am eagerly waiting for YARN-3611 issue to be resolved.

Best,
Sameer

On Thu, Nov 24, 2016 at 1:30 PM Holden Karau  wrote:

> So if the process your communicating with from Spark isn't launched inside
> of its YARN container then it shouldn't be an issue - although it sounds
> like you maybe have multiple resource managers on the same machine which
> can sometimes lead to interesting/difficult states.
>
> On Thu, Nov 24, 2016 at 1:27 PM, Sameer Choudhary 
> wrote:
>
> Ok, that makes sense for processes directly launched via fork or exec from
> the task.
>
> However, in my case the nd that starts docker daemon starts the new
> process. This process runs in a docker container. Will the container use
> memory from YARN executor memory overhead, as well? How will YARN know that
> the container launched by the docker daemon is linked to an executor?
>
> Best,
> Sameer
>
> On Thu, Nov 24, 2016 at 1:59 AM Holden Karau  wrote:
>
> YARN will kill your processes if the child processes you start via PIPE
> consume too much memory, you can configured the amount of memory Spark
> leaves aside for other processes besides the JVM in the YARN containers
> with spark.yarn.executor.memoryOverhead.
>
> On Wed, Nov 23, 2016 at 10:38 PM, Sameer Choudhary 
> wrote:
>
> Hi,
>
> I am working on an Spark 1.6.2 application on YARN managed EMR cluster
> that uses RDD's pipe method to process my data. I start a light weight
> daemon process that starts processes for each task via pipes. This is
> to ensure that I don't run into
> https://issues.apache.org/jira/browse/SPARK-671.
>
> I'm running into Spark job failure due to task failures across the
> cluster. Following are the questions that I think would help in
> understanding the issue:
>
> - How does resource allocation in PySpark work? How does YARN and
> SPARK track the memory consumed by python processes launched on the
> worker nodes?
>
> - As an example, let's say SPARK started n tasks on a worker node.
> These n tasks start n processes via pipe. Memory for executors is
> already reserved during application launch. As the processes run their
> memory footprint grows and eventually there is not enough memory on
> the box. In this case how will YARN and SPARK behave? Will the
> executors be killed or my processes will kill, eventually killing the
> task? I think this could lead to cascading failures of tasks across
> cluster as retry attempts also fail, eventually leading to termination
> of SPARK job. Is there a way to avoid this?
>
> - When we define number of executors in my SparkConf, are they
> distributed evenly across my nodes? One approach to get around this
> problem would be to limit the number of executors on each host that
> YARN can launch. So we will manage the memory for piped processes
> outside of YARN. Is there way to avoid this?
>
> Thanks,
> Sameer
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread Jörn Franke
I am not sure what use case you want to demonstrate with select count in 
general. Maybe you can elaborate more what your use case is.

Aside from this: this is a Cassandra issue. What is the setup of Cassandra? 
Dedicated nodes? How many? Replication strategy? Consistency configuration? How 
is the data spread on nodes?
Cassandra is more for use cases where you have a lot of data, but select only a 
subset from it or where you have a lot of single writes. 

If you want to analyze it you have to export it once to parquet, orc etc and 
then run queries on it. Depending on your use case you may want to go for that 
on hive2+tez+ldap or spark.

> On 24 Nov 2016, at 20:52, kant kodali  wrote:
> 
> some accurate numbers here. so it took me 1hr:30 mins to count  698705723 
> rows (~700 Million)
> 
> and my code is just this 
> 
> sc.cassandraTable("cuneiform", "blocks").cassandraCount
> 
> 
> 
>> On Thu, Nov 24, 2016 at 10:48 AM, kant kodali  wrote:
>> Take a look at this https://github.com/brianmhess/cassandra-count
>> 
>> Now It is just matter of incorporating it into spark-cassandra-connector I 
>> guess.
>> 
>>> On Thu, Nov 24, 2016 at 1:01 AM, kant kodali  wrote:
>>> According to this link 
>>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
>>> 
>>> I tried the following but it still looks like it is taking forever
>>> 
>>> sc.cassandraTable(keyspace, table).cassandraCount
>>> 
 On Thu, Nov 24, 2016 at 12:56 AM, kant kodali  wrote:
 I would be glad if SELECT COUNT(*) FROM hello can return any value for 
 that size :) I can say for sure it didn't return anything for 30 mins and 
 I probably need to build more patience to sit for few more hours after 
 that! Cassandra recommends to use ColumnFamilyStats using nodetool cfstats 
 which will give a pretty good estimate but not an accurate value.
 
> On Thu, Nov 24, 2016 at 12:48 AM, Anastasios Zouzias  
> wrote:
> How fast is Cassandra without Spark on the count operation?
> 
> cqsh> SELECT COUNT(*) FROM hello
> 
> (this is not equivalent with what you are doing but might help you find 
> the root of the cause)
> 
>> On Thu, Nov 24, 2016 at 9:03 AM, kant kodali  wrote:
>> I have the following code
>> 
>> I invoke spark-shell as follows
>> 
>> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134 
>> --executor-memory 15G --executor-cores 12 --conf 
>> spark.cassandra.input.split.size_in_mb=67108864
>> 
>> code
>> 
>> scala> val df = spark.sql("SELECT test from hello") // Billion rows 
>> in hello and test column is 1KB
>> 
>> df: org.apache.spark.sql.DataFrame = [test: binary]
>> 
>> scala> df.count
>> 
>> [Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean 
>> precisely.
>> 
>> If I invoke spark-shell as follows
>> 
>> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>> 
>> code
>> 
>> 
>> val df = spark.sql("SELECT test from hello") // This has about 
>> billion rows
>> 
>> scala> df.count
>> 
>> 
>> [Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?
>> 
>> 
>> Both of these versions didn't work Spark keeps running forever and I 
>> have been waiting for more than 15 mins and no response. Any ideas on 
>> what could be wrong and how to fix this?
>> 
>> I am using Spark 2.0.2
>> and spark-cassandra-connector_2.11-2.0.0-M3.jar
>> 
> 
> 
> 
> -- 
> -- Anastasios Zouzias
 
>>> 
>> 
> 


Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread kant kodali
We have a 8 node Cassandra Cluster. Replication Strategy: 3 Consistency
Level Quorum. Data Spread: I can let you know once I get access to our
production cluster.

The use case for simple count is more for internal use than say end
clients/customers however there are many uses cases from customers which
require a table scan.

I believe count can be faster although my spark nodes and Cassandra nodes
are not collocated and it can be in done in many ways. Here are couple of
ways I can think of

1) Since Cassandra cluster is all about token ranges. There should be a way
to break billion records into smaller subsets/token ranges and do a count
on each individual subset in parallel and finally sum them up.

2) I do think Ideas like whole stage code generation can be implemented on
the Cassandra server side (or in general for most databases). underneath
this is nothing but reading lines from a file so If my file has 1B rows and
each line is 1KB which means it is like reading 1TB and so I don't expect
it to take 1hr 30 mins. This is only if you read all columns but since
Cassandra is columnar database you could simply read one column in which
case the data size would be about 250 GB so it should be lot faster.


We have several uses cases where we need real time ad hoc querying. And
this would require table scans/going through all the records and so on.

Thanks







On Thu, Nov 24, 2016 at 1:44 PM, Jörn Franke  wrote:

> I am not sure what use case you want to demonstrate with select count in
> general. Maybe you can elaborate more what your use case is.
>
> Aside from this: this is a Cassandra issue. What is the setup of
> Cassandra? Dedicated nodes? How many? Replication strategy? Consistency
> configuration? How is the data spread on nodes?
> Cassandra is more for use cases where you have a lot of data, but select
> only a subset from it or where you have a lot of single writes.
>
> If you want to analyze it you have to export it once to parquet, orc etc
> and then run queries on it. Depending on your use case you may want to go
> for that on hive2+tez+ldap or spark.
>
> On 24 Nov 2016, at 20:52, kant kodali  wrote:
>
> some accurate numbers here. so it took me 1hr:30 mins to count  698705723
> rows (~700 Million)
>
> and my code is just this
>
> sc.cassandraTable("cuneiform", "blocks").cassandraCount
>
>
>
> On Thu, Nov 24, 2016 at 10:48 AM, kant kodali  wrote:
>
>> Take a look at this https://github.com/brianmhess/cassandra-count
>>
>> Now It is just matter of incorporating it into spark-cassandra-connector
>> I guess.
>>
>> On Thu, Nov 24, 2016 at 1:01 AM, kant kodali  wrote:
>>
>>> According to this link https://github.com/datastax/sp
>>> ark-cassandra-connector/blob/master/doc/3_selection.md
>>>
>>> I tried the following but it still looks like it is taking forever
>>>
>>> sc.cassandraTable(keyspace, table).cassandraCount
>>>
>>>
>>> On Thu, Nov 24, 2016 at 12:56 AM, kant kodali 
>>> wrote:
>>>
 I would be glad if SELECT COUNT(*) FROM hello can return any value for
 that size :) I can say for sure it didn't return anything for 30 mins and I
 probably need to build more patience to sit for few more hours after that!
 Cassandra recommends to use ColumnFamilyStats using nodetool cfstats which
 will give a pretty good estimate but not an accurate value.

 On Thu, Nov 24, 2016 at 12:48 AM, Anastasios Zouzias >>> > wrote:

> How fast is Cassandra without Spark on the count operation?
>
> cqsh> SELECT COUNT(*) FROM hello
>
> (this is not equivalent with what you are doing but might help you
> find the root of the cause)
>
> On Thu, Nov 24, 2016 at 9:03 AM, kant kodali 
> wrote:
>
>> I have the following code
>>
>> I invoke spark-shell as follows
>>
>> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>> --executor-memory 15G --executor-cores 12 --conf
>> spark.cassandra.input.split.size_in_mb=67108864
>>
>> code
>>
>> scala> val df = spark.sql("SELECT test from hello") // Billion
>> rows in hello and test column is 1KB
>>
>> df: org.apache.spark.sql.DataFrame = [test: binary]
>>
>> scala> df.count
>>
>> [Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean
>> precisely.
>>
>> If I invoke spark-shell as follows
>>
>> ./spark-shell --conf spark.cassandra.connection.hos
>> t=170.99.99.134
>>
>> code
>>
>>
>> val df = spark.sql("SELECT test from hello") // This has about
>> billion rows
>>
>> scala> df.count
>>
>>
>> [Stage 0:=>  (686 + 2) / 24686] // What are these numbers
>> precisely?
>>
>>
>> Both of these versions didn't work Spark keeps running forever and I
>> have been waiting for more than 15 mins and no response. Any ideas on 
>> what
>> could be wrong and how to fix this?
>>
>> I am using Spark 2.

New Contributor

2016-11-24 Thread Manolis Gemeliaris
Hi all ,

My name is Manolis Gemeliaris , I'm a software engineering student and I'm
willing to contribute to the Apache Spark Project. I don't have any prior
experience with contributing to open source.
I have prior experience with Java , R (just a little) and Python (just a
little)  and currently studying Scala in coursera
.

Following the contributing page of the project
 , I would like to ask for a
reccomendation on where to start , a possible issue I could begin with
maybe ?

Thanks in advance and please tell me if I'm doing something wrong here
(maybe I'm in the wrong place after all) - just a bit anxious as a
first-timer.


RE: OS killing Executor due to high (possibly off heap) memory usage

2016-11-24 Thread Shreya Agarwal
I don’t think it’s just memory overhead. It might be better to use an execute 
with lesser heap space(30GB?). 46 GB would mean more data load into memory and 
more GC, which can cause issues.

Also, have you tried to persist data in any way? If so, then that might be 
causing an issue.

Lastly, I am not sure if your data has a skew and if that is forcing a lot of 
data to be on one executor node.

Sent from my Windows 10 phone

From: Rodrick Brown
Sent: Friday, November 25, 2016 12:25 AM
To: Aniket Bhatnagar
Cc: user
Subject: Re: OS killing Executor due to high (possibly off heap) memory usage

Try setting spark.yarn.executor.memoryOverhead 1

On Thu, Nov 24, 2016 at 11:16 AM, Aniket Bhatnagar 
mailto:aniket.bhatna...@gmail.com>> wrote:
Hi Spark users

I am running a job that does join of a huge dataset (7 TB+) and the executors 
keep crashing randomly, eventually causing the job to crash. There are no out 
of memory exceptions in the log and looking at the dmesg output, it seems like 
the OS killed the JVM because of high memory usage. My suspicion is towards off 
heap usage of executor is causing this as I am limiting the on heap usage of 
executor to be 46 GB and each host running the executor has 60 GB of RAM. After 
the executor crashes, I can see that the external shuffle manager 
(org.apache.spark.network.server.TransportRequestHandler) logs a lot of channel 
closed exceptions in yarn node manager logs. This leads me to believe that 
something triggers out of memory during shuffle read. Is there a configuration 
to completely disable usage of off heap memory? I have tried setting 
spark.shuffle.io.preferDirectBufs=false but the 
executor is still getting killed by the same error.

Cluster details:
10 AWS c4.8xlarge hosts
RAM on each host - 60 GB
Number of cores on each host - 36
Additional hard disk on each host - 8 TB

Spark configuration:
dynamic allocation enabled
external shuffle service enabled
spark.driver.memory 1024M
spark.executor.memory 47127M
Spark master yarn-cluster

Sample error in yarn node manager:
2016-11-24 10:34:06,507 ERROR 
org.apache.spark.network.server.TransportRequestHandler (shuffle-server-50): 
Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=919299554123, 
chunkIndex=0}, 
buffer=FileSegmentManagedBuffer{file=/mnt3/yarn/usercache/hadoop/appcache/application_1479898345621_0006/blockmgr-ad5301a9-e1e9-4723-a8c4-9276971b2259/2c/shuffle_3_963_0.data,
 offset=0, length=669014456}} to 
/10.192.108.170:52782; closing connection
java.nio.channels.ClosedChannelException

Error in dmesg:
[799873.309897] Out of memory: Kill process 50001 (java) score 927 or sacrifice 
child
[799873.314439] Killed process 50001 (java) total-vm:65652448kB, 
anon-rss:57246528kB, file-rss:0kB

Thanks,
Aniket



--

[Orchard Platform]

Rodrick Brown / DevOPs

9174456839 / rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

NOTICE TO RECIPIENTS: This communication is confidential and intended for the 
use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an offer to 
sell or a solicitation of an indication of interest to purchase any loan, 
security or any other financial product or instrument, nor is it an offer to 
sell or a solicitation of an indication of interest to purchase any products or 
services to any persons who are prohibited from receiving such information 
under applicable law. The contents of this communication may not be accurate or 
complete and are subject to change without notice. As such, Orchard App, Inc. 
(including its subsidiaries and affiliates, "Orchard") makes no representation 
regarding the accuracy or completeness of the information contained herein. The 
intended recipient is advised to consult its own professional advisors, 
including those specializing in legal, tax and accounting matters. Orchard does 
not provide legal, tax or accounting advice.


Re: New Contributor

2016-11-24 Thread K. Omair Muhi
Hello Manolis,

I'm a new subscriber to this mailing list as well and I read on the Apache
web-page that one can begin with following these mailing lists and help out
other new users by pointing them to the right documentation or maybe go
through some documentation yourself in order to answer someone's question
(good learning experience).

This will help build your reputation before you start contributing code.
Also, another thing that you can do is sign up for getting added to code
reviews (as observer), this will also help expose you to some code and
perhaps provide some feedback during a code review if possible.

Just my 0.02.

Hope that helps.

Regards,
Omair

On Thu, Nov 24, 2016 at 7:38 PM, Manolis Gemeliaris <
gemeliarismano...@gmail.com> wrote:

> Hi all ,
>
> My name is Manolis Gemeliaris , I'm a software engineering student and I'm
> willing to contribute to the Apache Spark Project. I don't have any prior
> experience with contributing to open source.
> I have prior experience with Java , R (just a little) and Python (just a
> little)  and currently studying Scala in coursera
> .
>
> Following the contributing page of the project
>  , I would like to ask for a
> reccomendation on where to start , a possible issue I could begin with
> maybe ?
>
> Thanks in advance and please tell me if I'm doing something wrong here
> (maybe I'm in the wrong place after all) - just a bit anxious as a
> first-timer.
>
>