Accelerating Spark SQL / Dataframe using GPUs & Alluxio

2021-04-23 Thread Bin Fan
Hi Spark users,

We have been working on GPU acceleration for Apache Spark SQL / Dataframe
using the RAPIDS Accelerator for Apache Spark
<https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/apache-spark-3/>
and open source project Alluxio <https://github.com/Alluxio/alluxio>
without any code changes.
Our preliminary results suggest 2x improvement in performance and 70% in
ROI compared to a CPU-based cluster.

Feel free to read the developer blog <https://bit.ly/2QkXjxo> for more
details of the benchmark. If you are interested to discuss further with the
authors, join our free online meetup
<https://go.alluxio.io/community-alluxio-day-2021> next Tuesday morning
(April 27) Pacific time.

Best,

- Bin Fan


Evaluating Apache Spark with Data Orchestration using TPC-DS

2021-04-08 Thread Bin Fan
Dear Spark Users,

I am sharing a whitepaper on “Evaluating Apache Spark and Alluxio for Data
Analytics <https://bit.ly/2Pg2jms>”
which talks about how to benchmark Spark on Alluxio to accelerate TPCDS
benchmark results with details. Hope this helps. If you have any questions,
feel free to reach out to me

Best regards

- Bin Fan


Bursting Your On-Premises Data Lake Analytics and AI Workloads on AWS

2021-02-18 Thread Bin Fan
Hi everyone!

I am sharing this article about running Spark / Presto workloads on
AWS: Bursting
On-Premise Datalake Analytics and AI Workloads on AWS
<https://bit.ly/3qA1Tom> published on AWS blog. Hope you enjoy it. Feel
free to discuss with me here <https://alluxio.io/slack>.

- Bin Fan
Powered by Alluxio <https://www.alluxio.io/powered-by-alluxio/> | Alluxio
Slack Channel <https://alluxio.io/slack> | Data Orchestration Summit 2020
<https://www.alluxio.io/data-orchestration-summit-2020/>


Spark in hybrid cloud in AWS & GCP

2020-12-07 Thread Bin Fan
Dear Spark users,

If you are interested in running Spark in Hybrid Cloud? Checkout talks from
AWS & GCP at the virtual Data Orchestration Summit
<https://www.alluxio.io/data-orchestration-summit-2020/> on Dec. 8-9, 2020,
register for free <https://www.alluxio.io/data-orchestration-summit-2020/>.

The summit has speaker lineup spans creators and committers of Alluxio,
Spark, Presto, Tensorflow, K8s to data engineers and software engineers
building cloud-native data and AI platforms at Amazon, Alibaba, Comcast,
Facebook, Google, ING Bank, Microsoft, Tencent, and more!


- Bin Fan


Building High-performance Lake for Spark using OSS, Hudi, Alluxio

2020-11-23 Thread Bin Fan
 Hi Spark Users,

Check out this blog on Building High-performance Data Lake using Apache
Hudi, Spark and Alluxio at T3Go <https://bit.ly/373RYPi>
<https://bit.ly/373RYPi>

Cheers

- Bin Fan


Re: Spark dataframe hdfs vs s3

2020-05-29 Thread Bin Fan
Try to deploy Alluxio as a caching layer on top of S3, providing Spark a
similar HDFS interface?
Like in this article:
https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/


On Wed, May 27, 2020 at 6:52 PM Dark Crusader 
wrote:

> Hi Randy,
>
> Yes, I'm using parquet on both S3 and hdfs.
>
> On Thu, 28 May, 2020, 2:38 am randy clinton, 
> wrote:
>
>> Is the file Parquet on S3 or is it some other file format?
>>
>> In general I would assume that HDFS read/writes are more performant for
>> spark jobs.
>>
>> For instance, consider how well partitioned your HDFS file is vs the S3
>> file.
>>
>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi Jörn,
>>>
>>> Thanks for the reply. I will try to create a easier example to reproduce
>>> the issue.
>>>
>>> I will also try your suggestion to look into the UI. Can you guide on
>>> what I should be looking for?
>>>
>>> I was already using the s3a protocol to compare the times.
>>>
>>> My hunch is that multiple reads from S3 are required because of improper
>>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>>> Does this make sense?
>>>
>>> I would also like to add that we built an extra layer on S3 which might
>>> be adding to even slower times.
>>>
>>> Thanks for your help.
>>>
>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
>>> wrote:
>>>
 Have you looked in Spark UI why this is the case ?
 S3 Reading can take more time - it depends also what s3 url you are
 using : s3a vs s3n vs S3.

 It could help after some calculation to persist in-memory or on HDFS.
 You can also initially load from S3 and store on HDFS and work from there .

 HDFS offers Data locality for the tasks, ie the tasks start on the
 nodes where the data is. Depending on what s3 „protocol“ you are using you
 might be also more punished with performance.

 Try s3a as a protocol (replace all s3n with s3a).

 You can also use s3 url but this requires a special bucket
 configuration, a dedicated empty bucket and it lacks some ineroperability
 with other AWS services.

 Nevertheless, it could be also something else with the code. Can you
 post an example reproducing the issue?

 > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
 relinquisheddra...@gmail.com>:
 >
 > 
 > Hi all,
 >
 > I am reading data from hdfs in the form of parquet files (around 3
 GB) and running an algorithm from the spark ml library.
 >
 > If I create the same spark dataframe by reading data from S3, the
 same algorithm takes considerably more time.
 >
 > I don't understand why this is happening. Is this a chance occurence
 or are the spark dataframes created different?
 >
 > I don't understand how the data store would effect the algorithm
 performance.
 >
 > Any help would be appreciated. Thanks a lot.

>>>
>>
>> --
>> I appreciate your time,
>>
>> ~Randy
>>
>


Re: What is directory "/path/_spark_metadata" for?

2019-11-11 Thread Bin Fan
Hey Mark,

I believe this is the name of the subdirectory that is used to store
metadata about which files are valid, see comment in code
https://github.com/apache/spark/blob/v2.3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L33
Do you see the exception as warnings or as errors in Alluxio master log? It
will be helpful to post the stack trace if it is available.
My hypothesis is that Spark in your case was testing creating such
directory

-Bin

On Wed, Aug 28, 2019 at 1:59 AM Mark Zhao  wrote:

> Hey,
>
>  When running Spark on Alluxio-1.8.2, I encounter the following exception:
> “alluxio.exception.FileDoseNotExistException: Path
> “/test-data/_spark_metadata” does not exist” in Alluxio master.log. What
> exactly is the directory "_spark_metadata" used for? And how can I fix this
> problem?
>
> Thanks.
>
> Mark
>


How to avoid spark executor fetching jars in Local mode

2019-10-11 Thread Bin Chen
Hello everyone,

For spark local mode, I noticed the executor fetching jars from local, is there 
a way to avoid these steps, as these jars already on the local filesystem, it 
takes seconds to fetching all the jars, we want to reduce the time on bootstrap 
phase.

we are using spark-submit with `--conf spark.jars` to define these libs.

[2019-09-26 06:19:59,651] [INFO ] o.a.s.e.Executor - Fetching 
spark://10.2.1.5:43745/jars/xxx.jar with timestamp 1569478797007
[2019-09-26 06:19:59,729] [INFO ] o.a.s.n.c.TransportClientFactory - 
Successfully created connection to /10.2.1.5:43745 after 55 ms (0 ms spent in 
bootstraps)
[2019-09-26 06:19:59,754] [INFO ] o.a.s.u.Utils - Fetching 
spark://10.2.1.5:43745/jars/xxx.jar to 
/tmp/spark-0365e48c-1747-4370-978f-7cd142ef0375/userFiles-3309dc5e-b6d0-4b76-a9aa-8e0a226ddab9/fetchFileTemp6346264674899227929.tmp
[2019-09-26 06:19:59,801] [INFO ] o.a.s.e.Executor - Adding 
file:/tmp/spark-0365e48c-1747-4370-978f-7cd142ef0375/userFiles-3309dc5e-b6d0-4b76-a9aa-8e0a226ddab9/xxx.jar
 to class loader

Thanks

Chen Bin



As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our contacts privacy notice at Talend, Inc. 
<https://www.talend.com/contacts-privacy-policy/>




Re: Low cache hit ratio when running Spark on Alluxio

2019-09-19 Thread Bin Fan
Depending on the Alluxio version you are running, e..g, for 2.0, the
metrics of the local short-circuit read is not turned on by default.
So I would suggest you to first turn on the metrics collecting local
short-circuit reads by setting
alluxio.user.metrics.collection.enabled=true

Regarding the generic question to achieve high data locality when running
Spark on Alluxio, can you read
this article
https://www.alluxio.io/blog/top-10-tips-for-making-the-spark-alluxio-stack-blazing-fast/
and follow the suggests there. E.g., things can be weird on running Spark
on YARN for this case.

If you need more detailed instructions, feel free to join Alluxio community
channel https://slackin.alluxio.io <https://www.alluxio.io/slack>

- Bin Fan
alluxio.io <http://bit.ly/2JctWrJ> | powered by <http://bit.ly/2JdD0N2> | Data
Orchestration Summit 2019
<https://www.alluxio.io/data-orchestration-summit-2019/>

On Wed, Aug 28, 2019 at 1:49 AM Jerry Yan  wrote:

> Hi,
>
> We are running Spark jobs on an Alluxio Cluster which is serving 13
> gigabytes of data with 99% of the data is in memory. I was hoping to speed
> up the Spark jobs by reading the in-memory data in Alluxio, but found
> Alluxio local hit rate is only 1.68%, while Alluxio remote hit rate is
> 98.32%. By monitoring the network IO across all worker nodes through
> "dstat" command, I found that only two nodes had about 1GB of recv or send
> in the whole precessand, and it is sending  1GB or receiving 1GB during
> Spark Shuffle Stage. Is there any metrics I could check or configuration
> to tune ?
>
>
> Best,
>
> Jerry
>


Re: Can I set the Alluxio WriteType in Spark applications?

2019-09-19 Thread Bin Fan
Hi Mark,

You can follow the instructions here:
https://docs.alluxio.io/os/user/stable/en/compute/Spark.html#customize-alluxio-user-properties-for-individual-spark-jobs

Something like this:

$ spark-submit \--conf
'spark.driver.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH'
\--conf 
'spark.executor.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH'
\...


Hope it helps

- Bin

On Tue, Sep 17, 2019 at 7:53 AM Mark Zhao  wrote:

> Hi,
>
> If Spark applications write data into alluxio, can WriteType be configured?
>
> Thanks,
> Mark
>
>


Re: How to fix ClosedChannelException

2019-05-16 Thread Bin Fan
Hi

This *java.nio.channels.ClosedChannelException* is often caused by a
connection timeout
between your Spark executors and Alluxio workers.
One simple and quick fix is to increase the timeout value to be larger
alluxio.user.network.netty.timeout
<https://docs.alluxio.io/os/user/stable/en/reference/Properties-List.html#alluxio.user.network.netty.timeout>
in
your Spark jobs.

Checkout how to run Spark with customized alluxio properties
<https://docs.alluxio.io/os/user/stable/en/compute/Spark.html?utm_source=spark&utm_medium=mailinglist>
.

- Bin


On Thu, May 9, 2019 at 4:39 AM u9g  wrote:

> Hey,
>
> When I run Spark on Alluxio, I encounter the following error. How can I
> fix this? Thanks
>
> Lost task 63.0 in stage 0.0 (TID 63, 172.28.172.165, executor 7):
> java.io.lOException: java.util.concurrent.ExecutionExcep tion:
> java.nio.channels.ClosedC hannelException
>
> Best,
> Andy Li
>
>
>
>


Re: How to configure alluxio cluster with spark in yarn

2019-05-16 Thread Bin Fan
hi Andy

Assuming you are running Spark with YARN, then I would recommend deploying
Alluxio in the same YARN cluster if you are looking for best performance.
Alluxio can also be deployed separated as a standalone service, but in that
case, you may need to transfer data from Alluxio cluster to your Spark/YARN
cluster.

Here is the documentation
<https://docs.alluxio.io/os/user/1.8/en/deploy/Running-Alluxio-On-Yarn.html?utm_source=spark>
about
deploying Alluxio with YARN.

- Bin

On Thu, May 9, 2019 at 4:19 AM u9g  wrote:

> Hey,
>
> I want to speed up the Spark task running in the Yarn cluster through
> Alluxio. Is Alluxio recommended to run in the same yarn cluster on the yarn
> mode? Should I deploy Alluxio independently on the nodes of the yarn
> cluster? Or deploy a cluster separately?
> Best,
> Andy Li
>
>
>
>


Re: cache table vs. parquet table performance

2019-04-17 Thread Bin Fan
Hi Tomas,

One option is to cache your table as Parquet files into Alluxio (which can
serve as an in-memory distributed caching layer for Spark in your case).

The code on Spark will be like

> df.write.parquet("alluxio://master:19998/data.parquet")> df = 
> sqlContext.read.parquet("alluxio://master:19998/data.parquet")

(See more details at the documentation
http://www.alluxio.org/docs/1.8/en/compute/Spark.html
<http://www.alluxio.org/docs/1.8/en/compute/Spark.html#cache-dataframe-into-alluxio?utm_source=spark>
)

This would require running Alluxio as a separate service (ideally colocated
with Spark servers), of course.
But also enables data sharing across Spark jobs.

- Bin




On Tue, Jan 15, 2019 at 10:29 AM Tomas Bartalos 
wrote:

> Hello,
>
> I'm using spark-thrift server and I'm searching for best performing
> solution to query hot set of data. I'm processing records with nested
> structure, containing subtypes and arrays. 1 record takes up several KB.
>
> I tried to make some improvement with cache table:
>
> cache table event_jan_01 as select * from events where day_registered =
> 20190102;
>
>
> If I understood correctly, the data should be stored in *in-memory
> columnar* format with storage level MEMORY_AND_DISK. So data which
> doesn't fit to memory will be spille to disk (I assume also in columnar
> format (?))
> I cached 1 day of data (1 M records) and according to spark UI storage tab
> none of the data was cached to memory and everything was spilled to disk.
> The size of the data was *5.7 GB.*
> Typical queries took ~ 20 sec.
>
> Then I tried to store the data to parquet format:
>
> CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02"
> as
>
> select * from event_jan_01;
>
>
> The whole parquet took up only *178MB.*
> And typical queries took 5-10 sec.
>
> Is it possible to tune spark to spill the cached data in parquet format ?
> Why the whole cached table was spilled to disk and nothing stayed in
> memory ?
>
> Spark version: 2.4.0
>
> Best regards,
> Tomas
>
>


Re: How shall I configure the Spark executor memory size and the Alluxio worker memory size on a machine?

2019-04-04 Thread Bin Fan
oops, sorry for the confusion. I mean "20% of the size of your input data
set" allocated to Alluxio as memory resource as the starting point.
after that, you can checkout the cache hit ratio into Alluxio space based
on the metrics collected in Alluxio web UI
<http://www.alluxio.org/docs/1.8/en/basic/Web-Interface.html#master-metrics>
.
If you see lower hit ratio, increase Alluxio storage size and vice versa.

Hope this helps,

- Bin

On Thu, Apr 4, 2019 at 9:29 PM Bin Fan  wrote:

> Hi Andy,
>
> It really depends on your workloads. I would suggest to allocate 20% of
> the size of your input data set
> as the starting point and see how it works.
>
> Also depending on your data source as the under store of Alluxio, if it is
> remote (e.g., cloud storage like S3 or GCS),
> you can perhaps use Alluxio to manage local disk or SSD storage resource
> rather than memory resource.
> In this case, the "local Alluxio storage" is still much faster compared to
> reading from remote storage.
> Check out the documentation on tiered storage configuration here (
> http://www.alluxio.org/docs/1.8/en/advanced/Alluxio-Storage-Management.html#configuring-alluxio-storage
> )
>
> - Bin
>
> On Thu, Mar 21, 2019 at 8:26 AM u9g  wrote:
>
>> Hey,
>>
>> We have a cluster of 10 nodes each of which consists 128GB memory. We are
>> about to running Spark and Alluxio on the cluster.  We wonder how shall
>> allocate the memory to the Spark executor and the Alluxio worker on a
>> machine? Are there some recommendations? Thanks!
>>
>> Best,
>> Andy Li
>>
>>
>>
>>
>


Re: How shall I configure the Spark executor memory size and the Alluxio worker memory size on a machine?

2019-04-04 Thread Bin Fan
Hi Andy,

It really depends on your workloads. I would suggest to allocate 20% of the
size of your input data set
as the starting point and see how it works.

Also depending on your data source as the under store of Alluxio, if it is
remote (e.g., cloud storage like S3 or GCS),
you can perhaps use Alluxio to manage local disk or SSD storage resource
rather than memory resource.
In this case, the "local Alluxio storage" is still much faster compared to
reading from remote storage.
Check out the documentation on tiered storage configuration here (
http://www.alluxio.org/docs/1.8/en/advanced/Alluxio-Storage-Management.html#configuring-alluxio-storage
)

- Bin

On Thu, Mar 21, 2019 at 8:26 AM u9g  wrote:

> Hey,
>
> We have a cluster of 10 nodes each of which consists 128GB memory. We are
> about to running Spark and Alluxio on the cluster.  We wonder how shall
> allocate the memory to the Spark executor and the Alluxio worker on a
> machine? Are there some recommendations? Thanks!
>
> Best,
> Andy Li
>
>
>
>


Re: Questions about caching

2018-12-24 Thread Bin Fan
Hi Andrew,

Since you mentioned the alternative solution with Alluxio
<http://alluxio.org>, here is a more comprehensive
tutorial on caching Spark dataframes on Alluxio:
https://www.alluxio.com/blog/effective-spark-dataframes-with-alluxio

Namely, caching your dataframe is simply running
df.write.parquet(alluxioFilePath)
and your dataframes are stored in Alluxio as parquet files and you can
share them with more users.
One advantage with Alluxio here is you can manually free the cached data
from memory tier or
set the TTL for the cached data if you'd like more control on the data.


- Bin

On Tue, Dec 11, 2018 at 9:13 AM Andrew Melo  wrote:

> Greetings, Spark Aficionados-
>
> I'm working on a project to (ab-)use PySpark to do particle physics
> analysis, which involves iterating with a lot of transformations (to
> apply weights and select candidate events) and reductions (to produce
> histograms of relevant physics objects). We have a basic version
> working, but I'm looking to exploit some of Spark's caching behavior
> to speed up the interactive computation portion of the analysis,
> probably by writing a thin convenience wrapper. I have a couple
> questions I've been unable to find definitive answers to, which would
> help me design this wrapper an efficient way:
>
> 1) When cache()-ing a dataframe where only a subset of the columns are
> used, is the entire dataframe placed into the cache, or only the used
> columns. E.G. does "df2" end up caching only "a", or all three
> columns?
>
> df1 = sc.read.load('test.parquet') # Has columns a, b, c
> df2 = df1.cache()
> df2.select('a').collect()
>
> 2) Are caches reference-based, or is there some sort of de-duplication
> based on the logical/physical plans. So, for instance, does spark take
> advantage of the fact that these two dataframes should have the same
> content:
>
> df1 = sc.read.load('test.parquet').cache()
> df2 = sc.read.load('test.parquet').cache()
>
> ...or are df1 and df2 totally independent WRT caching behavior?
>
> 2a) If the cache is reference-based, is it sufficient to hold a
> weakref to the python object to keep the cache in-scope?
>
> 3) Finally, the spark.externalBlockStore.blockManager is intriguing in
> our environment where we have multiple users concurrently analyzing
> mostly the same input datasets. We have enough RAM in our clusters to
> cache a high percentage of the very common datasets, but only if users
> could somehow share their caches (which, conveniently, are the larger
> datasets), We also have very large edge SSD cache servers we use to
> cache trans-oceanic I/O we could throw at this as well.
>
> It looks, however, like that API was removed in 2.0.0 and there wasn't
> a replacement. There are products like Alluxio, but they aren't
> transparent, requiring the user to manually cache their dataframes by
> doing save/loads to external files using "alluxio://" URIs. Is there
> no way around this behavior now?
>
> Sorry for the long email, and thanks!
> Andrew
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: off heap to alluxio/tachyon in Spark 2

2016-09-19 Thread Bin Fan
Hi,

If you are looking for how to run Spark on Alluxio (formerly Tachyon),
here is the documentation from Alluxio doc site:
http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html
It still works for Spark 2.x.

Alluxio team also published articles on when and why running Spark (2.x)
with Alluxio may benefit performance:
http://www.alluxio.com/2016/08/effective-spark-rdds-with-alluxio/

- Bin


On Mon, Sep 19, 2016 at 7:56 AM, aka.fe2s  wrote:

> Hi folks,
>
> What has happened with Tachyon / Alluxio in Spark 2? Doc doesn't mention
> it no longer.
>
> --
> Oleksiy Dyagilev
>


PySpark read from HBase

2016-08-12 Thread Bin Wang
Hi there,

I have lots of raw data in several Hive tables where we built a workflow to
"join" those records together and restructured into HBase. It was done
using plain MapReduce to generate HFile, and then load incremental from
HFile into HBase to guarantee the best performance.

However, we need to do some time series analysis for each of the record in
HBase, but the implementation was done in Python (pandas, scikit learn)
which is pretty time-consuming to reproduce in Java, Scala.

I am thinking PySpark is probably the best approach if it works.
Can pyspark read from HFile directory? or can it read from HBase in
parallel?
I don't see that many examples out there so any help or guidance will be
appreciated.

Also, we are using Cloudera Hadoop so there might be a slight delay with
the latest Spark release.

Best regards,

Bin


Re: Question About OFF_HEAP Caching

2016-07-18 Thread Bin Fan
Here is one blog illustrating how to use Spark on Alluxio for this purpose.
Hope it will help:

http://www.alluxio.com/2016/04/getting-started-with-alluxio-and-spark/

On Mon, Jul 18, 2016 at 6:36 AM, Gene Pang  wrote:

> Hi,
>
> If you want to use Alluxio with Spark 2.x, it is recommended to write to
> and read from Alluxio with files. You can save an RDD with saveAsObjectFile
> with an Alluxio path (alluxio://host:port/path/to/file), and you can read
> that file from any other Spark job. Here is additional information on how
> to run Spark with Alluxio:
> http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html
>
> Hope that helps,
> Gene
>
> On Mon, Jul 18, 2016 at 12:11 AM, condor join 
> wrote:
>
>> Hi All,
>>
>> I have some questions about OFF_HEAP Caching. In Spark 1.X when we use
>> *rdd.persist(StorageLevel.OFF_HEAP)*,that means rdd caching in
>> Tachyon(Alluxio). However,in Spark 2.X,we can directly use OFF_HEAP  For
>> Caching
>>
>> (
>> https://issues.apache.org/jira/browse/SPARK-13992?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22off-heap%20caching%22).
>> I am confuse about this and I have follow questions:
>>
>> 1.In Spark 2.X, how should we use Tachyon for caching?
>>
>> 2.Is there any reason that must change in this way(I mean use off_heap
>> directly instead of using Tachyon)
>>
>> Thanks a lot!
>>
>>
>>
>>
>


Re: Possible to broadcast a function?

2016-06-29 Thread Bin Fan
following this suggestion, Aaron, you may take a look at Alluxio as the
off-heap in-memory data storage as input/output for Spark jobs if that
works for you.

See more intro on how to run Spark with Alluxio as data input / output.

http://www.alluxio.org/documentation/en/Running-Spark-on-Alluxio.html

- Bin

On Wed, Jun 29, 2016 at 8:40 AM, Sonal Goyal  wrote:

> Have you looked at Alluxio? (earlier tachyon)
>
> Best Regards,
> Sonal
> Founder, Nube Technologies <http://www.nubetech.co>
> Reifier at Strata Hadoop World
> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
> Reifier at Spark Summit 2015
> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Wed, Jun 29, 2016 at 7:30 PM, Aaron Perrin 
> wrote:
>
>> The user guide describes a broadcast as a way to move a large dataset to
>> each node:
>>
>> "Broadcast variables allow the programmer to keep a read-only variable
>> cached on each machine rather than shipping a copy of it with tasks. They
>> can be used, for example, to give every node a copy of a large input
>> dataset in an efficient manner."
>>
>> And the broadcast example shows it being used with a variable.
>>
>> But, is it somehow possible to instead broadcast a function that can be
>> executed once, per node?
>>
>> My use case is the following:
>>
>> I have a large data structure that I currently create on each executor.
>> The way that I create it is a hack.  That is, when the RDD function is
>> executed on the executor, I block, load a bunch of data (~250 GiB) from an
>> external data source, create the data structure as a static object in the
>> JVM, and then resume execution.  This works, but it ends up costing me a
>> lot of extra memory (i.e. a few TiB when I have a lot of executors).
>>
>> What I'd like to do is use the broadcast mechanism to load the data
>> structure once, per node.  But, I can't serialize the data structure from
>> the driver.
>>
>> Any ideas?
>>
>> Thanks!
>>
>> Aaron
>>
>>
>


Re: How to close connection in mapPartitions?

2015-10-22 Thread Bin Wang
BTW, "lines" is a DStream.

Bin Wang 于2015年10月23日周五 下午2:16写道:

> I use mapPartitions to open connections to Redis, I write it like this:
>
> val seqs = lines.mapPartitions { lines =>
>   val cache = new RedisCache(redisUrl, redisPort)
>   val result = lines.map(line => Parser.parseBody(line, cache))
>   cache.redisPool.close
>   result
> }
>
> But it seems the pool is closed before I use it. Am I doing anything
> wrong? Here is the error:
>
> java.lang.IllegalStateException: Pool not open
>   at 
> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>   at 
> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>   at com.redis.RedisClientPool.withClient(Pool.scala:34)
>   at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>   at 
> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>   at 
> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>   at 
> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>   at 
> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>


How to close connection in mapPartitions?

2015-10-22 Thread Bin Wang
I use mapPartitions to open connections to Redis, I write it like this:

val seqs = lines.mapPartitions { lines =>
  val cache = new RedisCache(redisUrl, redisPort)
  val result = lines.map(line => Parser.parseBody(line, cache))
  cache.redisPool.close
  result
}

But it seems the pool is closed before I use it. Am I doing anything wrong?
Here is the error:

java.lang.IllegalStateException: Pool not open
at 
org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
at 
org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
at com.redis.RedisClientPool.withClient(Pool.scala:34)
at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
at 
com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
at 
com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
at 
com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
at 
com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: Dose spark auto invoke StreamingContext.stop while receive kill signal?

2015-09-23 Thread Bin Wang
Thanks for the explain. I've opened a PR at
https://github.com/apache/spark/pull/8898

Tathagata Das 于2015年9月24日周四 上午2:44写道:

> YEs, since 1.4.0, it shuts down streamingContext without gracefully from
> shutdown hook.
> You can make it shutdown gracefully in that hook by setting the SparkConf
> "spark.streaming.stopGracefullyOnShutdown" to "true"
>
> Note to self, document this in the programming guide.
>
> On Wed, Sep 23, 2015 at 3:33 AM, Bin Wang  wrote:
>
>> I'd like the spark application to be stopped gracefully while received
>> kill signal, so I add these code:
>>
>> sys.ShutdownHookThread {
>>   println("Gracefully stopping Spark Streaming Application")
>>   ssc.stop(stopSparkContext = true, stopGracefully = true)
>>   println("Application stopped")
>> }
>>
>> But the application is not stopped gracefully:
>>
>> 15/09/23 17:44:38 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15:
>> SIGTERM
>> ...
>> 15/09/23 17:44:38 INFO streaming.StreamingContext: Invoking
>> stop(stopGracefully=false) from shutdown hook
>>
>> Dose spark auto invoke StreamingContext.stop for me?
>>
>
>


Dose spark auto invoke StreamingContext.stop while receive kill signal?

2015-09-23 Thread Bin Wang
I'd like the spark application to be stopped gracefully while received kill
signal, so I add these code:

sys.ShutdownHookThread {
  println("Gracefully stopping Spark Streaming Application")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  println("Application stopped")
}

But the application is not stopped gracefully:

15/09/23 17:44:38 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
...
15/09/23 17:44:38 INFO streaming.StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook

Dose spark auto invoke StreamingContext.stop for me?


Is it possible to merged delayed batches in streaming?

2015-09-23 Thread Bin Wang
I'm using Spark Streaming and there maybe some delays between batches. I'd
like to know is it possible to merge delayed batches into one batch to do
processing?

For example, the interval is set to 5 min but the first batch uses 1 hour,
so there are many batches delayed. In the end of processing for each batch,
I'll save the data into database. So if all the delayed batches are merged
into a big one, it will save many resources. I'd like to know if it is
possible. Thanks.


Re: How to recovery DStream from checkpoint directory?

2015-09-17 Thread Bin Wang
Thanks Adrian, the hint of use updateStateByKey with initialRdd helps a lot!

Adrian Tanase 于2015年9月17日周四 下午4:50写道:

> This section in the streaming guide makes your options pretty clear
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code
>
>
>1. Use 2 versions in parallel, drain the queue up to a point and strat
>fresh in the new version, only processing events from that point forward
>   1. Note that “up to a point” is specific to you state management
>   logic, it might mean “user sessions stated after 4 am” NOT “events 
> received
>   after 4 am”
>2. Graceful shutdown and saving data to DB, followed by checkpoint
>cleanup / new checkpoint dir
>   1. On restat, you need to use the updateStateByKey that takes an
>   initialRdd with the values preloaded from DB
>   2. By cleaning the checkpoint in between upgrades, data is loaded
>   only once
>
> Hope this helps,
> -adrian
>
> From: Bin Wang
> Date: Thursday, September 17, 2015 at 11:27 AM
> To: Akhil Das
> Cc: user
> Subject: Re: How to recovery DStream from checkpoint directory?
>
> In my understand, here I have only three options to keep the DStream state
> between redeploys (yes, I'm using updateStateByKey):
>
> 1. Use checkpoint.
> 2. Use my own database.
> 3. Use both.
>
> But none of  these options are great:
>
> 1. Use checkpoint: I cannot load it after code change. Or I need to keep
> the structure of the classes, which seems to be impossible in a developing
> project.
> 2. Use my own database: there may be failure between the program read data
> from Kafka and save the DStream to database. So there may have data lose.
> 3. Use both: Will the data load two times? How can I know in which
> situation I should use the which one?
>
> The power of checkpoint seems to be very limited. Is there any plan to
> support checkpoint while class is changed, like the discussion you gave me
> pointed out?
>
>
>
> Akhil Das 于2015年9月17日周四 下午3:26写道:
>
>> Any kind of changes to the jvm classes will make it fail. By
>> checkpointing the data you mean using checkpoint with updateStateByKey?
>> Here's a similar discussion happened earlier which will clear your doubts i
>> guess
>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang  wrote:
>>
>>> And here is another question. If I load the DStream from database every
>>> time I start the job, will the data be loaded when the job is failed and
>>> auto restart? If so, both the checkpoint data and database data are loaded,
>>> won't this a problem?
>>>
>>>
>>>
>>> Bin Wang 于2015年9月16日周三 下午8:40写道:
>>>
>>>> Will StreamingContex.getOrCreate do this work?What kind of code change
>>>> will make it cannot load?
>>>>
>>>> Akhil Das 于2015年9月16日周三 20:20写道:
>>>>
>>>>> You can't really recover from checkpoint if you alter the code. A
>>>>> better approach would be to use some sort of external storage (like a db 
>>>>> or
>>>>> zookeeper etc) to keep the state (the indexes etc) and then when you 
>>>>> deploy
>>>>> new code they can be easily recovered.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:
>>>>>
>>>>>> I'd like to know if there is a way to recovery dstream from
>>>>>> checkpoint.
>>>>>>
>>>>>> Because I stores state in DStream, I'd like the state to be recovered
>>>>>> when I restart the application and deploy new code.
>>>>>>
>>>>>
>>>>>
>>


Re: How to recovery DStream from checkpoint directory?

2015-09-17 Thread Bin Wang
In my understand, here I have only three options to keep the DStream state
between redeploys (yes, I'm using updateStateByKey):

1. Use checkpoint.
2. Use my own database.
3. Use both.

But none of  these options are great:

1. Use checkpoint: I cannot load it after code change. Or I need to keep
the structure of the classes, which seems to be impossible in a developing
project.
2. Use my own database: there may be failure between the program read data
from Kafka and save the DStream to database. So there may have data lose.
3. Use both: Will the data load two times? How can I know in which
situation I should use the which one?

The power of checkpoint seems to be very limited. Is there any plan to
support checkpoint while class is changed, like the discussion you gave me
pointed out?



Akhil Das 于2015年9月17日周四 下午3:26写道:

> Any kind of changes to the jvm classes will make it fail. By checkpointing
> the data you mean using checkpoint with updateStateByKey? Here's a similar
> discussion happened earlier which will clear your doubts i guess
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E
>
> Thanks
> Best Regards
>
> On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang  wrote:
>
>> And here is another question. If I load the DStream from database every
>> time I start the job, will the data be loaded when the job is failed and
>> auto restart? If so, both the checkpoint data and database data are loaded,
>> won't this a problem?
>>
>>
>>
>> Bin Wang 于2015年9月16日周三 下午8:40写道:
>>
>>> Will StreamingContex.getOrCreate do this work?What kind of code change
>>> will make it cannot load?
>>>
>>> Akhil Das 于2015年9月16日周三 20:20写道:
>>>
>>>> You can't really recover from checkpoint if you alter the code. A
>>>> better approach would be to use some sort of external storage (like a db or
>>>> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
>>>> new code they can be easily recovered.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:
>>>>
>>>>> I'd like to know if there is a way to recovery dstream from checkpoint.
>>>>>
>>>>> Because I stores state in DStream, I'd like the state to be recovered
>>>>> when I restart the application and deploy new code.
>>>>>
>>>>
>>>>
>


Re: How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
And here is another question. If I load the DStream from database every
time I start the job, will the data be loaded when the job is failed and
auto restart? If so, both the checkpoint data and database data are loaded,
won't this a problem?



Bin Wang 于2015年9月16日周三 下午8:40写道:

> Will StreamingContex.getOrCreate do this work?What kind of code change
> will make it cannot load?
>
> Akhil Das 于2015年9月16日周三 20:20写道:
>
>> You can't really recover from checkpoint if you alter the code. A better
>> approach would be to use some sort of external storage (like a db or
>> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
>> new code they can be easily recovered.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:
>>
>>> I'd like to know if there is a way to recovery dstream from checkpoint.
>>>
>>> Because I stores state in DStream, I'd like the state to be recovered
>>> when I restart the application and deploy new code.
>>>
>>
>>


Re: How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
Will StreamingContex.getOrCreate do this work?What kind of code change will
make it cannot load?

Akhil Das 于2015年9月16日周三 20:20写道:

> You can't really recover from checkpoint if you alter the code. A better
> approach would be to use some sort of external storage (like a db or
> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
> new code they can be easily recovered.
>
> Thanks
> Best Regards
>
> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:
>
>> I'd like to know if there is a way to recovery dstream from checkpoint.
>>
>> Because I stores state in DStream, I'd like the state to be recovered
>> when I restart the application and deploy new code.
>>
>
>


How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
I'd like to know if there is a way to recovery dstream from checkpoint.

Because I stores state in DStream, I'd like the state to be recovered when
I restart the application and deploy new code.


Re: How to clear Kafka offset in Spark streaming?

2015-09-14 Thread Bin Wang
I think I've found the reason. It seems that the the smallest offset is not
0 and I should not set the offset to 0.

Bin Wang 于2015年9月14日周一 下午2:46写道:

> Hi,
>
> I'm using spark streaming with kafka and I need to clear the offset and
> re-compute all things. I deleted checkpoint directory in HDFS and reset
> kafka offset with "kafka-run-class kafka.tools.ImportZkOffsets". I can
> confirm the offset is set to 0 in kafka:
>
> ~ > kafka-run-class kafka.tools.ConsumerOffsetChecker --group
> adhoc_data_spark --topic adhoc_data --zookeeper szq1.appadhoc.com:2181
> Group   Topic  Pid Offset  logSize
> Lag Owner
> adhoc_data_spark adhoc_data 0   0
> 5280743 5280743 none
>
> But when I restart spark streaming, the offset is reset to logSize, I
> cannot figure out why is that, can anybody help? Thanks.
>


How to clear Kafka offset in Spark streaming?

2015-09-13 Thread Bin Wang
Hi,

I'm using spark streaming with kafka and I need to clear the offset and
re-compute all things. I deleted checkpoint directory in HDFS and reset
kafka offset with "kafka-run-class kafka.tools.ImportZkOffsets". I can
confirm the offset is set to 0 in kafka:

~ > kafka-run-class kafka.tools.ConsumerOffsetChecker --group
adhoc_data_spark --topic adhoc_data --zookeeper szq1.appadhoc.com:2181
Group   Topic  Pid Offset  logSize
Lag Owner
adhoc_data_spark adhoc_data 0   0   5280743
5280743 none

But when I restart spark streaming, the offset is reset to logSize, I
cannot figure out why is that, can anybody help? Thanks.


Re: Data lost in spark streaming

2015-09-13 Thread Bin Wang
There is some error logs in the executor and I don't know if it is related:

15/09/11 10:54:05 WARN ipc.Client: Exception encountered while connecting
to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv
alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01
15/09/11 10:54:05 WARN yarn.ApplicationMaster: Reporter thread fails 4
time(s) in a row.
org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
AMRMToken from appattempt_1440495451668_0258_01
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.allocate(Unknown Source)
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:278)
at
org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:174)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:323)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
Invalid AMRMToken from appattempt_1440495451668_0258_01
at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy21.allocate(Unknown Source)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
... 9 more

...

15/09/11 10:54:10 WARN ipc.Client: Exception encountered while connecting
to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv
alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01
15/09/11 10:54:10 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 12, (reason: Exception was thrown 5 time(s) from Reporter thread.)
15/09/11 10:54:10 INFO streaming.StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook
15/09/11 10:54:10 INFO scheduler.ReceiverTracker: Sent stop signal to all 1
receivers
15/09/11 10:54:12 ERROR scheduler.ReceiverTracker: Deregistered receiver
for stream 0: Stopped by driver

Tathagata Das 于2015年9月13日周日 下午4:05写道:

> Maybe the driver got restarted. See the log4j logs of the driver before it
> restarted.
>
> On Thu, Sep 10, 2015 at 11:32 PM, Bin Wang  wrote:
>
>> I'm using spark streaming 1.4.0 and have a DStream that have all the data
>> it received. But today the history data in the DStream seems to be lost
>> suddenly. And the application UI also lost the streaming process time and
>> all the related data. Could any give some hint to debug this? Thanks.
>>
>>
>>
>


Data lost in spark streaming

2015-09-10 Thread Bin Wang
I'm using spark streaming 1.4.0 and have a DStream that have all the data
it received. But today the history data in the DStream seems to be lost
suddenly. And the application UI also lost the streaming process time and
all the related data. Could any give some hint to debug this? Thanks.


Re: Will multiple filters on the same RDD optimized to one filter?

2015-07-16 Thread Bin Wang
What if I would use both rdd1 and rdd2 later?

Raghavendra Pandey 于2015年7月16日周四 下午4:08写道:

> If you cache rdd it will save some operations. But anyway filter is a lazy
> operation. And it runs based on what you will do later on with rdd1 and
> rdd2...
>
> Raghavendra
> On Jul 16, 2015 1:33 PM, "Bin Wang"  wrote:
>
>> If I write code like this:
>>
>> val rdd = input.map(_.value)
>> val f1 = rdd.filter(_ == 1)
>> val f2 = rdd.filter(_ == 2)
>> ...
>>
>> Then the DAG of the execution may be this:
>>
>>  -> Filter -> ...
>> Map
>>  -> Filter -> ...
>>
>> But the two filters is operated on the same RDD, which means it could be
>> done by just scan the RDD once. Does spark have this kind optimization for
>> now?
>>
>


Will multiple filters on the same RDD optimized to one filter?

2015-07-16 Thread Bin Wang
If I write code like this:

val rdd = input.map(_.value)
val f1 = rdd.filter(_ == 1)
val f2 = rdd.filter(_ == 2)
...

Then the DAG of the execution may be this:

 -> Filter -> ...
Map
 -> Filter -> ...

But the two filters is operated on the same RDD, which means it could be
done by just scan the RDD once. Does spark have this kind optimization for
now?


Re: Spark Streaming Hangs on Start

2015-07-09 Thread Bin Wang
Thanks for the help. I set  --executor-cores and it works now. I've used
--total-executor-cores and don't realize it changed.

Tathagata Das 于2015年7月10日周五 上午3:11写道:

> 1. There will be a long running job with description "start()" as that is
> the jobs that is running the receivers. It will never end.
>
> 2. You need to set the number of cores given to the Spark executors by the
> YARN container. That is SparkConf spark.executor.cores,  --executor-cores
> in spark-submit. Since it is by default 1, your only container has one core
> which is occupied by the receiver, leaving no cores to run the map tasks.
> So the map stage is blocked
>
> 3.  Note these log lines. Especially "15/07/09 18:29:00 INFO
> receiver.ReceiverSupervisorImpl: Received stop signal" . I think somehow
> your streaming context is being shutdown too early which is causing the
> KafkaReceiver to stop. Something your should debug.
>
>
> 15/07/09 18:27:13 INFO consumer.ConsumerFetcherThread: 
> [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
>  Starting
> 15/07/09 18:27:13 INFO consumer.ConsumerFetcherManager: 
> [ConsumerFetcherManager-1436437633199] Added fetcher for partitions 
> ArrayBuffer([[adhoc_data,0], initOffset 53 to broker 
> id:42,host:szq1.appadhoc.com,port:9092] )
> 15/07/09 18:27:13 INFO storage.MemoryStore: ensureFreeSpace(1680) called with 
> curMem=96628, maxMem=16669841817
> 15/07/09 18:27:13 INFO storage.MemoryStore: Block input-0-1436437633600 
> stored as bytes in memory (estimated size 1680.0 B, free 15.5 GB)
> 15/07/09 18:27:13 WARN storage.BlockManager: Block input-0-1436437633600 
> replicated to only 0 peer(s) instead of 1 peers
> 15/07/09 18:27:14 INFO receiver.BlockGenerator: Pushed block 
> input-0-1436437633600*15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: 
> Received stop signal
> *15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Stopping receiver 
> with message: Stopped by driver:
> 15/07/09 18:29:00 INFO consumer.ZookeeperConsumerConnector: 
> [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], 
> ZKConsumerConnector shutting down
> 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: 
> [ConsumerFetcherManager-1436437633199] Stopping leader finder thread
> 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
>  Shutting down
> 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
>  Stopped
> 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
>  Shutdown completed
> 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: 
> [ConsumerFetcherManager-1436437633199] Stopping all fetchers
> 15/07/09 18:29:00 INFO consumer.ConsumerFetcherThread: 
> [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
>  Shutting down
> 15/07/09 18:29:01 INFO consumer.SimpleConsumer: Reconnect due to socket 
> error: java.nio.channels.ClosedByInterruptException
> 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: 
> [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
>  Stopped
> 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: 
> [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
>  Shutdown completed
> 15/07/09 18:29:01 INFO consumer.ConsumerFetcherManager: 
> [ConsumerFetcherManager-1436437633199] All connections stopped
> 15/07/09 18:29:01 INFO zkclient.ZkEventThread: Terminate ZkClient event 
> thread.
> 15/07/09 18:29:01 INFO zookeeper.ZooKeeper: Session: 0x14e70eedca00315 closed
> 15/07/09 18:29:01 INFO zookeeper.ClientCnxn: EventThread shut down
> 15/07/09 18:29:01 INFO consumer.ZookeeperConsumerConnector: 
> [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], 
> ZKConsumerConnector shutdown completed in 74 ms
> 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop
> 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Deregistering 
> receiver 0
>
>
>
>


Spark Streaming Hangs on Start

2015-07-09 Thread Bin Wang
I'm using spark streaming with Kafka, and submit it to YARN cluster with
mode "yarn-cluster". But it hangs at SparkContext.start(). The Kafka config
is right since it can show some events in "Streaming" tab of web UI.

The attached file is the screen shot of the "Jobs" tab of web UI. The code
in the main class is:

object StatCounter {

  val config = ConfigFactory.load()
  val redisUrl = config.getString("redis.url")
  val redisPort = config.getInt("redis.port")
  val zkQuorum = config.getString("kafka.zkQuorum")
  val group = config.getString("kafka.group")
  val topic = config.getString("kafka.topic")
  val threadNum = config.getInt("kafka.threadNum")

  val cache = new RedisCache(redisUrl, redisPort)

  def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(config.getString("spark.name"))
.set("spark.cassandra.connection.host",
config.getString("cassandra.host"))

val ssc = new StreamingContext(conf,
Seconds(config.getInt("spark.interval")))
ssc.checkpoint(config.getString("spark.checkpoint"))
val storage = new CassandraStorage("adhoc_data", ssc)

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic ->
threadNum)).map(_._2)

val logs = lines.flatMap(line => Parser.parseBody(line, cache))
Counter.count(logs, storage)

sys.ShutdownHookThread {
  println("Gracefully stopping Spark Streaming Application")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  println("Application stopped")
}

ssc.start()
ssc.awaitTermination()
  }
}

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: How to submit streaming application and exit

2015-07-08 Thread Bin Wang
Thanks. Actually I've find the way. I'm using spark-submit to submit the
job the a YARN cluster with --mater yarn-cluster (which spark-submit
process is not the driver). So I can config
"spark.yarn.submit.waitAppComplettion" to "false" so that the process will
exit after the job is submitted.

ayan guha 于2015年7月8日周三 下午12:26写道:

> spark-submit is nothing but a process in your OS, so you should be able to
> submit it in background and exit. However, your spark-submit process itself
> is the driver for your spark streaming application, so it will not exit for
> the lifetime of the streaming app.
>
> On Wed, Jul 8, 2015 at 1:13 PM, Bin Wang  wrote:
>
>> I'm writing a streaming application and want to use spark-submit to
>> submit it to a YARN cluster. I'd like to submit it in a client node and
>> exit spark-submit after the application is running. Is it possible?
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


How to submit streaming application and exit

2015-07-07 Thread Bin Wang
I'm writing a streaming application and want to use spark-submit to submit
it to a YARN cluster. I'd like to submit it in a client node and exit
spark-submit after the application is running. Is it possible?


Problem Run Spark Example HBase Code Using Spark-Submit

2015-06-25 Thread Bin Wang
I am trying to run the Spark example code HBaseTest from command line using
spark-submit instead run-example, in that case, I can learn more how to run
spark code in general.

However, it told me CLASS_NOT_FOUND about htrace since I am using CDH5.4. I
successfully located the htrace jar file but I am having a hard time adding
it to path.

This is the final spark-submit command I have but still have the class not
found error.  Can anyone help me with this?

#!/bin/bash
export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark

/bin/bash $SPARK_HOME/bin/spark-submit \
--master yarn-client \
--class org.apache.spark.examples.HBaseTest \
--driver-class-path
/etc/hbase/conf:$SPARK_HOME/examples/lib/*.jar:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/*.jar
\
--jars
$SPARK_HOME/examples/lib/*.jar:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/*.jar
\
$SPARK_HOME/examples/lib/*.jar \
myhbasetablename

Note:
htrace-core-3.0.4.jar, htrace-core-3.1.0-incubating.jar, htrace-core.jar
are all located under '
/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/'.


Re: Specify Python interpreter

2015-05-12 Thread Bin Wang
Hi Felix and Tomoas,

Thanks a lot for your information. I figured out the environment variable
PYSPARK_PYTHON is the secret key.

My current approach is to start iPython notebook on the namenode,

export PYSPARK_PYTHON=/opt/local/anaconda/bin/ipython
/opt/local/anaconda/bin/ipython notebook --profile=mypysparkprofile

In my iPython notebook, I have the flexibility to manually start my
SparkContext in a way like this:

os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/jre-1.7.0-openjdk.x86_64/"
sys.path.append("/opt/cloudera/parcels/CDH/lib/spark/python")

  
sys.path.append("/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip")
from pyspark import SparkContext, SparkConf
sconf = SparkConf()
conf = (SparkConf().setMaster("spark://datafireball1:7077")
.setAppName("SparkApplication")
.set("spark.executor.memory", "16g")
.set("spark.ui.killEnabled", "true"))
sc = SparkContext(conf=conf)

This really works out for me and I am using the lastest iPython notebook to
interactively write Spark application.

If you have a better Python solution will can offer a better workflow for
interactive spark development. Please share.

Bin


On Tue, May 12, 2015 at 1:20 AM, Tomas Olsson  wrote:

> Hi,
> You can try
>
> PYSPARK_DRIVER_PYTHON=/path/to/ipython
> PYSPARK_DRIVER_PYTHON_OPTS="notebook” /path/to//pyspark
>
>
> /Tomas
>
> > On 11 May 2015, at 22:17, Bin Wang  wrote:
> >
> > Hey there,
> >
> > I have installed a python interpreter in certain location, say
> "/opt/local/anaconda".
> >
> > Is there anything that I can specify the Python interpreter while
> developing in iPython notebook? Maybe a property in the while creating the
> Sparkcontext?
> >
> >
> > I know that I can put "#!/opt/local/anaconda" at the top of my Python
> code and use spark-submit to distribute it to the cluster. However, since I
> am using iPython notebook, this is not available as an option.
> >
> > Best,
> >
> > Bin
>
>


Specify Python interpreter

2015-05-11 Thread Bin Wang
Hey there,

I have installed a python interpreter in certain location, say
"/opt/local/anaconda".

Is there anything that I can specify the Python interpreter while
developing in iPython notebook? Maybe a property in the while creating the
Sparkcontext?


I know that I can put "#!/opt/local/anaconda" at the top of my Python code
and use spark-submit to distribute it to the cluster. However, since I am
using iPython notebook, this is not available as an option.

Best,

Bin


Spark on top of YARN Compression in iPython notebook

2015-05-10 Thread Bin Wang
I started a AWS cluster (1master + 3core) and download the prebuilt Spark
binary. I downloaded the latest Anaconda Python and started a iPython
notebook server by running the command below:

ipython notebook --port  --profile nbserver --no-browser

Then, I try to develop a Spark application running on top of YARN
interactively in the iPython notebook:

Here is the code that I have written:

import sys
import os
from pyspark import SparkContext, SparkConf
sys.path.append('/home/hadoop/myuser/spark-1.3.1-bin-hadoop2.4/python')
sys.path.append('/home/hadoop/myuser/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip')
os.environ["YARN_CONF_DIR"] = "/home/hadoop/conf"
os.environ["SPARK_HOME"] = "/home/hadoop/bwang/spark-1.3.1-bin-hadoop2.4"
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("Spark ML")
.set("spark.executor.memory", "2g")
   )
sc = SparkContext(conf=conf)
data = sc.textFile("hdfs://
ec2-xx.xx.xx..compute-1.amazonaws.com:8020/data/*")
data.count()

The code works all the way till the count, and it shows
"com.hadoop.compression.lzo.LzoCodec not found"..
Here <http://www.wepaste.com/sparkcompression/>is the full log.

I did some search, and it is basically around Spark cannot access Lzocodec
library.

I have tried to use os.environ to set the SPARK_CLASSPATH and
SPARK_LIBRARY_PATH to include the hadoop-lzo.jar which is located in
"./home/hadoop/.versions/2.4.0-amzn-4/share/hadoop/common/lib/hadoop-lzo.jar
" in AWS hadoop. However, it is still not working.

Can anyone show me how to solve this problem?


Using Pandas/Scikit Learning in Pyspark

2015-05-08 Thread Bin Wang
Hey there,

I have a CDH cluster where the default Python installed on those Redhat
Linux are Python2.6.

I am thinking about developing a Spark application using pyspark and I want
to be able to use Pandas and Scikit learn package. Anaconda Python
interpreter has the most funtionalities out of box, however, when I try to
use Anaconda Python2.7. The Spark job won't run properly and failed due to
the reason that the Python interpreter is not consistent across the
cluster.
Here are my questions:

(1) I took a quick look at the source code of pyspark, looks like in the
end, they are using spark-submit. Doesn't that mean all the work in the end
will be translated into scala code and distribute the workload to the whole
cluster? In that case, I should not worry about the Python interpreter
beyond the master node right?

(2) If the Spark job need consistent Python library to be installed on
every node. Should I install Anaconda Python on all of them? If so, what is
the modern way of managing the Python ecosystem on the cluster?

I am a big fan of Python so please guide me.

Best regards,

Bin


Anaconda iPython notebook working with CDH Spark

2014-12-28 Thread Bin Wang
Hi there,

I have a cluster with CDH5.1 running on top of Redhat6.5, where the default
Python version is 2.6. I am trying to set up a proper iPython notebook
environment to develop spark application using pyspark.

Here
<http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/>
is a tutorial that I have been following. However, it turned out that the
author was using iPython1 where we have the latest Anaconda Python2.7
installed on our name node. When I finished following the tutorial, I can
connect to the spark cluster but whenever I tried to distribute the work,
it will errorred out and google tells me it is the difference between the
version of Python across the cluster.

Here are a few thoughts that I am planning to try.
(1) remove the Anaconda Python from the namenode and install the iPython
version that is compatible with Python2.6.
(2) or I need to install Anaconda Python on every node and make it the
default Python version across the whole cluster (however, I am not sure if
this plan will totally screw up the existing environment since some running
services are built by Python2.6...)

Let me which should be the proper way to set up an iPython notebook
environment.

Best regards,

Bin


Running time bottleneck on a few worker

2014-08-12 Thread Bin
Hi All,


I met a problem that for each stage, most workers finished fast (around 1min), 
but a few workers spent like 7min to finish, which significantly slow down the 
process.


As shown below, the running time is very unbalancedly distributed over workers.


I wonder whether this is normal? Is it related to the partition strategy? For 
now, I used the default partition strategy.


Looking for advice!


Thanks very much!


Best,
Bin



[GraphX] Is it normal to shuffle write 15GB while the data is only 30MB?

2014-08-08 Thread Bin
Hi All,


I am running a customized label propagation using Pregel. After a few 
iterations, the program becomes slow and wastes a lot of time in mapPartitions 
(at GraphImpl.scala:184 or VertexRDD.scala:318, or VertexRDD.scala:323). And 
the amount of shuffle write reaches 15GB, while the size of the raw data with 
(srcid, dstid, weight) is only 30MB.


I wonder whether this is normal?


Below please find my customized label propagation implementation. I have 
changed "map" into "mapValues in line 183 to decrease shuffling, but it makes 
no difference":


"
154 def adsorption(sc : SparkContext, graph : Graph[(Int, Map[VertexId, 
Double], String), Double], tol: Double)
155 : Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] =
156 {
157 val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, 
Int, String), Double] = graph
158 .outerJoinVertices(graph.inDegrees){
159 case (vid, u, inDegOpt) => (u._1, u._2, 1.0, 
inDegOpt.getOrElse(0), u._3)
160 }
162 .cache()
167 
168 def sendMessage(edge : EdgeTriplet[(Int, Map[VertexId, Double], 
Double, Int, String), Double])={
175 val dstAttr = edge.dstAttr
176 
177 if (dstAttr._3 >= tol && (dstAttr._1 == 1 || dstAttr._1 == 3))
178 {
181 val indegree = dstAttr._4.toDouble
182 
183 val mapToSend = 
edge.srcAttr._2.mapValues{_/indegree}.map(identity)
187 Iterator((edge.dstId, mapToSend))
188 }
189 else
190 {   
192 Iterator.empty
193 }
194 }
195 
196 def mergeMessage(label1:Map[VertexId, Double], label2:Map[VertexId, 
Double]): Map[VertexId, Double] =
197 {  
202 val mm = (label1.keySet ++ label2.keySet).map{i=>
203 val count1Val = label1.getOrElse(i, 0.0)
204 val count2Val = label2.getOrElse(i, 0.0)
205 i->(count1Val + count2Val)
206 }.toMap
211 }
212 
213 def vertexProgram(vid: VertexId, attr: (Int, Map[VertexId, Double], 
Double, Int, String), message: Map[VertexId, Double])={
218 
219if (message.isEmpty) attr
220else
221{
223 
224val oldlabel = attr._2
227 
228var accum = 0.0
229 
230message.foreach(x=> (accum = accum + x._2))
233 
234val newlabel = message.map(x=>(x._1->x._2/accum))
235 
236val diff = 
(newlabel.keySet--oldlabel.keySet).toSet.size.toDouble / 
oldlabel.keySet.size.toDouble
239 
240(attr._1, newlabel, diff, attr._4, attr._5)
241}
242 }
243 
244 // empty initial message
245 val initialMessage = Map[VertexId, Double]()
246 
247 Pregel(adsorptionGraph, initialMessage, maxIterations = 3, 
activeDirection = EdgeDirection.In)(
248 vprog = vertexProgram,
249 sendMsg = sendMessage,
250 mergeMsg = mergeMessage
251 )
252 }
"

Re:[GraphX] Can't zip RDDs with unequal numbers of partitions

2014-08-07 Thread Bin
OK, I think I've figured it out.


It seems to be a bug which has been reported at: 
https://issues.apache.org/jira/browse/SPARK-2823 and  
https://github.com/apache/spark/pull/1763.


As it says: "If the users set “spark.default.parallelism” and the value is 
different with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions"


So my quick fix is to repartition the EdgeRDD to exactly the number of 
parallelism. But I think this would lead to much network communication.


So is there any other better solutions?


Thanks a lot!


Best,
Bin

在 2014-08-06 04:54:39,"Bin"  写道:

Hi All,


Finally I found that the problem occured when I called the graphx lib:


"
Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs 
with unequal numbers of partitions
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:56)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:703)
at adsorption$.adsorption(adsorption.scala:138)
at adsorption$.main(adsorption.scala:64)
at adsorption.main(adsorption.scala)
"


The source codes:


"
129 val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, 
Int, String), Double] = graph
130 .outerJoinVertices(graph.inDegrees){
131 case (vid, u, inDegOpt) => (u._1, u._2, 1.0, 
inDegOpt.getOrElse(0), u._3)
132 }
133 //.mapVertices((vid, v_attr) => (v_attr._1, v_attr._2, 1.0, 0))
134 .cache()
135 
136 println("After transforming into adsorption graph 
")
137 
138 adsorptionGraph.triplets.foreach(tri=>println())
"


Any advice?


Thanks a lot!


Best,
Bin




[GraphX] Can't zip RDDs with unequal numbers of partitions

2014-08-06 Thread Bin
Hi All,


Finally I found that the problem occured when I called the graphx lib:


"
Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs 
with unequal numbers of partitions
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:56)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:703)
at adsorption$.adsorption(adsorption.scala:138)
at adsorption$.main(adsorption.scala:64)
at adsorption.main(adsorption.scala)
"


The source codes:


"
129 val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, 
Int, String), Double] = graph
130 .outerJoinVertices(graph.inDegrees){
131 case (vid, u, inDegOpt) => (u._1, u._2, 1.0, 
inDegOpt.getOrElse(0), u._3)
132 }
133 //.mapVertices((vid, v_attr) => (v_attr._1, v_attr._2, 1.0, 0))
134 .cache()
135 
136 println("After transforming into adsorption graph 
")
137 
138 adsorptionGraph.triplets.foreach(tri=>println())
"


Any advice?


Thanks a lot!


Best,
Bin

Can't zip RDDs with unequal numbers of partitions

2014-08-05 Thread Bin
Hi All,


I met the titled error. This exception occured in line 223, as shown below:


212 // read files
213 val lines = 
sc.textFile(path_edges).map(line=>line.split(",")).map(line=>((line(0), 
line(1)), line(2).toDouble)).reduceByKey(_+
_).cache
214 
215 val lines_vertices = lines.map{line=>(line._1._1, 
Map(nameHash(line._1._2)->line._2))}.reduceByKey(_++_).cache
216 
217 val name_shadow = "_shadow"
218 
219 val nodes =
220 lines_vertices
221 .map{line=>(nameHash(line._1), (1, Map[VertexId,Double](), 
line._1))} ++
222 lines_vertices
223 .map{line=>(nameHash(line._1 + name_shadow), (2,line._2, line._1 + 
name_shadow))} ++
224 lines
225 .map{line=>(nameHash(line._1._2), (3, Map[VertexId,Double](), 
line._1._2))}


Sorry for posting the source codes, but I couldn't think of a better way. 


I am confused how come the partitions were unequal, and how I can control the 
number of partitions of these RDD. Can someone give me some advice on this 
problem?


Thanks very much!


Best,
Bin

[GraphX] How spark parameters relate to Pregel implementation

2014-08-04 Thread Bin
Hi all,


I wonder how spark parameters, e.g., number of paralellism, affect Pregel 
performance? Specifically, sendmessage, mergemessage, and vertexprogram?


I have tried label propagation on a 300,000 edges graph, and I found that no 
paralellism is much faster than 5 or 500 paralellism.


Looking for advice!


Thanks a lot!


Best,
Bin

Re:Re: Re:Re: [GraphX] The best way to construct a graph

2014-07-31 Thread Bin
Thanks for the advice. But since I am not the administrator of our spark 
cluster, I can't do this. Is there any better solution based on the current 
spark?


At 2014-08-01 02:38:15, "shijiaxin"  wrote:
>Have you tried to write another similar function like edgeListFile in the
>same file, and then compile the project again?
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-The-best-way-to-construct-a-graph-tp11122p11138.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re:Re: [GraphX] The best way to construct a graph

2014-07-31 Thread Bin
It seems that I cannot specify the weights. I have also tried to imitate 
GraphLoader.edgeListFile, but I can't call The methods and class used in 
GraphLoader.edgeListFile.


Have you successfully done this?


At 2014-08-01 12:47:08, "shijiaxin"  wrote:
>I think you can try GraphLoader.edgeListFile, and then use join to associate
>the attributes with each vertex
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-The-best-way-to-construct-a-graph-tp11122p11127.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re:Re: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

2014-07-31 Thread Bin
Hi Haiyang,


Thanks, it really is the reason.


Best,
Bin



在 2014-07-31 08:05:34,"Haiyang Fu"  写道:

Have you tried to increase the dirver memory?



On Thu, Jul 31, 2014 at 3:54 PM, Bin  wrote:

Hi All,


The data size of my task is about 30mb. It runs smoothly in local mode. 
However, when I submit it to the cluster, it throws the titled error (Please 
see below for the complete output).


Actually, my output is almost the same with 
http://stackoverflow.com/questions/24080891/spark-program-hangs-at-job-finished-toarray-workers-throw-java-util-concurren.
 I also toArray my data, which was the reason of his case.


However, how come it runs OK in local but not in the cluster? The memory of 
each worker is over 60g, and my run command is:


"$SPARK_HOME/bin/spark-class org.apache.spark.deploy.Client launch 
spark://10.196.135.101:7077 $jar_path $programname 
-Dspark.master=spark://10.196.135.101:7077 -Dspark.cores.max=300 
-Dspark.executor.memory=20g -spark.jars=$jar_path 
-Dspark.default.parallelism=100  
-Dspark.hadoop.hadoop.job.ugi=$username,$groupname  -Dspark.app.name=$appname 
$in_path $scala_out_path"


Looking for help and thanks a lot!


Below please find the complete output:


14/07/31 15:06:53 WARN Configuration: DEPRECATED: hadoop-site.xml found in the 
classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, 
mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, 
mapred-default.xml and hdfs-default.xml respectively
14/07/31 15:06:53 INFO SecurityManager: Changing view acls to: spark
14/07/31 15:06:53 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(spark)
14/07/31 15:06:53 INFO Slf4jLogger: Slf4jLogger started
14/07/31 15:06:53 INFO Remoting: Starting remoting
14/07/31 15:06:54 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkExecutor@tdw-10-215-140-22:39446]
14/07/31 15:06:54 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://sparkExecutor@tdw-10-215-140-22:39446]
14/07/31 15:06:54 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
akka.tcp://spark@tdw-10-196-135-106:38502/user/CoarseGrainedScheduler
14/07/31 15:06:54 INFO WorkerWatcher: Connecting to worker 
akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker
14/07/31 15:06:54 INFO WorkerWatcher: Successfully connected to 
akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker
14/07/31 15:06:56 INFO CoarseGrainedExecutorBackend: Successfully registered 
with driver
14/07/31 15:06:56 INFO SecurityManager: Changing view acls to: spark
14/07/31 15:06:56 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(spark)
14/07/31 15:06:56 INFO Slf4jLogger: Slf4jLogger started
14/07/31 15:06:56 INFO Remoting: Starting remoting
14/07/31 15:06:56 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://spark@tdw-10-215-140-22:56708]
14/07/31 15:06:56 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://spark@tdw-10-215-140-22:56708]
14/07/31 15:06:56 INFO SparkEnv: Connecting to MapOutputTracker: 
akka.tcp://spark@tdw-10-196-135-106:38502/user/MapOutputTracker
14/07/31 15:06:58 INFO SparkEnv: Connecting to BlockManagerMaster: 
akka.tcp://spark@tdw-10-196-135-106:38502/user/BlockManagerMaster
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data1/sparkenv/local/spark-local-20140731150659-3f12
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data2/sparkenv/local/spark-local-20140731150659-1602
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data3/sparkenv/local/spark-local-20140731150659-d213
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data4/sparkenv/local/spark-local-20140731150659-f42e
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data5/sparkenv/local/spark-local-20140731150659-63d0
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data6/sparkenv/local/spark-local-20140731150659-9003
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data7/sparkenv/local/spark-local-20140731150659-f260
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data8/sparkenv/local/spark-local-20140731150659-6334
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data9/sparkenv/local/spark-local-20140731150659-3af4
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data10/sparkenv/local/spark-local-20140731150659-133d
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data11/sparkenv/local/spark-local-20140731150659-ed08
14/07/31 15:06:59 INFO MemoryStore: MemoryStore started with capacity 11.5 GB.
14/07/31 15:06:59 INFO ConnectionManager: Bound socket to port 35127 with id = 
ConnectionManagerId(tdw-10-215-140-22,35127)
14/07/31 15:06:59 INFO BlockManagerMaster: Trying to register B

[GraphX] The best way to construct a graph

2014-07-31 Thread Bin
Hi All,


I am wondering what is the best way to construct a graph?


Say I have some attributes for each user, and specific weight for each user 
pair. The way I am currently doing is first read user information and edge 
triple into two arrays, then use sc.parallelize to create vertexRDD and 
edgeRDD, respectively. Then create the graph using Graph(vertices, edges).


I wonder whether there is a better way to do this?


Looking for advice!


Thanks very much!


Best,
Bin

java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

2014-07-31 Thread Bin
Hi All,


The data size of my task is about 30mb. It runs smoothly in local mode. 
However, when I submit it to the cluster, it throws the titled error (Please 
see below for the complete output).


Actually, my output is almost the same with 
http://stackoverflow.com/questions/24080891/spark-program-hangs-at-job-finished-toarray-workers-throw-java-util-concurren.
 I also toArray my data, which was the reason of his case.


However, how come it runs OK in local but not in the cluster? The memory of 
each worker is over 60g, and my run command is:


"$SPARK_HOME/bin/spark-class org.apache.spark.deploy.Client launch 
spark://10.196.135.101:7077 $jar_path $programname 
-Dspark.master=spark://10.196.135.101:7077 -Dspark.cores.max=300 
-Dspark.executor.memory=20g -spark.jars=$jar_path 
-Dspark.default.parallelism=100  
-Dspark.hadoop.hadoop.job.ugi=$username,$groupname  -Dspark.app.name=$appname 
$in_path $scala_out_path"


Looking for help and thanks a lot!


Below please find the complete output:


14/07/31 15:06:53 WARN Configuration: DEPRECATED: hadoop-site.xml found in the 
classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, 
mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, 
mapred-default.xml and hdfs-default.xml respectively
14/07/31 15:06:53 INFO SecurityManager: Changing view acls to: spark
14/07/31 15:06:53 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(spark)
14/07/31 15:06:53 INFO Slf4jLogger: Slf4jLogger started
14/07/31 15:06:53 INFO Remoting: Starting remoting
14/07/31 15:06:54 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkExecutor@tdw-10-215-140-22:39446]
14/07/31 15:06:54 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://sparkExecutor@tdw-10-215-140-22:39446]
14/07/31 15:06:54 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
akka.tcp://spark@tdw-10-196-135-106:38502/user/CoarseGrainedScheduler
14/07/31 15:06:54 INFO WorkerWatcher: Connecting to worker 
akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker
14/07/31 15:06:54 INFO WorkerWatcher: Successfully connected to 
akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker
14/07/31 15:06:56 INFO CoarseGrainedExecutorBackend: Successfully registered 
with driver
14/07/31 15:06:56 INFO SecurityManager: Changing view acls to: spark
14/07/31 15:06:56 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(spark)
14/07/31 15:06:56 INFO Slf4jLogger: Slf4jLogger started
14/07/31 15:06:56 INFO Remoting: Starting remoting
14/07/31 15:06:56 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://spark@tdw-10-215-140-22:56708]
14/07/31 15:06:56 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://spark@tdw-10-215-140-22:56708]
14/07/31 15:06:56 INFO SparkEnv: Connecting to MapOutputTracker: 
akka.tcp://spark@tdw-10-196-135-106:38502/user/MapOutputTracker
14/07/31 15:06:58 INFO SparkEnv: Connecting to BlockManagerMaster: 
akka.tcp://spark@tdw-10-196-135-106:38502/user/BlockManagerMaster
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data1/sparkenv/local/spark-local-20140731150659-3f12
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data2/sparkenv/local/spark-local-20140731150659-1602
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data3/sparkenv/local/spark-local-20140731150659-d213
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data4/sparkenv/local/spark-local-20140731150659-f42e
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data5/sparkenv/local/spark-local-20140731150659-63d0
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data6/sparkenv/local/spark-local-20140731150659-9003
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data7/sparkenv/local/spark-local-20140731150659-f260
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data8/sparkenv/local/spark-local-20140731150659-6334
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data9/sparkenv/local/spark-local-20140731150659-3af4
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data10/sparkenv/local/spark-local-20140731150659-133d
14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at 
/data11/sparkenv/local/spark-local-20140731150659-ed08
14/07/31 15:06:59 INFO MemoryStore: MemoryStore started with capacity 11.5 GB.
14/07/31 15:06:59 INFO ConnectionManager: Bound socket to port 35127 with id = 
ConnectionManagerId(tdw-10-215-140-22,35127)
14/07/31 15:06:59 INFO BlockManagerMaster: Trying to register BlockManager
14/07/31 15:07:00 INFO BlockManagerMaster: Registered BlockManager
14/07/31 15:07:00 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-0914d215-dd22-4d5e-9ec0-724937dbfd8b
14/07/31 15:

[GraphX] How to access a vertex via vertexId?

2014-07-29 Thread Bin
Hi All,


I wonder how to access a vertex via its vertexId? I need to get vertex's 
attributes after running graph algorithm.


Thanks very much!


Best,
Bin

Re: Scala vs Python performance differences

2014-04-14 Thread Bin Wang
At least, Spark Streaming doesn't support Python at this moment, right?


On Mon, Apr 14, 2014 at 6:48 PM, Andrew Ash  wrote:

> Hi Spark users,
>
> I've always done all my Spark work in Scala, but occasionally people ask
> about Python and its performance impact vs the same algorithm
> implementation in Scala.
>
> Has anyone done tests to measure the difference?
>
> Anecdotally I've heard Python is a 40% slowdown but that's entirely
> hearsay.
>
> Cheers,
> Andrew
>


Re: Spark and HBase

2014-04-08 Thread Bin Wang
First, I have not tried it myself. However, what I have heard it has some
basic SQL features so you can query you HBase table like query content on
HDFS using Hive.
So it is not "query a simple column", I believe you can do joins and other
SQL queries. Maybe you can wrap up an EMR cluster with Hbase preconfigured
and give it a try.

Sorry cannot provide more detailed explanation and help.



On Tue, Apr 8, 2014 at 10:17 AM, Flavio Pompermaier wrote:

> Thanks for the quick reply Bin. Phenix is something I'm going to try for
> sure but is seems somehow useless if I can use Spark.
> Probably, as you said, since Phoenix use a dedicated data structure within
> each HBase Table has a more effective memory usage but if I need to
> deserialize data stored in a HBase cell I still have to read in memory that
> object and thus I need Spark. From what I understood Phoenix is good if I
> have to query a simple column of HBase but things get really complicated if
> I have to add an index for each column in my table and I store complex
> object within the cells. Is it correct?
>
> Best,
> Flavio
>
>
>
>
> On Tue, Apr 8, 2014 at 6:05 PM, Bin Wang  wrote:
>
>> Hi Flavio,
>>
>> I happened to attend, actually attending the 2014 Apache Conf, I heard a
>> project called "Apache Phoenix", which fully leverage HBase and suppose to
>> be 1000x faster than Hive. And it is not memory bounded, in which case sets
>> up a limit for Spark. It is still in the incubating group and the "stats"
>> functions spark has already implemented are still on the roadmap. I am not
>> sure whether it will be good but might be something interesting to check
>> out.
>>
>> /usr/bin
>>
>>
>> On Tue, Apr 8, 2014 at 9:57 AM, Flavio Pompermaier 
>> wrote:
>>
>>> Hi to everybody,
>>>
>>>  in these days I looked a bit at the recent evolution of the big data
>>> stacks and it seems that HBase is somehow fading away in favour of
>>> Spark+HDFS. Am I correct?
>>> Do you think that Spark and HBase should work together or not?
>>>
>>> Best regards,
>>> Flavio
>>>
>>


Re: Spark and HBase

2014-04-08 Thread Bin Wang
Hi Flavio,

I happened to attend, actually attending the 2014 Apache Conf, I heard a
project called "Apache Phoenix", which fully leverage HBase and suppose to
be 1000x faster than Hive. And it is not memory bounded, in which case sets
up a limit for Spark. It is still in the incubating group and the "stats"
functions spark has already implemented are still on the roadmap. I am not
sure whether it will be good but might be something interesting to check
out.

/usr/bin


On Tue, Apr 8, 2014 at 9:57 AM, Flavio Pompermaier wrote:

> Hi to everybody,
>
> in these days I looked a bit at the recent evolution of the big data
> stacks and it seems that HBase is somehow fading away in favour of
> Spark+HDFS. Am I correct?
> Do you think that Spark and HBase should work together or not?
>
> Best regards,
> Flavio
>


Spark Streaming Maven Build

2014-03-04 Thread Bin Wang
Hi there,

I tried the Kafka WordCount example and it works perfect and the code is
pretty straightforward to understand.

Can anyone show to me how to start your own maven project with the
KafkaWordCount example using minimum-effort.

1. How the pom file should look like (including jar-plugin?
assembly-plugin?..etc)
2. mvn install or mvn clean install or mvn install compile assembly:single?
3. after you have a jar file, then how do you execute the jar file instead
of using bin/run-example...


To answer those people who might ask what you have done
(Here is a derivative from the KafkaWordCount example that I have written
to do IP count example where the input data from Kafka is actually JSON
string.
https://github.com/biwa7636/binwangREPO
I had such a bad lucky got it to working. So if anyone can copy the code of
WordCountExample and build it using maven and got it working.. if you can
share your pom and those maven commands, I will be so appreciated!)

Best regards and let me know if you need further info.

Bin


Re: Missing Spark URL after staring the master

2014-03-04 Thread Bin Wang
Hi Mayur,

I am using CDH4.6.0p0.26.  And the latest Cloudera Spark parcel is Spark
0.9.0 CDH4.6.0p0.50.
As I mentioned, somehow, the Cloudera Spark version doesn't contain the
run-example shell scripts.. However, it is automatically configured and it
is pretty easy to set up across the cluster...

Thanks,
Bin


On Tue, Mar 4, 2014 at 10:59 AM, Mayur Rustagi wrote:

> I have on cloudera vm
> http://docs.sigmoidanalytics.com/index.php/How_to_Install_Spark_on_Cloudera_VM
> which version are you trying to setup on cloudera.. also which cloudera
> version are you using...
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Mon, Mar 3, 2014 at 4:29 PM, Bin Wang  wrote:
>
>> Hi Ognen/Mayur,
>>
>> Thanks for the reply and it is good to know how easy it is to setup Spark
>> on AWS cluster.
>>
>> My situation is a bit different from yours, our company already have a
>> cluster and it really doesn't make that much sense not to use them. That is
>> why I have been "going through" this. I really wish there are some
>> tutorials teaching you how to set up Spark Cluster on baremetal CDH cluster
>> or .. some way to tweak the CDH Spark distribution, so it is up to date.
>>
>> Ognen, of course it will be very helpful if you can 'history | grep
>> spark... ' and document the work that you have done since you've already
>> made it!
>>
>> Bin
>>
>>
>>
>> On Mon, Mar 3, 2014 at 2:06 PM, Ognen Duzlevski <
>> og...@plainvanillagames.com> wrote:
>>
>>>  I should add that in this setup you really do not need to look for the
>>> printout of the master node's IP - you set it yourself a priori. If anyone
>>> is interested, let me know, I can write it all up so that people can follow
>>> some set of instructions. Who knows, maybe I can come up with a set of
>>> scripts to automate it all...
>>>
>>> Ognen
>>>
>>>
>>>
>>> On 3/3/14, 3:02 PM, Ognen Duzlevski wrote:
>>>
>>> I have a Standalone spark cluster running in an Amazon VPC that I set up
>>> by hand. All I did was provision the machines from a common AMI image (my
>>> underlying distribution is Ubuntu), I created a "sparkuser" on each machine
>>> and I have a /home/sparkuser/spark folder where I downladed spark. I did
>>> this on the master only, I did sbt/sbt assemble and I set up the
>>> conf/spark-env.sh to point to the master which is an IP address (in my case
>>> 10.10.0.200, the port is the default 7077). I also set up the slaves file
>>> in the same subdirectory to have all 16 ip addresses of the worker nodes
>>> (in my case 10.10.0.201-216). After sbt/sbt assembly was done on master, I
>>> then did cd ~/; tar -czf spark.tgz spark/ and I copied the resulting tgz
>>> file to each worker using the same "sparkuser" account and unpacked the
>>> .tgz on each slave (this will effectively replicate everything from master
>>> to all slaves - you can script this so you don't do it by hand).
>>>
>>> Your AMI should have the distribution's version of Java and git
>>> installed by the way.
>>>
>>> All you have to do then is sparkuser@spark-master>
>>> spark/sbin/start-all.sh (for 0.9, in 0.8.1 it is spark/bin/start-all.sh)
>>> and it will all automagically start :)
>>>
>>> All my Amazon nodes come with 4x400 Gb of ephemeral space which I have
>>> set up into a 1.6TB RAID0 array on each node and I am pooling this into an
>>> HDFS filesystem which is operated by a namenode outside the spark cluster
>>> while all the datanodes are the same nodes as the spark workers. This
>>> enables replication and extremely fast access since ephemeral is much
>>> faster than EBS or anything else on Amazon (you can do even better with SSD
>>> drives on this setup but it will cost ya).
>>>
>>> If anyone is interested I can document our pipeline set up - I came up
>>> with it myself and do not have a clue as to what the industry standards are
>>> since I could not find any written instructions anywhere online about how
>>> to set up a whole data analytics pipeline from the point of ingestion to
>>> the point of analytics (people don't want to share their secrets? or am I
>>> just in the dark and incapable of using Google properly?). My requirement
>>> was that I wanted this to run within a VPC for added security and
>>>

Re: Missing Spark URL after staring the master

2014-03-03 Thread Bin Wang
Hi Ognen/Mayur,

Thanks for the reply and it is good to know how easy it is to setup Spark
on AWS cluster.

My situation is a bit different from yours, our company already have a
cluster and it really doesn't make that much sense not to use them. That is
why I have been "going through" this. I really wish there are some
tutorials teaching you how to set up Spark Cluster on baremetal CDH cluster
or .. some way to tweak the CDH Spark distribution, so it is up to date.

Ognen, of course it will be very helpful if you can 'history | grep
spark... ' and document the work that you have done since you've already
made it!

Bin



On Mon, Mar 3, 2014 at 2:06 PM, Ognen Duzlevski  wrote:

>  I should add that in this setup you really do not need to look for the
> printout of the master node's IP - you set it yourself a priori. If anyone
> is interested, let me know, I can write it all up so that people can follow
> some set of instructions. Who knows, maybe I can come up with a set of
> scripts to automate it all...
>
> Ognen
>
>
>
> On 3/3/14, 3:02 PM, Ognen Duzlevski wrote:
>
> I have a Standalone spark cluster running in an Amazon VPC that I set up
> by hand. All I did was provision the machines from a common AMI image (my
> underlying distribution is Ubuntu), I created a "sparkuser" on each machine
> and I have a /home/sparkuser/spark folder where I downladed spark. I did
> this on the master only, I did sbt/sbt assemble and I set up the
> conf/spark-env.sh to point to the master which is an IP address (in my case
> 10.10.0.200, the port is the default 7077). I also set up the slaves file
> in the same subdirectory to have all 16 ip addresses of the worker nodes
> (in my case 10.10.0.201-216). After sbt/sbt assembly was done on master, I
> then did cd ~/; tar -czf spark.tgz spark/ and I copied the resulting tgz
> file to each worker using the same "sparkuser" account and unpacked the
> .tgz on each slave (this will effectively replicate everything from master
> to all slaves - you can script this so you don't do it by hand).
>
> Your AMI should have the distribution's version of Java and git installed
> by the way.
>
> All you have to do then is sparkuser@spark-master>
> spark/sbin/start-all.sh (for 0.9, in 0.8.1 it is spark/bin/start-all.sh)
> and it will all automagically start :)
>
> All my Amazon nodes come with 4x400 Gb of ephemeral space which I have set
> up into a 1.6TB RAID0 array on each node and I am pooling this into an HDFS
> filesystem which is operated by a namenode outside the spark cluster while
> all the datanodes are the same nodes as the spark workers. This enables
> replication and extremely fast access since ephemeral is much faster than
> EBS or anything else on Amazon (you can do even better with SSD drives on
> this setup but it will cost ya).
>
> If anyone is interested I can document our pipeline set up - I came up
> with it myself and do not have a clue as to what the industry standards are
> since I could not find any written instructions anywhere online about how
> to set up a whole data analytics pipeline from the point of ingestion to
> the point of analytics (people don't want to share their secrets? or am I
> just in the dark and incapable of using Google properly?). My requirement
> was that I wanted this to run within a VPC for added security and
> simplicity, the Amazon security groups get really old quickly. Added bonus
> is that you can use a VPN as an entry into the whole system and your
> cluster instantly becomes "local" to you in terms of IPs etc. I use OpenVPN
> since I don't like Cisco nor Juniper (the only two options Amazon provides
> for their VPN gateways).
>
> Ognen
>
>
> On 3/3/14, 1:00 PM, Bin Wang wrote:
>
> Hi there,
>
>  I have a CDH cluster set up, and I tried using the Spark parcel come
> with Cloudera Manager, but it turned out they even don't have the
> run-example shell command in the bin folder. Then I removed it from the
> cluster and cloned the incubator-spark into the name node of my cluster,
> and built from source there successfully with everything as default.
>
>  I ran a few examples and everything seems work fine in the local mode.
> Then I am thinking about scale it to my cluster, which is what the
> "DISTRIBUTE + ACTIVATE" command does in Cloudera Manager. I want to add all
> the datanodes to the slaves and think I should run Spark in the standalone
> mode.
>
>  Say I am trying to set up Spark in the standalone mode following this
> instruction:
> https://spark.incubator.apache.org/docs/latest/spark-standalone.html
> However, it says "Once started, the master will print out a
> spark://HOST:PORT URL for its

Missing Spark URL after staring the master

2014-03-03 Thread Bin Wang
Hi there,

I have a CDH cluster set up, and I tried using the Spark parcel come with
Cloudera Manager, but it turned out they even don't have the run-example
shell command in the bin folder. Then I removed it from the cluster and
cloned the incubator-spark into the name node of my cluster, and built from
source there successfully with everything as default.

I ran a few examples and everything seems work fine in the local mode. Then
I am thinking about scale it to my cluster, which is what the "DISTRIBUTE +
ACTIVATE" command does in Cloudera Manager. I want to add all the datanodes
to the slaves and think I should run Spark in the standalone mode.

Say I am trying to set up Spark in the standalone mode following this
instruction:
https://spark.incubator.apache.org/docs/latest/spark-standalone.html
However, it says "Once started, the master will print out a
spark://HOST:PORT URL for itself, which you can use to connect workers to
it, or pass as the "master" argument to SparkContext. You can also find
this URL on the master's web UI, which is http://localhost:8080 by default."

After I started the master, there is no URL printed on the screen and
neither the web UI is running.
Here is the output:
[root@box incubator-spark]# ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to
/root/bwang_spark_new/incubator-spark/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-box.out

First Question: am I even in the ballpark to run Spark in standalone mode
if I try to fully utilize my cluster? I saw there are four ways to launch
Spark on a cluster, AWS-EC2, Spark in standalone, Apache Meso, Hadoop
Yarn... which I guess standalone mode is the way to go?

Second Question: how to get the Spark URL of the cluster, why the output is
not like what the instruction says?

Best regards,

Bin