Re: OS killing Executor due to high (possibly off heap) memory usage

2016-12-08 Thread Aniket Bhatnagar
I did some instrumentation to figure out traces of where DirectByteBuffers
are being created and it turns out that setting the following system
properties in addition to setting spark.shuffle.io.preferDirectBufs=false
in spark config:

io.netty.noUnsafe=true
io.netty.threadLocalDirectBufferSize=0

This should force netty to mostly use on heap buffers and thus increases
the stability of spark jobs that perform a lot of shuffle. I have created
the defect SPARK-18787 to either force these settings when
spark.shuffle.io.preferDirectBufs=false is set in spark config or document
it.

Hope it will be helpful for other users as well.

Thanks,
Aniket

On Sat, Nov 26, 2016 at 3:31 PM Koert Kuipers <ko...@tresata.com> wrote:

> i agree that offheap memory usage is unpredictable.
>
> when we used rdds the memory was mostly on heap and total usage
> predictable, and we almost never had yarn killing executors.
>
> now with dataframes the memory usage is both on and off heap, and we have
> no way of limiting the off heap memory usage by spark, yet yarn requires a
> maximum total memory usage and if you go over it yarn kills the executor.
>
> On Fri, Nov 25, 2016 at 12:14 PM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
> Thanks Rohit, Roddick and Shreya. I tried
> changing spark.yarn.executor.memoryOverhead to be 10GB and lowering
> executor memory to 30 GB and both of these didn't work. I finally had to
> reduce the number of cores per executor to be 18 (from 36) in addition to
> setting higher spark.yarn.executor.memoryOverhead and lower executor memory
> size. I had to trade off performance for reliability.
>
> Unfortunately, spark does a poor job reporting off heap memory usage. From
> the profiler, it seems that the job's heap usage is pretty static but the
> off heap memory fluctuates quiet a lot. It looks like bulk of off heap is
> used by io.netty.buffer.UnpooledUnsafeDirectByteBuf while the shuffle
> client is trying to read block from shuffle service. It looks
> like org.apache.spark.network.util.TransportFrameDecoder retains them
> in buffers field while decoding responses from the shuffle service. So far,
> it's not clear why it needs to hold multiple GBs in the buffers. Perhaps
> increasing the number of partitions may help with this.
>
> Thanks,
> Aniket
>
> On Fri, Nov 25, 2016 at 1:09 AM Shreya Agarwal <shrey...@microsoft.com>
> wrote:
>
> I don’t think it’s just memory overhead. It might be better to use an
> execute with lesser heap space(30GB?). 46 GB would mean more data load into
> memory and more GC, which can cause issues.
>
>
>
> Also, have you tried to persist data in any way? If so, then that might be
> causing an issue.
>
>
>
> Lastly, I am not sure if your data has a skew and if that is forcing a lot
> of data to be on one executor node.
>
>
>
> Sent from my Windows 10 phone
>
>
>
> *From: *Rodrick Brown <rodr...@orchardplatform.com>
> *Sent: *Friday, November 25, 2016 12:25 AM
> *To: *Aniket Bhatnagar <aniket.bhatna...@gmail.com>
> *Cc: *user <user@spark.apache.org>
> *Subject: *Re: OS killing Executor due to high (possibly off heap) memory
> usage
>
>
> Try setting spark.yarn.executor.memoryOverhead 1
>
> On Thu, Nov 24, 2016 at 11:16 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
> Hi Spark users
>
> I am running a job that does join of a huge dataset (7 TB+) and the
> executors keep crashing randomly, eventually causing the job to crash.
> There are no out of memory exceptions in the log and looking at the dmesg
> output, it seems like the OS killed the JVM because of high memory usage.
> My suspicion is towards off heap usage of executor is causing this as I am
> limiting the on heap usage of executor to be 46 GB and each host running
> the executor has 60 GB of RAM. After the executor crashes, I can see that
> the external shuffle manager
> (org.apache.spark.network.server.TransportRequestHandler) logs a lot of
> channel closed exceptions in yarn node manager logs. This leads me to
> believe that something triggers out of memory during shuffle read. Is there
> a configuration to completely disable usage of off heap memory? I have
> tried setting spark.shuffle.io.preferDirectBufs=false but the executor is
> still getting killed by the same error.
>
> Cluster details:
> 10 AWS c4.8xlarge hosts
> RAM on each host - 60 GB
> Number of cores on each host - 36
> Additional hard disk on each host - 8 TB
>
> Spark configuration:
> dynamic allocation enabled
> external shuffle service enabled
> spark.driver.memory 1024M
> spark.executor.memory 47127M
> Spark master yarn-cluster
>
> Sample error in yarn node manager:
> 2016-11

Re: OS killing Executor due to high (possibly off heap) memory usage

2016-11-25 Thread Aniket Bhatnagar
Thanks Rohit, Roddick and Shreya. I tried
changing spark.yarn.executor.memoryOverhead to be 10GB and lowering
executor memory to 30 GB and both of these didn't work. I finally had to
reduce the number of cores per executor to be 18 (from 36) in addition to
setting higher spark.yarn.executor.memoryOverhead and lower executor memory
size. I had to trade off performance for reliability.

Unfortunately, spark does a poor job reporting off heap memory usage. From
the profiler, it seems that the job's heap usage is pretty static but the
off heap memory fluctuates quiet a lot. It looks like bulk of off heap is
used by io.netty.buffer.UnpooledUnsafeDirectByteBuf while the shuffle
client is trying to read block from shuffle service. It looks
like org.apache.spark.network.util.TransportFrameDecoder retains them
in buffers field while decoding responses from the shuffle service. So far,
it's not clear why it needs to hold multiple GBs in the buffers. Perhaps
increasing the number of partitions may help with this.

Thanks,
Aniket

On Fri, Nov 25, 2016 at 1:09 AM Shreya Agarwal <shrey...@microsoft.com>
wrote:

I don’t think it’s just memory overhead. It might be better to use an
execute with lesser heap space(30GB?). 46 GB would mean more data load into
memory and more GC, which can cause issues.



Also, have you tried to persist data in any way? If so, then that might be
causing an issue.



Lastly, I am not sure if your data has a skew and if that is forcing a lot
of data to be on one executor node.



Sent from my Windows 10 phone



*From: *Rodrick Brown <rodr...@orchardplatform.com>
*Sent: *Friday, November 25, 2016 12:25 AM
*To: *Aniket Bhatnagar <aniket.bhatna...@gmail.com>
*Cc: *user <user@spark.apache.org>
*Subject: *Re: OS killing Executor due to high (possibly off heap) memory
usage


Try setting spark.yarn.executor.memoryOverhead 1

On Thu, Nov 24, 2016 at 11:16 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

Hi Spark users

I am running a job that does join of a huge dataset (7 TB+) and the
executors keep crashing randomly, eventually causing the job to crash.
There are no out of memory exceptions in the log and looking at the dmesg
output, it seems like the OS killed the JVM because of high memory usage.
My suspicion is towards off heap usage of executor is causing this as I am
limiting the on heap usage of executor to be 46 GB and each host running
the executor has 60 GB of RAM. After the executor crashes, I can see that
the external shuffle manager
(org.apache.spark.network.server.TransportRequestHandler) logs a lot of
channel closed exceptions in yarn node manager logs. This leads me to
believe that something triggers out of memory during shuffle read. Is there
a configuration to completely disable usage of off heap memory? I have
tried setting spark.shuffle.io.preferDirectBufs=false but the executor is
still getting killed by the same error.

Cluster details:
10 AWS c4.8xlarge hosts
RAM on each host - 60 GB
Number of cores on each host - 36
Additional hard disk on each host - 8 TB

Spark configuration:
dynamic allocation enabled
external shuffle service enabled
spark.driver.memory 1024M
spark.executor.memory 47127M
Spark master yarn-cluster

Sample error in yarn node manager:
2016-11-24 10:34:06,507 ERROR
org.apache.spark.network.server.TransportRequestHandler
(shuffle-server-50): Error sending result
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=919299554123,
chunkIndex=0},
buffer=FileSegmentManagedBuffer{file=/mnt3/yarn/usercache/hadoop/appcache/application_1479898345621_0006/blockmgr-ad5301a9-e1e9-4723-a8c4-9276971b2259/2c/shuffle_3_963_0.data,
offset=0, length=669014456}} to /10.192.108.170:52782; closing connection
java.nio.channels.ClosedChannelException

Error in dmesg:
[799873.309897] Out of memory: Kill process 50001 (java) score 927 or
sacrifice child
[799873.314439] Killed process 50001 (java) total-vm:65652448kB,
anon-rss:57246528kB, file-rss:0kB

Thanks,
Aniket




-- 

[image: Orchard Platform] <http://www.orchardplatform.com/>

*Rodrick Brown */ *DevOPs*

9174456839 / rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

*NOTICE TO RECIPIENTS*: This communication is confidential and intended for
the use of the addressee only. If you are not an intended recipient of this
communication, please delete it immediately and notify the sender by return
email. Unauthorized reading, dissemination, distribution or copying of this
communication is prohibited. This communication does not constitute an
offer to sell or a solicitation of an indication of interest to purchase
any loan, security or any other financial product or instrument, nor is it
an offer to sell or a solicitation of an indication of interest to purchase
any products or services to any persons who are prohibited from receiving
such information under applicable law. The contents of this communication
may not be accurate or complete and 

OS killing Executor due to high (possibly off heap) memory usage

2016-11-24 Thread Aniket Bhatnagar
Hi Spark users

I am running a job that does join of a huge dataset (7 TB+) and the
executors keep crashing randomly, eventually causing the job to crash.
There are no out of memory exceptions in the log and looking at the dmesg
output, it seems like the OS killed the JVM because of high memory usage.
My suspicion is towards off heap usage of executor is causing this as I am
limiting the on heap usage of executor to be 46 GB and each host running
the executor has 60 GB of RAM. After the executor crashes, I can see that
the external shuffle manager
(org.apache.spark.network.server.TransportRequestHandler) logs a lot of
channel closed exceptions in yarn node manager logs. This leads me to
believe that something triggers out of memory during shuffle read. Is there
a configuration to completely disable usage of off heap memory? I have
tried setting spark.shuffle.io.preferDirectBufs=false but the executor is
still getting killed by the same error.

Cluster details:
10 AWS c4.8xlarge hosts
RAM on each host - 60 GB
Number of cores on each host - 36
Additional hard disk on each host - 8 TB

Spark configuration:
dynamic allocation enabled
external shuffle service enabled
spark.driver.memory 1024M
spark.executor.memory 47127M
Spark master yarn-cluster

Sample error in yarn node manager:
2016-11-24 10:34:06,507 ERROR
org.apache.spark.network.server.TransportRequestHandler
(shuffle-server-50): Error sending result
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=919299554123,
chunkIndex=0},
buffer=FileSegmentManagedBuffer{file=/mnt3/yarn/usercache/hadoop/appcache/application_1479898345621_0006/blockmgr-ad5301a9-e1e9-4723-a8c4-9276971b2259/2c/shuffle_3_963_0.data,
offset=0, length=669014456}} to /10.192.108.170:52782; closing connection
java.nio.channels.ClosedChannelException

Error in dmesg:
[799873.309897] Out of memory: Kill process 50001 (java) score 927 or
sacrifice child
[799873.314439] Killed process 50001 (java) total-vm:65652448kB,
anon-rss:57246528kB, file-rss:0kB

Thanks,
Aniket


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread Aniket Bhatnagar
Try changing compression to bzip2 or lzo. For reference -
http://comphadoop.weebly.com

Thanks,
Aniket

On Mon, Nov 21, 2016, 10:18 PM yeshwanth kumar 
wrote:

> Hi,
>
> we are running Hive on Spark, we have an external table over snappy
> compressed csv file of size 917.4 M
> HDFS block size is set to 256 MB
>
> as per my Understanding, if i run a query over that external table , it
> should launch 4 tasks. one for each block.
> but i am seeing one executor and one task processing all the file.
>
> trying to understand the reason behind,
>
> i went one step further to understand the block locality
> when i get the block locations for that file, i found
>
> [DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK],
>  DatanodeInfoWithStorage[10.11.0.227:50010
> ,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK],
>  DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010
> ,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK],
> DatanodeInfoWithStorage[10.11.0.227:50010
> ,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010
> ,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]]
>
> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>
> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
> 10.11.0.228, because these 2 nodes has all the four blocks needed for
> computation
> but the executor is running in 10.11.0.225
>
> my theory is not applying anywhere.
>
> please help me in understanding how spark/yarn calculates number of
> executors/tasks.
>
> Thanks,
> -Yeshwanth
>


Re: Very long pause/hang at end of execution

2016-11-16 Thread Aniket Bhatnagar
Also, how are you launching the application? Through spark submit or
creating spark content in your app?

Thanks,
Aniket

On Wed, Nov 16, 2016 at 10:44 AM Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Thanks for sharing the thread dump. I had a look at them and couldn't find
> anything unusual. Is there anything in the logs (driver + executor) that
> suggests what's going on? Also, what does the spark job do and what is the
> version of spark and hadoop you are using?
>
> Thanks,
> Aniket
>
>
> On Wed, Nov 16, 2016 at 2:07 AM Michael Johnson <mjjohnson@yahoo.com>
> wrote:
>
> The extremely long hand/pause has started happening again. I've been
> running on a small remote cluster, so I used the UI to grab thread dumps
> rather than doing it from the command line. There seems to be one executor
> still alive, along with the driver; I grabbed 4 thread dumps from each, a
> couple of seconds apart. I'd greatly appreciate any help tracking down
> what's going on! (I've attached them, but I can paste them somewhere if
> that's more convenient.)
>
> Thanks,
> Michael
>
>
>
>
> On Sunday, November 6, 2016 10:49 PM, Michael Johnson
> <mjjohnson@yahoo.com.INVALID> wrote:
>
>
> Hm. Something must have changed, as it was happening quite consistently
> and now I can't get it to reproduce. Thank you for the offer, and if it
> happens again I will try grabbing thread dumps and I will see if I can
> figure out what is going on.
>
>
> On Sunday, November 6, 2016 10:02 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> I doubt it's GC as you mentioned that the pause is several minutes. Since
> it's reproducible in local mode, can you run the spark application locally
> and once your job is complete (and application appears paused), can you
> take 5 thread dumps (using jstack or jcmd on the local spark JVM process)
> with 1 second delay between each dump and attach them? I can take a look.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 2:21 PM Michael Johnson <mjjohnson@yahoo.com>
> wrote:
>
> Thanks; I tried looking at the thread dumps for the driver and the one
> executor that had that option in the UI, but I'm afraid I don't know how to
> interpret what I saw...  I don't think it could be my code directly, since
> at this point my code has all completed? Could GC be taking that long?
>
> (I could also try grabbing the thread dumps and pasting them here, if that
> would help?)
>
> On Sunday, November 6, 2016 8:36 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> In order to know what's going on, you can study the thread dumps either
> from spark UI or from any other thread dump analysis tool.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 1:31 PM Michael Johnson
> <mjjohnson@yahoo.com.invalid> wrote:
>
> I'm doing some processing and then clustering of a small dataset (~150
> MB). Everything seems to work fine, until the end; the last few lines of my
> program are log statements, but after printing those, nothing seems to
> happen for a long time...many minutes; I'm not usually patient enough to
> let it go, but I think one time when I did just wait, it took over an hour
> (and did eventually exit on its own). Any ideas on what's happening, or how
> to troubleshoot?
>
> (This happens both when running locally, using the localhost mode, as well
> as on a small cluster with four 4-processor nodes each with 15GB of RAM; in
> both cases the executors have 2GB+ of RAM, and none of the inputs/outputs
> on any of the stages is more than 75 MB...)
>
> Thanks,
> Michael
>
>
>
>
>
>
>
>


Re: Very long pause/hang at end of execution

2016-11-16 Thread Aniket Bhatnagar
Thanks for sharing the thread dump. I had a look at them and couldn't find
anything unusual. Is there anything in the logs (driver + executor) that
suggests what's going on? Also, what does the spark job do and what is the
version of spark and hadoop you are using?

Thanks,
Aniket

On Wed, Nov 16, 2016 at 2:07 AM Michael Johnson <mjjohnson@yahoo.com>
wrote:

> The extremely long hand/pause has started happening again. I've been
> running on a small remote cluster, so I used the UI to grab thread dumps
> rather than doing it from the command line. There seems to be one executor
> still alive, along with the driver; I grabbed 4 thread dumps from each, a
> couple of seconds apart. I'd greatly appreciate any help tracking down
> what's going on! (I've attached them, but I can paste them somewhere if
> that's more convenient.)
>
> Thanks,
> Michael
>
>
>
>
> On Sunday, November 6, 2016 10:49 PM, Michael Johnson
> <mjjohnson@yahoo.com.INVALID> wrote:
>
>
> Hm. Something must have changed, as it was happening quite consistently
> and now I can't get it to reproduce. Thank you for the offer, and if it
> happens again I will try grabbing thread dumps and I will see if I can
> figure out what is going on.
>
>
> On Sunday, November 6, 2016 10:02 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> I doubt it's GC as you mentioned that the pause is several minutes. Since
> it's reproducible in local mode, can you run the spark application locally
> and once your job is complete (and application appears paused), can you
> take 5 thread dumps (using jstack or jcmd on the local spark JVM process)
> with 1 second delay between each dump and attach them? I can take a look.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 2:21 PM Michael Johnson <mjjohnson@yahoo.com>
> wrote:
>
> Thanks; I tried looking at the thread dumps for the driver and the one
> executor that had that option in the UI, but I'm afraid I don't know how to
> interpret what I saw...  I don't think it could be my code directly, since
> at this point my code has all completed? Could GC be taking that long?
>
> (I could also try grabbing the thread dumps and pasting them here, if that
> would help?)
>
> On Sunday, November 6, 2016 8:36 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> In order to know what's going on, you can study the thread dumps either
> from spark UI or from any other thread dump analysis tool.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 1:31 PM Michael Johnson
> <mjjohnson@yahoo.com.invalid> wrote:
>
> I'm doing some processing and then clustering of a small dataset (~150
> MB). Everything seems to work fine, until the end; the last few lines of my
> program are log statements, but after printing those, nothing seems to
> happen for a long time...many minutes; I'm not usually patient enough to
> let it go, but I think one time when I did just wait, it took over an hour
> (and did eventually exit on its own). Any ideas on what's happening, or how
> to troubleshoot?
>
> (This happens both when running locally, using the localhost mode, as well
> as on a small cluster with four 4-processor nodes each with 15GB of RAM; in
> both cases the executors have 2GB+ of RAM, and none of the inputs/outputs
> on any of the stages is more than 75 MB...)
>
> Thanks,
> Michael
>
>
>
>
>
>
>
>


Re: Dataset API | Setting number of partitions during join/groupBy

2016-11-11 Thread Aniket Bhatnagar
Hi Shreya

Initial partitions in the Datasets were more than 1000 and after a group by
operation, the resultant Dataset had only 200 partitions (because by
default number of partitions being set to 200). Any further operations on
the resultant Dataset will have a maximum of 200 parallelism resulting in
inefficient use of cluster.

I am performing multiple join & group by operations on Datasets that are
huge (8TB+) and low parallelism severely affects the time it takes to run
the data pipeline. The workaround that
sets sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, ) works but it would be ideal to set partitions on a per
join/group by operation basis, like we could using the RDD API.

Thanks,
Aniket

On Fri, Nov 11, 2016 at 6:27 PM Shreya Agarwal <shrey...@microsoft.com>
wrote:

> Curious – why do you want to repartition? Is there a subsequent step which
> fails because the number of partitions is less? Or you want to do it for a
> perf gain?
>
>
>
> Also, what were your initial Dataset partitions and how many did you have
> for the result of join?
>
>
>
> *From:* Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com]
> *Sent:* Friday, November 11, 2016 9:22 AM
> *To:* user <user@spark.apache.org>
> *Subject:* Dataset API | Setting number of partitions during join/groupBy
>
>
>
> Hi
>
>
>
> I can't seem to find a way to pass number of partitions while join 2
> Datasets or doing a groupBy operation on the Dataset. There is an option of
> repartitioning the resultant Dataset but it's inefficient to repartition
> after the Dataset has been joined/grouped into default number of
> partitions. With RDD API, this was easy to do as the functions accepted a
> numPartitions parameter. The only way to do this seems to be
> sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, ) but
> this means that all join/groupBy operations going forward will have the
> same number of partitions.
>
>
>
> Thanks,
>
> Aniket
>


Dataset API | Setting number of partitions during join/groupBy

2016-11-11 Thread Aniket Bhatnagar
Hi

I can't seem to find a way to pass number of partitions while join 2
Datasets or doing a groupBy operation on the Dataset. There is an option of
repartitioning the resultant Dataset but it's inefficient to repartition
after the Dataset has been joined/grouped into default number of
partitions. With RDD API, this was easy to do as the functions accepted a
numPartitions parameter. The only way to do this seems to be
sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, ) but
this means that all join/groupBy operations going forward will have the
same number of partitions.

Thanks,
Aniket


Re: Very long pause/hang at end of execution

2016-11-06 Thread Aniket Bhatnagar
I doubt it's GC as you mentioned that the pause is several minutes. Since
it's reproducible in local mode, can you run the spark application locally
and once your job is complete (and application appears paused), can you
take 5 thread dumps (using jstack or jcmd on the local spark JVM process)
with 1 second delay between each dump and attach them? I can take a look.

Thanks,
Aniket

On Sun, Nov 6, 2016 at 2:21 PM Michael Johnson <mjjohnson@yahoo.com>
wrote:

> Thanks; I tried looking at the thread dumps for the driver and the one
> executor that had that option in the UI, but I'm afraid I don't know how to
> interpret what I saw...  I don't think it could be my code directly, since
> at this point my code has all completed? Could GC be taking that long?
>
> (I could also try grabbing the thread dumps and pasting them here, if that
> would help?)
>
> On Sunday, November 6, 2016 8:36 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> In order to know what's going on, you can study the thread dumps either
> from spark UI or from any other thread dump analysis tool.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 1:31 PM Michael Johnson
> <mjjohnson@yahoo.com.invalid> wrote:
>
> I'm doing some processing and then clustering of a small dataset (~150
> MB). Everything seems to work fine, until the end; the last few lines of my
> program are log statements, but after printing those, nothing seems to
> happen for a long time...many minutes; I'm not usually patient enough to
> let it go, but I think one time when I did just wait, it took over an hour
> (and did eventually exit on its own). Any ideas on what's happening, or how
> to troubleshoot?
>
> (This happens both when running locally, using the localhost mode, as well
> as on a small cluster with four 4-processor nodes each with 15GB of RAM; in
> both cases the executors have 2GB+ of RAM, and none of the inputs/outputs
> on any of the stages is more than 75 MB...)
>
> Thanks,
> Michael
>
>
>
>


Re: Very long pause/hang at end of execution

2016-11-06 Thread Aniket Bhatnagar
In order to know what's going on, you can study the thread dumps either
from spark UI or from any other thread dump analysis tool.

Thanks,
Aniket

On Sun, Nov 6, 2016 at 1:31 PM Michael Johnson
 wrote:

> I'm doing some processing and then clustering of a small dataset (~150
> MB). Everything seems to work fine, until the end; the last few lines of my
> program are log statements, but after printing those, nothing seems to
> happen for a long time...many minutes; I'm not usually patient enough to
> let it go, but I think one time when I did just wait, it took over an hour
> (and did eventually exit on its own). Any ideas on what's happening, or how
> to troubleshoot?
>
> (This happens both when running locally, using the localhost mode, as well
> as on a small cluster with four 4-processor nodes each with 15GB of RAM; in
> both cases the executors have 2GB+ of RAM, and none of the inputs/outputs
> on any of the stages is more than 75 MB...)
>
> Thanks,
> Michael
>


Re: Improvement proposal | Dynamic disk allocation

2016-11-06 Thread Aniket Bhatnagar
If people agree that is desired, I am willing to submit a SIP for this and
find time to work on it.

Thanks,
Aniket

On Sun, Nov 6, 2016 at 1:06 PM Aniket Bhatnagar <aniket.bhatna...@gmail.com>
wrote:

> Hello
>
> Dynamic allocation feature allows you to add executors and scale
> computation power. This is great, however, I feel like we also need a way
> to dynamically scale storage. Currently, if the disk is not able to hold
> the spilled/shuffle data, the job is aborted causing frustration and loss
> of time. In deployments like AWS EMR, it is possible to run an agent that
> add disks on the fly if it sees that the disks are running out of space and
> it would be great if Spark could immediately start using the added disks
> just as it does when new executors are added.
>
> Thanks,
> Aniket
>


Improvement proposal | Dynamic disk allocation

2016-11-06 Thread Aniket Bhatnagar
Hello

Dynamic allocation feature allows you to add executors and scale
computation power. This is great, however, I feel like we also need a way
to dynamically scale storage. Currently, if the disk is not able to hold
the spilled/shuffle data, the job is aborted causing frustration and loss
of time. In deployments like AWS EMR, it is possible to run an agent that
add disks on the fly if it sees that the disks are running out of space and
it would be great if Spark could immediately start using the added disks
just as it does when new executors are added.

Thanks,
Aniket


Re: RuntimeException: Null value appeared in non-nullable field when holding Optional Case Class

2016-11-03 Thread Aniket Bhatnagar
Issue raised - SPARK-18251

On Wed, Nov 2, 2016, 9:12 PM Aniket Bhatnagar <aniket.bhatna...@gmail.com>
wrote:

> Hi all
>
> I am running into a runtime exception when a DataSet is holding an Empty
> object instance for an Option type that is holding non-nullable field. For
> instance, if we have the following case class:
>
> case class DataRow(id: Int, value: String)
>
> Then, DataSet[Option[DataRow]] can only hold Some(DataRow) objects and
> cannot hold Empty. If it does so, the following exception is thrown:
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 6 in stage 0.0 failed 1 times, most recent
> failure: Lost task 6.0 in stage 0.0 (TID 6, localhost):
> java.lang.RuntimeException: Null value appeared in non-nullable field:
> - field (class: "scala.Int", name: "id")
> - option value class:
> "com.aol.advertising.dmp.audscale.uts.DataSetOptBug.DataRow"
> - root class: "scala.Option"
> If the schema is inferred from a Scala tuple/case class, or a Java bean,
> please try to use scala.Option[_] or other nullable types (e.g.
> java.lang.Integer instead of int/scala.Int).
>
>
> I am attaching a sample program to reproduce this. Is this a known
> limitation or a bug?
>
> Thanks,
> Aniket
>
> Full stack trace:
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 6 in stage 0.0 failed 1 times, most recent
> failure: Lost task 6.0 in stage 0.0 (TID 6, localhost):
> java.lang.RuntimeException: Null value appeared in non-nullable field:
> - field (class: "scala.Int", name: "id")
> - option value class: "DataSetOptBug.DataRow"
> - root class: "scala.Option"
> If the schema is inferred from a Scala tuple/case class, or a Java bean,
> please try to use scala.Option[_] or other nullable types (e.g.
> java.lang.Integer instead of int/scala.Int).
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>


RuntimeException: Null value appeared in non-nullable field when holding Optional Case Class

2016-11-02 Thread Aniket Bhatnagar
Hi all

I am running into a runtime exception when a DataSet is holding an Empty
object instance for an Option type that is holding non-nullable field. For
instance, if we have the following case class:

case class DataRow(id: Int, value: String)

Then, DataSet[Option[DataRow]] can only hold Some(DataRow) objects and
cannot hold Empty. If it does so, the following exception is thrown:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 6 in stage 0.0 failed 1 times, most recent failure:
Lost task 6.0 in stage 0.0 (TID 6, localhost): java.lang.RuntimeException:
Null value appeared in non-nullable field:
- field (class: "scala.Int", name: "id")
- option value class:
"com.aol.advertising.dmp.audscale.uts.DataSetOptBug.DataRow"
- root class: "scala.Option"
If the schema is inferred from a Scala tuple/case class, or a Java bean,
please try to use scala.Option[_] or other nullable types (e.g.
java.lang.Integer instead of int/scala.Int).


I am attaching a sample program to reproduce this. Is this a known
limitation or a bug?

Thanks,
Aniket

Full stack trace:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 6 in stage 0.0 failed 1 times, most recent failure:
Lost task 6.0 in stage 0.0 (TID 6, localhost): java.lang.RuntimeException:
Null value appeared in non-nullable field:
- field (class: "scala.Int", name: "id")
- option value class: "DataSetOptBug.DataRow"
- root class: "scala.Option"
If the schema is inferred from a Scala tuple/case class, or a Java bean,
please try to use scala.Option[_] or other nullable types (e.g.
java.lang.Integer instead of int/scala.Int).
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


DataSetOptBug.scala
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Execute function once on each node

2016-07-19 Thread Aniket Bhatnagar
Thanks for the explanation. Try creating a custom RDD whose getPartitions
returns an array of custom partition objects of size n (= number of nodes).
In a custom partition object, you can have the file path and ip/hostname
where the partition needs to be computed. Then, have getPreferredLocations
return the ip/hostname from the partition object and in compute function,
assert that you are in right ip/hostname (or fail) and read the content of
the file.

Not a 100% sure it will work though.

On Tue, Jul 19, 2016, 2:54 AM Josh Asplund <joshaspl...@gmail.com> wrote:

> The spark workers are running side-by-side with scientific simulation
> code. The code writes output to local SSDs to keep latency low. Due to the
> volume of data being moved (10's of terabytes +), it isn't really feasible
> to copy the data to a global filesystem. Executing a function on each node
> would allow us to read the data in situ without a copy.
>
> I understand that manually assigning tasks to nodes reduces fault
> tolerance, but the simulation codes already explicitly assign tasks, so a
> failure of any one node is already a full-job failure.
>
> On Mon, Jul 18, 2016 at 3:43 PM Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> You can't assume that the number to nodes will be constant as some may
>> fail, hence you can't guarantee that a function will execute at most once
>> or atleast once on a node. Can you explain your use case in a bit more
>> detail?
>>
>> On Mon, Jul 18, 2016, 10:57 PM joshuata <joshaspl...@gmail.com> wrote:
>>
>>> I am working on a spark application that requires the ability to run a
>>> function on each node in the cluster. This is used to read data from a
>>> directory that is not globally accessible to the cluster. I have tried
>>> creating an RDD with n elements and n partitions so that it is evenly
>>> distributed among the n nodes, and then mapping a function over the RDD.
>>> However, the runtime makes no guarantees that each partition will be
>>> stored
>>> on a separate node. This means that the code will run multiple times on
>>> the
>>> same node while never running on another.
>>>
>>> I have looked through the documentation and source code for both RDDs and
>>> the scheduler, but I haven't found anything that will do what I need.
>>> Does
>>> anybody know of a solution I could use?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Execute function once on each node

2016-07-18 Thread Aniket Bhatnagar
You can't assume that the number to nodes will be constant as some may
fail, hence you can't guarantee that a function will execute at most once
or atleast once on a node. Can you explain your use case in a bit more
detail?

On Mon, Jul 18, 2016, 10:57 PM joshuata  wrote:

> I am working on a spark application that requires the ability to run a
> function on each node in the cluster. This is used to read data from a
> directory that is not globally accessible to the cluster. I have tried
> creating an RDD with n elements and n partitions so that it is evenly
> distributed among the n nodes, and then mapping a function over the RDD.
> However, the runtime makes no guarantees that each partition will be stored
> on a separate node. This means that the code will run multiple times on the
> same node while never running on another.
>
> I have looked through the documentation and source code for both RDDs and
> the scheduler, but I haven't found anything that will do what I need. Does
> anybody know of a solution I could use?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Help me! Spark WebUI is corrupted!

2015-12-31 Thread Aniket Bhatnagar
Are you running on YARN or standalone?

On Thu, Dec 31, 2015, 3:35 PM LinChen  wrote:

> *Screenshot1(Normal WebUI)*
>
>
>
> *Screenshot2(Corrupted WebUI)*
>
>
>
> As screenshot2 shows, the format of my Spark WebUI looks strange and I
> cannot click the description of active jobs. It seems there is something
> missing in my opearing system. I googled it but find nothing. Could anybody
> help me?
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Aniket Bhatnagar
Can you try storing the state (word count) in an external key value store?

On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê  wrote:

> Hi all,
>
> Anyone could help me on this. It's a bit urgent for me on this.
> I'm very confused and curious about Spark data checkpoint performance? Is
> there any detail implementation of checkpoint I can look into?
> Spark Streaming only take sub-second to process 20K messages/sec, however
> it take 25 seconds for checkpoint. Now my application have average 30
> seconds latency and keep increasingly.
>
>
> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê :
>
>> Thankd all, it would be great to have this feature soon.
>> Do you know what's the release plan for 1.6?
>>
>> In addition to this, I still have checkpoint performance problem
>>
>> My code is just simple like this:
>> JavaStreamingContext jssc = new
>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>> jssc.checkpoint("spark-data/checkpoint");
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(...);
>> JavaPairDStream stats =
>> messages.mapToPair(parseJson)
>> .reduceByKey(REDUCE_STATS)
>> .updateStateByKey(RUNNING_STATS);
>>
>> stats.print()
>>
>>   Now I need to maintain about 800k keys, the stats here is only count
>> number of occurence for key.
>>   While running the cache dir is very small (about 50M), my question is:
>>
>>   1/ For regular micro-batch it takes about 800ms to finish, but every 10
>> seconds when data checkpoint is running
>>   It took me 5 seconds to finish the same size micro-batch, why it's too
>> high? what's kind of job in checkpoint?
>>   why it's keep increasing?
>>
>>   2/ When I changes the data checkpoint interval like using:
>>   stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
>> is 10
>>
>>   The checkpoint is keep increasing significantly first checkpoint is
>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>   Why it's too high when increasing checkpoint interval?
>>
>> It seems that default interval works more stable.
>>
>> On Nov 4, 2015 9:08 PM, "Adrian Tanase"  wrote:
>>
>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>
>>> Left some comments on the JIRA and design doc.
>>>
>>> -adrian
>>>
>>> From: Shixiong Zhu
>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>> To: Thúy Hằng Lê
>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>> Subject: Re: Spark Streaming data checkpoint performance
>>>
>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>> performance issue of "updateStateByKey". You can take a look at
>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>> https://github.com/apache/spark/pull/9256
>>>
>>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Aniket Bhatnagar
It depends on the stats you are collecting. For example, if you just
collecting counts, you can do away with updateStateByKey completely by
doing insert or update operation on the data store after reduce. I.e.

For each (key, batchCount)
  if (key exists in dataStore)
update count = count + batchCount for the key
 else
insert (key, batchCount)

Thanks,
Aniket

On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:

> Thanks Aniket,
>
> I want to store the state to an external storage but it should be in later
> step I think.
> Basically, I have to use updateStateByKey function to maintain the
> running state (which requires checkpoint), and my bottleneck is now in data
> checkpoint.
>
> My pseudo code is like below:
>
> JavaStreamingContext jssc = new JavaStreamingContext(
> sparkConf,Durations.seconds(2));
> jssc.checkpoint("spark-data/checkpoint");
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(...);
> JavaPairDStream<String, List> stats =
> messages.mapToPair(parseJson)
> .reduceByKey(REDUCE_STATS)
> .updateStateByKey(RUNNING_STATS);
>
>JavaPairDStream<String, List> newData =
> stages.filter(NEW_STATS);
>
>newData.foreachRDD{
>  rdd.forEachPartition{
>//Store to external storage.
>  }
>   }
>
>   Without using updateStageByKey, I'm only have the stats of the last
> micro-batch.
>
> Any advise on this?
>
>
> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:
>
>> Can you try storing the state (word count) in an external key value store?
>>
>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Anyone could help me on this. It's a bit urgent for me on this.
>>> I'm very confused and curious about Spark data checkpoint performance?
>>> Is there any detail implementation of checkpoint I can look into?
>>> Spark Streaming only take sub-second to process 20K messages/sec,
>>> however it take 25 seconds for checkpoint. Now my application have average
>>> 30 seconds latency and keep increasingly.
>>>
>>>
>>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>>
>>>> Thankd all, it would be great to have this feature soon.
>>>> Do you know what's the release plan for 1.6?
>>>>
>>>> In addition to this, I still have checkpoint performance problem
>>>>
>>>> My code is just simple like this:
>>>> JavaStreamingContext jssc = new
>>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>>> jssc.checkpoint("spark-data/checkpoint");
>>>> JavaPairInputDStream<String, String> messages =
>>>> KafkaUtils.createDirectStream(...);
>>>> JavaPairDStream<String, List> stats =
>>>> messages.mapToPair(parseJson)
>>>> .reduceByKey(REDUCE_STATS)
>>>> .updateStateByKey(RUNNING_STATS);
>>>>
>>>> stats.print()
>>>>
>>>>   Now I need to maintain about 800k keys, the stats here is only count
>>>> number of occurence for key.
>>>>   While running the cache dir is very small (about 50M), my question is:
>>>>
>>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>>> 10 seconds when data checkpoint is running
>>>>   It took me 5 seconds to finish the same size micro-batch, why it's
>>>> too high? what's kind of job in checkpoint?
>>>>   why it's keep increasing?
>>>>
>>>>   2/ When I changes the data checkpoint interval like using:
>>>>   stats.checkpoint(Durations.seconds(100)); //change to 100,
>>>> defaults is 10
>>>>
>>>>   The checkpoint is keep increasing significantly first checkpoint is
>>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>>   Why it's too high when increasing checkpoint interval?
>>>>
>>>> It seems that default interval works more stable.
>>>>
>>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>>>
>>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>>
>>>>> Left some comments on the JIRA and design doc.
>>>>>
>>>>> -adrian
>>>>>
>>>>> From: Shixiong Zhu
>>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>>> To: Thúy Hằng Lê
>>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>>
>>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>>> performance issue of "updateStateByKey". You can take a look at
>>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>>> https://github.com/apache/spark/pull/9256
>>>>>
>>>>
>>>
>


Re: How to close connection in mapPartitions?

2015-10-23 Thread Aniket Bhatnagar
Are you sure RedisClientPool is being initialized properly in the
constructor of RedisCache? Can you please copy paste the code that you use
to initialize RedisClientPool inside the constructor of RedisCache?

Thanks,
Aniket

On Fri, Oct 23, 2015 at 11:47 AM Bin Wang  wrote:

> BTW, "lines" is a DStream.
>
> Bin Wang 于2015年10月23日周五 下午2:16写道:
>
>> I use mapPartitions to open connections to Redis, I write it like this:
>>
>> val seqs = lines.mapPartitions { lines =>
>>   val cache = new RedisCache(redisUrl, redisPort)
>>   val result = lines.map(line => Parser.parseBody(line, cache))
>>   cache.redisPool.close
>>   result
>> }
>>
>> But it seems the pool is closed before I use it. Am I doing anything
>> wrong? Here is the error:
>>
>> java.lang.IllegalStateException: Pool not open
>>  at 
>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>>  at 
>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>>  at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>  at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>>  at 
>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>>  at 
>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at scala.collection.immutable.List.foreach(List.scala:318)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>  at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>>  at 
>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>  at 
>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at 
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>>  at 
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>


Re: HBase Spark Streaming giving error after restore

2015-10-17 Thread Aniket Bhatnagar
Can you try changing classOf[OutputFormat[String,
BoxedUnit]] to classOf[OutputFormat[String,
Put]] while configuring hconf?

On Sat, Oct 17, 2015, 11:44 AM Amit Hora  wrote:

> Hi,
>
> Regresta for delayed resoonse
> please find below full stack trace
>
> ava.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/10/16 18:50:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 1, localhost, ANY, 1185 bytes)
> 15/10/16 18:50:03 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 15/10/16 18:50:03 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be
> cast to org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/10/16 18:50:03 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1
> times; aborting job
> 15/10/16 18:50:03 INFO TaskSchedulerImpl: Cancelling stage 0
> 15/10/16 18:50:03 INFO Executor: Executor is trying to kill task 1.0 in
> stage 0.0 (TID 1)
> 15/10/16 18:50:03 INFO TaskSchedulerImpl: Stage 0 was cancelled
> 15/10/16 18:50:03 INFO DAGScheduler: Job 0 failed: foreachRDD at
> TwitterStream.scala:150, took 5.956054 s
> 15/10/16 18:50:03 INFO JobScheduler: Starting job streaming job
> 144500141 ms.0 from job set of time 144500141 ms
> 15/10/16 18:50:03 ERROR JobScheduler: Error running job streaming job
> 144500140 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): java.lang.ClassCastException:
> scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
> at
> 

Re: Example of updateStateByKey with initial RDD?

2015-10-08 Thread Aniket Bhatnagar
Ohh I see. You could have to add underscore
after ProbabilityCalculator.updateCountsOfProcessGivenRole. Try:

dstream.map(x => (x.keyWithTime, x))
.updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole _,
new HashPartitioner(3), initialProcessGivenRoleRdd)

Here is an example:
def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long] =
{
  val prevCount = prevStateOpt.getOrElse(0L)
  val newCount = prevCount + events.size
  Some(newCount)
}
val interval = 60 * 1000
val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_ *
interval).map(n => (n % interval, n / interval))
val counts = eventsStream.map(event => {
  (event.timestamp - event.timestamp % interval, event)
}).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new
HashPartitioner(3), initialRDD = initialRDD)
counts.print()

Thanks,
Aniket


On Thu, Oct 8, 2015 at 5:48 PM Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Aniket,
>
> Thank you for the example - but that's not quite what I'm looking for.
> I've got a call to updateStateByKey that looks like the following:
>
> dstream.map(x => (x.keyWithTime, x))
> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole)
>
> def updateCountsOfProcessGivenRole(a : Seq[CountsOfProcessGivenRole], b:
> Option[CountsOfProcessGivenRole]) : Option[CountsOfProcessGivenRole] = {
> val currentTime = DateTime.now(DateTimeZone.UTC)
> if(a.isEmpty) {
>   if (b.get.eventhourbin.plusDays(3).getMillis <
> currentTime.getMillis) {
> None
>   } else {
> b
>   }
> } else { // a is not empty - b may or may not be defined
>   val population = if(b.isDefined) b.get.Population else 0 + a.map(x
> => x.Population).sum
>   val subpopulation = if(b.isDefined) b.get.Subpopulation else 0 +
> a.map(x => x.Subpopulation).sum
>   Some(CountsOfProcessGivenRole(a(0), population, subpopulation))
> }
>   }
>
> This works fine, however when I go to add an initial RDD, modifying the
> 'updateStateByKey' call to look like the following:
>
> val initialProcessGivenRoleRdd: RDD[((String, String, DateTime),
> CountsOfProcessGivenRole)] = iPrProcessGivenRole.map(x => (x.keyWithTime,
> x))
> dstream.map(x => (x.keyWithTime, x))
> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole,
> new HashPartitioner(3), initialProcessGivenRoleRdd)
>
> I am getting an error -- 'missing arguments for method
> updateCountsOfProcessGivenRole'. Looking at the method calls, the function
> that is called for appears to be the same.  I was hoping an example might
> shed some light on the issue.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
>
>
>
>
> On Thu, Oct 8, 2015 at 7:04 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Here is an example:
>>
>> val interval = 60 * 1000
>> val counts = eventsStream.map(event => {
>>   (event.timestamp - event.timestamp % interval, event)
>> }).updateStateByKey[Long](updateFunc = (events: Seq[Event], prevStateOpt:
>> Option[Long]) => {
>>   val prevCount = prevStateOpt.getOrElse(0L)
>>   val newCount = prevCount + events.size
>>   Some(newCount)
>> })
>> counts.print()
>>
>> Hope it helps!
>>
>> Thanks,
>> Aniket
>>
>> On Thu, Oct 8, 2015 at 4:29 PM Bryan <bryan.jeff...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Can anyone point me to a good example of updateStateByKey with an
>>> initial RDD? I am seeing a compile time error when following the API.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>
>


Re: Launching EC2 instances with Spark compiled for Scala 2.11

2015-10-08 Thread Aniket Bhatnagar
Is it possible for you to use EMR instead of EC2? If so, you may be able to
tweak EMR bootstrap scripts to install your custom spark build.

Thanks,
Aniket

On Thu, Oct 8, 2015 at 5:58 PM Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> Hello,
>
> I was wondering if there is an easy way launch EC2 instances which have a
> Spark built for Scala 2.11.
>
> The only way I can think of is to prepare the sources for 2.11 as shown in
> the Spark build instructions (
> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211),
> upload the changed sources as a Github repo, and use the "--spark-git-repo"
> option to specify the repo as the one to build from.
>
> Is there a recommended way to launch EC2 instances if you need Scala 2.11
> support?
>
> Regards,
> Theodore
>
> --
> View this message in context: Launching EC2 instances with Spark compiled
> for Scala 2.11
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: spark-submit classloader issue...

2015-09-28 Thread Aniket Bhatnagar
Hi Rachna

Can you just use http client provided via spark transitive dependencies
instead of excluding them?

The reason user classpath first is failing could be because you have spark
artifacts in your assembly jar that dont match with what is deployed
(version mismatch or you built the version yourself, etc)

Thanks,
Aniket

On Tue, Sep 29, 2015, 7:31 AM Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Hello all,
>
> *Goal:*  I want to use APIs from HttpClient library 4.4.1.  I am using
> maven shaded plugin to generate JAR.
>
>
>
> *Findings:* When I run my program as a *java application within eclipse
> everything works fine*.  But when I am running the program using
> *spark-submit* I am getting following error:
>
> URL content Could not initialize class
> org.apache.http.conn.ssl.SSLConnectionSocketFactory
>
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.http.conn.ssl.SSLConnectionSocketFactory
>
>
>
> When I tried to get the referred JAR it is pointing to some Hadoop JAR,  I
> am assuming this is something set in spark-submit.
>
>
>
> ClassLoader classLoader = HttpEndPointClient.class.getClassLoader();
>
> URL resource =
> classLoader.getResource("org/apache/http/message/BasicLineFormatter.class");
>
> Prints following jar:
>
>
> jar:file:/usr/lib/hadoop/lib/httpcore-4.2.5.jar!/org/apache/http/message/BasicLineFormatter.class
>
>
>
> After research I found that I can override *--conf
> spark.files.userClassPathFirst=true --conf
> spark.yarn.user.classpath.first=true*
>
>
>
> But when I do that I am getting following error:
>
> ERROR: org.apache.spark.executor.Executor - Exception in task 0.0 in stage
> 0.0 (TID 0)
>
> java.io.InvalidClassException: org.apache.spark.scheduler.Task; local
> class incompatible: stream classdesc serialVersionUID =
> -4703555755588060120, local class serialVersionUID = -1589734467697262504
>
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
>
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
>
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>
> at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
>
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> I am running on CDH 5.4  Here is my complete pom file.
>
>
>
> http://maven.apache.org/POM/4.0.0; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance;
>
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd
> 
> ">
>
> 4.0.0
>
> test
>
> test
>
> 0.0.1-SNAPSHOT
>
> 
>
> 
>
>
> org.apache.httpcomponents
>
>
> httpcore
>
> 4.4.1
>
> 
>
> 
>
>
> org.apache.httpcomponents
>
>
> httpclient
>
> 4.4.1
>
> 
>
> 
>
>
> org.apache.spark
>
>
> spark-streaming-kafka_2.10
>
> 1.5.0
>
> 
>
> 
>
>  httpcore
>
>   org.apache.httpcomponents
>
> 
>
> 
>
> 
>
> 
>
>
> org.apache.spark
>
>
> spark-streaming_2.10
>
> 1.5.0
>
> 
>
> 
>
>  httpcore
>
>   org.apache.httpcomponents
>
> 
>
> 
>
> 
>
> 
>
>
> org.apache.spark
>
>
> spark-core_2.10
>
> 1.5.0
>
> 
>
> 
>
>  httpcore
>
>   

Re: word count (group by users) in spark

2015-09-21 Thread Aniket Bhatnagar
Unless I am mistaken, in a group by operation, it spills to disk in case
values for a key don't fit in memory.

Thanks,
Aniket

On Mon, Sep 21, 2015 at 10:43 AM Huy Banh <huy.b...@gmail.com> wrote:

> Hi,
>
> If your input format is user -> comment, then you could:
>
> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
> four three")))
> val wordCounts = comments.
>flatMap({case (user, comment) =>
> for (word <- comment.split(" ")) yield(((user, word), 1)) }).
>reduceByKey(_ + _)
>
> val output = wordCounts.
>map({case ((user, word), count) => (user, (word, count))}).
>groupByKey()
>
> By Aniket, if we group by user first, it could run out of memory when
> spark tries to put all words in a single sequence, couldn't it?
>
> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Using scala API, you can first group by user and then use combineByKey.
>>
>> Thanks,
>> Aniket
>>
>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi All,
>>> I would like to achieve this below output using spark , I managed to
>>> write
>>> in Hive and call it in spark but not in just spark (scala), how to group
>>> word counts on particular user (column) for example.
>>> Imagine users and their given tweets I want to do word count based on
>>> user
>>> name.
>>>
>>> Input:-
>>> kaliA,B,A,B,B
>>> james B,A,A,A,B
>>>
>>> Output:-
>>> kali A [Count] B [Count]
>>> James A [Count] B [Count]
>>>
>>> My Hive Answer:-
>>> CREATE EXTERNAL TABLE  TEST
>>> (
>>>  user_name string   ,
>>>  COMMENTS  STRING
>>>
>>> )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
>>> LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
>>> create a text file with data mentioned in the email)
>>>
>>> use default;select user_name,COLLECT_SET(text) from (select
>>> user_name,concat(sub,' ',count(comments)) as text  from test LATERAL VIEW
>>> explode(split(comments,',')) subView AS sub group by user_name,sub)w
>>> group
>>> by user_name;
>>>
>>> Spark With Hive:-
>>> package com.examples
>>>
>>> /**
>>>  * Created by kalit_000 on 17/09/2015.
>>>  */
>>> import org.apache.log4j.Logger
>>> import org.apache.log4j.Level
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.{SparkContext, SparkConf}
>>> import org.apache.spark.SparkContext._
>>>
>>>
>>> object HiveWordCount {
>>>
>>>   def main(args: Array[String]): Unit =
>>>   {
>>> Logger.getLogger("org").setLevel(Level.WARN)
>>> Logger.getLogger("akka").setLevel(Level.WARN)
>>>
>>> val conf = new
>>>
>>> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
>>> "1g")
>>> val sc = new SparkContext(conf)
>>> val sqlContext= new SQLContext(sc)
>>>
>>> val hc=new HiveContext(sc)
>>>
>>> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
>>> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
>>> STORED AS TEXTFILE LOCATION '/data/kali/test' ")
>>>
>>> val op=hc.sql("select user_name,COLLECT_SET(text) from (select
>>> user_name,concat(sub,' ',count(comments)) as text  from default.test
>>> LATERAL
>>> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
>>> group by user_name")
>>>
>>> op.collect.foreach(println)
>>>
>>>
>>>   }
>>>
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>


Re: question building spark in a virtual machine

2015-09-19 Thread Aniket Bhatnagar
Hi Eval

Can you check if your Ubuntu VM has enough RAM allocated to run JVM of size
3gb?

thanks,
Aniket

On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler 
wrote:

> Hi,
>
> I had configured the MAVEN_OPTS environment variable the same as you wrote.
> My java version is 1.7.0_75.
> I didn't customized the JVM heap size specifically. Is there an additional
> configuration I have to run besides the MAVEN_OPTS configutaion?
>
> Thanks,
> Eyal
>
> On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu  wrote:
>
>> Can you tell us how you configured the JVM heap size ?
>> Which version of Java are you using ?
>>
>> When I build Spark, I do the following:
>>
>> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
>> -XX:ReservedCodeCacheSize=512m"
>>
>> Cheers
>>
>> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler > > wrote:
>>
>>> Hi,
>>> Trying to build spark in my ubuntu virtual machine, I am getting the
>>> following error:
>>>
>>> "Error occurred during initialization of VM
>>> Could not reserve enough space for object heap
>>> Error: could not create the Java Virtual Machine.
>>> Error: A fatal exception has occurred. Program will exit".
>>>
>>> I have configured the JVM heap size correctly.
>>>
>>> How can I fix it?
>>>
>>> Thanks,
>>> Eyal
>>>
>>
>>
>


Re: word count (group by users) in spark

2015-09-19 Thread Aniket Bhatnagar
Using scala API, you can first group by user and then use combineByKey.

Thanks,
Aniket

On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com 
wrote:

> Hi All,
> I would like to achieve this below output using spark , I managed to write
> in Hive and call it in spark but not in just spark (scala), how to group
> word counts on particular user (column) for example.
> Imagine users and their given tweets I want to do word count based on user
> name.
>
> Input:-
> kaliA,B,A,B,B
> james B,A,A,A,B
>
> Output:-
> kali A [Count] B [Count]
> James A [Count] B [Count]
>
> My Hive Answer:-
> CREATE EXTERNAL TABLE  TEST
> (
>  user_name string   ,
>  COMMENTS  STRING
>
> )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
> LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
> create a text file with data mentioned in the email)
>
> use default;select user_name,COLLECT_SET(text) from (select
> user_name,concat(sub,' ',count(comments)) as text  from test LATERAL VIEW
> explode(split(comments,',')) subView AS sub group by user_name,sub)w group
> by user_name;
>
> Spark With Hive:-
> package com.examples
>
> /**
>  * Created by kalit_000 on 17/09/2015.
>  */
> import org.apache.log4j.Logger
> import org.apache.log4j.Level
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.SparkContext._
>
>
> object HiveWordCount {
>
>   def main(args: Array[String]): Unit =
>   {
> Logger.getLogger("org").setLevel(Level.WARN)
> Logger.getLogger("akka").setLevel(Level.WARN)
>
> val conf = new
>
> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
> "1g")
> val sc = new SparkContext(conf)
> val sqlContext= new SQLContext(sc)
>
> val hc=new HiveContext(sc)
>
> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
> STORED AS TEXTFILE LOCATION '/data/kali/test' ")
>
> val op=hc.sql("select user_name,COLLECT_SET(text) from (select
> user_name,concat(sub,' ',count(comments)) as text  from default.test
> LATERAL
> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
> group by user_name")
>
> op.collect.foreach(println)
>
>
>   }
>
>
>
>
> Thanks
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi

2015-09-18 Thread Aniket Bhatnagar
I don't think yarn-cluster mode is currently supported. You may want to ask
zeppelin community for confirmation though.

On Fri, Sep 18, 2015, 5:41 PM shahab <shahab.mok...@gmail.com> wrote:

> It works using yarn-client but I want to make it running on cluster. Is
> there any way to do so?
>
> best,
> /Shahab
>
> On Fri, Sep 18, 2015 at 12:54 PM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Can you try yarn-client mode?
>>
>> On Fri, Sep 18, 2015, 3:38 PM shahab <shahab.mok...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Probably I have wrong zeppelin  configuration, because I get the
>>> following error when I execute spark statements in Zeppelin:
>>>
>>> org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
>>> running on a cluster. Deployment to YARN is not supported directly by
>>> SparkContext. Please use spark-submit.
>>>
>>>
>>> Anyone knows What's the solution to this?
>>>
>>> best,
>>> /Shahab
>>>
>>
>


Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi

2015-09-18 Thread Aniket Bhatnagar
Can you try yarn-client mode?

On Fri, Sep 18, 2015, 3:38 PM shahab  wrote:

> Hi,
>
> Probably I have wrong zeppelin  configuration, because I get the following
> error when I execute spark statements in Zeppelin:
>
> org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
> running on a cluster. Deployment to YARN is not supported directly by
> SparkContext. Please use spark-submit.
>
>
> Anyone knows What's the solution to this?
>
> best,
> /Shahab
>


Re: Checkpointing with Kinesis

2015-09-17 Thread Aniket Bhatnagar
You can perhaps setup a WAL that logs to S3? New cluster should pick the
records that weren't processed due previous cluster termination.

Thanks,
Aniket

On Thu, Sep 17, 2015, 9:19 PM Alan Dipert  wrote:

> Hello,
> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
> picked up by a Lambda function that loads them into Redshift.  That no data
> is lost during processing is important to us.
>
> We have set our Kinesis checkpoint interval to 15 minutes, which is also
> our window size.
>
> Unfortunately, checkpointing happens after receiving data from Kinesis,
> not after we have successfully written to S3.  If batches back up in Spark,
> and the cluster is terminated, whatever data was in-memory will be lost
> because it was checkpointed but not actually saved to S3.
>
> We are considering forking and modifying the kinesis-asl library with
> changes that would allow us to perform the checkpoint manually and at the
> right time.  We'd rather not do this.
>
> Are we overlooking an easier way to deal with this problem?  Thank you in
> advance for your insight!
>
> Alan
>


Re: java.lang.NoSuchMethodError and yarn-client mode

2015-09-09 Thread Aniket Bhatnagar
Hi Tom

There has to be a difference in classpaths in yarn-client and yarn-cluster
mode. Perhaps a good starting point would be to print classpath as a first
thing in SimpleApp.main. It should give clues around why it works in
yarn-cluster mode.

Thanks,
Aniket

On Wed, Sep 9, 2015, 2:11 PM Tom Seddon  wrote:

> Hi,
>
> I have a problem trying to get a fairly simple app working which makes use
> of native avro libraries.  The app runs fine on my local machine and in
> yarn-cluster mode, but when I try to run it on EMR yarn-client mode I get
> the error below.  I'm aware this is a version problem, as EMR runs an
> earlier version of avro, and I am trying to use avro-1.7.7.
>
> What's confusing me a great deal is the fact that this runs fine in
> yarn-cluster mode.
>
> What is it about yarn-cluster mode that means the application has access
> to the correct version of the avro library?  I need to run in yarn-client
> mode as I will be caching data to the driver machine in between batches.  I
> think in yarn-cluster mode the driver can run on any machine in the cluster
> so this would not work.
>
> Grateful for any advice as I'm really stuck on this.  AWS support are
> trying but they don't seem to know why this is happening either!
>
> Just to note, I'm aware of Databricks spark-avro project and have used
> it.  This is an investigation to see if I can use RDDs instead of
> dataframes.
>
> java.lang.NoSuchMethodError:
> org.apache.avro.Schema$Parser.parse(Ljava/lang/String;[Ljava/lang/String;)Lorg/apache/avro/Schema;
> at ophan.thrift.event.Event.(Event.java:10)
> at SimpleApp$.main(SimpleApp.scala:25)
> at SimpleApp.main(SimpleApp.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
>
> Tom
>
>
>


Re: [Streaming] Non-blocking recommendation in custom receiver documentation and KinesisReceiver's worker.run blocking calll

2015-05-23 Thread Aniket Bhatnagar
Hi TD

Unfortunately, I am off for a week so I won't be able to test this until
next week. Will keep you posted.

Aniket

On Sat, May 23, 2015, 6:16 AM Tathagata Das t...@databricks.com wrote:

 Hey Aniket, I just checked in the fix in Spark master and branch-1.4.
 Could you download Spark and test it out?



 On Thu, May 21, 2015 at 1:43 AM, Tathagata Das t...@databricks.com
 wrote:

 Thanks for the JIRA. I will look into this issue.

 TD

 On Thu, May 21, 2015 at 1:31 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I ran into one of the issues that are potentially caused because of this
 and have logged a JIRA bug -
 https://issues.apache.org/jira/browse/SPARK-7788

 Thanks,
 Aniket

 On Wed, Sep 24, 2014 at 12:59 PM Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi all

 Reading through Spark streaming's custom receiver documentation, it is
 recommended that onStart and onStop methods should not block indefinitely.
 However, looking at the source code of KinesisReceiver, the onStart method
 calls worker.run that blocks until worker is shutdown (via a call to
 onStop).

 So, my question is what are the ramifications of making a blocking call
 in onStart and whether this is something that should be addressed
 in KinesisReceiver implementation.

 Thanks,
 Aniket






OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am aggregating a dataset using combineByKey method and for a certain
input size, the job fails with the following error. I have enabled head
dumps to better analyze the issue and will report back if I have any
findings. Meanwhile, if you guys have any idea of what could possibly
result in this error or how to better debug this, please let me know.

java.lang.OutOfMemoryError: Java heap space
at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
at
org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at
org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)


Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am setting spark.executor.memory as 1024m on a 3 node cluster with each
node having 4 cores and 7 GB RAM. The combiner functions are taking scala
case classes as input and are generating mutable.ListBuffer of scala case
classes. Therefore, I am guessing hashCode and equals should be taken care
of.

Thanks,
Aniket

On Wed, Apr 15, 2015 at 1:00 PM Xianjin YE advance...@gmail.com wrote:

 what is your JVM heap size settings?  The OOM in SIzeEstimator is caused
 by a lot of entry in IdentifyHashMap.
 A quick guess is that the object in your dataset is a custom class and you
 didn't implement the hashCode and equals method correctly.



 On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote:

  I am aggregating a dataset using combineByKey method and for a certain
 input size, the job fails with the following error. I have enabled head
 dumps to better analyze the issue and will report back if I have any
 findings. Meanwhile, if you guys have any idea of what could possibly
 result in this error or how to better debug this, please let me know.
 
  java.lang.OutOfMemoryError: Java heap space
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
  at
 org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
  at
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
  at
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at
 org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
  at
 org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
  at
 org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at
 org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
  at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
  at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
  at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
  at
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
  at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)






Saprk 1.2.0 | Spark job fails with MetadataFetchFailedException

2015-03-19 Thread Aniket Bhatnagar
I have a job that sorts data and runs a combineByKey operation and it
sometimes fails with the following error. The job is running on spark 1.2.0
cluster with yarn-client deployment mode. Any clues on how to debug the
error?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Aniket Bhatnagar
This is tricky to debug. Check logs of node and resource manager of YARN to
see if you can trace the error. In the past I have to closely look at
arguments getting passed to YARN container (they get logged before
attempting to launch containers). If I still don't get a clue, I had to
check the script generated by YARN to execute the container and even run
manually to trace at what line the error has occurred.

BTW are you submitting the job from windows?

On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as well?
 No strange log message during startup, and can't see any other log messages
 since no executer gets launched. Does not seems to work in yarn-client mode
 either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining
 the executors page in the web ui, only the driver is listed, no
 executors are ever received, and the driver keep waiting forever. Has
 anyone seemed similar problems?

 Thanks for any insights,
 Anders






Re: Programmatic Spark 1.2.0 on EMR | S3 filesystem is not working when using

2015-02-02 Thread Aniket Bhatnagar
Alright.. I found the issue. I wasn't setting fs.s3.buffer.dir
configuration. Here is the final spark conf snippet that works:


spark.hadoop.fs.s3n.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem,
spark.hadoop.fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem,
spark.hadoop.fs.s3bfs.impl: org.apache.hadoop.fs.s3.S3FileSystem,
spark.hadoop.fs.s3.buffer.dir:
/mnt/var/lib/hadoop/s3,/mnt1/var/lib/hadoop/s3,
spark.hadoop.fs.s3n.endpoint: s3.amazonaws.com,
spark.hadoop.fs.emr.configuration.version: 1.0,
spark.hadoop.fs.s3n.multipart.uploads.enabled: true,
spark.hadoop.fs.s3.enableServerSideEncryption: false,
spark.hadoop.fs.s3.serverSideEncryptionAlgorithm: AES256,
spark.hadoop.fs.s3.consistent: true,
spark.hadoop.fs.s3.consistent.retryPolicyType: exponential,
spark.hadoop.fs.s3.consistent.retryPeriodSeconds: 10,
spark.hadoop.fs.s3.consistent.retryCount: 5,
spark.hadoop.fs.s3.maxRetries: 4,
spark.hadoop.fs.s3.sleepTimeSeconds: 10,
spark.hadoop.fs.s3.consistent.throwExceptionOnInconsistency: true,
spark.hadoop.fs.s3.consistent.metadata.autoCreate: true,
spark.hadoop.fs.s3.consistent.metadata.tableName: EmrFSMetadata,
spark.hadoop.fs.s3.consistent.metadata.read.capacity: 500,
spark.hadoop.fs.s3.consistent.metadata.write.capacity: 100,
spark.hadoop.fs.s3.consistent.fastList: true,
spark.hadoop.fs.s3.consistent.fastList.prefetchMetadata: false,
spark.hadoop.fs.s3.consistent.notification.CloudWatch: false,
spark.hadoop.fs.s3.consistent.notification.SQS: false

Thanks,
Aniket


On Fri Jan 30 2015 at 23:29:25 Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 Right. Which makes me to believe that the directory is perhaps configured
 somewhere and i have missed configuring the same. The process that is
 submitting jobs (basically becomes driver) is running in sudo mode and the
 executors are executed by YARN. The hadoop username is configured as
 'hadoop' (default user in EMR).

 On Fri, Jan 30, 2015, 11:25 PM Sven Krasser kras...@gmail.com wrote:

 From your stacktrace it appears that the S3 writer tries to write the
 data to a temp file on the local file system first. Taking a guess, that
 local directory doesn't exist or you don't have permissions for it.
 -Sven

 On Fri, Jan 30, 2015 at 6:44 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am programmatically submit spark jobs in yarn-client mode on EMR.
 Whenever a job tries to save file to s3, it gives the below mentioned
 exception. I think the issue might be what EMR is not setup properly as I
 have to set all hadoop configurations manually in SparkContext. However, I
 am not sure which configuration am I missing (if any).

 Configurations that I am using in SparkContext to setup EMRFS:
 spark.hadoop.fs.s3n.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem,
 spark.hadoop.fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem,
 spark.hadoop.fs.emr.configuration.version: 1.0,
 spark.hadoop.fs.s3n.multipart.uploads.enabled: true,
 spark.hadoop.fs.s3.enableServerSideEncryption: false,
 spark.hadoop.fs.s3.serverSideEncryptionAlgorithm: AES256,
 spark.hadoop.fs.s3.consistent: true,
 spark.hadoop.fs.s3.consistent.retryPolicyType: exponential,
 spark.hadoop.fs.s3.consistent.retryPeriodSeconds: 10,
 spark.hadoop.fs.s3.consistent.retryCount: 5,
 spark.hadoop.fs.s3.maxRetries: 4,
 spark.hadoop.fs.s3.sleepTimeSeconds: 10,
 spark.hadoop.fs.s3.consistent.throwExceptionOnInconsistency: true,
 spark.hadoop.fs.s3.consistent.metadata.autoCreate: true,
 spark.hadoop.fs.s3.consistent.metadata.tableName: EmrFSMetadata,
 spark.hadoop.fs.s3.consistent.metadata.read.capacity: 500,
 spark.hadoop.fs.s3.consistent.metadata.write.capacity: 100,
 spark.hadoop.fs.s3.consistent.fastList: true,
 spark.hadoop.fs.s3.consistent.fastList.prefetchMetadata: false,
 spark.hadoop.fs.s3.consistent.notification.CloudWatch: false,
 spark.hadoop.fs.s3.consistent.notification.SQS: false,

 Exception:
 java.io.IOException: No such file or directory
 at java.io.UnixFileSystem.createFileExclusively(Native Method)
 at java.io.File.createNewFile(File.java:1006)
 at java.io.File.createTempFile(File.java:1989)
 at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.startNewTempFile(
 S3FSOutputStream.java:269)
 at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.writeInternal(
 S3FSOutputStream.java:205)
 at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.flush(
 S3FSOutputStream.java:136)
 at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(
 S3FSOutputStream.java:156)
 at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(
 FSDataOutputStream.java:72)
 at org.apache.hadoop.fs.FSDataOutputStream.close(
 FSDataOutputStream.java:105)
 at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(
 TextOutputFormat.java:109)
 at org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.close(
 MultipleOutputFormat.java:116)
 at org.apache.spark.SparkHadoopWriter.close(SparkHadoopWriter.scala:102)
 at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.
 apply(PairRDDFunctions.scala:1068

Re: Programmatic Spark 1.2.0 on EMR | S3 filesystem is not working when using

2015-01-30 Thread Aniket Bhatnagar
Right. Which makes me to believe that the directory is perhaps configured
somewhere and i have missed configuring the same. The process that is
submitting jobs (basically becomes driver) is running in sudo mode and the
executors are executed by YARN. The hadoop username is configured as
'hadoop' (default user in EMR).

On Fri, Jan 30, 2015, 11:25 PM Sven Krasser kras...@gmail.com wrote:

 From your stacktrace it appears that the S3 writer tries to write the data
 to a temp file on the local file system first. Taking a guess, that local
 directory doesn't exist or you don't have permissions for it.
 -Sven

 On Fri, Jan 30, 2015 at 6:44 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am programmatically submit spark jobs in yarn-client mode on EMR.
 Whenever a job tries to save file to s3, it gives the below mentioned
 exception. I think the issue might be what EMR is not setup properly as I
 have to set all hadoop configurations manually in SparkContext. However, I
 am not sure which configuration am I missing (if any).

 Configurations that I am using in SparkContext to setup EMRFS:
 spark.hadoop.fs.s3n.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem,
 spark.hadoop.fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem,
 spark.hadoop.fs.emr.configuration.version: 1.0,
 spark.hadoop.fs.s3n.multipart.uploads.enabled: true,
 spark.hadoop.fs.s3.enableServerSideEncryption: false,
 spark.hadoop.fs.s3.serverSideEncryptionAlgorithm: AES256,
 spark.hadoop.fs.s3.consistent: true,
 spark.hadoop.fs.s3.consistent.retryPolicyType: exponential,
 spark.hadoop.fs.s3.consistent.retryPeriodSeconds: 10,
 spark.hadoop.fs.s3.consistent.retryCount: 5,
 spark.hadoop.fs.s3.maxRetries: 4,
 spark.hadoop.fs.s3.sleepTimeSeconds: 10,
 spark.hadoop.fs.s3.consistent.throwExceptionOnInconsistency: true,
 spark.hadoop.fs.s3.consistent.metadata.autoCreate: true,
 spark.hadoop.fs.s3.consistent.metadata.tableName: EmrFSMetadata,
 spark.hadoop.fs.s3.consistent.metadata.read.capacity: 500,
 spark.hadoop.fs.s3.consistent.metadata.write.capacity: 100,
 spark.hadoop.fs.s3.consistent.fastList: true,
 spark.hadoop.fs.s3.consistent.fastList.prefetchMetadata: false,
 spark.hadoop.fs.s3.consistent.notification.CloudWatch: false,
 spark.hadoop.fs.s3.consistent.notification.SQS: false,

 Exception:
 java.io.IOException: No such file or directory
 at java.io.UnixFileSystem.createFileExclusively(Native Method)
 at java.io.File.createNewFile(File.java:1006)
 at java.io.File.createTempFile(File.java:1989)
 at
 com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.startNewTempFile(S3FSOutputStream.java:269)
 at
 com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.writeInternal(S3FSOutputStream.java:205)
 at
 com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.flush(S3FSOutputStream.java:136)
 at
 com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:156)
 at
 org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 at
 org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:105)
 at
 org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:109)
 at
 org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.close(MultipleOutputFormat.java:116)
 at org.apache.spark.SparkHadoopWriter.close(SparkHadoopWriter.scala:102)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1047)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Hints? Suggestions?




 --
 http://sites.google.com/site/krasser/?utm_source=sig



Re: saving rdd to multiple files named by the key

2015-01-26 Thread Aniket Bhatnagar
This might be helpful:
http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

On Tue Jan 27 2015 at 07:45:18 Sharon Rapoport sha...@plaid.com wrote:

 Hi,

 I have an rdd of [k,v] pairs. I want to save each [v] to a file named [k].
 I got them by combining many [k,v] by [k]. I could then save to file by
 partitions, but that still doesn't allow me to choose the name, and leaves
 me stuck with foo/part-...

 Any tips?

 Thanks,
 Sharon



Re: ClosureCleaner should use ClassLoader created by SparkContext

2015-01-21 Thread Aniket Bhatnagar
Here is the stack trace for reference. Notice that this happens in when the
job spawns a new thread.

java.lang.ClassNotFoundException: com.myclass$$anonfun$8$$anonfun$9
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
~[na:1.7.0_71]
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
~[na:1.7.0_71]
at java.security.AccessController.doPrivileged(Native Method)
~[na:1.7.0_71]
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
~[na:1.7.0_71]
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
~[na:1.7.0_71]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
~[na:1.7.0_71]
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
~[na:1.7.0_71]
at java.lang.Class.forName0(Native Method) ~[na:1.7.0_71]
at java.lang.Class.forName(Class.java:274) ~[na:1.7.0_71]
at
org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:260)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source) ~[com.esotericsoftware.reflectasm.reflectasm-1.07-shaded.jar:na]
at
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source) ~[com.esotericsoftware.reflectasm.reflectasm-1.07-shaded.jar:na]
at
org.apache.spark.util.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:87)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:107)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at org.apache.spark.rdd.RDD.map(RDD.scala:271)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at com.myclass.com$myclass$$load(myclass.scala:375) ~[na:na]
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
~[org.scala-lang.scala-library-2.11.5.jar:na]
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
~[org.scala-lang.scala-library-2.11.5.jar:na]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]


On Wed Jan 21 2015 at 17:34:34 Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 While implementing a spark server, I realized that Thread's context loader
 must be set to any dynamically loaded classloader so that ClosureCleaner
 can do it's thing. Should the ClosureCleaner not use classloader created by
 SparkContext (that has all dynamically added jars via SparkContext.addJar)
 instead of using Thread.currentThread.getContextClassLoader while looking
 up class in InnerClosureFinder?

 Thanks,
 Aniket



ClosureCleaner should use ClassLoader created by SparkContext

2015-01-21 Thread Aniket Bhatnagar
While implementing a spark server, I realized that Thread's context loader
must be set to any dynamically loaded classloader so that ClosureCleaner
can do it's thing. Should the ClosureCleaner not use classloader created by
SparkContext (that has all dynamically added jars via SparkContext.addJar)
instead of using Thread.currentThread.getContextClassLoader while looking
up class in InnerClosureFinder?

Thanks,
Aniket


Re: How to output to S3 and keep the order

2015-01-19 Thread Aniket Bhatnagar
When you repartiton, ordering can get lost. You would need to sort after
repartitioning.

Aniket

On Tue, Jan 20, 2015, 7:08 AM anny9699 anny9...@gmail.com wrote:

 Hi,

 I am using Spark on AWS and want to write the output to S3. It is a
 relatively small file and I don't want them to output as multiple parts. So
 I use

 result.repartition(1).saveAsTextFile(s3://...)

 However as long as I am using the saveAsTextFile method, the output doesn't
 keep the original order. But if I use BufferedWriter in Java to write the
 output, I could only write to the master machine instead of S3 directly. Is
 there a way that I could write to S3 and the same time keep the order?

 Thanks a lot!
 Anny



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/How-to-output-to-S3-and-keep-the-order-tp21246.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: kinesis multiple records adding into stream

2015-01-16 Thread Aniket Bhatnagar
Sorry. I couldn't understand the issue. Are you trying to send data to
kinesis from a spark batch/real time job?

- Aniket

On Fri, Jan 16, 2015, 9:40 PM Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 Hi Experts!

 I am using kinesis dependency as follow
 groupId = org.apache.spark
  artifactId = spark-streaming-kinesis-asl_2.10
  version = 1.2.0

 in this aws sdk version 1.8.3 is being used. in this sdk multiple records
 can not be put in a single request. is it possible to put multiple records
 in a single request ?


 thanks



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/kinesis-multiple-records-adding-into-
 stream-tp21191.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: kinesis creating stream scala code exception

2015-01-15 Thread Aniket Bhatnagar
Are you using spark in standalone mode or yarn or mesos? If its yarn,
please mention the hadoop distribution and version. What spark distribution
are  you using (it seems 1.2.0 but compiled with which hadoop version)?

Thanks, Aniket

On Thu, Jan 15, 2015, 4:59 PM Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 Hi, Expert I want to consumes data from kinesis stream using spark
 streaming.
 I am trying to  create kinesis stream using scala code. Here is my code

 def main(args: Array[String]) {
 println(Stream creation started)
 if(create(2))
 println(Stream is created successfully)
 }
 def create(shardCount: Int): Boolean = {
 val credentials = new
 BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID,
 KinesisProperties.AWS_SECRET_KEY)

 var kinesisClient: AmazonKinesisClient = new
 AmazonKinesisClient(credentials)
 kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL,
 KinesisProperties.KINESIS_SERVICE_NAME,
 KinesisProperties.KINESIS_REGION_ID)
 val createStreamRequest = new CreateStreamRequest()
 createStreamRequest.setStreamName(KinesisProperties.myStreamName);
 createStreamRequest.setShardCount(shardCount)
 val describeStreamRequest = new DescribeStreamRequest()
 describeStreamRequest.setStreamName(KinesisProperties.
 myStreamName)
 try {
 Thread.sleep(12)
 } catch {
 case e: Exception =
 }
 var streamStatus = not active
 while (!streamStatus.equalsIgnoreCase(ACTIVE)) {
 try {
 Thread.sleep(1000)
 } catch {
 case e: Exception = e.printStackTrace()
 }
 try {
 val describeStreamResponse =
 kinesisClient.describeStream(describeStreamRequest)
 streamStatus =
 describeStreamResponse.getStreamDescription.getStreamStatus
 } catch {
 case e: Exception = e.printStackTrace()
 }
 }
 if (streamStatus.equalsIgnoreCase(ACTIVE))
 true
 else
 false
 }


 When I run this code I get following exception

 Exception in thread main java.lang.NoSuchMethodError:
 org.joda.time.format.DateTimeFormatter.withZoneUTC()Lorg/joda/time/format/
 DateTimeFormatter;
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(
 NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
 DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)
 at
 com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
 at
 com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(
 SignerFactory.java:105)
 at com.amazonaws.auth.SignerFactory.getSigner(
 SignerFactory.java:78)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(
 AmazonWebServiceClient.java:307)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByURI(
 AmazonWebServiceClient.java:280)
 at
 com.amazonaws.AmazonWebServiceClient.setEndpoint(
 AmazonWebServiceClient.java:160)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(
 AmazonKinesisClient.java:2102)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.
 init(AmazonKinesisClient.java:216)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.
 init(AmazonKinesisClient.java:139)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.
 init(AmazonKinesisClient.java:116)
 at
 com.platalytics.platform.connectors.Kinesis.App$.create(App.scala:32)
 at
 com.platalytics.platform.connectors.Kinesis.App$.main(App.scala:26)
 at com.platalytics.platform.connectors.Kinesis.App.main(App.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
 57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(
 DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)



 I have following maven dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-streaming-kinesis-asl_2.10/artifactId
 version1.2.0/version
 /dependency


 Any suggestion?



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/kinesis-creating-stream-scala-code-
 exception-tp21154.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: 

Re: Inserting an element in RDD[String]

2015-01-15 Thread Aniket Bhatnagar
Sure there is. Create a new RDD just containing the schema line (hint: use
sc.parallelize) and then union both the RDDs (the header RDD and data RDD)
to get a final desired RDD.

On Thu Jan 15 2015 at 19:48:52 Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 hi experts!

 I hav an RDD[String] and i want to add schema line at beginning in this
 rdd.
 I know RDD is immutable. So is there anyway to have a new rdd with one
 schema line and contents of previous rdd?


 Thanks



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Issue with Parquet on Spark 1.2 and Amazon EMR

2015-01-12 Thread Aniket Bhatnagar
Meanwhile, I have submitted a pull request (
https://github.com/awslabs/emr-bootstrap-actions/pull/37) that allows users
to place their jars ahead of all other jars in spark classpath. This should
serve as a temporary workaround for all class conflicts.

Thanks,
Aniket

On Mon Jan 05 2015 at 22:13:47 Kelly, Jonathan jonat...@amazon.com wrote:

   I've noticed the same thing recently and will contact the appropriate
 owner soon.  (I work for Amazon, so I'll go through internal channels and
 report back to this list.)

  In the meantime, I've found that editing spark-env.sh and putting the
 Spark assembly first in the classpath fixes the issue.  I expect that the
 version of Parquet that's being included in the EMR libs just needs to be
 upgraded.


  ~ Jonathan Kelly

   From: Aniket Bhatnagar aniket.bhatna...@gmail.com
 Date: Sunday, January 4, 2015 at 10:51 PM
 To: Adam Gilmore dragoncu...@gmail.com, user@spark.apache.org 
 user@spark.apache.org
 Subject: Re: Issue with Parquet on Spark 1.2 and Amazon EMR

   Can you confirm your emr version? Could it be because of the classpath
 entries for emrfs? You might face issues with using S3 without them.

 Thanks,
 Aniket

 On Mon, Jan 5, 2015, 11:16 AM Adam Gilmore dragoncu...@gmail.com wrote:

  Just an update on this - I found that the script by Amazon was the
 culprit - not exactly sure why.  When I installed Spark manually onto the
 EMR (and did the manual configuration of all the EMR stuff), it worked fine.

 On Mon, Dec 22, 2014 at 11:37 AM, Adam Gilmore dragoncu...@gmail.com
 wrote:

  Hi all,

  I've just launched a new Amazon EMR cluster and used the script at:

  s3://support.elasticmapreduce/spark/install-spark

  to install Spark (this script was upgraded to support 1.2).

  I know there are tools to launch a Spark cluster in EC2, but I want to
 use EMR.

  Everything installs fine; however, when I go to read from a Parquet
 file, I end up with (the main exception):

  Caused by: java.lang.NoSuchMethodError:
 parquet.hadoop.ParquetInputSplit.init(Lorg/apache/hadoop/fs/Path;JJJ[Ljava/lang/String;[JLjava/lang/String;Ljava/util/Map;)V
 at
 parquet.hadoop.TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(ParquetInputFormat.java:578)
 ... 55 more

  It seems to me like a version mismatch somewhere.  Where is the
 parquet-hadoop jar coming from?  Is it built into a fat jar for Spark?

  Any help would be appreciated.  Note that 1.1.1 worked fine with
 Parquet files.





Re: spark-network-yarn 2.11 depends on spark-network-shuffle 2.10

2015-01-08 Thread Aniket Bhatnagar
Actually it does causes builds with SBT 0.13.7 to fail with the error
Conflicting cross-version suffixes. I have raised a defect SPARK-5143 for
this.

On Wed Jan 07 2015 at 23:44:21 Marcelo Vanzin van...@cloudera.com wrote:

 This particular case shouldn't cause problems since both of those
 libraries are java-only (the scala version appended there is just for
 helping the build scripts).

 But it does look weird, so it would be nice to fix it.

 On Wed, Jan 7, 2015 at 12:25 AM, Aniket Bhatnagar
 aniket.bhatna...@gmail.com wrote:
  It seems that spark-network-yarn compiled for scala 2.11 depends on
  spark-network-shuffle compiled for scala 2.10. This causes cross version
  dependencies conflicts in sbt. Seems like a publishing error?
 
  http://www.uploady.com/#!/download/6Yn95UZA0DR/3taAJFjCJjrsSXOR



 --
 Marcelo



spark-network-yarn 2.11 depends on spark-network-shuffle 2.10

2015-01-07 Thread Aniket Bhatnagar
It seems that spark-network-yarn compiled for scala 2.11 depends on
spark-network-shuffle compiled for scala 2.10. This causes cross version
dependencies conflicts in sbt. Seems like a publishing error?

http://www.uploady.com/#!/download/6Yn95UZA0DR/3taAJFjCJjrsSXOR


Re: A spark newbie question

2015-01-04 Thread Aniket Bhatnagar
Go through spark API documentation. Basically you have to do group by
(date, message_type) and then do a count.

On Sun, Jan 4, 2015, 9:58 PM Dinesh Vallabhdas dines...@yahoo.com.invalid
wrote:

 A spark cassandra newbie question. Thanks in advance for the help.
 I have a cassandra table with 2 columns message_timestamp(timestamp) and
 message_type(text). The data is of the form

 2014-06-25 12:01:39 START
 2014-06-25 12:02:39 START
 2014-06-25 12:02:39 PAUSE
 2014-06-25 14:02:39 STOP
 2014-06-25 15:02:39 START
 2014-06-27 12:01:39 START
 2014-06-27 11:03:39 STOP
 2014-06-27 12:03:39 REWIND
 2014-06-27 12:04:39 RESTART
 2014-06-27 12:05:39 PAUSE
 2014-06-27 13:03:39 REWIND
 2014-06-27 14:03:39 START

 I want to use spark(using java) to calculate counts of a message_type on a
 per day basis and store it back in cassandra in a new table with 3 columns (
 date,message_type,count).
 The result table should look like this

 2014-06-25 START 3
 2014-06-25 STOP 1
 2014-06-25 PAUSE 1
 2014-06-27 START 2
 2014-06-27 STOP 1
 2014-06-27 PAUSE 1
 2014-06-27 REWIND 2
 2014-06-27 RESTART 1

 I'm not proficient in scala and would like to use java.





Re: Issue with Parquet on Spark 1.2 and Amazon EMR

2015-01-04 Thread Aniket Bhatnagar
Can you confirm your emr version? Could it be because of the classpath
entries for emrfs? You might face issues with using S3 without them.

Thanks,
Aniket

On Mon, Jan 5, 2015, 11:16 AM Adam Gilmore dragoncu...@gmail.com wrote:

 Just an update on this - I found that the script by Amazon was the culprit
 - not exactly sure why.  When I installed Spark manually onto the EMR (and
 did the manual configuration of all the EMR stuff), it worked fine.

 On Mon, Dec 22, 2014 at 11:37 AM, Adam Gilmore dragoncu...@gmail.com
 wrote:

 Hi all,

 I've just launched a new Amazon EMR cluster and used the script at:

 s3://support.elasticmapreduce/spark/install-spark

 to install Spark (this script was upgraded to support 1.2).

 I know there are tools to launch a Spark cluster in EC2, but I want to
 use EMR.

 Everything installs fine; however, when I go to read from a Parquet file,
 I end up with (the main exception):

 Caused by: java.lang.NoSuchMethodError:
 parquet.hadoop.ParquetInputSplit.init(Lorg/apache/hadoop/fs/Path;JJJ[Ljava/lang/String;[JLjava/lang/String;Ljava/util/Map;)V
 at
 parquet.hadoop.TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(ParquetInputFormat.java:578)
 ... 55 more

 It seems to me like a version mismatch somewhere.  Where is the
 parquet-hadoop jar coming from?  Is it built into a fat jar for Spark?

 Any help would be appreciated.  Note that 1.1.1 worked fine with Parquet
 files.





Re: sparkContext.textFile does not honour the minPartitions argument

2015-01-02 Thread Aniket Bhatnagar
Thanks everyone. I studied the source code and realized minPartitions is
passed over to Hadoop's InputFormat and its upto the InputFormat
implementation to use the parameter as a hint.

Thanks,
Aniket

On Fri, Jan 2, 2015, 7:13 AM Rishi Yadav ri...@infoobjects.com wrote:

 Hi Ankit,

 Optional number of partitions value is to increase number of partitions
 not reduce it from default value.

 On Thu, Jan 1, 2015 at 10:43 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to read a file into a single partition but it seems like
 sparkContext.textFile ignores the passed minPartitions value. I know I can
 repartition the RDD but I was curious to know if this is expected or if
 this is a bug that needs to be further investigated?





sparkContext.textFile does not honour the minPartitions argument

2015-01-01 Thread Aniket Bhatnagar
I am trying to read a file into a single partition but it seems like
sparkContext.textFile ignores the passed minPartitions value. I know I can
repartition the RDD but I was curious to know if this is expected or if
this is a bug that needs to be further investigated?


Re: Host Error on EC2 while accessing hdfs from stadalone

2014-12-30 Thread Aniket Bhatnagar
Did you check firewall rules in security groups?

On Tue, Dec 30, 2014, 9:34 PM Laeeq Ahmed laeeqsp...@yahoo.com.invalid
wrote:

 Hi,

 I am using spark standalone on EC2. I can access ephemeral hdfs from
 spark-shell interface but I can't access hdfs in standalone application. I
 am using spark 1.2.0 with hadoop 2.4.0 and launched cluster from ec2 folder
 from my local machine. In my pom file I have given hadoop client as 2.4.0.
 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-client/artifactId
 version2.4.0/version
 /dependency

 The error is as fallows:

 *java.io.IOException: Failed on local exception: java.io.EOFException;
 Host Details : local host is: ip-10-40-121-200/10.40.121.200
 http://10.40.121.200; destination host is:
 ec2-23-21-113-136.compute-1.amazonaws.com
 http://ec2-23-21-113-136.compute-1.amazonaws.com:9000;*

 Regards,
 Laeeq




Re: Spark 1.2.0 Yarn not published

2014-12-29 Thread Aniket Bhatnagar
Thanks Ted. Adding dependency to spark-network-yarn would allow resolution
to YarnShuffleService which from docs suggests that it runs on Node Manager
and perhaps isn't useful while submitting jobs programmatically. I think
what I need is a dependency to spark-yarn module so that classes like
YarnRMClient, Client, etc resolve. These are used while submitting jobs to
YARN atleast in yarn-client mode. Any particular reason why spark-yarn
client module isn't published anymore?


On Mon Dec 29 2014 at 13:00:26 Ted Yu yuzhih...@gmail.com wrote:

 See this thread:
 http://search-hadoop.com/m/JW1q5vd61V1/Spark-yarn+1.2.0subj=Re+spark+yarn_2+10+1+2+0+artifacts

 Cheers

 On Dec 28, 2014, at 11:13 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Hi all

 I just realized that spark-yarn artifact hasn't been published for 1.2.0
 release. Any particular reason for that? I was using it in my yet another
 spark-job-server project to submit jobs to a YARN cluster through
 convenient REST APIs (with some success). The job server was creating
 SparkContext with master as 'yarn-client' and hence needed to be dependent
 on spark-yarn jar.

 Thanks,
 Aniket





Re: action progress in ipython notebook?

2014-12-29 Thread Aniket Bhatnagar
Hi Josh

Is there documentation available for status API? I would like to use it.

Thanks,
Aniket

On Sun Dec 28 2014 at 02:37:32 Josh Rosen rosenvi...@gmail.com wrote:

 The console progress bars are implemented on top of a new stable status
 API that was added in Spark 1.2.  It's possible to query job progress
 using this interface (in older versions of Spark, you could implement a
 custom SparkListener and maintain the counts of completed / running /
 failed tasks / stages yourself).

 There are actually several subtleties involved in implementing job-level
 progress bars which behave in an intuitive way; there's a pretty extensive
 discussion of the challenges at https://github.com/apache/spark/pull/3009.
 Also, check out the pull request for the console progress bars for an
 interesting design discussion around how they handle parallel stages:
 https://github.com/apache/spark/pull/3029.

 I'm not sure about the plumbing that would be necessary to display live
 progress updates in the IPython notebook UI, though.  The general pattern
 would probably involve a mapping to relate notebook cells to Spark jobs
 (you can do this with job groups, I think), plus some periodic timer that
 polls the driver for the status of the current job in order to update the
 progress bar.

 For Spark 1.3, I'm working on designing a REST interface to accesses this
 type of job / stage / task progress information, as well as expanding the
 types of information exposed through the stable status API interface.

 - Josh

 On Thu, Dec 25, 2014 at 10:01 AM, Eric Friedman eric.d.fried...@gmail.com
  wrote:

 Spark 1.2.0 is SO much more usable than previous releases -- many thanks
 to the team for this release.

 A question about progress of actions.  I can see how things are
 progressing using the Spark UI.  I can also see the nice ASCII art
 animation on the spark driver console.

 Has anyone come up with a way to accomplish something similar in an
 iPython notebook using pyspark?

 Thanks
 Eric





Re: action progress in ipython notebook?

2014-12-29 Thread Aniket Bhatnagar
Thanks Josh. Looks promising. I will give it a try.

Thanks,
Aniket

On Mon, Dec 29, 2014, 9:55 PM Josh Rosen rosenvi...@gmail.com wrote:

 It's accessed through the `statusTracker` field on SparkContext.

 *Scala*:


 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkStatusTracker

 *Java*:


 https://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkStatusTracker.html

 Don't create new instances of this yourself; instead, use sc.statusTracker
 to obtain the current instance.

 This API is missing a bunch of stuff that's available in the web UI, but
 it was designed so that we can add new methods without breaking binary
 compatibility. Although it would technically be a new feature, I'd hope
 that we can backport some additions to 1.2.1 since it's just adding a
 facade / stable interface in front of JobProgressListener and thus has
 little to no risk to introduce new bugs elsewhere in Spark.



 On Mon, Dec 29, 2014 at 3:08 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi Josh

 Is there documentation available for status API? I would like to use it.

 Thanks,
 Aniket


 On Sun Dec 28 2014 at 02:37:32 Josh Rosen rosenvi...@gmail.com wrote:

 The console progress bars are implemented on top of a new stable status
 API that was added in Spark 1.2.  It's possible to query job progress
 using this interface (in older versions of Spark, you could implement a
 custom SparkListener and maintain the counts of completed / running /
 failed tasks / stages yourself).

 There are actually several subtleties involved in implementing
 job-level progress bars which behave in an intuitive way; there's a
 pretty extensive discussion of the challenges at
 https://github.com/apache/spark/pull/3009.  Also, check out the pull
 request for the console progress bars for an interesting design discussion
 around how they handle parallel stages:
 https://github.com/apache/spark/pull/3029.

 I'm not sure about the plumbing that would be necessary to display live
 progress updates in the IPython notebook UI, though.  The general pattern
 would probably involve a mapping to relate notebook cells to Spark jobs
 (you can do this with job groups, I think), plus some periodic timer that
 polls the driver for the status of the current job in order to update the
 progress bar.

 For Spark 1.3, I'm working on designing a REST interface to accesses
 this type of job / stage / task progress information, as well as expanding
 the types of information exposed through the stable status API interface.

 - Josh

 On Thu, Dec 25, 2014 at 10:01 AM, Eric Friedman 
 eric.d.fried...@gmail.com wrote:

 Spark 1.2.0 is SO much more usable than previous releases -- many
 thanks to the team for this release.

 A question about progress of actions.  I can see how things are
 progressing using the Spark UI.  I can also see the nice ASCII art
 animation on the spark driver console.

 Has anyone come up with a way to accomplish something similar in an
 iPython notebook using pyspark?

 Thanks
 Eric






Spark 1.2.0 Yarn not published

2014-12-28 Thread Aniket Bhatnagar
Hi all

I just realized that spark-yarn artifact hasn't been published for 1.2.0
release. Any particular reason for that? I was using it in my yet another
spark-job-server project to submit jobs to a YARN cluster through
convenient REST APIs (with some success). The job server was creating
SparkContext with master as 'yarn-client' and hence needed to be dependent
on spark-yarn jar.

Thanks,
Aniket


Re: Are lazy values created once per node or once per partition?

2014-12-17 Thread Aniket Bhatnagar
I would think that it has to be per worker.

On Wed, Dec 17, 2014, 6:32 PM Ashic Mahtab as...@live.com wrote:

 Hello,
 Say, I have the following code:

 let something = Something()

 someRdd.foreachRdd(something.someMethod)

 And in something, I have a lazy member variable that gets created in
 something.someMethod.

 Would that lazy be created once per node, or once per partition?

 Thanks,
 Ashic.



Re: Streaming | Partition count mismatch exception while saving data in RDD

2014-12-16 Thread Aniket Bhatnagar
It turns out that this happens when checkpoint is set to a local directory
path. I have opened a JIRA SPARK-4862 for Spark streaming to output better
error message.

Thanks,
Aniket

On Tue Dec 16 2014 at 20:08:13 Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 I am using spark 1.1.0 running a streaming job that uses updateStateByKey
 and then (after a bunch of maps/flatMaps) does a foreachRDD to save data in
 each RDD by making HTTP calls. The issue is that each time I attempt to
 save the RDD (using foreach on RDD), it gives me the following exception:

 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[467] at
 apply at List.scala:318(0) has different number of partitions than original
 RDD MapPartitionsRDD[461] at mapPartitions at StateDStream.scala:71(56)

 I read through the code but couldn't understand why this exception is
 happening. Any help would be appreciated!

 Thanks,
 Aniket



Re: Spark 1.1.0 does not spawn more than 6 executors in yarn-client mode and ignores --num-executors

2014-12-16 Thread Aniket Bhatnagar
Hi guys

I am hoping someone might have a clue on why this is happening. Otherwise I
will have to dwell into YARN module's source code to better understand the
issue.

On Wed, Dec 10, 2014, 11:54 PM Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 I am running spark 1.1.0 on AWS EMR and I am running a batch job that
 should seems to be highly parallelizable in yarn-client mode. But spark
 stop spawning any more executors after spawning 6 executors even though
 YARN cluster has 15 healthy m1.large nodes. I even tried providing
 '--num-executors 60' argument during spark-submit but even that doesn't
 help. A quick look at spark admin UI suggests there are active stages whose
 tasks have not been started yet and even then spark doesn't start more
 executors. I am not sure why. Any help on this would be greatly appreciated.

 Here is link to screen shots that I took of spark admin and yarn admin -
 http://imgur.com/a/ztjr7

 Thanks,
 Aniket




Re: Run Spark job on Playframework + Spark Master/Worker in one Mac

2014-12-15 Thread Aniket Bhatnagar
Try the workaround (addClassPathJars(sparkContext,
this.getClass.getClassLoader) discussed in
http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E

Thanks,
Aniket

On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi 
tomoya.igarashi.0...@gmail.com wrote:

 Hi all,

 I am trying to run Spark job on Playframework + Spark Master/Worker in one
 Mac.
 When job ran, I encountered java.lang.ClassNotFoundException.
 Would you teach me how to solve it?

 Here is my code in Github.
 https://github.com/TomoyaIgarashi/spark_cluster_sample

 * Envrionments:
 Mac 10.9.5
 Java 1.7.0_71
 Playframework 2.2.3
 Spark 1.1.1

 * Setup history:
  cd ~
  git clone g...@github.com:apache/spark.git
  cd spark
  git checkout -b v1.1.1 v1.1.1
  sbt/sbt assembly
  vi ~/.bashrc
 export SPARK_HOME=/Users/tomoya/spark
  . ~/.bashrc
  hostname
 Tomoya-Igarashis-MacBook-Air.local
  vi $SPARK_HOME/conf/slaves
 Tomoya-Igarashis-MacBook-Air.local
  play new spark_cluster_sample
 default name
 type - scala

 * Run history:
  $SPARK_HOME/sbin/start-all.sh
  jps
  which play
 /Users/tomoya/play/play
  git clone https://github.com/TomoyaIgarashi/spark_cluster_sample
  cd spark_cluster_sample
  play run

 * Error trace:
 Here is error trace in Gist.
 https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511

 Regards



Re: Serialization issue when using HBase with Spark

2014-12-15 Thread Aniket Bhatnagar
The reason not using sc.newAPIHadoopRDD is it only support one scan each
time.

I am not sure is that's true. You can use multiple scans as following:

val scanStrings = scans.map(scan = convertScanToString(scan))
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)

where convertScanToString is implemented as:

/**
 * Serializes a HBase scan into string.
 * @param scan Scan to serialize.
 * @return Base64 encoded serialized scan.
 */
private def convertScanToString(scan: Scan) = {
  val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}

Thanks,
Aniket

On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu zsxw...@gmail.com wrote:

 Just point out a bug in your codes. You should not use `mapPartitions`
 like that. For details, I recommend Section setup() and cleanup() in Sean
 Owen's post:
 http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

 Best Regards,
 Shixiong Zhu

 2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com:

 In #1, class HTable can not be serializable.
 You also need to check you self defined function getUserActions and make
 sure it is a member function of one class who implement serializable
 interface.

 发自我的 iPad

  在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道:
 
  The scenario is using HTable instance to scan multiple rowkey range in
 Spark
  tasks look likes below:
  Option 1:
  val users = input
   .map { case (deviceId, uid) =
  uid}.distinct().sortBy(x=x).mapPartitions(iterator={
   val conf = HBaseConfiguration.create()
   val table = new HTable(conf, actions)
   val result = iterator.map{ userId=
 (userId, getUserActions(table, userId, timeStart, timeStop))
   }
   table.close()
   result
 })
 
  But got the exception:
  org.apache.spark.SparkException: Task not serializable
 at
 
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
  ...
  Caused by: java.io.NotSerializableException:
  org.apache.hadoop.conf.Configuration
 at
  java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
  java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 
  The reason not using sc.newAPIHadoopRDD is it only support one scan each
  time.
  val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
  And if using MultiTableInputFormat, driver is not possible put all
 rowkeys
  into HBaseConfiguration
  Option 2:
  sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
  It may divide all rowkey ranges into several parts then use option 2,
 but I
  prefer option 1. So is there any solution for option 1?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark with HBase

2014-12-15 Thread Aniket Bhatnagar
In case you are still looking for help, there has been multiple discussions
in this mailing list that you can try searching for. Or you can simply use
https://github.com/unicredit/hbase-rdd :-)

Thanks,
Aniket

On Wed Dec 03 2014 at 16:11:47 Ted Yu yuzhih...@gmail.com wrote:

 Which hbase release are you running ?
 If it is 0.98, take a look at:

 https://issues.apache.org/jira/browse/SPARK-1297

 Thanks

 On Dec 2, 2014, at 10:21 PM, Jai jaidishhari...@gmail.com wrote:

 I am trying to use Apache Spark with a psuedo distributed Hadoop Hbase
 Cluster and I am looking for some links regarding the same. Can someone
 please guide me through the steps to accomplish this. Thanks a lot for
 Helping



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-HBase-tp20226.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Run Spark job on Playframework + Spark Master/Worker in one Mac

2014-12-15 Thread Aniket Bhatnagar
Seems you are using standalone mode. Can you check spark worker logs or
application logs in spark work directory to find any errors?

On Tue, Dec 16, 2014, 9:09 AM Tomoya Igarashi 
tomoya.igarashi.0...@gmail.com wrote:

 Hi Aniket,
 Thanks for your reply.

 I followed your advice to modified my code.
 Here is latest one.

 https://github.com/TomoyaIgarashi/spark_cluster_sample/commit/ce7613c42d3adbe6ae44e264c11f3829460f3c35

 As a result, It works correctly! Thank you very much.

 But, AssociationError Message appears line 397 in Playframework logs as
 follows.
 https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d

 Is there any problem?


 2014-12-15 18:48 GMT+09:00 Aniket Bhatnagar aniket.bhatna...@gmail.com:

 Try the workaround (addClassPathJars(sparkContext,
 this.getClass.getClassLoader) discussed in
 http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E

 Thanks,
 Aniket


 On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi 
 tomoya.igarashi.0...@gmail.com wrote:

 Hi all,

 I am trying to run Spark job on Playframework + Spark Master/Worker in
 one Mac.
 When job ran, I encountered java.lang.ClassNotFoundException.
 Would you teach me how to solve it?

 Here is my code in Github.
 https://github.com/TomoyaIgarashi/spark_cluster_sample

 * Envrionments:
 Mac 10.9.5
 Java 1.7.0_71
 Playframework 2.2.3
 Spark 1.1.1

 * Setup history:
  cd ~
  git clone g...@github.com:apache/spark.git
  cd spark
  git checkout -b v1.1.1 v1.1.1
  sbt/sbt assembly
  vi ~/.bashrc
 export SPARK_HOME=/Users/tomoya/spark
  . ~/.bashrc
  hostname
 Tomoya-Igarashis-MacBook-Air.local
  vi $SPARK_HOME/conf/slaves
 Tomoya-Igarashis-MacBook-Air.local
  play new spark_cluster_sample
 default name
 type - scala

 * Run history:
  $SPARK_HOME/sbin/start-all.sh
  jps
  which play
 /Users/tomoya/play/play
  git clone https://github.com/TomoyaIgarashi/spark_cluster_sample
  cd spark_cluster_sample
  play run

 * Error trace:
 Here is error trace in Gist.
 https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511

 Regards




Re: Having problem with Spark streaming with Kinesis

2014-12-14 Thread Aniket Bhatnagar
The reason is because of the following code:

val numStreams = numShards
val kinesisStreams = (0 until numStreams).map { i =
  KinesisUtils.createStream(ssc, streamName, endpointUrl,
kinesisCheckpointInterval,
  InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
}

In the above code, numStreams is set as numShards. This enforces the need
to have #shards + 1 workers. If you set numStreams as Math.min(numShards,
numAvailableWorkers - 1), you can have lesser number of workers than number
of shards. Makes sense?

On Sun Dec 14 2014 at 10:06:36 A.K.M. Ashrafuzzaman 
ashrafuzzaman...@gmail.com wrote:

 Thanks Aniket,
 The trick is to have the #workers = #shards + 1. But I don’t know why is
 that.
 http://spark.apache.org/docs/latest/streaming-kinesis-integration.html

 Here in the figure[spark streaming kinesis architecture], it seems like
 one node should be able to take on more than one shards.


 A.K.M. Ashrafuzzaman
 Lead Software Engineer
 NewsCred http://www.newscred.com/

 (M) 880-175-5592433
 Twitter https://twitter.com/ashrafuzzaman | Blog
 http://jitu-blog.blogspot.com/ | Facebook
 https://www.facebook.com/ashrafuzzaman.jitu

 Check out The Academy http://newscred.com/theacademy, your #1 source
 for free content marketing resources

 On Nov 26, 2014, at 6:23 PM, A.K.M. Ashrafuzzaman 
 ashrafuzzaman...@gmail.com wrote:

 Hi guys,
 When we are using Kinesis with 1 shard then it works fine. But when we use
 more that 1 then it falls into an infinite loop and no data is processed by
 the spark streaming. In the kinesis dynamo DB, I can see that it keeps
 increasing the leaseCounter. But it do start processing.

 I am using,
 scala: 2.10.4
 java version: 1.8.0_25
 Spark: 1.1.0
 spark-streaming-kinesis-asl: 1.1.0

 A.K.M. Ashrafuzzaman
 Lead Software Engineer
 NewsCred http://www.newscred.com/

 (M) 880-175-5592433
 Twitter https://twitter.com/ashrafuzzaman | Blog
 http://jitu-blog.blogspot.com/ | Facebook
 https://www.facebook.com/ashrafuzzaman.jitu

 Check out The Academy http://newscred.com/theacademy, your #1 source
 for free content marketing resources





Spark 1.1.0 does not spawn more than 6 executors in yarn-client mode and ignores --num-executors

2014-12-10 Thread Aniket Bhatnagar
I am running spark 1.1.0 on AWS EMR and I am running a batch job that
should seems to be highly parallelizable in yarn-client mode. But spark
stop spawning any more executors after spawning 6 executors even though
YARN cluster has 15 healthy m1.large nodes. I even tried providing
'--num-executors 60' argument during spark-submit but even that doesn't
help. A quick look at spark admin UI suggests there are active stages whose
tasks have not been started yet and even then spark doesn't start more
executors. I am not sure why. Any help on this would be greatly appreciated.

Here is link to screen shots that I took of spark admin and yarn admin -
http://imgur.com/a/ztjr7

Thanks,
Aniket


Re: Programmatically running spark jobs using yarn-client

2014-12-09 Thread Aniket Bhatnagar
Thanks Akhil. I was wondering why it isn't available to find the class even
though it existed in the same class loader as SparkContext. As a
workaround, I used the following code the add all dependent jars in a
playframework application to spark context.

@tailrec
private def addClassPathJars(sparkContext: SparkContext, classLoader:
ClassLoader): Unit = {
  classLoader match {
case urlClassLoader: URLClassLoader = {
  urlClassLoader.getURLs.foreach(classPathUrl = {
if (classPathUrl.toExternalForm.endsWith(.jar)) {
  LOGGER.debug(sAdded $classPathUrl to spark context
$sparkContext)
  sparkContext.addJar(classPathUrl.toExternalForm)
} else {
  LOGGER.debug(sIgnored $classPathUrl while adding to spark
context $sparkContext)
}
  })
}
case _ = LOGGER.debug(sIgnored class loader $classLoader as it does
not subclasses URLClassLoader)
  }
  if (classLoader.getParent != null){
addClassPathJars(sparkContext, classLoader.getParent)
  }
}

On Mon Dec 08 2014 at 21:39:42 Akhil Das ak...@sigmoidanalytics.com wrote:

 How are you submitting the job? You need to create a jar of your code (sbt
 package will give you one inside target/scala-*/projectname-*.jar) and then
 use it while submitting. If you are not using spark-submit then you can
 simply add this jar to spark by
 sc.addJar(/path/to/target/scala*/projectname*jar)

 Thanks
 Best Regards

 On Mon, Dec 8, 2014 at 7:23 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to create (yet another) spark as a service tool that lets you
 submit jobs via REST APIs. I think I have nearly gotten it to work baring a
 few issues. Some of which seem already fixed in 1.2.0 (like SPARK-2889) but
 I have hit the road block with the following issue.

 I have created a simple spark job as following:

 class StaticJob {
 import SparkContext._
 override def run(sc: SparkContext): Result = {
   val array = Range(1, 1000).toArray
   val rdd = sc.parallelize(array)
   val paired = rdd.map(i = (i % 1, i)).sortByKey()
   val sum = paired.countByKey()
   SimpleResult(sum)
 }
 }

 When I submit this job programmatically, it gives me a class not found
 error:

 2014-12-08 05:41:18,421 [Result resolver thread-0] [warn]
 o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0,
 localhost.localdomain): java.lang.ClassNotFoundException:
 com.blah.server.examples.StaticJob$$anonfun$1
 java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 java.security.AccessController.doPrivileged(Native Method)
 java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 java.lang.Class.forName0(Native Method)
 java.lang.Class.forName(Class.java:270)

 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)

 I decompiled the StaticJob$$anonfun$1 class and it seems to point to
 closure 'rdd.map(i = (i % 1, i))'. I am sure why this is happening.
 Any help will be greatly appreciated.





Programmatically running spark jobs using yarn-client

2014-12-08 Thread Aniket Bhatnagar
I am trying to create (yet another) spark as a service tool that lets you
submit jobs via REST APIs. I think I have nearly gotten it to work baring a
few issues. Some of which seem already fixed in 1.2.0 (like SPARK-2889) but
I have hit the road block with the following issue.

I have created a simple spark job as following:

class StaticJob {
import SparkContext._
override def run(sc: SparkContext): Result = {
  val array = Range(1, 1000).toArray
  val rdd = sc.parallelize(array)
  val paired = rdd.map(i = (i % 1, i)).sortByKey()
  val sum = paired.countByKey()
  SimpleResult(sum)
}
}

When I submit this job programmatically, it gives me a class not found
error:

2014-12-08 05:41:18,421 [Result resolver thread-0] [warn]
o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0,
localhost.localdomain): java.lang.ClassNotFoundException:
com.blah.server.examples.StaticJob$$anonfun$1
java.net.URLClassLoader$1.run(URLClassLoader.java:366)
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
java.lang.ClassLoader.loadClass(ClassLoader.java:425)
java.lang.ClassLoader.loadClass(ClassLoader.java:358)
java.lang.Class.forName0(Native Method)
java.lang.Class.forName(Class.java:270)

org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)

I decompiled the StaticJob$$anonfun$1 class and it seems to point to
closure 'rdd.map(i = (i % 1, i))'. I am sure why this is happening.
Any help will be greatly appreciated.


Re: Having problem with Spark streaming with Kinesis

2014-11-26 Thread Aniket Bhatnagar
What's your cluster size? For streamig to work, it needs shards + 1
executors.

On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman 
ashrafuzzaman...@gmail.com wrote:

 Hi guys,
 When we are using Kinesis with 1 shard then it works fine. But when we use
 more that 1 then it falls into an infinite loop and no data is processed by
 the spark streaming. In the kinesis dynamo DB, I can see that it keeps
 increasing the leaseCounter. But it do start processing.

 I am using,
 scala: 2.10.4
 java version: 1.8.0_25
 Spark: 1.1.0
 spark-streaming-kinesis-asl: 1.1.0

 A.K.M. Ashrafuzzaman
 Lead Software Engineer
 NewsCred http://www.newscred.com/

 (M) 880-175-5592433
 Twitter https://twitter.com/ashrafuzzaman | Blog
 http://jitu-blog.blogspot.com/ | Facebook
 https://www.facebook.com/ashrafuzzaman.jitu

 Check out The Academy http://newscred.com/theacademy, your #1 source
 for free content marketing resources




Re: Having problem with Spark streaming with Kinesis

2014-11-26 Thread Aniket Bhatnagar
Did you set spark master as local[*]? If so, then it means that nunber of
executors is equal to number of cores of the machine. Perhaps your mac
machine has more cores (certainly more than number of kinesis shards +1).

Try explicitly setting master as local[N] where N is number of kinesis
shards + 1. It should then work on both the machines.

On Thu, Nov 27, 2014, 9:46 AM Ashrafuzzaman ashrafuzzaman...@gmail.com
wrote:

 I was trying in one machine with just sbt run.

 And it is working with my mac environment with the same configuration.

 I used the sample code from
 https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala


 val kinesisClient = new AmazonKinesisClient(new
 DefaultAWSCredentialsProviderChain())
 kinesisClient.setEndpoint(endpointUrl)
 val numShards =
 kinesisClient.describeStream(streamName).getStreamDescription().getShards()
   .size()

 /* In this example, we're going to create 1 Kinesis
 Worker/Receiver/DStream for each shard. */
 val numStreams = numShards

 /* Setup the and SparkConfig and StreamingContext */
 /* Spark Streaming batch interval */
 val batchInterval = Milliseconds(2000)
 val sparkConfig = new SparkConf().setAppName(KinesisWordCount)
 val ssc = new StreamingContext(sparkConfig, batchInterval)

 /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
 val kinesisCheckpointInterval = batchInterval

 /* Create the same number of Kinesis DStreams/Receivers as Kinesis
 stream's shards */
 val kinesisStreams = (0 until numStreams).map { i =
   KinesisUtils.createStream(ssc, streamName, endpointUrl,
 kinesisCheckpointInterval,
   InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
 }

 /* Union all the streams */
 val unionStreams = ssc.union(kinesisStreams)

 /* Convert each line of Array[Byte] to String, split into words, and count
 them */
 val words = unionStreams.flatMap(byteArray = new String(byteArray)
   .split( ))

 /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
 val wordCounts = words.map(word = (word, 1)).reduceByKey(_ + _)

 /* Print the first 10 wordCounts */
 wordCounts.print()

 /* Start the streaming context and await termination */
 ssc.start()
 ssc.awaitTermination()



 A.K.M. Ashrafuzzaman
 Lead Software Engineer
 NewsCred http://www.newscred.com

 (M) 880-175-5592433
 Twitter https://twitter.com/ashrafuzzaman | Blog
 http://jitu-blog.blogspot.com/ | Facebook
 https://www.facebook.com/ashrafuzzaman.jitu

 Check out The Academy http://newscred.com/theacademy, your #1 source
 for free content marketing resources

 On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 What's your cluster size? For streamig to work, it needs shards + 1
 executors.

 On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman 
 ashrafuzzaman...@gmail.com wrote:

 Hi guys,
 When we are using Kinesis with 1 shard then it works fine. But when we
 use more that 1 then it falls into an infinite loop and no data is
 processed by the spark streaming. In the kinesis dynamo DB, I can see that
 it keeps increasing the leaseCounter. But it do start processing.

 I am using,
 scala: 2.10.4
 java version: 1.8.0_25
 Spark: 1.1.0
 spark-streaming-kinesis-asl: 1.1.0

 A.K.M. Ashrafuzzaman
 Lead Software Engineer
 NewsCred http://www.newscred.com/

 (M) 880-175-5592433
 Twitter https://twitter.com/ashrafuzzaman | Blog
 http://jitu-blog.blogspot.com/ | Facebook
 https://www.facebook.com/ashrafuzzaman.jitu

 Check out The Academy http://newscred.com/theacademy, your #1 source
 for free content marketing resources





Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Aniket Bhatnagar
Thanks Daniel. I can understand that the keys will not be in sorted order
but what I am trying to understanding is whether the functions are passed
values in sorted order in a given partition.

For example:

sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a,
b) = b).collect
res0: Array[(Int, Int)] = Array((1,8))

The fold always given me last value as 8 which suggests values preserve
sorting earlier defined in stage in DAG?

On Wed Nov 19 2014 at 18:10:11 Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 Akhil, I think Aniket uses the word persisted in a different way than
 what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
 combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
 is preserved.)

 I think the answer is no. combineByKey uses AppendOnlyMap, which is a
 hashmap. That will shuffle your keys. You can quickly verify it in
 spark-shell:

 scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect
 res0: Array[(Int, Int)] = Array((8,1), (7,1))

 (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
 number that demonstrates this.)

 On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If something is persisted you can easily see them under the Storage tab
 in the web ui.

 Thanks
 Best Regards

 On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair
 RDD transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted order?
 What if I had done groupByKey and then combineByKey? What transformations
 can unsort an already sorted data?






Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Aniket Bhatnagar
Thanks Daniel :-). It seems to make sense and something I was hoping for. I
will proceed with this assumption and report back if I see any anomalies.

On Wed Nov 19 2014 at 19:30:02 Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 Ah, so I misunderstood you too :).

 My reading of org/ apache/spark/Aggregator.scala is that your function
 will always see the items in the order that they are in the input RDD. An
 RDD partition is always accessed as an iterator, so it will not be read out
 of order.

 On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Thanks Daniel. I can understand that the keys will not be in sorted order
 but what I am trying to understanding is whether the functions are passed
 values in sorted order in a given partition.

 For example:

 sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t =
 t._2).foldByKey(0)((a, b) = b).collect
 res0: Array[(Int, Int)] = Array((1,8))

 The fold always given me last value as 8 which suggests values preserve
 sorting earlier defined in stage in DAG?

 On Wed Nov 19 2014 at 18:10:11 Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Akhil, I think Aniket uses the word persisted in a different way than
 what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
 combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
 is preserved.)

 I think the answer is no. combineByKey uses AppendOnlyMap, which is a
 hashmap. That will shuffle your keys. You can quickly verify it in
 spark-shell:

 scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect
 res0: Array[(Int, Int)] = Array((8,1), (7,1))

 (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
 number that demonstrates this.)

 On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If something is persisted you can easily see them under the Storage tab
 in the web ui.

 Thanks
 Best Regards

 On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair
 RDD transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted
 order? What if I had done groupByKey and then combineByKey? What
 transformations can unsort an already sorted data?







Re: Getting spark job progress programmatically

2014-11-19 Thread Aniket Bhatnagar
I have for now submitted a JIRA ticket @
https://issues.apache.org/jira/browse/SPARK-4473. I will collate all my
experiences ( hacks) and submit them as a feature request for public API.
On Tue Nov 18 2014 at 20:35:00 andy petrella andy.petre...@gmail.com
wrote:

 yep, we should also propose to add this stuffs in the public API.

 Any other ideas?

 On Tue Nov 18 2014 at 4:03:35 PM Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Thanks Andy. This is very useful. This gives me all active stages  their
 percentage completion but I am unable to tie stages to job group (or
 specific job). I looked at Spark's code and to me, it
 seems org.apache.spark.scheduler.ActiveJob's group ID should get propagated
 to StageInfo (possibly in the StageInfo.fromStage method). For now, I will
 have to write my own version JobProgressListener that stores stageId to
 group Id mapping.

 I will submit a JIRA ticket and seek spark dev's opinion on this. Many
 thanks for your prompt help Andy.

 Thanks,
 Aniket


 On Tue Nov 18 2014 at 19:40:06 andy petrella andy.petre...@gmail.com
 wrote:

 I started some quick hack for that in the notebook, you can head to:
 https://github.com/andypetrella/spark-notebook/
 blob/master/common/src/main/scala/notebook/front/widgets/SparkInfo.scala

 On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am writing yet another Spark job server and have been able to submit
 jobs and return/save results. I let multiple jobs use the same spark
 context but I set job group while firing each job so that I can in future
 cancel jobs. Further, what I deserve to do is provide some kind of status
 update/progress on running jobs (a % completion but be awesome) but I am
 unable to figure out appropriate spark API to use. I do however see status
 reporting in spark UI so there must be a way to get status of various
 stages per job group. Any hints on what APIs should I look at?




Re: Getting spark job progress programmatically

2014-11-19 Thread Aniket Bhatnagar
Thanks for pointing this out Mark. Had totally missed the existing JIRA
items

On Wed Nov 19 2014 at 21:42:19 Mark Hamstra m...@clearstorydata.com wrote:

 This is already being covered by SPARK-2321 and SPARK-4145.  There are
 pull requests that are already merged or already very far along -- e.g.,
 https://github.com/apache/spark/pull/3009

 If there is anything that needs to be added, please add it to those issues
 or PRs.

 On Wed, Nov 19, 2014 at 7:55 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I have for now submitted a JIRA ticket @
 https://issues.apache.org/jira/browse/SPARK-4473. I will collate all my
 experiences ( hacks) and submit them as a feature request for public API.

 On Tue Nov 18 2014 at 20:35:00 andy petrella andy.petre...@gmail.com
 wrote:

 yep, we should also propose to add this stuffs in the public API.

 Any other ideas?

 On Tue Nov 18 2014 at 4:03:35 PM Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Thanks Andy. This is very useful. This gives me all active stages 
 their percentage completion but I am unable to tie stages to job group (or
 specific job). I looked at Spark's code and to me, it
 seems org.apache.spark.scheduler.ActiveJob's group ID should get propagated
 to StageInfo (possibly in the StageInfo.fromStage method). For now, I will
 have to write my own version JobProgressListener that stores stageId to
 group Id mapping.

 I will submit a JIRA ticket and seek spark dev's opinion on this. Many
 thanks for your prompt help Andy.

 Thanks,
 Aniket


 On Tue Nov 18 2014 at 19:40:06 andy petrella andy.petre...@gmail.com
 wrote:

 I started some quick hack for that in the notebook, you can head to:
 https://github.com/andypetrella/spark-notebook/
 blob/master/common/src/main/scala/notebook/front/widgets/
 SparkInfo.scala

 On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am writing yet another Spark job server and have been able to
 submit jobs and return/save results. I let multiple jobs use the same 
 spark
 context but I set job group while firing each job so that I can in future
 cancel jobs. Further, what I deserve to do is provide some kind of status
 update/progress on running jobs (a % completion but be awesome) but I am
 unable to figure out appropriate spark API to use. I do however see 
 status
 reporting in spark UI so there must be a way to get status of various
 stages per job group. Any hints on what APIs should I look at?





Getting spark job progress programmatically

2014-11-18 Thread Aniket Bhatnagar
I am writing yet another Spark job server and have been able to submit jobs
and return/save results. I let multiple jobs use the same spark context but
I set job group while firing each job so that I can in future cancel jobs.
Further, what I deserve to do is provide some kind of status
update/progress on running jobs (a % completion but be awesome) but I am
unable to figure out appropriate spark API to use. I do however see status
reporting in spark UI so there must be a way to get status of various
stages per job group. Any hints on what APIs should I look at?


Is sorting persisted after pair rdd transformations?

2014-11-18 Thread Aniket Bhatnagar
I am trying to figure out if sorting is persisted after applying Pair RDD
transformations and I am not able to decisively tell after reading the
documentation.

For example:
val numbers = .. // RDD of numbers
val pairedNumbers = numbers.map(number = (number % 100, number))
val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
pairedNumber._2) // Sort by values in the pair
val aggregates = sortedPairedNumbers.combineByKey(..)

In this example, will the combine functions see values in sorted order?
What if I had done groupByKey and then combineByKey? What transformations
can unsort an already sorted data?


Re: Getting spark job progress programmatically

2014-11-18 Thread Aniket Bhatnagar
Thanks Andy. This is very useful. This gives me all active stages  their
percentage completion but I am unable to tie stages to job group (or
specific job). I looked at Spark's code and to me, it
seems org.apache.spark.scheduler.ActiveJob's group ID should get propagated
to StageInfo (possibly in the StageInfo.fromStage method). For now, I will
have to write my own version JobProgressListener that stores stageId to
group Id mapping.

I will submit a JIRA ticket and seek spark dev's opinion on this. Many
thanks for your prompt help Andy.

Thanks,
Aniket

On Tue Nov 18 2014 at 19:40:06 andy petrella andy.petre...@gmail.com
wrote:

 I started some quick hack for that in the notebook, you can head to:
 https://github.com/andypetrella/spark-notebook/
 blob/master/common/src/main/scala/notebook/front/widgets/SparkInfo.scala

 On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am writing yet another Spark job server and have been able to submit
 jobs and return/save results. I let multiple jobs use the same spark
 context but I set job group while firing each job so that I can in future
 cancel jobs. Further, what I deserve to do is provide some kind of status
 update/progress on running jobs (a % completion but be awesome) but I am
 unable to figure out appropriate spark API to use. I do however see status
 reporting in spark UI so there must be a way to get status of various
 stages per job group. Any hints on what APIs should I look at?




Re: Out of memory with Spark Streaming

2014-10-31 Thread Aniket Bhatnagar
Thanks Chris for looking at this. I was putting data at roughly the same 50
records per batch max. This issue was purely because of a bug in my
persistence logic that was leaking memory.

Overall, I haven't seen a lot of lag with kinesis + spark setup and I am
able to process records at roughly the same rate as data as fed into
kinesis with acceptable latency.

Thanks,
Aniket
On Oct 31, 2014 1:15 AM, Chris Fregly ch...@fregly.com wrote:

 curious about why you're only seeing 50 records max per batch.

 how many receivers are you running?  what is the rate that you're putting
 data onto the stream?

 per the default AWS kinesis configuration, the producer can do 1000 PUTs
 per second with max 50k bytes per PUT and max 1mb per second per shard.

 on the consumer side, you can only do 5 GETs per second and 2mb per second
 per shard.

 my hunch is that the 5 GETs per second is what's limiting your consumption
 rate.

 can you verify that these numbers match what you're seeing?  if so, you
 may want to increase your shards and therefore the number of kinesis
 receivers.

 otherwise, this may require some further investigation on my part.  i
 wanna stay on top of this if it's an issue.

 thanks for posting this, aniket!

 -chris

 On Fri, Sep 12, 2014 at 5:34 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi all

 Sorry but this was totally my mistake. In my persistence logic, I was
 creating async http client instance in RDD foreach but was never closing it
 leading to memory leaks.

 Apologies for wasting everyone's time.

 Thanks,
 Aniket

 On 12 September 2014 02:20, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Which version of spark are you running?

 If you are running the latest one, then could try running not a window
 but a simple event count on every 2 second batch, and see if you are still
 running out of memory?

 TD


 On Thu, Sep 11, 2014 at 10:34 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I did change it to be 1 gb. It still ran out of memory but a little
 later.

 The streaming job isnt handling a lot of data. In every 2 seconds, it
 doesn't get more than 50 records. Each record size is not more than 500
 bytes.
  On Sep 11, 2014 10:54 PM, Bharat Venkat bvenkat.sp...@gmail.com
 wrote:

 You could set spark.executor.memory to something bigger than the
 default (512mb)


 On Thu, Sep 11, 2014 at 8:31 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am running a simple Spark Streaming program that pulls in data from
 Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, 
 maps
 data and persists to a store.

 The program is running in local mode right now and runs out of memory
 after a while. I am yet to investigate heap dumps but I think Spark isn't
 releasing memory after processing is complete. I have even tried changing
 storage level to disk only.

 Help!

 Thanks,
 Aniket








Spark doesn't retry task while writing to HDFS

2014-10-24 Thread Aniket Bhatnagar
Hi all

I have written a job that reads data from HBASE and writes to HDFS (fairly
simple). While running the job, I noticed that a few of the tasks failed
with the following error. Quick googling on the error suggests that its an
unexplained error and is perhaps intermittent. What I am curious to know is
why didn't Spark retry writing file to HDFS? It just shows it as failed job
in Spark UI.

Error:
java.io.IOException: All datanodes x.x.x.x: are bad. Aborting...

org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1128)

org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:924)

org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:486)


Thanks,
Aniket


Re: Is SparkSQL + JDBC server a good approach for caching?

2014-10-24 Thread Aniket Bhatnagar
Just curious... Why would you not store the processed results in regular
relational database? Not sure what you meant by persist the appropriate
RDDs. Did you mean output of your job will be RDDs?

On 24 October 2014 13:35, ankits ankitso...@gmail.com wrote:

 I want to set up spark SQL to allow ad hoc querying over the last X days of
 processed data, where the data is processed through spark. This would also
 have to cache data (in memory only), so the approach I was thinking of was
 to build a layer that persists the appropriate RDDs and stores them in
 memory.

 I see spark sql allows ad hoc querying through JDBC though I have never
 used
 that before. Will using JDBC offer any advantages (e.g does it have built
 in
 support for caching?) over rolling my own solution for this use case?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark streaming stops computing while the receiver keeps running without any errors reported

2014-09-24 Thread Aniket Bhatnagar
Hi all

I was finally able to get this working by setting
the SPARK_EXECUTOR_INSTANCES to a high number. However, I am wondering if
this is a bug because the app gets submitted but ceases to run because it
can't run desired number of workers. Shouldn't the app be rejected if it
cant be run on the cluster?

Thanks,
Aniket

On 22 September 2014 18:14, Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 Hi all

 I was finally able to figure out why this streaming appeared stuck. The
 reason was that I was running out of workers in my standalone deployment of
 Spark. There was no feedback in the logs which is why it took a little time
 for me to figure out.

 However, now I am trying to run the same in yarn-client mode and this is
 again giving the same problem. Is it possible to run out of workers in YARN
 mode as well? If so, how can I figure that out?

 Thanks,
 Aniket

 On 19 September 2014 18:07, Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Apologies in delay in getting back on this. It seems the Kinesis example
 does not run on Spark 1.1.0 even when it is built using kinesis-acl profile
 because of a dependency conflict in http client (same issue as
 http://mail-archives.apache.org/mod_mbox/spark-dev/201409.mbox/%3ccajob8btdxks-7-spjj5jmnw0xsnrjwdpcqqtjht1hun6j4z...@mail.gmail.com%3E).
 I had to add a later version of http client in kinesis-acl profile to make
 it run. Then, the Kinesis example sets master as local so it does not
 honour the MASTER environment variable as other examples do. Once I was
 able to resolve these issues, I was finally able to reproduce the issue.
 The example works fine in local mode but does not do anything when receiver
 runs in remote workers.

 Spark streaming does not report any blocks received from the receivers
 even though I can see the following lines in the app logs (I modified the
 debug line to print size as well):

 14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Pushed block
 input-0-1411129664668 in 15 ms
 14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Reported block
 input-0-1411129664668 of size 1

 Here are the screenshots of Spark admin: http://imgur.com/a/gWKYm

 I also ran other examples (custom receiver, etc) in both local and
 distributed mode and they seem to be working fine.

 Any ideas?

 Thanks,
 Aniket

 On 12 September 2014 02:49, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 This is very puzzling, given that this works in the local mode.

 Does running the kinesis example work with your spark-submit?


 https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

 The instructions are present in the streaming guide.
 https://github.com/apache/spark/blob/master/docs/streaming-kinesis-integration.md

 If that does not work on cluster, then I would see the streaming UI for
 the number records that are being received, and the stages page for whether
 jobs are being executed for every batch or not. Can tell use whether that
 is working well.

 Also ccing, chris fregly who wrote Kinesis integration.

 TD




 On Thu, Sep 11, 2014 at 4:51 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi all

 I am trying to run kinesis spark streaming application on a standalone
 spark cluster. The job works find in local mode but when I submit it (using
 spark-submit), it doesn't do anything. I enabled logs
 for org.apache.spark.streaming.kinesis package and I regularly get the
 following in worker logs:

 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
 x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId
 shardId-
 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
 x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId
 shardId-0001

 But the job does not perform any operations defined on DStream. To
 investigate this further, I changed the kinesis-asl's KinesisUtils to
 perform the following computation on the DStream created
 using ssc.receiverStream(new KinesisReceiver...):

 stream.count().foreachRDD(rdd = rdd.foreach(tuple = logInfo(Emitted
  + tuple)))

 Even the above line does not results in any corresponding log entries
 both in driver and worker logs. The only relevant logs that I could find in
 driver logs are:
 14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at
 KinesisUtils.scala:68) finished in 0.398 s
 14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at
 KinesisUtils.scala:68, took 4.926449985 s
 14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job
 1410435653000 ms.0 from job set of time 1410435653000 ms
 14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job
 1410435653000 ms.1 from job set of time 1410435653000 ms
 14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at
 KinesisUtils.scala:68
 14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at
 DStream.scala:489)
 14/09/11 11:40:58 INFO

[Streaming] Non-blocking recommendation in custom receiver documentation and KinesisReceiver's worker.run blocking calll

2014-09-24 Thread Aniket Bhatnagar
Hi all

Reading through Spark streaming's custom receiver documentation, it is
recommended that onStart and onStop methods should not block indefinitely.
However, looking at the source code of KinesisReceiver, the onStart method
calls worker.run that blocks until worker is shutdown (via a call to
onStop).

So, my question is what are the ramifications of making a blocking call in
onStart and whether this is something that should be addressed
in KinesisReceiver implementation.

Thanks,
Aniket


Re: Spark streaming stops computing while the receiver keeps running without any errors reported

2014-09-19 Thread Aniket Bhatnagar
Apologies in delay in getting back on this. It seems the Kinesis example
does not run on Spark 1.1.0 even when it is built using kinesis-acl profile
because of a dependency conflict in http client (same issue as
http://mail-archives.apache.org/mod_mbox/spark-dev/201409.mbox/%3ccajob8btdxks-7-spjj5jmnw0xsnrjwdpcqqtjht1hun6j4z...@mail.gmail.com%3E).
I had to add a later version of http client in kinesis-acl profile to make
it run. Then, the Kinesis example sets master as local so it does not
honour the MASTER environment variable as other examples do. Once I was
able to resolve these issues, I was finally able to reproduce the issue.
The example works fine in local mode but does not do anything when receiver
runs in remote workers.

Spark streaming does not report any blocks received from the receivers even
though I can see the following lines in the app logs (I modified the debug
line to print size as well):

14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Pushed block
input-0-1411129664668 in 15 ms
14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Reported block
input-0-1411129664668 of size 1

Here are the screenshots of Spark admin: http://imgur.com/a/gWKYm

I also ran other examples (custom receiver, etc) in both local and
distributed mode and they seem to be working fine.

Any ideas?

Thanks,
Aniket

On 12 September 2014 02:49, Tathagata Das tathagata.das1...@gmail.com
wrote:

 This is very puzzling, given that this works in the local mode.

 Does running the kinesis example work with your spark-submit?


 https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

 The instructions are present in the streaming guide.
 https://github.com/apache/spark/blob/master/docs/streaming-kinesis-integration.md

 If that does not work on cluster, then I would see the streaming UI for
 the number records that are being received, and the stages page for whether
 jobs are being executed for every batch or not. Can tell use whether that
 is working well.

 Also ccing, chris fregly who wrote Kinesis integration.

 TD




 On Thu, Sep 11, 2014 at 4:51 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi all

 I am trying to run kinesis spark streaming application on a standalone
 spark cluster. The job works find in local mode but when I submit it (using
 spark-submit), it doesn't do anything. I enabled logs
 for org.apache.spark.streaming.kinesis package and I regularly get the
 following in worker logs:

 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
 x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId
 shardId-
 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
 x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId
 shardId-0001

 But the job does not perform any operations defined on DStream. To
 investigate this further, I changed the kinesis-asl's KinesisUtils to
 perform the following computation on the DStream created
 using ssc.receiverStream(new KinesisReceiver...):

 stream.count().foreachRDD(rdd = rdd.foreach(tuple = logInfo(Emitted 
 + tuple)))

 Even the above line does not results in any corresponding log entries
 both in driver and worker logs. The only relevant logs that I could find in
 driver logs are:
 14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at
 KinesisUtils.scala:68) finished in 0.398 s
 14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at
 KinesisUtils.scala:68, took 4.926449985 s
 14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job
 1410435653000 ms.0 from job set of time 1410435653000 ms
 14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job
 1410435653000 ms.1 from job set of time 1410435653000 ms
 14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at
 KinesisUtils.scala:68
 14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at
 DStream.scala:489)
 14/09/11 11:40:58 INFO DAGScheduler: Got job 3 (foreach at
 KinesisUtils.scala:68) with 2 output partitions (allowLocal=false)
 14/09/11 11:40:58 INFO DAGScheduler: Final stage: Stage 5(foreach at
 KinesisUtils.scala:68)

 After the above logs, nothing shows up corresponding to KinesisUtils. I
 am out of ideas on this one and any help on this would greatly appreciated.

 Thanks,
 Aniket





Re: Bulk-load to HBase

2014-09-19 Thread Aniket Bhatnagar
Agreed that the bulk import would be faster. In my case, I wasn't expecting
a lot of data to be uploaded to HBase and also, I didn't want to take the
pain of importing generated HFiles into HBase. Is there a way to invoke
HBase HFile import batch script programmatically?

On 19 September 2014 17:58, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

 In fact, it seems that Put can be used by HFileOutputFormat, so Put object
 itself may not be the problem.

 The problem is that TableOutputFormat uses the Put object in the normal
 way (that goes through normal write path), while HFileOutFormat uses it to
 directly build the HFile.



 *From:* innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr]
 *Sent:* Friday, September 19, 2014 9:20 PM

 *To:* user@spark.apache.org
 *Subject:* RE: Bulk-load to HBase



 Thank you for the example code.



 Currently I use foreachPartition() + Put(), but your example code can be
 used to clean up my code.



 BTW, since the data uploaded by Put() goes through normal HBase write
 path, it can be slow.

 So, it would be nice if bulk-load could be used, since it bypasses the
 write path.



 Thanks.



 *From:* Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com
 aniket.bhatna...@gmail.com]
 *Sent:* Friday, September 19, 2014 9:01 PM
 *To:* innowireless TaeYun Kim
 *Cc:* user
 *Subject:* Re: Bulk-load to HBase



 I have been using saveAsNewAPIHadoopDataset but I use TableOutputFormat
 instead of HFileOutputFormat. But, hopefully this should help you:



 val hbaseZookeeperQuorum =
 s$zookeeperHost:$zookeeperPort:$zookeeperHbasePath

 val conf = HBaseConfiguration.create()

 conf.set(hbase.zookeeper.quorum, hbaseZookeeperQuorum)

 conf.set(TableOutputFormat.QUORUM_ADDRESS, hbaseZookeeperQuorum)

 conf.set(TableOutputFormat.QUORUM_PORT, zookeeperPort.toString)

 conf.setClass(mapreduce.outputformat.class,
 classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]])

 conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)



 val rddToSave: RDD[(Array[Byte], Array[Byte], Array[Byte])] = ... // Some
 RDD that contains row key, column qualifier and data



 val putRDD = rddToSave.map(tuple = {

 val (rowKey, column data) = tuple

 val put: Put = new Put(rowKey)

 put.add(COLUMN_FAMILY_RAW_DATA_BYTES, column, data)



 (new ImmutableBytesWritable(rowKey), put)

 })



 putRDD.saveAsNewAPIHadoopDataset(conf)





 On 19 September 2014 16:52, innowireless TaeYun Kim 
 taeyun@innowireless.co.kr wrote:

 Hi,



 Sorry, I just found saveAsNewAPIHadoopDataset.

 Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there
 any example code for that?



 Thanks.



 *From:* innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr]
 *Sent:* Friday, September 19, 2014 8:18 PM
 *To:* user@spark.apache.org
 *Subject:* RE: Bulk-load to HBase



 Hi,



 After reading several documents, it seems that saveAsHadoopDataset cannot
 use HFileOutputFormat.

 It’s because saveAsHadoopDataset method uses JobConf, so it belongs to
 the old Hadoop API, while HFileOutputFormat is a member of mapreduce
 package which is for the new Hadoop API.



 Am I right?

 If so, is there another method to bulk-load to HBase from RDD?



 Thanks.



 *From:* innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr
 taeyun@innowireless.co.kr]
 *Sent:* Friday, September 19, 2014 7:17 PM
 *To:* user@spark.apache.org
 *Subject:* Bulk-load to HBase



 Hi,



 Is there a way to bulk-load to HBase from RDD?

 HBase offers HFileOutputFormat class for bulk loading by MapReduce job,
 but I cannot figure out how to use it with saveAsHadoopDataset.



 Thanks.





Re: Re[2]: HBase 0.96+ with Spark 1.0+

2014-09-12 Thread Aniket Bhatnagar
Hi Reinis

Try if the exclude suggestion from me and Sean works for you. If not, can
you turn on verbose class loading to see from where
javax.servlet.ServletRegistration is loaded? The class should load
from org.mortbay.jetty
% servlet-api % jettyVersion. If it loads from some other jar, you would
have to exclude it from your build.

Hope it helps.

Thanks,
Aniket

On 12 September 2014 02:21, sp...@orbit-x.de wrote:

 Thank you, Aniket for your hint!

 Alas, I am facing really hellish situation as it seems, because I have
 integration tests using BOTH spark and HBase (Minicluster). Thus I get
 either:

 class javax.servlet.ServletRegistration's signer information does not
 match signer information of other classes in the same package
 java.lang.SecurityException: class javax.servlet.ServletRegistration's
 signer information does not match signer information of other classes in
 the same package
 at java.lang.ClassLoader.checkCerts(ClassLoader.java:943)
 at java.lang.ClassLoader.preDefineClass(ClassLoader.java:657)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:785)

 or:

 [info]   Cause: java.lang.ClassNotFoundException:
 org.mortbay.jetty.servlet.Context
 [info]   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 [info]   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 [info]   at java.security.AccessController.doPrivileged(Native Method)
 [info]   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 [info]   at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
 [info]   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 [info]   at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
 [info]   at
 org.apache.hadoop.hdfs.server.namenode.NameNode.startHttpServer(NameNode.java:661)
 [info]   at
 org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:552)
 [info]   at
 org.apache.hadoop.hdfs.server.namenode.NameNode.init(NameNode.java:720)

 I am searching the web already for a week trying to figure out how to make
 this work :-/

 all the help or hints are greatly appreciated
 reinis


 --
 -Original-Nachricht-
 Von: Aniket Bhatnagar aniket.bhatna...@gmail.com
 An: sp...@orbit-x.de
 Cc: user user@spark.apache.org
 Datum: 11-09-2014 20:00
 Betreff: Re: Re[2]: HBase 0.96+ with Spark 1.0+


 Dependency hell... My fav problem :).

 I had run into a similar issue with hbase and jetty. I cant remember thw
 exact fix, but is are excerpts from my dependencies that may be relevant:

 val hadoop2Common = org.apache.hadoop % hadoop-common % hadoop2Version
 excludeAll(

   ExclusionRule(organization = javax.servlet),

   ExclusionRule(organization = javax.servlet.jsp),

 ExclusionRule(organization = org.mortbay.jetty)

   )

   val hadoop2MapRedClient = org.apache.hadoop %
 hadoop-mapreduce-client-core % hadoop2Version

   val hbase = org.apache.hbase % hbase % hbaseVersion excludeAll(

   ExclusionRule(organization = org.apache.maven.wagon),

   ExclusionRule(organization = org.jboss.netty),

 ExclusionRule(organization = org.mortbay.jetty),

   ExclusionRule(organization = org.jruby) // Don't need HBASE's jruby.
 It pulls in whole lot of other dependencies like joda-time.

   )

 val sparkCore = org.apache.spark %% spark-core % sparkVersion

   val sparkStreaming = org.apache.spark %% spark-streaming %
 sparkVersion

   val sparkSQL = org.apache.spark %% spark-sql % sparkVersion

   val sparkHive = org.apache.spark %% spark-hive % sparkVersion

   val sparkRepl = org.apache.spark %% spark-repl % sparkVersion

   val sparkAll = Seq (

   sparkCore excludeAll(

   ExclusionRule(organization = org.apache.hadoop)), // We assume hadoop
 2 and hence omit hadoop 1 dependencies

   sparkSQL,

   sparkStreaming,

   hadoop2MapRedClient,

   hadoop2Common,

   org.mortbay.jetty % servlet-api % 3.0.20100224

   )

 On Sep 11, 2014 8:05 PM, sp...@orbit-x.de wrote:

 Hi guys,

 any luck with this issue, anyone?

 I aswell tried all the possible exclusion combos to a no avail.

 thanks for your ideas
 reinis

 -Original-Nachricht-
  Von: Stephen Boesch java...@gmail.com
  An: user user@spark.apache.org
  Datum: 28-06-2014 15:12
  Betreff: Re: HBase 0.96+ with Spark 1.0+
 
  Hi Siyuan,
 Thanks for the input. We are preferring to use the SparkBuild.scala
 instead of maven. I did not see any protobuf.version related settings in
 that file. But - as noted by Sean Owen - in any case the issue we are
 facing presently is about the duplicate incompatible javax.servlet entries
 - apparently from the org.mortbay artifacts.


 
  2014-06-28 6:01 GMT-07:00 Siyuan he hsy...@gmail.com:
  Hi Stephen,
 
 I am using spark1.0+ HBase0.96.2. This is what I did:
 1) rebuild spark using: mvn -Dhadoop.version=2.3.0
 -Dprotobuf.version=2.5.0 -DskipTests clean package
 2) In spark-env.sh, set SPARK_CLASSPATH =
 /path-to/hbase-protocol-0.96.2-hadoop2.jar

 
 Hopefully it can help.
 Siyuan


 
  On Sat, Jun 28, 2014 at 8:52

Re: Out of memory with Spark Streaming

2014-09-12 Thread Aniket Bhatnagar
Hi all

Sorry but this was totally my mistake. In my persistence logic, I was
creating async http client instance in RDD foreach but was never closing it
leading to memory leaks.

Apologies for wasting everyone's time.

Thanks,
Aniket

On 12 September 2014 02:20, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Which version of spark are you running?

 If you are running the latest one, then could try running not a window but
 a simple event count on every 2 second batch, and see if you are still
 running out of memory?

 TD


 On Thu, Sep 11, 2014 at 10:34 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I did change it to be 1 gb. It still ran out of memory but a little later.

 The streaming job isnt handling a lot of data. In every 2 seconds, it
 doesn't get more than 50 records. Each record size is not more than 500
 bytes.
  On Sep 11, 2014 10:54 PM, Bharat Venkat bvenkat.sp...@gmail.com
 wrote:

 You could set spark.executor.memory to something bigger than the
 default (512mb)


 On Thu, Sep 11, 2014 at 8:31 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am running a simple Spark Streaming program that pulls in data from
 Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, maps
 data and persists to a store.

 The program is running in local mode right now and runs out of memory
 after a while. I am yet to investigate heap dumps but I think Spark isn't
 releasing memory after processing is complete. I have even tried changing
 storage level to disk only.

 Help!

 Thanks,
 Aniket






Spark streaming stops computing while the receiver keeps running without any errors reported

2014-09-11 Thread Aniket Bhatnagar
Hi all

I am trying to run kinesis spark streaming application on a standalone
spark cluster. The job works find in local mode but when I submit it (using
spark-submit), it doesn't do anything. I enabled logs
for org.apache.spark.streaming.kinesis package and I regularly get the
following in worker logs:

14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId
shardId-
14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId
shardId-0001

But the job does not perform any operations defined on DStream. To
investigate this further, I changed the kinesis-asl's KinesisUtils to
perform the following computation on the DStream created
using ssc.receiverStream(new KinesisReceiver...):

stream.count().foreachRDD(rdd = rdd.foreach(tuple = logInfo(Emitted  +
tuple)))

Even the above line does not results in any corresponding log entries both
in driver and worker logs. The only relevant logs that I could find in
driver logs are:
14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at
KinesisUtils.scala:68) finished in 0.398 s
14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at
KinesisUtils.scala:68, took 4.926449985 s
14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job
1410435653000 ms.0 from job set of time 1410435653000 ms
14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job
1410435653000 ms.1 from job set of time 1410435653000 ms
14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at
KinesisUtils.scala:68
14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at
DStream.scala:489)
14/09/11 11:40:58 INFO DAGScheduler: Got job 3 (foreach at
KinesisUtils.scala:68) with 2 output partitions (allowLocal=false)
14/09/11 11:40:58 INFO DAGScheduler: Final stage: Stage 5(foreach at
KinesisUtils.scala:68)

After the above logs, nothing shows up corresponding to KinesisUtils. I am
out of ideas on this one and any help on this would greatly appreciated.

Thanks,
Aniket


Out of memory with Spark Streaming

2014-09-11 Thread Aniket Bhatnagar
I am running a simple Spark Streaming program that pulls in data from
Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, maps
data and persists to a store.

The program is running in local mode right now and runs out of memory after
a while. I am yet to investigate heap dumps but I think Spark isn't
releasing memory after processing is complete. I have even tried changing
storage level to disk only.

Help!

Thanks,
Aniket


Re: Out of memory with Spark Streaming

2014-09-11 Thread Aniket Bhatnagar
I did change it to be 1 gb. It still ran out of memory but a little later.

The streaming job isnt handling a lot of data. In every 2 seconds, it
doesn't get more than 50 records. Each record size is not more than 500
bytes.
 On Sep 11, 2014 10:54 PM, Bharat Venkat bvenkat.sp...@gmail.com wrote:

 You could set spark.executor.memory to something bigger than the
 default (512mb)


 On Thu, Sep 11, 2014 at 8:31 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am running a simple Spark Streaming program that pulls in data from
 Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, maps
 data and persists to a store.

 The program is running in local mode right now and runs out of memory
 after a while. I am yet to investigate heap dumps but I think Spark isn't
 releasing memory after processing is complete. I have even tried changing
 storage level to disk only.

 Help!

 Thanks,
 Aniket





Re: Spark on Raspberry Pi?

2014-09-11 Thread Aniket Bhatnagar
Just curiois... What's the use case you are looking to implement?
On Sep 11, 2014 10:50 PM, Daniil Osipov daniil.osi...@shazam.com wrote:

 Limited memory could also cause you some problems and limit usability. If
 you're looking for a local testing environment, vagrant boxes may serve you
 much better.

 On Thu, Sep 11, 2014 at 6:18 AM, Chen He airb...@gmail.com wrote:




 Pi's bus speed, memory size and access speed, and processing ability are
 limited. The only benefit could be the power consumption.

 On Thu, Sep 11, 2014 at 8:04 AM, Sandeep Singh sand...@techaddict.me
 wrote:

 Has anyone tried using Raspberry Pi for Spark? How efficient is it to use
 around 10 Pi's for local testing env ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Raspberry-Pi-tp13965.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Re[2]: HBase 0.96+ with Spark 1.0+

2014-09-11 Thread Aniket Bhatnagar
Dependency hell... My fav problem :).

I had run into a similar issue with hbase and jetty. I cant remember thw
exact fix, but is are excerpts from my dependencies that may be relevant:

val hadoop2Common = org.apache.hadoop % hadoop-common % hadoop2Version
excludeAll(

ExclusionRule(organization = javax.servlet),

ExclusionRule(organization = javax.servlet.jsp),

ExclusionRule(organization = org.mortbay.jetty)

   )

  val hadoop2MapRedClient = org.apache.hadoop %
hadoop-mapreduce-client-core % hadoop2Version

  val hbase = org.apache.hbase % hbase % hbaseVersion excludeAll(

ExclusionRule(organization = org.apache.maven.wagon),

ExclusionRule(organization = org.jboss.netty),

ExclusionRule(organization = org.mortbay.jetty),

ExclusionRule(organization = org.jruby) // Don't need
HBASE's jruby. It pulls in whole lot of other dependencies like joda-time.

)

val sparkCore = org.apache.spark %% spark-core % sparkVersion

  val sparkStreaming = org.apache.spark %% spark-streaming %
sparkVersion

  val sparkSQL = org.apache.spark %% spark-sql % sparkVersion

  val sparkHive = org.apache.spark %% spark-hive % sparkVersion

  val sparkRepl = org.apache.spark %% spark-repl % sparkVersion

  val sparkAll = Seq (

sparkCore excludeAll(

  ExclusionRule(organization = org.apache.hadoop)), // We assume
hadoop 2 and hence omit hadoop 1 dependencies

sparkSQL,

sparkStreaming,

hadoop2MapRedClient,

hadoop2Common,

org.mortbay.jetty % servlet-api % 3.0.20100224

  )

On Sep 11, 2014 8:05 PM, sp...@orbit-x.de wrote:

 Hi guys,

 any luck with this issue, anyone?

 I aswell tried all the possible exclusion combos to a no avail.

 thanks for your ideas
 reinis

 -Original-Nachricht-
  Von: Stephen Boesch java...@gmail.com
  An: user user@spark.apache.org
  Datum: 28-06-2014 15:12
  Betreff: Re: HBase 0.96+ with Spark 1.0+
 
  Hi Siyuan,
  Thanks for the input. We are preferring to use the SparkBuild.scala
 instead of maven.  I did not see any protobuf.version  related settings in
 that file. But - as noted by Sean Owen - in any case the issue we are
 facing presently is about the duplicate incompatible javax.servlet entries
 - apparently from the org.mortbay artifacts.


 
  2014-06-28 6:01 GMT-07:00 Siyuan he hsy...@gmail.com:
  Hi Stephen,
 
 I am using spark1.0+ HBase0.96.2. This is what I did:
 1) rebuild spark using: mvn -Dhadoop.version=2.3.0
 -Dprotobuf.version=2.5.0 -DskipTests clean package
 2) In spark-env.sh, set SPARK_CLASSPATH =
 /path-to/hbase-protocol-0.96.2-hadoop2.jar

 
 Hopefully it can help.
 Siyuan


 
  On Sat, Jun 28, 2014 at 8:52 AM, Stephen Boesch java...@gmail.com
 wrote:
 
 
 Thanks Sean.  I had actually already added exclusion rule for
 org.mortbay.jetty - and that had not resolved it.
 
 Just in case I used your precise formulation:

 
 val excludeMortbayJetty = ExclusionRule(organization = org.mortbay.jetty)
 ..

   ,(org.apache.spark % spark-core_2.10 % sparkVersion
 withSources()).excludeAll(excludeMortbayJetty)
   ,(org.apache.spark % spark-sql_2.10 % sparkVersion
 withSources()).excludeAll(excludeMortbayJetty)

 
 However the same error still recurs:

 
 14/06/28 05:48:35 INFO HttpServer: Starting HTTP Server
 [error] (run-main-0) java.lang.SecurityException: class
 javax.servlet.FilterRegistration's signer information does not match
 signer information of other classes in the same package
 java.lang.SecurityException: class javax.servlet.FilterRegistration's
 signer information does not match signer information of other classes in
 the same package



 

 

 
  2014-06-28 4:22 GMT-07:00 Sean Owen so...@cloudera.com:

  This sounds like an instance of roughly the same item as in
  https://issues.apache.org/jira/browse/SPARK-1949  Have a look at
  adding that exclude to see if it works.
 

  On Fri, Jun 27, 2014 at 10:21 PM, Stephen Boesch java...@gmail.com
 wrote:
   The present trunk is built and tested against HBase 0.94.
  
  
   I have tried various combinations of versions of HBase 0.96+ and Spark
 1.0+
   and all end up with
  
   14/06/27 20:11:15 INFO HttpServer: Starting HTTP Server
   [error] (run-main-0) java.lang.SecurityException: class
   javax.servlet.FilterRegistration's signer information does not match
   signer information of other classes in the same package
   java.lang.SecurityException: class javax.servlet.FilterRegistration's
   signer information does not match signer information of other classes
 in the
   same package
   at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
  
  
   I have tried a number of different ways to exclude javax.servlet
 related
   jars. But none have avoided this error.
  
   Anyone have a (small-ish) build.sbt that works with later versions of
 HBase?
  
  
 


 


 

 




 -
 To unsubscribe, e-mail: 

Using Spark's ActionSystem for performing analytics using Akka

2014-09-02 Thread Aniket Bhatnagar
Sorry about the noob question, but I was just wondering if we use Spark's
ActorSystem (SparkEnv.actorSystem), would it distribute actors across
worker nodes or would the actors only run in driver JVM?


[Stream] Checkpointing | chmod: cannot access `/cygdrive/d/tmp/spark/f8e594bf-d940-41cb-ab0e-0fd3710696cb/rdd-57/.part-00001-attempt-215': No such file or directory

2014-09-01 Thread Aniket Bhatnagar
On my local (windows) dev environment, I have been trying to get spark
streaming running to test my real time(ish) jobs. I have set the checkpoint
directory as /tmp/spark and have installed latest cygwin. I keep getting
the following error:

org.apache.hadoop.util.Shell$ExitCodeException: chmod: cannot access
`/cygdrive/d/tmp/spark/f8e594bf-d940-41cb-ab0e-0fd3710696cb/rdd-57/.part-1-attempt-215':
No such file or directory


Although nothing breaks but such errors are a bit annoying. Any clues on
how to fix the issue?


Re: [Stream] Checkpointing | chmod: cannot access `/cygdrive/d/tmp/spark/f8e594bf-d940-41cb-ab0e-0fd3710696cb/rdd-57/.part-00001-attempt-215': No such file or directory

2014-09-01 Thread Aniket Bhatnagar
Hi everyone

It turns out that  I had chef installed and it's chmod has higher
preference than cygwin's chmod in the PATH. I fixed the environment
variable and now its working fine.


On 1 September 2014 11:48, Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 On my local (windows) dev environment, I have been trying to get spark
 streaming running to test my real time(ish) jobs. I have set the checkpoint
 directory as /tmp/spark and have installed latest cygwin. I keep getting
 the following error:

 org.apache.hadoop.util.Shell$ExitCodeException: chmod: cannot access
 `/cygdrive/d/tmp/spark/f8e594bf-d940-41cb-ab0e-0fd3710696cb/rdd-57/.part-1-attempt-215':
 No such file or directory


 Although nothing breaks but such errors are a bit annoying. Any clues on
 how to fix the issue?



[Streaming] Triggering an action in absence of data

2014-09-01 Thread Aniket Bhatnagar
Hi all

I am struggling to implement a use case wherein I need to trigger an action
in case no data has been received for X amount of time. I haven't been able
to figure out an easy way to do this. No state/foreach methods get called
when no data has arrived. I thought of generating a 'tick' DStream that
generates an arbitrary object and union/group the tick stream with data
stream to detect that data hasn't arrived for X amount of time. However,
since my data DStream is Paired (has key-value tuple) and I use
updateStateByKey method for processing the data stream, I can't group/union
it with tick stream(s) without knowing all keys in advance.

My second idea was to push data from DStream to an actor and let actor (per
key) manage state and data absent use cases. However, there is no way to
run an actor continuously for all data belonging to a key or a partition.

I am stuck now and can't think of anything else to solve for the use case.
Has anyone else ran into similar issue? Any thoughts on how the use case
could be implemented in Spark streaming?

Thanks,
Aniket


Using unshaded akka in Spark driver

2014-08-28 Thread Aniket Bhatnagar
I am building (yet another) job server for Spark using Play! framework and
it seems like Play's akka dependency conflicts with Spark's shaded akka
dependency. Using SBT, I can force Play to use akka 2.2.3 (unshaded) but I
haven't been able to figure out how to exclude com.typesafe.akka
dependencies all together and introduce shaded akka dependencies instead.

I was wondering what can potentially go wrong if I use unshaded version in
the job server? The job server is responsible for creating
SparkContext/StreamingContext that connects to spark cluster (using
spark.master config) and these contexts are then provided to jobs that
would create RDDs/DStreams  perform computations on them.

I am guessing in this case, the job server would act as a Spark driver.


Re: Block input-* already exists on this machine; not re-adding it warnings

2014-08-26 Thread Aniket Bhatnagar
Answering my own question, it seems that the warnings are expected as
explained by TD @
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-questions-td3281.html
.

Here is what he wrote:
Spark Streaming is designed to replicate the received data within the
machines in a Spark cluster for fault-tolerance. However, when you are
running in the local mode, since there is only one machine, the
blocks of data arent able to replicate. This is expected and safe to
ignore in local mode.

I was indeed running it in local mode and hence it seems that I can safely
ignore such warnings.

Thanks,
Aniket


On 22 August 2014 15:54, Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 Hi everyone

 I back ported kinesis-asl to spark 1.0.2 and ran a quick test on my local
 machine. It seems to be working fine but I keep getting the following
 warnings. I am not sure what it means and weather it is something to worry
 about or not.

 2014-08-22 15:53:43,803 [pool-1-thread-7] WARN
  o.apache.spark.storage.BlockManager - Block input-0-1408703023600 already
 exists on this machine; not re-adding it

 Thoughts?

 Thanks,
 Aniket



Block input-* already exists on this machine; not re-adding it warnings

2014-08-22 Thread Aniket Bhatnagar
Hi everyone

I back ported kinesis-asl to spark 1.0.2 and ran a quick test on my local
machine. It seems to be working fine but I keep getting the following
warnings. I am not sure what it means and weather it is something to worry
about or not.

2014-08-22 15:53:43,803 [pool-1-thread-7] WARN
 o.apache.spark.storage.BlockManager - Block input-0-1408703023600 already
exists on this machine; not re-adding it

Thoughts?

Thanks,
Aniket


Understanding how to create custom DStreams in Spark streaming

2014-08-22 Thread Aniket Bhatnagar
Hi everyone

Sorry about the noob question, but I am struggling to understand ways to
create DStreams in Spark. Here is my understanding based on what I could
gather from documentation and studying Spark code (as well as some hunch).
Please correct me if I am wrong.

1. In most cases, one would either extend ReceiverInputDStream
or InputDStream to create a custom DStream that pulls data from an external
system.
 - ReceiverInputDStream is used to distributed data receiving code (i.e.
Receiver) to workers. N instances of ReceiverInputDStream results in
distributing to N workers. No control on which worker nodes executes which
instance of receiving code.
 - InputDStream is used to run receiving code in driver. The driver creates
RDDs which are distributed to workers nodes which run processing logic. No
way to control on how RDD gets distributed to workers unless one does
repartitioning of generated RDDs.

2. DStreams or RDDs get no feedback on whether the processing was
successful or not. This means, one can't implement re-pull in case of
failures.

The above makes me realize that it is not trivial to implement a streaming
use case with atleast once processing guarantee. Any thoughts on building
reliable real time processing system using Spark will be appreciated.


Re: Spark shell creating a local SparkContext instead of connecting to connecting to Spark Master

2014-08-06 Thread Aniket Bhatnagar
Thanks. This worked :). I am thinking I should add this in spark-env.sh so
that spark-shell always connects to master be default.
On Aug 6, 2014 12:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 ​You can always start your spark-shell by specifying the master as

 MASTER=spark://*whatever*:7077 $SPARK_HOME/bin/spark-shell​

 Then it will connect to that *whatever* master.


 Thanks
 Best Regards


 On Tue, Aug 5, 2014 at 8:51 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi

 Apologies if this is a noob question. I have setup Spark 1.0.1 on EMR
 using a slightly modified version of script
 @ s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark-yarn.rb. It
 seems to be running fine with master logs stating:

 14/08/05 14:36:56 INFO Master: I have been elected leader! New state:
 ALIVE
 14/08/05 14:37:21 INFO Master: Registering worker
 ip-10-0-2-80.ec2.internal:52029 with 2 cores, 6.3 GB RAM

 The script has also created spark-env.sh under conf which has the
 following content:

 export SPARK_MASTER_IP=x.x.x.x
 export SCALA_HOME=/home/hadoop/.versions/scala-2.10.3
 export SPARK_LOCAL_DIRS=/mnt/spark/
 export
 SPARK_CLASSPATH=/usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar
 export SPARK_DAEMON_JAVA_OPTS=-verbose:gc -XX:+PrintGCDetails
 -XX:+PrintGCTimeStamps
 export
 SPARK_ASSEMBLY_JAR=/home/hadoop/spark/lib/spark-assembly-1.0.1-hadoop2.4.0.jar

 However, when I run the spark-shell, sc.isLocal returns true. Also, no
 matter how many RDDs I cache, the used memory in the master UI
 (x.x.x.x:7077) shows 0B used. This leads me to believe that the spark-shell
 isn't connecting to Spark master and has started a local instance of spark.
 Is there something I am missing in my setup that allows for spark-shell to
 connect to master?

 Thanks,
 Aniket





  1   2   >