RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target release day for Spark3.3?

2022-01-21 Thread Bode, Meikel, NM-X-DS
Hello Juan Liu,

The release process is well documented (see last step on announcement):
https://spark.apache.org/release-process.html

To (un)subcribe to the mailing lists see:
https://spark.apache.org/community.html

Best,
Meikel

Meikel Bode, MSc
Senior Manager | Head of SAP Data Platforms & Analytics
-
Postal address:
Arvato Systems GmbH
Reinhard-Mohn-Straße 200
3 Gütersloh
Germany

Visitor address:
Arvato Systems GmbH
Fuggerstraße 11
33689 Bielefeld
Germany

Phone: +49(5241)80-89734
Mobile: +49(151)14774185
E-Mail: meikel.b...@bertelsmann.de<mailto:meikel.b...@bertelsmann.de>
arvato-systems.de<https://www.arvato-systems.de/>



From: Juan Liu 
Sent: Donnerstag, 20. Januar 2022 09:44
To: Bode, Meikel, NM-X-DS 
Cc: sro...@gmail.com; Theodore J Griesenbrock ; 
user@spark.apache.org
Subject: RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?

Sie erhalten nicht oft E-Mail von 
"liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>". Weitere Informationen, warum 
dies wichtig ist<http://aka.ms/LearnAboutSenderIdentification>
hi, Meikel, would you pls help to add both of us 
(t...@ibm.com<mailto:t...@ibm.com>, 
liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>) to mailing lists: 
user@spark.apache.org<mailto:user@spark.apache.org> ? thanks!
Juan Liu (刘娟) PMP®
Release Manager, Watson Health, China Development Lab
Email: liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>
Mobile: 86-13521258532





From:"Bode, Meikel, NM-X-DS" 
mailto:meikel.b...@bertelsmann.de>>
To:"Theodore J Griesenbrock" mailto:t...@ibm.com>>, 
"sro...@gmail.com<mailto:sro...@gmail.com>" 
mailto:sro...@gmail.com>>
Cc:"Juan Liu" mailto:liuj...@cn.ibm.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Date:2022/01/20 03:05 PM
Subject:[EXTERNAL] RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and 
how? your target release day for Spark3.3?




Hi, New releases are announced via mailing lists 
user@spark.apache.org<mailto:user@spark.apache.org> & 
d...@spark.apache.org<mailto:d...@spark.apache.org>. Best, Meikel From: 
Theodore J Griesenbrock mailto:t...@ibm.com>> Sent: Mittwoch, 19. 
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender

This message came from outside your organization.

ZjQcmQRYFpfptBannerEnd

Hi,



New releases are announced via mailing lists 
user@spark.apache.org<mailto:user@spark.apache.org>& 
d...@spark.apache.org<mailto:d...@spark.apache.org>.



Best,

Meikel



From:Theodore J Griesenbrock mailto:t...@ibm.com>>
Sent: Mittwoch, 19. Januar 2022 18:50
To: sro...@gmail.com<mailto:sro...@gmail.com>
Cc: Juan Liu mailto:liuj...@cn.ibm.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?



Sie erhalten nicht oft E-Mail von "t...@ibm.com<mailto:t...@ibm.com>". Weitere 
Informationen, warum dies wichtig 
ist<http://aka.ms/LearnAboutSenderIdentification>

Again, sorry to bother you.



What is the best option available to ensure we get notified when a new version 
is released for Apache Spark?  I do not see any RSS feeds, nor do I see any 
e-mail subscription option for this page:  
https://spark.apache.org/news/index.html<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fnews%2Findex.html=04%7C01%7CMeikel.Bode%40Bertelsmann.de%7Cebcdc2fa4d024e3886e708d9dbf110ed%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637782650703280903%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=8Y8ihxZl9ox%2F7EcFuDeLA0y3W0UOzVagr1dPZFuVDoA%3D=0>



Please let me know what we can do to ensure we stay up to date with the news.



Thanks!



-T.J.





T.J. Griesenbrock

Technical Release Manager

Watson Health

He/Him/His



+1 (602) 377-7673 (Text only)
t...@ibm.com<mailto:t...@ibm.com>

IBM





- Original message -
From: "Sean Owen" mailto:sro...@gmail.com>>
To: "Juan Liu" mailto:liuj...@cn.ibm.com>>
Cc: "Theodore J Griesenbrock" mailto:t...@ibm.com>>, "User" 
mailto:user@spark.apache.org>>
Subject: [EXTERNAL] Re: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? 
your target release day for Spark3.3?
Date: Thu, Jan 13, 2022 08:05

Yes, Spark does not use the SocketServer mentioned in CVE-2019-17571, however, 
so is not affected.

3.3.0 would probably be out in a couple months.



On Thu, Jan 13, 2022 at 3:14 AM Juan Liu 
mailto:liuj...@cn.ibm.com>> wrote:

We are info

RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target release day for Spark3.3?

2022-01-19 Thread Bode, Meikel, NM-X-DS
Hi,

New releases are announced via mailing lists 
user@spark.apache.org<mailto:user@spark.apache.org> & 
d...@spark.apache.org<mailto:d...@spark.apache.org>.

Best,
Meikel

From: Theodore J Griesenbrock 
Sent: Mittwoch, 19. Januar 2022 18:50
To: sro...@gmail.com
Cc: Juan Liu ; user@spark.apache.org
Subject: RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?

Sie erhalten nicht oft E-Mail von "t...@ibm.com<mailto:t...@ibm.com>". Weitere 
Informationen, warum dies wichtig 
ist<http://aka.ms/LearnAboutSenderIdentification>
Again, sorry to bother you.

What is the best option available to ensure we get notified when a new version 
is released for Apache Spark?  I do not see any RSS feeds, nor do I see any 
e-mail subscription option for this page:  
https://spark.apache.org/news/index.html<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fnews%2Findex.html=04%7C01%7CMeikel.Bode%40bertelsmann.de%7C50197a78ba4b4bef3ca108d9db77e438%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637782130240616190%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=%2BtL780hmjJoLAFTiNjQc%2FB7QtPU2u1dyW%2B1LhkXWL7o%3D=0>

Please let me know what we can do to ensure we stay up to date with the news.

Thanks!

-T.J.


T.J. Griesenbrock
Technical Release Manager
Watson Health
He/Him/His

+1 (602) 377-7673 (Text only)
t...@ibm.com<mailto:t...@ibm.com>

IBM


- Original message -
From: "Sean Owen" mailto:sro...@gmail.com>>
To: "Juan Liu" mailto:liuj...@cn.ibm.com>>
Cc: "Theodore J Griesenbrock" mailto:t...@ibm.com>>, "User" 
mailto:user@spark.apache.org>>
Subject: [EXTERNAL] Re: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? 
your target release day for Spark3.3?
Date: Thu, Jan 13, 2022 08:05

Yes, Spark does not use the SocketServer mentioned in CVE-2019-17571, however, 
so is not affected.
3.3.0 would probably be out in a couple months.

On Thu, Jan 13, 2022 at 3:14 AM Juan Liu 
mailto:liuj...@cn.ibm.com>> wrote:
We are informed that CVE-2021-4104 is not only problem with Log4J 1.x. There is 
one more CVE-2019-17571, and as Apache announced EOL in 2015, so Spark 3.3.0 
will be very expected. Do you think middle 2022 is a reasonable time for Spark 
3.3.0 release?

Juan Liu (刘娟) PMP®




Release Management, Watson Health, China Development Lab
Email: liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>
Phone: 86-10-82452506













- To 
unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


Re: Spark on Mesos: Spark issuing hundreds of SUBSCRIBE requests / second and crashing Mesos

2018-07-23 Thread Susan X. Huynh
Hi Nimi,

This sounds similar to a bug I have come across before. See:
https://jira.apache.org/jira/browse/SPARK-22342?focusedCommentId=16429950=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16429950

It turned out to be a bug in libmesos (the client library used to
communicate with Mesos): "using a failoverTimeout of 0 with Mesos native
scheduler client can result in infinite subscribe loop" (
https://issues.apache.org/jira/browse/MESOS-8171). It can be fixed by
upgrading to a version of libmesos that has the fix.

Susan


On Fri, Jul 13, 2018 at 3:39 PM, Nimi W  wrote:

> I've come across an issue with Mesos 1.4.1 and Spark 2.2.1. We launch
> Spark tasks using the MesosClusterDispatcher in cluster mode. On a couple
> of occasions, we have noticed that when the Spark Driver crashes (to
> various causes - human error, network error), sometimes, when the Driver is
> restarted, it issues a hundreds of SUBSCRIBE requests to mesos / per second
> up until the Mesos Master node gets overwhelmed and crashes. It does this
> again to the next master node, over and over until it takes down all the
> master nodes. Usually the only thing that will fix is manually stopping the
> driver and restarting.
>
> Here is a snippet of the log of the mesos master, which just logs the
> repeated SUBSCRIBE command: https://gist.github.com/nemosupremo/
> 28ef4acfd7ec5bdcccee9789c021a97f
>
> Here is the output of the spark framework: https://gist.
> github.com/nemosupremo/d098ef4def28ebf96c14d8f87aecd133 which also just
> repeats 'Transport endpoint is not connected' over and over.
>
> Thanks for any insights
>
>
>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: Interest in adding ability to request GPU's to the spark client?

2018-07-23 Thread Susan X. Huynh
There's some discussion and proposal of supporting GPUs in this Spark JIRA:
https://jira.apache.org/jira/browse/SPARK-24615 "Accelerator-aware task
scheduling for Spark"

Susan

On Thu, Jul 12, 2018 at 11:17 AM, Mich Talebzadeh  wrote:

> I agree.
>
> Adding GPU capability to Spark in my opinion is a must for Advanced
> Analytics.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 12 Jul 2018 at 19:14, Maximiliano Felice <
> maximilianofel...@gmail.com> wrote:
>
>> Hi,
>>
>> I've been meaning to reply to this email for a while now, sorry for
>> taking so much time.
>>
>> I personally think that adding GPU resource management will allow us to
>> boost some ETL performance a lot. For the last year, I've worked in
>> transforming some Machine Learning pipelines from Python in Numpy/Pandas to
>> Spark. Adding GPU capabilities to Spark would:
>>
>>
>>- Accelerate many matrix and batch computations we currently have in
>>Tensorflow
>>- Allow us to use spark for the whole pipeline (combined with
>>possibly better online serving)
>>- Let us trigger better Hyperparameter selection directly from Spark
>>
>>
>> There will be many more aspects of this that we could explode. What do
>> the rest of the list think?
>>
>> See you
>>
>> El mié., 16 may. 2018 a las 2:58, Daniel Galvez ()
>> escribió:
>>
>>> Hi all,
>>>
>>> Is anyone here interested in adding the ability to request GPUs to
>>> Spark's client (i.e, spark-submit)? As of now, Yarn 3.0's resource manager
>>> server has the ability to schedule GPUs as resources via cgroups, but the
>>> Spark client lacks an ability to request these.
>>>
>>> The ability to guarantee GPU resources would be practically useful for
>>> my organization. Right now, the only way to do that is to request the
>>> entire memory (or all CPU's) on a node, which is very kludgey and wastes
>>> resources, especially if your node has more than 1 GPU and your code was
>>> written such that an executor can use only one GPU at a time.
>>>
>>> I'm just not sure of a good way to make use of libraries like
>>> Databricks' Deep Learning pipelines
>>> <https://github.com/databricks/spark-deep-learning> for GPU-heavy
>>> computation otherwise, unless you are luckily in an organization which is
>>> able to virtualize computer nodes such that each node will have only one
>>> GPU. Of course, I realize that many Databricks customers are using Azure or
>>> AWS, which allow you to do this facilely. Is this what people normally do
>>> in industry?
>>>
>>> This is something I am interested in working on, unless others out there
>>> have advice on why this is a bad idea.
>>>
>>> Unfortunately, I am not familiar enough with Mesos and Kubernetes right
>>> now to know how they schedule gpu resources and whether adding support for
>>> requesting GPU's from them to the spark-submit client would be simple.
>>>
>>> Daniel
>>>
>>> --
>>> Daniel Galvez
>>> http://danielgalvez.me
>>> https://github.com/galv
>>>
>>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: Spark on Mesos - Weird behavior

2018-07-23 Thread Susan X. Huynh
Hi Thodoris,

Maybe setting "spark.scheduler.minRegisteredResourcesRatio" to > 0 would
help? Default value is 0 with Mesos.

"The minimum ratio of registered resources (registered resources / total
expected resources) (resources are executors in yarn mode and Kubernetes
mode, CPU cores in standalone mode and Mesos coarsed-grained mode
['spark.cores.max' value is total expected resources for Mesos
coarse-grained mode] ) to wait for before scheduling begins. Specified as a
double between 0.0 and 1.0. Regardless of whether the minimum ratio of
resources has been reached, the maximum amount of time it will wait before
scheduling begins is controlled by config
spark.scheduler.maxRegisteredResourcesWaitingTime." -
https://spark.apache.org/docs/latest/configuration.html

Susan

On Wed, Jul 11, 2018 at 7:22 AM, Pavel Plotnikov <
pavel.plotni...@team.wrike.com> wrote:

> Oh, sorry, i missed that you use spark without dynamic allocation. Anyway,
> i don't know does this parameters works without dynamic allocation.
>
> On Wed, Jul 11, 2018 at 5:11 PM Thodoris Zois  wrote:
>
>> Hello,
>>
>> Yeah you are right, but I think that works only if you use Spark dynamic
>> allocation. Am I wrong?
>>
>> -Thodoris
>>
>> On 11 Jul 2018, at 17:09, Pavel Plotnikov 
>> wrote:
>>
>> Hi, Thodoris
>> You can configure resources per executor and manipulate with number of
>> executers instead using spark.max.cores. I think 
>> spark.dynamicAllocation.minExecutors
>> and spark.dynamicAllocation.maxExecutors configuration values can help
>> you.
>>
>> On Tue, Jul 10, 2018 at 5:07 PM Thodoris Zois  wrote:
>>
>>> Actually after some experiments we figured out that spark.max.cores /
>>> spark.executor.cores is the upper bound for the executors. Spark apps will
>>> run even only if one executor can be launched.
>>>
>>> Is there any way to specify also the lower bound? It is a bit annoying
>>> that seems that we can’t control the resource usage of an application. By
>>> the way, we are not using dynamic allocation.
>>>
>>> - Thodoris
>>>
>>>
>>> On 10 Jul 2018, at 14:35, Pavel Plotnikov >> com> wrote:
>>>
>>> Hello Thodoris!
>>> Have you checked this:
>>>  - does mesos cluster have available resources?
>>>   - if spark have waiting tasks in queue more than
>>> spark.dynamicAllocation.schedulerBacklogTimeout configuration value?
>>>  - And then, have you checked that mesos send offers to spark app mesos
>>> framework at least with 10 cores and 2GB RAM?
>>>
>>> If mesos have not available offers with 10 cores, for example, but have
>>> with 8 or 9, so you can use smaller executers for better fit for available
>>> resources on nodes for example with 4 cores and 1 GB RAM, for example
>>>
>>> Cheers,
>>> Pavel
>>>
>>> On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois  wrote:
>>>
>>>> Hello list,
>>>>
>>>> We are running Apache Spark on a Mesos cluster and we face a weird
>>>> behavior of executors. When we submit an app with e.g 10 cores and 2GB of
>>>> memory and max cores 30, we expect to see 3 executors running on the
>>>> cluster. However, sometimes there are only 2... Spark applications are not
>>>> the only one that run on the cluster. I guess that Spark starts executors
>>>> on the available offers even if it does not satisfy our needs. Is there any
>>>> configuration that we can use in order to prevent Spark from starting when
>>>> there are no resource offers for the total number of executors?
>>>>
>>>> Thank you
>>>> - Thodoris
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: Advice on multiple streaming job

2018-05-06 Thread Susan X. Huynh
Hi Dhaval,

Not sure if you have considered this: the port 4040 sounds like a driver UI
port. By default it will try up to 4056, but you can increase that number
with "spark.port.maxRetries". (
https://spark.apache.org/docs/latest/configuration.html) Try setting it to
"32". This would help if the only conflict is among the driver UI ports
(like if you have > 16 drivers running on the same host).

Susan

On Sun, May 6, 2018 at 12:32 AM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Use a scheduler that abstract the network away with a CNI for instance or
> other mécanismes (mesos, kubernetes, yarn). The CNI will allow to always
> bind on the same ports because each container will have its own IP. Some
> other solution like mesos and marathon can work without CNI , with host IP
> binding, but will manage the ports for you ensuring there isn't any
> conflict.
>
> Le sam. 5 mai 2018 à 17:10, Dhaval Modi <dhavalmod...@gmail.com> a écrit :
>
>> Hi All,
>>
>> Need advice on executing multiple streaming jobs.
>>
>> Problem:- We have 100's of streaming job. Every streaming job uses new
>> port. Also, Spark automatically checks port from 4040 to 4056, post that it
>> fails. One of the workaround, is to provide port explicitly.
>>
>> Is there a way to tackle this situation? or Am I missing any thing?
>>
>> Thanking you in advance.
>>
>> Regards,
>> Dhaval Modi
>> dhavalmod...@gmail.com
>>
>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: [Mesos] How to Disable Blacklisting on Mesos?

2018-04-09 Thread Susan X. Huynh
Hi Han,

You may be seeing the same issue I described here:
https://issues.apache.org/jira/browse/SPARK-22342?focusedCommentId=16411780=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16411780
Do you see "TASK_LOST" in your driver logs? I got past that issue by
updating my version of libmesos (see my second comment in the ticket).

There's also this PR that is in progress:
https://github.com/apache/spark/pull/20640

Susan

On Sun, Apr 8, 2018 at 4:06 PM, hantuzun <m...@hantuzun.com> wrote:

> Hi all,
>
> Spark currently has blacklisting enabled on Mesos, no matter what:
> [SPARK-19755][Mesos] Blacklist is always active for
> MesosCoarseGrainedSchedulerBackend
>
> Blacklisting also prevents new drivers from running on our nodes where
> previous drivers' had failed tasks.
>
> We've tried restarting Spark dispatcher before sending new tasks. Even
> creating new machines (with the same hostname) does not help.
>
> Looking at  TaskSetBlacklist
> <https://github.com/apache/spark/blob/e18d6f5326e0d9ea03d31de5ce04cb
> 84d3b8ab37/core/src/main/scala/org/apache/spark/
> scheduler/TaskSetBlacklist.scala#L66>
> , I don't understand how a fresh Spark job submitted from a fresh Spark
> Dispatcher starts saying all the nodes are blacklisted right away. How does
> Spark know previous task failures?
>
> This issue severely interrupts us. How could we disable blacklisting on
> Spark 2.3.0? Creative ideas are welcome :)
>
> Best,
> Han
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: external shuffle service in mesos

2018-01-22 Thread Susan X. Huynh
Hi Igor,

You made a good point about the tradeoffs. I think the main thing you would
get with Marathon is the accounting for resources (the memory and cpus
specified in the config file). That allows Mesos to manage the resources
properly. I don't think the other tools mentioned would reserve resources
from Mesos.

If you want more information about production ops for Mesos, you might want
to ask in the Mesos mailing list. Or, you can check out the
https://dcos.io/community/ project.

Susan

On Sat, Jan 20, 2018 at 11:59 PM, igor.berman <igor.ber...@gmail.com> wrote:

> Hi Susan
>
> In general I can get what I need without Marathon, with configuring
> external-shuffle-service with puppet/ansible/chef + maybe some alerts for
> checks.
>
> I mean in companies that don't have strong Devops teams and want to install
> services as simple as possible just by config - Marathon might be useful,
> however if company already has strong puppet/ansible/chef whatever infra,
> the Marathon addition(additional component) and management is less clear
>
> WDYT?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: external shuffle service in mesos

2018-01-20 Thread Susan X. Huynh
Hi Igor,

The best way I know of is with Marathon.
* Placement constraint: you could combine constraints in Marathon. Like:
"constraints": [
["hostname", "UNIQUE"],
["hostname", "LIKE", "host1|host2|host3"]
]
https://groups.google.com/forum/#!topic/marathon-framework/hfLUw3TIw2I

* You would have to use a workaround to deal with a dynamically sized
cluster: set the number of instances to be greater than the expected
cluster size.
https://jira.mesosphere.com/browse/MARATHON-3791?focusedCommentId=79976=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-79976
As the commenter notes, it's not ideal, it's just a workaround.

Susan

On Sat, Jan 20, 2018 at 8:33 AM, igor.berman <igor.ber...@gmail.com> wrote:

> Hi,
> wanted to get some advice regarding managing external shuffle service in
> mesos environments
>
> In spark documentation the Marathon is mentioned, however there is very
> limited documentation.
> I've tried to search for some documentation and it's seems not too
> difficult
> to configure it under Marathon(e.g.
> https://github.com/NBCUAS/dcos-spark-shuffle-service/
> blob/master/marathon/mesos-shuffle-service.json),
> however I see few problems:
>
> There is no clear way to deploy some application in mesos on every node
> see https://jira.mesosphere.com/browse/MARATHON-3791
> * it's not possible to guarantee on which nodes shuffle service application
> will be placed(it's possible to guarantee with mesos unique constrain that
> only 1 shuffle service instance will be placed on some node)
> * cluster that has dynamic nodes joining/leaving - the config of shuffle
> service must be adjusted(specifically number of instances config)
>
> So any production ops advices will be welcome
> Igor
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: Anyone know where to find independent contractors in New York?

2017-12-21 Thread x x
Try https://www.fiverr.com/ 


> On Dec 21, 2017, at 12:31 PM, Stephen Boesch  wrote:
> 
> Hi Richard, this is not a jobs board: please only discuss spark application 
> development issues. 
> 
> 2017-12-21 8:34 GMT-08:00 Richard L. Burton III  >:
> I'm trying to locate four independent contractors who have experience with 
> Spark. I'm not sure where I can go to find experienced Spark consultants.
> 
> Please, no recruiters.
> -- 
> -Richard L. Burton III
> 
> 



Re: Spark job only starts tasks on a single node

2017-12-07 Thread Susan X. Huynh
Sounds strange. Maybe it has to do with the job itself? What kind of job is
it? Have you gotten it to run on more than one node before? What's in the
spark-submit command?

Susan

On Wed, Dec 6, 2017 at 11:21 AM, Ji Yan <ji...@drive.ai> wrote:

> I am sure that the other agents have plentiful enough resources, but I
> don't know why Spark only scheduled executors on one single node, up to
> that node's capacity ( it is a different node everytime I run btw ).
>
> I checked the DEBUG log from Spark Driver, didn't see any mention of
> decline. But from log, it looks like it has only accepted one offer from
> Mesos.
>
> Also looks like there is no special role required on Spark part!
>
> On Wed, Dec 6, 2017 at 5:57 AM, Art Rand <art.r...@gmail.com> wrote:
>
>> Hello Ji,
>>
>> Spark will launch Executors round-robin on offers, so when the resources
>> on an agent get broken into multiple resource offers it's possible that
>> many Executrors get placed on a single agent. However, from your
>> description, it's not clear why your other agents do not get Executors
>> scheduled on them. It's possible that the offers from your other agents are
>> insufficient in some way. The Mesos MASTER log should show offers being
>> declined by your Spark Driver, do you see that?  If you have DEBUG level
>> logging in your Spark driver you should also see offers being declined
>> <https://github.com/apache/spark/blob/193555f79cc73873613674a09a7c371688b6dbc7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L576>
>> there. Finally if your Spark framework isn't receiving any resource offers,
>> it could be because of the roles you have established on your agents or
>> quota set on other frameworks, have you set up any of that? Hope this helps!
>>
>> Art
>>
>> On Tue, Dec 5, 2017 at 10:45 PM, Ji Yan <ji...@drive.ai> wrote:
>>
>>> Hi all,
>>>
>>> I am running Spark 2.0 on Mesos 1.1. I was trying to split up my job
>>> onto several nodes. I try to set the number of executors by the formula
>>> (spark.cores.max / spark.executor.cores). The behavior I saw was that Spark
>>> will try to fill up on one mesos node as many executors as it can, then it
>>> stops going to other mesos nodes despite that it has not done scheduling
>>> all the executors I have asked it to yet! This is super weird!
>>>
>>> Did anyone notice this behavior before? Any help appreciated!
>>>
>>> Ji
>>>
>>> The information in this email is confidential and may be legally
>>> privileged. It is intended solely for the addressee. Access to this email
>>> by anyone else is unauthorized. If you are not the intended recipient, any
>>> disclosure, copying, distribution or any action taken or omitted to be
>>> taken in reliance on it, is prohibited and may be unlawful.
>>>
>>
>>
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>



-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Please remove me!

2017-11-07 Thread x x

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkSQL not able to read a empty table location

2017-05-21 Thread Bajpai, Amit X. -ND
set spark.sql.hive.verifyPartitionPath=true didn’t help. Still getting the same 
error.

I tried to copy a file with a _ prefix and I am not getting the error and the 
file is also ignored by SparkSQL. But when scheduling the job in prod and if 
during one execution there is no data to be processed the query will again 
fail. How to deal with this scenario.


From: Sea <261810...@qq.com>
Date: Sunday, May 21, 2017 at 8:04 AM
To: Steve Loughran <ste...@hortonworks.com>, "Bajpai, Amit X. -ND" 
<amit.x.bajpai@disney.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: SparkSQL not able to read a empty table location


please try spark.sql.hive.verifyPartitionPath true

-- Original --
From:  "Steve Loughran";<ste...@hortonworks.com>;
Date:  Sat, May 20, 2017 09:19 PM
To:  "Bajpai, Amit X. -ND"<amit.x.bajpai@disney.com>;
Cc:  "user@spark.apache.org"<user@spark.apache.org>;
Subject:  Re: SparkSQL not able to read a empty table location


On 20 May 2017, at 01:44, Bajpai, Amit X. -ND 
<amit.x.bajpai@disney.com<mailto:n...@disney.com>> wrote:

Hi,

I have a hive external table with the S3 location having no files (but the S3 
location directory does exists). When I am trying to use Spark SQL to count the 
number of records in the table it is throwing error saying “File s3n://data/xyz 
does not exist. null/0”.

select * from tablex limit 10

Can someone let me know how we can fix this issue.

Thanks


There isn't really a "directory" in S3, just a set of objects whose paths begin 
with a string. Try creating an empty file with an _ prefix in the directory; it 
should be ignored by Spark SQL but will cause the "directory" to come into being


SparkSQL not able to read a empty table location

2017-05-19 Thread Bajpai, Amit X. -ND
Hi,

I have a hive external table with the S3 location having no files (but the S3 
location directory does exists). When I am trying to use Spark SQL to count the 
number of records in the table it is throwing error saying “File s3n://data/xyz 
does not exist. null/0”.

select * from tablex limit 10

Can someone let me know how we can fix this issue.

Thanks


Re: Is Spark 2.0 master node compatible with Spark 1.5 work node?

2016-09-26 Thread Rex X
Yes, I have a cloudera cluster with Yarn. Any more details on how to work
out with uber jar?

Thank you.


On Sun, Sep 18, 2016 at 2:13 PM, Felix Cheung <felixcheun...@hotmail.com>
wrote:

> Well, uber jar works in YARN, but not with standalone ;)
>
>
>
>
>
> On Sun, Sep 18, 2016 at 12:44 PM -0700, "Chris Fregly" <ch...@fregly.com>
> wrote:
>
> you'll see errors like this...
>
> "java.lang.RuntimeException: java.io.InvalidClassException:
> org.apache.spark.rpc.netty.RequestMessage; local class incompatible:
> stream classdesc serialVersionUID = -2221986757032131007, local class
> serialVersionUID = -5447855329526097695"
>
> ...when mixing versions of spark.
>
> i'm actually seeing this right now while testing across Spark 1.6.1 and
> Spark 2.0.1 for my all-in-one, hybrid cloud/on-premise Spark + Zeppelin +
> Kafka + Kubernetes + Docker + One-Click Spark ML Model Production
> Deployments initiative documented here:
>
> https://github.com/fluxcapacitor/pipeline/wiki/Kubernetes-Docker-Spark-ML
>
> and check out my upcoming meetup on this effort either in-person or
> online:
>
> http://www.meetup.com/Advanced-Spark-and-TensorFlow-
> Meetup/events/233978839/
>
> we're throwing in some GPU/CUDA just to sweeten the offering!  :)
>
> On Sat, Sep 10, 2016 at 2:57 PM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> I don't think a 2.0 uber jar will play nicely on a 1.5 standalone
>> cluster.
>>
>>
>> On Saturday, September 10, 2016, Felix Cheung <felixcheun...@hotmail.com>
>> wrote:
>>
>>> You should be able to get it to work with 2.0 as uber jar.
>>>
>>> What type cluster you are running on? YARN? And what distribution?
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Sep 4, 2016 at 8:48 PM -0700, "Holden Karau" <
>>> hol...@pigscanfly.ca> wrote:
>>>
>>> You really shouldn't mix different versions of Spark between the master
>>> and worker nodes, if your going to upgrade - upgrade all of them. Otherwise
>>> you may get very confusing failures.
>>>
>>> On Monday, September 5, 2016, Rex X <dnsr...@gmail.com> wrote:
>>>
>>>> Wish to use the Pivot Table feature of data frame which is available
>>>> since Spark 1.6. But the spark of current cluster is version 1.5. Can we
>>>> install Spark 2.0 on the master node to work around this?
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>
>
>
> --
> *Chris Fregly*
> Research Scientist @ *PipelineIO* <http://pipeline.io>
> *Advanced Spark and TensorFlow Meetup*
> <http://www.meetup.com/Advanced-Spark-and-TensorFlow-Meetup/>
> *San Francisco* | *Chicago* | *Washington DC*
>
>
>
>
>


Is Spark 2.0 master node compatible with Spark 1.5 work node?

2016-09-04 Thread Rex X
Wish to use the Pivot Table feature of data frame which is available since
Spark 1.6. But the spark of current cluster is version 1.5. Can we install
Spark 2.0 on the master node to work around this?

Thanks!


Re: How to make new composite columns by combining rows in the same group?

2016-08-26 Thread Rex X
The data.csv need to be corrected:


1. Given following CSV file
$cat data.csv

ID,City,Zip,Price,Rating
1,A,95123,100,1
1,B,95124,102,2
1,A,95126,100,2
2,B,95123,200,1
2,B,95124,201,2
2,C,95124,203,1
3,A,95126,300,2
3,C,95124,280,1
4,C,95124,400,2


On Fri, Aug 26, 2016 at 4:54 AM, Rex X <dnsr...@gmail.com> wrote:

> 1. Given following CSV file
>
> $cat data.csv
>
> ID,City,Zip,Price,Rating1,A,95123,100,01,B,95124,102,11,A,95126,100,12,B,95123,200,02,B,95124,201,12,C,95124,203,03,A,95126,300,13,C,95124,280,04,C,95124,400,1
>
>
> We want to group by ID, and make new composite columns of Price and Rating
> based on the value of $City-$Zip.
>
>
> 2. The Expected Result:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>   ID
>   A_95123_Price
>A_95123_Rating
>   A_95126_Price
>   A_95126_Rating
>   B_95123_Price
>   B_95123_Rating
>   B_95124_Price
>   B_95124_Rating
>   C_95124_Price
>   C_95124_Rating
>
>
>   1
>   100
>   1
>   100
>   2
>   0
>   0
>   102
>   2
>   0
>   0
>
>
>   2
>   0
>   0
>   0
>   0
>   200
>   1
>   201
>   2
>   203
>   1
>
>
>   3
>   0
>   0
>   300
>   2
>   0
>   0
>   0
>   0
>   280
>   1
>
>
>   4
>   0
>   0
>   0
>   0
>   0
>   0
>   0
>   0
>   400
>   2
>
> Any tips would be greatly appreciated!
>
> Thank you.
>
> Regards,
> Rex
>
>


How to make new composite columns by combining rows in the same group?

2016-08-26 Thread Rex X
1. Given following CSV file

$cat data.csv

ID,City,Zip,Price,Rating1,A,95123,100,01,B,95124,102,11,A,95126,100,12,B,95123,200,02,B,95124,201,12,C,95124,203,03,A,95126,300,13,C,95124,280,04,C,95124,400,1


We want to group by ID, and make new composite columns of Price and Rating
based on the value of $City-$Zip.


2. The Expected Result:















  ID
  A_95123_Price
   A_95123_Rating
  A_95126_Price
  A_95126_Rating
  B_95123_Price
  B_95123_Rating
  B_95124_Price
  B_95124_Rating
  C_95124_Price
  C_95124_Rating


  1
  100
  1
  100
  2
  0
  0
  102
  2
  0
  0


  2
  0
  0
  0
  0
  200
  1
  201
  2
  203
  1


  3
  0
  0
  300
  2
  0
  0
  0
  0
  280
  1


  4
  0
  0
  0
  0
  0
  0
  0
  0
  400
  2

Any tips would be greatly appreciated!

Thank you.

Regards,
Rex


Re: How to do this pairing in Spark?

2016-08-26 Thread Rex X
Hi Ayan,

Yes, ID=3 can be paired with ID=1, and the same for ID=9 with ID=8. BUT we
want to keep only ONE pair for the ID with Flag=0.

Since ID=1 with Flag=0 already paired with ID=2, and ID=8 paired with ID=7,
we simply delete ID=3 and ID=9.

Thanks!

Regards,
Rex


On Fri, Aug 26, 2016 at 12:46 AM, ayan guha <guha.a...@gmail.com> wrote:

> Why 3 and 9 should be deleted? 3 can be paired with 1and 9 can be paired
> with 8.
> On 26 Aug 2016 11:00, "Rex X" <dnsr...@gmail.com> wrote:
>
>> 1. Given following CSV file
>>
>> > $cat data.csv
>> >
>> > ID,City,Zip,Flag
>> > 1,A,95126,0
>> > 2,A,95126,1
>> > 3,A,95126,1
>> > 4,B,95124,0
>> > 5,B,95124,1
>> > 6,C,95124,0
>> > 7,C,95127,1
>> > 8,C,95127,0
>> > 9,C,95127,1
>>
>>
>> (a) where "ID" above is a primary key (unique),
>>
>> (b) for each "City" and "Zip" combination, there is one ID in max with
>> Flag=0; while it can contain multiple IDs with Flag=1 for each "City" and
>> "Zip" combination.
>>
>> (c) Flag can be 0 or 1
>>
>>
>> 2. For each ID with Flag=0, we want to pair it with another ID with
>> Flag=1 but with the same City - Zip. If one cannot find another paired ID
>> with Flag=1 and matched City - Zip, we just delete that record.
>>
>> Here is the expected result:
>>
>> > ID,City,Zip,Flag
>> > 1,A,95126,0
>> > 2,A,95126,1
>> > 4,B,95124,0
>> > 5,B,95124,1
>> > 7,C,95127,1
>> > 8,C,95127,0
>>
>>
>> Any valuable tips how to do this pairing in Python or Scala?
>>
>> Great thanks!
>>
>> Rex
>>
>


How to do this pairing in Spark?

2016-08-25 Thread Rex X
1. Given following CSV file

> $cat data.csv
>
> ID,City,Zip,Flag
> 1,A,95126,0
> 2,A,95126,1
> 3,A,95126,1
> 4,B,95124,0
> 5,B,95124,1
> 6,C,95124,0
> 7,C,95127,1
> 8,C,95127,0
> 9,C,95127,1


(a) where "ID" above is a primary key (unique),

(b) for each "City" and "Zip" combination, there is one ID in max with
Flag=0; while it can contain multiple IDs with Flag=1 for each "City" and
"Zip" combination.

(c) Flag can be 0 or 1


2. For each ID with Flag=0, we want to pair it with another ID with Flag=1
but with the same City - Zip. If one cannot find another paired ID with
Flag=1 and matched City - Zip, we just delete that record.

Here is the expected result:

> ID,City,Zip,Flag
> 1,A,95126,0
> 2,A,95126,1
> 4,B,95124,0
> 5,B,95124,1
> 7,C,95127,1
> 8,C,95127,0


Any valuable tips how to do this pairing in Python or Scala?

Great thanks!

Rex


What's the best way to find the Nearest Neighbor row of a matrix with 10billion rows x 300 columns?

2016-05-17 Thread Rex X
Each row of the given matrix is Vector[Double]. Want to find out the
nearest neighbor row to each row using cosine similarity.

The problem here is the complexity: O( 10^20 )

We need to do *blocking*, and do the row-wise comparison within each block.
Any tips for best practice?

In Spark, we have RowMatrix.*ColumnSimilarity*, but I didn't find
*RowSimilarity* method.


Thank you.


Regards
Rex


How to select from table name using IF(condition, tableA, tableB)?

2016-03-15 Thread Rex X
I want to do a query based on a logic condition to query between two tables.

select *
from if(A>B, tableA, tableB)


But "if" function in Hive cannot work within FROM above. Any idea how?


What is the best way to JOIN two 10TB csv files and three 100kb files on Spark?

2016-02-05 Thread Rex X
Dear all,

The new DataFrame of spark is extremely fast. But out cluster have limited
RAM (~500GB).

What is the best way to do such a big table Join?

Any sample code is greatly welcome!


Best,
Rex


issue of tableau connect to spark sql 1.5

2015-10-16 Thread Wangfei (X)

  Hi all!

  I test tableau(9.1.0 32bit) to read tables form spark sql(build from 
branch-1.5) using odbc. And found the following issue:

#

# "[Simba][SQLEngine] (31740) Table or view not found: SPARK.default.src
# table "[default].[src]"  not exist"


and i found a very stange issue that if i run the sql of "select 
count(1) from src" from tableau the spark ui show that it run the sql of 
" SELECT 1". 



Any one tested tableau with spark sql 1.5 successfully?

Thanks Fei.









Can we do dataframe.query like Pandas dataframe in spark?

2015-09-17 Thread Rex X
With Pandas dataframe
,
we can do query:

>>> from numpy.random import randn>>> from pandas import DataFrame>>> df = 
>>> DataFrame(randn(10, 2), columns=list('ab'))>>> df.query('a > b')


This SQL-select-like query is very convenient. Can we do similar thing with
the new dataframe of spark?


Best,
Rex


Re: Can we do dataframe.query like Pandas dataframe in spark?

2015-09-17 Thread Rex X
very cool! Thank you, Michael.


On Thu, Sep 17, 2015 at 11:00 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> from pyspark.sql.functions import *
>
> ​
>
> df = sqlContext.range(10).select(rand().alias("a"), rand().alias("b"))
>
> df.where("a > b").show()
>
> (2) Spark Jobs
> +--+---+ | a| b|
> +--+---+
> |0.6697439215581628|0.23420961030968923| |0.9248996796756386|
> 0.4146647917936366| +------+---+
>
> On Thu, Sep 17, 2015 at 9:32 AM, Rex X <dnsr...@gmail.com> wrote:
>
>> With Pandas dataframe
>> <http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.query.html>,
>> we can do query:
>>
>> >>> from numpy.random import randn>>> from pandas import DataFrame>>> df = 
>> >>> DataFrame(randn(10, 2), columns=list('ab'))>>> df.query('a > b')
>>
>>
>> This SQL-select-like query is very convenient. Can we do similar thing
>> with the new dataframe of spark?
>>
>>
>> Best,
>> Rex
>>
>
>


Re: What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-12 Thread Rex X
Jorn and Nick,

Thanks for answering.

Nick, the sparkit-learn project looks interesting. Thanks for mentioning it.


Rex


On Sat, Sep 12, 2015 at 12:05 PM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> You might want to check out https://github.com/lensacom/sparkit-learn
> <https://github.com/lensacom/sparkit-learn/blob/master/README.rst>
>
> Though it's true for random
> Forests / trees you will need to use MLlib
>
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox>
>
>
> On Sat, Sep 12, 2015 at 9:00 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>
>> I fear you have to do the plumbing all yourself. This is the same for all
>> commercial and non-commercial libraries/analytics packages. It often also
>> depends on the functional requirements on how you distribute.
>>
>> Le sam. 12 sept. 2015 à 20:18, Rex X <dnsr...@gmail.com> a écrit :
>>
>>> Hi everyone,
>>>
>>> What is the best way to migrate existing scikit-learn code to PySpark
>>> cluster? Then we can bring together the full power of both scikit-learn and
>>> spark, to do scalable machine learning. (I know we have MLlib. But the
>>> existing code base is big, and some functions are not fully supported yet.)
>>>
>>> Currently I use multiprocessing module of Python to boost the speed. But
>>> this only works for one node, while the data set is small.
>>>
>>> For many real cases, we may need to deal with gigabytes or even
>>> terabytes of data, with thousands of raw categorical attributes, which can
>>> lead to millions of discrete features, using 1-of-k representation.
>>>
>>> For these cases, one solution is to use distributed memory. That's why I
>>> am considering spark. And spark support Python!
>>> With Pyspark, we can import scikit-learn.
>>>
>>> But the question is how to make the scikit-learn code, decisionTree
>>> classifier for example, running in distributed computing mode, to benefit
>>> the power of Spark?
>>>
>>>
>>> Best,
>>> Rex
>>>
>>
>


What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-12 Thread Rex X
Hi everyone,

What is the best way to migrate existing scikit-learn code to PySpark
cluster? Then we can bring together the full power of both scikit-learn and
spark, to do scalable machine learning. (I know we have MLlib. But the
existing code base is big, and some functions are not fully supported yet.)

Currently I use multiprocessing module of Python to boost the speed. But
this only works for one node, while the data set is small.

For many real cases, we may need to deal with gigabytes or even terabytes
of data, with thousands of raw categorical attributes, which can lead to
millions of discrete features, using 1-of-k representation.

For these cases, one solution is to use distributed memory. That's why I am
considering spark. And spark support Python!
With Pyspark, we can import scikit-learn.

But the question is how to make the scikit-learn code, decisionTree
classifier for example, running in distributed computing mode, to benefit
the power of Spark?


Best,
Rex


HOw to concatenate two csv files into one RDD?

2015-06-26 Thread Rex X
With Python Pandas, it is easy to do concatenation of dataframes
by combining  pandas.concat
http://pandas.pydata.org/pandas-docs/stable/generated/pandas.concat.html
and pandas.read_csv

pd.concat([pd.read_csv(os.path.join(Path_to_csv_files, f)) for f in
csvfiles])

where csvfiles is the list of csv files


HOw can we do this in Spark?


Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?

2015-06-16 Thread Rex X
Hi Sujit,

That's a good point. But 1-hot encoding will make our data changing from
Terabytes to Petabytes, because we have tens of categorical attributes, and
some of them contain thousands of categorical values.

Is there any way to make a good balance of data size and right
representation of categories?


-Rex


On Tue, Jun 16, 2015 at 1:27 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hi Rexx,

 In general (ie not Spark specific), its best to convert categorical data
 to 1-hot encoding rather than integers - that way the algorithm doesn't use
 the ordering implicit in the integer representation.

 -sujit


 On Tue, Jun 16, 2015 at 1:17 PM, Rex X dnsr...@gmail.com wrote:

 Is it necessary to convert categorical data into integers?

 Any tips would be greatly appreciated!

 -Rex

 On Sun, Jun 14, 2015 at 10:05 AM, Rex X dnsr...@gmail.com wrote:

 For clustering analysis, we need a way to measure distances.

 When the data contains different levels of measurement -
 *binary / categorical (nominal), counts (ordinal), and ratio (scale)*

 To be concrete, for example, working with attributes of
 *city, zip, satisfaction_level, price*

 In the meanwhile, the real data usually also contains string attributes,
 for example, book titles. The distance between two strings can be measured
 by minimum-edit-distance.


 In SPSS, it provides Two-Step Cluster, which can handle both ratio scale
 and ordinal numbers.


 What is right algorithm to do hierarchical clustering analysis with all
 these four-kind attributes above with *MLlib*?


 If we cannot find a right metric to measure the distance, an alternative
 solution is to do a topological data analysis (e.g. linkage, and etc).
 Can we do such kind of analysis with *GraphX*?


 -Rex






Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?

2015-06-16 Thread Rex X
 Is it necessary to convert categorical data into integers?

Any tips would be greatly appreciated!

-Rex

On Sun, Jun 14, 2015 at 10:05 AM, Rex X dnsr...@gmail.com wrote:

 For clustering analysis, we need a way to measure distances.

 When the data contains different levels of measurement -
 *binary / categorical (nominal), counts (ordinal), and ratio (scale)*

 To be concrete, for example, working with attributes of
 *city, zip, satisfaction_level, price*

 In the meanwhile, the real data usually also contains string attributes,
 for example, book titles. The distance between two strings can be measured
 by minimum-edit-distance.


 In SPSS, it provides Two-Step Cluster, which can handle both ratio scale
 and ordinal numbers.


 What is right algorithm to do hierarchical clustering analysis with all
 these four-kind attributes above with *MLlib*?


 If we cannot find a right metric to measure the distance, an alternative
 solution is to do a topological data analysis (e.g. linkage, and etc).
 Can we do such kind of analysis with *GraphX*?


 -Rex




What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?

2015-06-14 Thread Rex X
For clustering analysis, we need a way to measure distances.

When the data contains different levels of measurement -
*binary / categorical (nominal), counts (ordinal), and ratio (scale)*

To be concrete, for example, working with attributes of
*city, zip, satisfaction_level, price*

In the meanwhile, the real data usually also contains string attributes,
for example, book titles. The distance between two strings can be measured
by minimum-edit-distance.


In SPSS, it provides Two-Step Cluster, which can handle both ratio scale
and ordinal numbers.


What is right algorithm to do hierarchical clustering analysis with all
these four-kind attributes above with *MLlib*?


If we cannot find a right metric to measure the distance, an alternative
solution is to do a topological data analysis (e.g. linkage, and etc). Can
we do such kind of analysis with *GraphX*?


-Rex


Re: [Spark] What is the most efficient way to do such a join and column manipulation?

2015-06-13 Thread Rex X
Thanks, Don! Does SQL implementation of spark do parallel processing on
records by default?

-Rex


On Sat, Jun 13, 2015 at 10:13 AM, Don Drake dondr...@gmail.com wrote:

 Take a look at https://github.com/databricks/spark-csv to read in the
 tab-delimited file (change the default delimiter)

 and once you have that as a DataFrame, SQL can do the rest.

 https://spark.apache.org/docs/latest/sql-programming-guide.html

 -Don


 On Fri, Jun 12, 2015 at 8:46 PM, Rex X dnsr...@gmail.com wrote:

 Hi,

 I want to use spark to select N columns, top M rows of all csv files
 under a folder.

 To be concrete, say we have a folder with thousands of tab-delimited csv
 files with following attributes format (each csv file is about 10GB):

 idnameaddresscity...
 1Mattadd1LA...
 2Willadd2LA...
 3Lucyadd3SF...
 ...

 And we have a lookup table based on name above

 namegender
 MattM
 LucyF
 ...

 Now we are interested to output from top 100K rows of each csv file into
 following format:

 idnamegender
 1MattM
 ...

 Can we use pyspark to efficiently handle this?





 --
 Donald Drake
 Drake Consulting
 http://www.drakeconsulting.com/
 http://www.MailLaunder.com/
 800-733-2143



How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?

2015-06-12 Thread Rex X
To be concrete, say we have a folder with thousands of tab-delimited csv
files with following attributes format (each csv file is about 10GB):

idnameaddresscity...
1Mattadd1LA...
2Willadd2LA...
3Lucyadd3SF...
...

And we have a lookup table based on name above

namegender
MattM
LucyF
...

Now we are interested to output from top 1000 rows of each csv file into
following format:

idnamegender
1MattM
...

Can we use pyspark to efficiently handle this?


[Spark] What is the most efficient way to do such a join and column manipulation?

2015-06-12 Thread Rex X
Hi,

I want to use spark to select N columns, top M rows of all csv files under
a folder.

To be concrete, say we have a folder with thousands of tab-delimited csv
files with following attributes format (each csv file is about 10GB):

idnameaddresscity...
1Mattadd1LA...
2Willadd2LA...
3Lucyadd3SF...
...

And we have a lookup table based on name above

namegender
MattM
LucyF
...

Now we are interested to output from top 100K rows of each csv file into
following format:

idnamegender
1MattM
...

Can we use pyspark to efficiently handle this?


[no subject]

2015-06-11 Thread Wangfei (X)
Hi, all

   We use spark sql to insert data from a text table into a partitioning table 
and found that if we give more cores to executors the insert performance whold 
be worse.



executors numtotal-executor-cores  average time for 
insert task

3   3 1.7 
min

3   6 3 min

3   9 4.8 
min





any one has idea?



following is the detail info of out test.





cluster:
4 nodes, each node 300+ g memory and 24 cores.
hadoop: 1 namenode + 3 datanodes.
spark: standalone mode, 1 master + 3 workers.



spark version:

apache master branch,  the current commit is

33mcommit a777eb04bf981312b640326607158f78dd4163cd

Author: Patrick Wendell patr...@databricks.commailto:patr...@databricks.com
Date:   Wed Jun 10 21:13:47 2015 -0700

[HOTFIX] Adding more contributor name bindings


sql:

1 create a external table like

CREATE EXTERNAL TABLE tableA (
   f1 string,
   f2 string,
   f3 bigint,
   f4 smallint,
   f5 smallint,
   f6 string,
   f7 smallint,
   f8 string,
   f9 string,
   f10 smallint,
   f11 string,
   f12 bigint,
   f13 bigint,
   f14 bigint,
   f15 bigint,
   f16 string,
   f17 string,
   f18 smallint,
   f19 string,
   f20 string,
   f21 string,
   f22 string,
   f23 string,
   f24 smallint,
   f25 smallint,
   f26 bigint,
   f27 smallint,
   f28 bigint,
   f29 string,
   f30 bigint,
   f31 bigint,
   f32 bigint,
   f33 smallint,
   f34 smallint,
   f35 smallint,
   f36 smallint,
   f37 smallint,
   f38 smallint,
   f39 string,
   f40 smallint,
   f41 string,
   f42 smallint,
   f43 string,
   f44 smallint,
   f45 smallint,
   f46 smallint,
   f47 string,
   f48 smallint,
   f49 smallint,
   f50 string,
   f51 string,
   f52 smallint,
   f53 int,
   f54 bigint,
   f55 bigint)
row format delimited fields terminated by '|'
STORED AS textfile
LOCATION '/data';



2 create a patition table



CREATE EXTERNAL TABLE tableB (
   f1 string,
   f2 string,
   f3 bigint,
   f4 smallint,
   f5 smallint,
   f6 string,
   f7 smallint,
   f8 string,
   f9 string,
   f10 smallint,
   f11 string,
   f12 bigint,
   f13 bigint,
   f14 bigint,
   f15 bigint,
   f16 string,
   f17 string,
   f18 smallint,
   f19 string,
   f20 string,
   f21 string,
   f22 string,
   f23 string,
   f24 smallint,
   f25 smallint,
   f26 bigint,
   f27 smallint,
   f28 bigint,
   f29 string,
   f30 bigint,
   f31 bigint,
   f32 bigint,
   f33 smallint,
   f34 smallint,
   f35 smallint,
   f36 smallint,
   f37 smallint,
   f38 smallint,
   f39 string,
   f40 smallint,
   f41 string,
   f42 smallint,
   f43 string,
   f44 smallint,
   f45 smallint,
   f46 smallint,
   f47 string,
   f48 smallint,
   f49 smallint,
   f50 string,
   f51 string,
   f52 smallint,
   f53 int,
   f54 bigint,
   f55 bigint) partitioned by (hour int, id string);



3 do insert



insert into table tableB partition (hour,last_msisdn) select *, 
hour(from_unixtime(f3,'-MM-dd HH:mm:ss')),substr(f9,-1)  from tableA;




spark sql insert into table performance issue

2015-06-11 Thread Wangfei (X)



发件人: Wangfei (X)
发送时间: 2015年6月11日 17:33
收件人: user@spark.apache.org
主题:


Hi, all

   We use spark sql to insert data from a text table into a partitioning table 
and found that if we give more cores to executors the insert performance whold 
be worse.



executors numtotal-executor-cores  average time for 
insert task

3   3 1.7 
min

3   6 3 min

3   9 4.8 
min





any one has idea?



following is the detail info of out test.





cluster:
4 nodes, each node 300+ g memory and 24 cores.
hadoop: 1 namenode + 3 datanodes.
spark: standalone mode, 1 master + 3 workers.



spark version:

apache master branch,  the current commit is

33mcommit a777eb04bf981312b640326607158f78dd4163cd

Author: Patrick Wendell patr...@databricks.commailto:patr...@databricks.com
Date:   Wed Jun 10 21:13:47 2015 -0700

[HOTFIX] Adding more contributor name bindings


sql:

1 create a external table like

CREATE EXTERNAL TABLE tableA (
   f1 string,
   f2 string,
   f3 bigint,
   f4 smallint,
   f5 smallint,
   f6 string,
   f7 smallint,
   f8 string,
   f9 string,
   f10 smallint,
   f11 string,
   f12 bigint,
   f13 bigint,
   f14 bigint,
   f15 bigint,
   f16 string,
   f17 string,
   f18 smallint,
   f19 string,
   f20 string,
   f21 string,
   f22 string,
   f23 string,
   f24 smallint,
   f25 smallint,
   f26 bigint,
   f27 smallint,
   f28 bigint,
   f29 string,
   f30 bigint,
   f31 bigint,
   f32 bigint,
   f33 smallint,
   f34 smallint,
   f35 smallint,
   f36 smallint,
   f37 smallint,
   f38 smallint,
   f39 string,
   f40 smallint,
   f41 string,
   f42 smallint,
   f43 string,
   f44 smallint,
   f45 smallint,
   f46 smallint,
   f47 string,
   f48 smallint,
   f49 smallint,
   f50 string,
   f51 string,
   f52 smallint,
   f53 int,
   f54 bigint,
   f55 bigint)
row format delimited fields terminated by '|'
STORED AS textfile
LOCATION '/data';



2 create a patition table



CREATE EXTERNAL TABLE tableB (
   f1 string,
   f2 string,
   f3 bigint,
   f4 smallint,
   f5 smallint,
   f6 string,
   f7 smallint,
   f8 string,
   f9 string,
   f10 smallint,
   f11 string,
   f12 bigint,
   f13 bigint,
   f14 bigint,
   f15 bigint,
   f16 string,
   f17 string,
   f18 smallint,
   f19 string,
   f20 string,
   f21 string,
   f22 string,
   f23 string,
   f24 smallint,
   f25 smallint,
   f26 bigint,
   f27 smallint,
   f28 bigint,
   f29 string,
   f30 bigint,
   f31 bigint,
   f32 bigint,
   f33 smallint,
   f34 smallint,
   f35 smallint,
   f36 smallint,
   f37 smallint,
   f38 smallint,
   f39 string,
   f40 smallint,
   f41 string,
   f42 smallint,
   f43 string,
   f44 smallint,
   f45 smallint,
   f46 smallint,
   f47 string,
   f48 smallint,
   f49 smallint,
   f50 string,
   f51 string,
   f52 smallint,
   f53 int,
   f54 bigint,
   f55 bigint) partitioned by (hour int, id string);



3 do insert



insert into table tableB partition (hour,last_msisdn) select *, 
hour(from_unixtime(f3,'-MM-dd HH:mm:ss')),substr(f9,-1)  from tableA;




Security,authorization and governance

2015-05-29 Thread Phani Yadavilli -X (pyadavil)
Hi Team,

Is there any opensource framework/tool for providing security authorization and 
data governance to spark.

Regards
Phani Kumar


Re: Spark SQL on large number of columns

2015-05-19 Thread Wangfei (X)
And which version are you using

发自我的 iPhone

在 2015年5月19日,18:29,ayan guha 
guha.a...@gmail.commailto:guha.a...@gmail.com 写道:

can you kindly share your code?

On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
phatak@gmail.commailto:phatak@gmail.com wrote:
Hi,
I  am trying run spark sql aggregation on a file with 26k columns. No of rows 
is very small. I am running into issue that spark is taking huge amount of time 
to parse the sql and create a logical plan. Even if i have just one row, it's 
taking more than 1 hour just to get pass the parsing. Any idea how to optimize 
in these kind of scenarios?


Regards,
Madhukara Phatak
http://datamantra.io/



--
Best Regards,
Ayan Guha


Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed

2015-05-14 Thread Wangfei (X)
Yes it is repeatedly on my locally Jenkins.

发自我的 iPhone

在 2015年5月14日,18:30,Tathagata Das 
t...@databricks.commailto:t...@databricks.com 写道:

Do you get this failure repeatedly?



On Thu, May 14, 2015 at 12:55 AM, kf 
wangf...@huawei.commailto:wangf...@huawei.com wrote:
Hi, all, i got following error when i run unit test of spark by dev/run-tests
on the latest branch-1.4 branch.

the latest commit id:
commit d518c0369fa412567855980c3f0f426cde5c190d
Author: zsxwing zsxw...@gmail.commailto:zsxw...@gmail.com
Date:   Wed May 13 17:58:29 2015 -0700

error

[info] Test org.apache.spark.streaming.JavaAPISuite.testCount started
[error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed:
org.apache.spark.SparkException: Error communicating with MapOutputTracker
[error] at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
[error] at
org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119)
[error] at
org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
[error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
[error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577)
[error] at
org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626)
[error] at
org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597)
[error] at
org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403)
[error] at
org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102)
[error] at
org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344)
[error] at
org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102)
[error] at
org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74)
[error] at
org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102)
[error] at
org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala)
[error] at
org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103)
[error] ...
[error] Caused by: org.apache.spark.SparkException: Error sending message
[message = StopMapOutputTracker]
[error] at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
[error] at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
[error] at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109)
[error] ... 52 more
[error] Caused by: java.util.concurrent.TimeoutException: Futures timed out
after [120 seconds]
[error] at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
[error] at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
[error] at
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
[error] at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[error] at scala.concurrent.Await$.result(package.scala:107)
[error] at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
[error] ... 54 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.comhttp://Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




Re: Is SQLContext thread-safe?

2015-04-30 Thread Wangfei (X)
actually this is a sql parse exception, are you sure your sql is right?

发自我的 iPhone

 在 2015年4月30日,18:50,Haopu Wang hw...@qilinsoft.com 写道:
 
 Hi, in a test on SparkSQL 1.3.0, multiple threads are doing select on a
 same SQLContext instance, but below exception is thrown, so it looks
 like SQLContext is NOT thread safe? I think this is not the desired
 behavior.
 
 ==
 
 java.lang.RuntimeException: [1.1] failure: ``insert'' expected but
 identifier select found
 
 select id ,ext.d from UNIT_TEST
 ^
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSpark
 SQLParser.scala:40)
 at
 org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:130)
 at
 org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:130)
 at
 org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkS
 QLParser$$others$1.apply(SparkSQLParser.scala:96)
 at
 org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkS
 QLParser$$others$1.apply(SparkSQLParser.scala:95)
 at
 scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at
 scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parser
 s.scala:242)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parser
 s.scala:242)
 at
 scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$
 apply$2.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$
 apply$2.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Par
 sers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Par
 sers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Pa
 rsers.scala:891)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Pa
 rsers.scala:891)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at
 scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParser
 s.scala:110)
 at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSpark
 SQLParser.scala:38)
 at
 org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.sca
 la:134)
 at
 org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.sca
 la:134)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134)
 at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:915)
 
 -Original Message-
 From: Cheng, Hao [mailto:hao.ch...@intel.com] 
 Sent: Monday, March 02, 2015 9:05 PM
 To: Haopu Wang; user
 Subject: RE: Is SQLContext thread-safe?
 
 Yes it is thread safe, at least it's supposed to be.
 
 -Original Message-
 From: Haopu Wang [mailto:hw...@qilinsoft.com] 
 Sent: Monday, March 2, 2015 4:43 PM
 To: user
 Subject: Is SQLContext thread-safe?
 
 Hi, is it safe to use the same SQLContext to do Select operations in
 different threads at the same time? Thank you very much!
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org
 
 
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org
 


RE: Need help with ALS Recommendation code

2015-04-05 Thread Phani Yadavilli -X (pyadavil)
Hi Xiangrui,

Thank you for the response. I tried sbt package and sbt compile both the 
commands give me success result

sbt compile
[info] Set current project to machine-learning (in build 
file:/opt/mapr/spark/spark-1.2.1/SparkTraining/machine-learning/)
[info] Updating 
{file:/opt/mapr/spark/spark-1.2.1/SparkTraining/machine-learning/}machine-learning...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[success] Total time: 1 s, completed Apr 6, 2015 5:14:43 AM

sbt package
[info] Set current project to machine-learning (in build 
file:/opt/mapr/spark/spark-1.2.1/SparkTraining/machine-learning/)
[success] Total time: 1 s, completed Apr 6, 2015 5:15:04 AM

How do I proceed from here.

Regards
Phani Kumar

-Original Message-
From: Xiangrui Meng [mailto:men...@gmail.com] 
Sent: Monday, April 06, 2015 9:50 AM
To: Phani Yadavilli -X (pyadavil)
Cc: user@spark.apache.org
Subject: Re: Need help with ALS Recommendation code

Could you try `sbt package` or `sbt compile` and see whether there are errors? 
It seems that you haven't reached the ALS code yet. -Xiangrui

On Sat, Apr 4, 2015 at 5:06 AM, Phani Yadavilli -X (pyadavil) 
pyada...@cisco.com wrote:
 Hi ,



 I am trying to run the following command in the Movie Recommendation 
 example provided by the ampcamp tutorial



 Command:   sbt package run /movielens/medium



 Exception: sbt.TrapExitSecurityException thrown from the 
 UncaughtExceptionHandler in thread run-main-0

 java.lang.RuntimeException: Nonzero exit code: 1

 at scala.sys.package$.error(package.scala:27)

 [trace] Stack trace suppressed: run last compile:run for the full output.

 [error] (compile:run) Nonzero exit code: 1

 [error] Total time: 0 s, completed Apr 4, 2015 12:00:18 PM



 I am unable to identify the error code.Can someone help me on this.



 Regards

 Phani Kumar

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Need help with ALS Recommendation code

2015-04-04 Thread Phani Yadavilli -X (pyadavil)
Hi ,

I am trying to run the following command in the Movie Recommendation example 
provided by the ampcamp tutorial

Command:   sbt package run /movielens/medium

Exception: sbt.TrapExitSecurityException thrown from the 
UncaughtExceptionHandler in thread run-main-0
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 0 s, completed Apr 4, 2015 12:00:18 PM

I am unable to identify the error code.Can someone help me on this.

Regards
Phani Kumar


Need a spark mllib tutorial

2015-04-02 Thread Phani Yadavilli -X (pyadavil)
Hi,

I am new to the spark MLLib and I was browsing through the internet for good 
tutorials advanced to the spark documentation example. But, I do not find any. 
Need help.

Regards
Phani Kumar


Re: Processing order in Spark

2014-10-09 Thread x
I doubt Spark has such a ability which can arrange the order of task
execution.
You could try from these aspects.

1. Write your partitioner to group your data.
2. Sort elements in partitions e.g. with row index.
3. Control the order of incoming outcome obtained from Spark at your
application.

xj @ Tokyo life

On Fri, Oct 10, 2014 at 10:29 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 I am planning an application where the order of items is somehow
 important. In particular it is an online machine learning application where
 learning in a different order will lead to a different model.

 I was wondering about ordering guarantees for Spark applications. So if I
 say myRdd.map(someFun), then someFun will be executed on many cluster
 nodes, but do I know anything about the order of the execution?

 Say, for example, if data is distributed like

  node1 | node2 | node3 | node4
1   |   2   |   3   |   4
5   |   6   |   7   |   8
9   |  10   |  11   |  12
   13   |  14   |  15   |  16

 Then I guess that - more or less - first, items 1-4 will be processed,
 then 5-8, then 9-12; about the best I could hope for in a distributed
 context. However, if the distribution is like

  node1 | node2 | node3 | node4
1   |   5   |   9   |  13
2   |   6   |  10   |  14
3   |   7   |  11   |  15
4   |   8   |  12   |  16

 then items are processed in an order that is completely unrelated to the
 original order of items in my dataset. So is there some way to ensure/steer
 the order in which someFun is processed from a global point of view?

 Thanks
 Tobias




Re: combineByKey throws ClassCastException

2014-09-15 Thread x
How about this.

scala val rdd2 = rdd.combineByKey(
 | (v: Int) = v.toLong,
 | (c: Long, v: Int) = c + v,
 | (c1: Long, c2: Long) = c1 + c2)
rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
combineB
yKey at console:14

xj @ Tokyo

On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com wrote:

 I followd an example presented in the tutorial Learning Spark
 http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
 to compute the per-key average as follows:


 val Array(appName) = args
 val sparkConf = new SparkConf()
 .setAppName(appName)
 val sc = new SparkContext(sparkConf)
 /*
  * compute the per-key average of values
  * results should be:
  *A : 5.8
  *B : 14
  *C : 60.6
  */
 val rdd = sc.parallelize(List(
 (A, 3), (A, 9), (A, 12), (A, 0), (A, 5),
 (B, 4), (B, 10), (B, 11), (B, 20), (B, 25),
 (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2)
 val avg = rdd.combineByKey(
 (x:Int) = (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
 cannot be cast to java.lang.Integer
 (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1),
 (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 +
 acc2._2))
 .map{case (s, t) = (s, t._1/t._2.toFloat)}
  avg.collect.foreach(t = println(t._1 +  - + t._2))



 When I submitted the application, an exception of 
 *java.lang.ClassCastException:
 scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown
 out. The tutorial said that the first function of *combineByKey*, *(x:Int)
 = (x, 1)*, should take a single element in the source RDD and return an
 element of the desired type in the resulting RDD. In my application, we
 take a single element of type *Int *from the source RDD and return a
 tuple of type (*Int*, *Int*), which meets the requirements quite well.
 But why would such an exception be thrown?

 I'm using CDH 5.0 and Spark 0.9

 Thanks.





Re: Matrix multiplication in spark

2014-08-21 Thread x
You could create a distributed matrix with RowMatrix.

val rmat = new RowMatrix(rows)

And then make a local DenseMatrix.

val localMat = Matrices.dense(m, n, mat)

Then multiply them.

rmat.multiply(localMat)


xj @ Tokyo

On Thu, Aug 21, 2014 at 6:37 PM, Sean Owen so...@cloudera.com wrote:

 Are you trying to multiply dense or sparse matrices? if sparse, are
 they very large -- meaning, are you looking for distributed
 operations?

 On Thu, Aug 21, 2014 at 10:07 AM, phoenix bai mingzhi...@gmail.com
 wrote:
  there is RowMatrix implemented in spark.
  and I check for a while but failed to find any matrix operations (like
  multiplication etc) are defined in the class yet.
 
  so, my question is, if I want to do matrix multiplication, (to do vector
 x
  matrix multiplication to be precise), need to convert the vector/matrix
 to
  the the matrix type defined in breeze package right?
 
  thanks

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Matrix multiplication in spark

2014-08-21 Thread x
Yes.
Now Spark API doesn't provide transpose function. You have to define it
like below.

def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
(for {
  c - m(0).indices
} yield m.map(_(c)) ).toArray
}

xj @ Tokyo


On Thu, Aug 21, 2014 at 10:12 PM, phoenix bai mingzhi...@gmail.com wrote:

 this is exactly what I was looking for. thank you.

 one thing though, it doesn`t have transpose() function defined, so I have
 to do the transpose myself for the localMat in your case.
 hoping it will be supported in the future :-)



 On Thu, Aug 21, 2014 at 7:30 PM, x wasedax...@gmail.com wrote:

 You could create a distributed matrix with RowMatrix.

 val rmat = new RowMatrix(rows)

 And then make a local DenseMatrix.

 val localMat = Matrices.dense(m, n, mat)

 Then multiply them.

 rmat.multiply(localMat)


 xj @ Tokyo

 On Thu, Aug 21, 2014 at 6:37 PM, Sean Owen so...@cloudera.com wrote:

 Are you trying to multiply dense or sparse matrices? if sparse, are
 they very large -- meaning, are you looking for distributed
 operations?

 On Thu, Aug 21, 2014 at 10:07 AM, phoenix bai mingzhi...@gmail.com
 wrote:
  there is RowMatrix implemented in spark.
  and I check for a while but failed to find any matrix operations (like
  multiplication etc) are defined in the class yet.
 
  so, my question is, if I want to do matrix multiplication, (to do
 vector x
  matrix multiplication to be precise), need to convert the
 vector/matrix to
  the the matrix type defined in breeze package right?
 
  thanks

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: How can I implement eigenvalue decomposition in Spark?

2014-08-08 Thread x
Generally adjacency matrix is undirected(symmetric) on social network, so
you can get eigenvectors from SVD computed result.

A = UDV^t

The first column of U is the biggest eigenvector corresponding to the first
value of D.

xj @ Tokyo


On Sat, Aug 9, 2014 at 4:08 AM, Li Pu l...@twitter.com.invalid wrote:

 @Miles, eigen-decomposition with asymmetric matrix doesn't always give
 real-value solutions, and it doesn't have the nice properties that
 symmetric matrix holds. Usually you want to symmetrize your asymmetric
 matrix in some way, e.g. see
 http://machinelearning.wustl.edu/mlpapers/paper_files/icml2005_ZhouHS05.pdf
 but as Sean mentioned, you can always compute the largest eigenpair with
 power method or some variations like pagerank, which is already implemented
 in graphx.


 On Fri, Aug 8, 2014 at 2:50 AM, Sean Owen so...@cloudera.com wrote:

 The SVD does not in general give you eigenvalues of its input.

 Are you just trying to access the U and V matrices? they are also
 returned in the API.  But they are not the eigenvectors of M, as you
 note.

 I don't think MLlib has anything to help with the general eigenvector
 problem.
 Maybe you can implement a sort of power iteration algorithm using
 GraphX to find the largest eigenvector?

 On Fri, Aug 8, 2014 at 4:07 AM, Chunnan Yao yaochun...@gmail.com wrote:
  Hi there, what you've suggested are all meaningful. But to make myself
  clearer, my essential problems are:
  1. My matrix is asymmetric, and it is a probabilistic adjacency matrix,
  whose entries(a_ij) represents the likelihood that user j will
 broadcast the
  information generated by user i. Apparently, a_ij and a_ji is different,
  caus I love you doesn't necessarily mean you love me(What a sad
 story~). All
  entries are real.
  2. I know I can get eigenvalues through SVD. My problem is I can't get
 the
  corresponding eigenvectors, which requires solving equations, and I also
  need eigenvectors in my calculation.In my simulation of this paper, I
 only
  need the biggest eigenvalues and corresponding eigenvectors.
  The paper posted by Shivaram Venkataraman is also concerned about
 symmetric
  matrix. Could any one help me out?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Li
 @vrilleup



Re: How can I implement eigenvalue decomposition in Spark?

2014-08-07 Thread x
 The SVD computed result already contains descending order of singular
values, you can get the biggest eigenvalue.

---

  val svd = matrix.computeSVD(matrix.numCols().toInt, computeU = true)
  val U: RowMatrix = svd.U
  val s: Vector = svd.s
  val V: Matrix = svd.V

  U.rows.toArray.take(1).foreach(println)

  println(s.toArray(0)*s.toArray(0))

  println(V.toArray.take(s.size).foreach(println))

---

xj @ Tokyo

On Fri, Aug 8, 2014 at 3:06 AM, Shivaram Venkataraman 
shiva...@eecs.berkeley.edu wrote:

 If you just want to find the top eigenvalue / eigenvector you can do
 something like the Lanczos method. There is a description of a MapReduce
 based algorithm in Section 4.2 of [1]

 [1] http://www.cs.cmu.edu/~ukang/papers/HeigenPAKDD2011.pdf


 On Thu, Aug 7, 2014 at 10:54 AM, Li Pu l...@twitter.com.invalid wrote:

 @Miles, the latest SVD implementation in mllib is partially distributed.
 Matrix-vector multiplication is computed among all workers, but the right
 singular vectors are all stored in the driver. If your symmetric matrix is
 n x n and you want the first k eigenvalues, you will need to fit n x k
 doubles in driver's memory. Behind the scene, it calls ARPACK to compute
 eigen-decomposition of A^T A. You can look into the source code for the
 details.

 @Sean, the SVD++ implementation in graphx is not the canonical definition
 of SVD. It doesn't have the orthogonality that SVD holds. But we might want
 to use graphx as the underlying matrix representation for mllib.SVD to
 address the problem of skewed entry distribution.


 On Thu, Aug 7, 2014 at 10:51 AM, Evan R. Sparks evan.spa...@gmail.com
 wrote:

 Reza Zadeh has contributed the distributed implementation of
 (Tall/Skinny) SVD (
 http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html),
 which is in MLlib (Spark 1.0) and a distributed sparse SVD coming in Spark
 1.1. (https://issues.apache.org/jira/browse/SPARK-1782). If your data
 is sparse (which it often is in social networks), you may have better luck
 with this.

 I haven't tried the GraphX implementation, but those algorithms are
 often well-suited for power-law distributed graphs as you might see in
 social networks.

 FWIW, I believe you need to square elements of the sigma matrix from the
 SVD to get the eigenvalues.




 On Thu, Aug 7, 2014 at 10:20 AM, Sean Owen so...@cloudera.com wrote:

 (-incubator, +user)

 If your matrix is symmetric (and real I presume), and if my linear
 algebra isn't too rusty, then its SVD is its eigendecomposition. The
 SingularValueDecomposition object you get back has U and V, both of
 which have columns that are the eigenvectors.

 There are a few SVDs in the Spark code. The one in mllib is not
 distributed (right?) and is probably not an efficient means of
 computing eigenvectors if you really just want a decomposition of a
 symmetric matrix.

 The one I see in graphx is distributed? I haven't used it though.
 Maybe it could be part of a solution.



 On Thu, Aug 7, 2014 at 2:21 PM, yaochunnan yaochun...@gmail.com
 wrote:
  Our lab need to do some simulation on online social networks. We need
 to
  handle a 5000*5000 adjacency matrix, namely, to get its largest
 eigenvalue
  and corresponding eigenvector. Matlab can be used but it is
 time-consuming.
  Is Spark effective in linear algebra calculations and
 transformations? Later
  we would have 500*500 matrix processed. It seems emergent
 that we
  should find some distributed computation platform.
 
  I see SVD has been implemented and I can get eigenvalues of a matrix
 through
  this API.  But when I want to get both eigenvalues and eigenvectors
 or at
  least the biggest eigenvalue and the corresponding eigenvector, it
 seems
  that current Spark doesn't have such API. Is it possible that I write
  eigenvalue decomposition from scratch? What should I do? Thanks a lot!
 
 
  Miles Yao
 
  
  View this message in context: How can I implement eigenvalue
 decomposition
  in Spark?
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Li
 @vrilleup





Re: How can I implement eigenvalue decomposition in Spark?

2014-08-07 Thread x
U.rows.toArray.take(1).foreach(println) and
V.toArray.take(s.size).foreach(println)
are not eigenvectors corresponding to the biggest eigenvalue s.toArray(0)*s.
toArray(0)?

xj @ Tokyo

On Fri, Aug 8, 2014 at 12:07 PM, Chunnan Yao yaochun...@gmail.com wrote:

 Hi there, what you've suggested are all meaningful. But to make myself
 clearer, my essential problems are:
 1. My matrix is asymmetric, and it is a probabilistic adjacency matrix,
 whose entries(a_ij) represents the likelihood that user j will broadcast
 the information generated by user i. Apparently, a_ij and a_ji is
 different, caus I love you doesn't necessarily mean you love me(What a sad
 story~). All entries are real.
 2. I know I can get eigenvalues through SVD. My problem is I can't get the
 corresponding eigenvectors, which requires solving equations, and I also
 need eigenvectors in my calculation.In my simulation of this paper, I only
 need the biggest eigenvalues and corresponding eigenvectors.
 The paper posted by Shivaram Venkataraman is also concerned about
 symmetric matrix. Could any one help me out?


 2014-08-08 9:41 GMT+08:00 x wasedax...@gmail.com:

  The SVD computed result already contains descending order of singular
 values, you can get the biggest eigenvalue.

 ---

   val svd = matrix.computeSVD(matrix.numCols().toInt, computeU = true)
   val U: RowMatrix = svd.U
   val s: Vector = svd.s
   val V: Matrix = svd.V

   U.rows.toArray.take(1).foreach(println)

   println(s.toArray(0)*s.toArray(0))

   println(V.toArray.take(s.size).foreach(println))

 ---

 xj @ Tokyo


 On Fri, Aug 8, 2014 at 3:06 AM, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edu wrote:

 If you just want to find the top eigenvalue / eigenvector you can do
 something like the Lanczos method. There is a description of a MapReduce
 based algorithm in Section 4.2 of [1]

 [1] http://www.cs.cmu.edu/~ukang/papers/HeigenPAKDD2011.pdf


 On Thu, Aug 7, 2014 at 10:54 AM, Li Pu l...@twitter.com.invalid wrote:

 @Miles, the latest SVD implementation in mllib is partially
 distributed. Matrix-vector multiplication is computed among all workers,
 but the right singular vectors are all stored in the driver. If your
 symmetric matrix is n x n and you want the first k eigenvalues, you will
 need to fit n x k doubles in driver's memory. Behind the scene, it calls
 ARPACK to compute eigen-decomposition of A^T A. You can look into the
 source code for the details.

 @Sean, the SVD++ implementation in graphx is not the canonical
 definition of SVD. It doesn't have the orthogonality that SVD holds. But we
 might want to use graphx as the underlying matrix representation for
 mllib.SVD to address the problem of skewed entry distribution.


 On Thu, Aug 7, 2014 at 10:51 AM, Evan R. Sparks evan.spa...@gmail.com
 wrote:

 Reza Zadeh has contributed the distributed implementation of
 (Tall/Skinny) SVD (
 http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html),
 which is in MLlib (Spark 1.0) and a distributed sparse SVD coming in Spark
 1.1. (https://issues.apache.org/jira/browse/SPARK-1782). If your data
 is sparse (which it often is in social networks), you may have better luck
 with this.

 I haven't tried the GraphX implementation, but those algorithms are
 often well-suited for power-law distributed graphs as you might see in
 social networks.

 FWIW, I believe you need to square elements of the sigma matrix from
 the SVD to get the eigenvalues.




 On Thu, Aug 7, 2014 at 10:20 AM, Sean Owen so...@cloudera.com wrote:

 (-incubator, +user)

 If your matrix is symmetric (and real I presume), and if my linear
 algebra isn't too rusty, then its SVD is its eigendecomposition. The
 SingularValueDecomposition object you get back has U and V, both of
 which have columns that are the eigenvectors.

 There are a few SVDs in the Spark code. The one in mllib is not
 distributed (right?) and is probably not an efficient means of
 computing eigenvectors if you really just want a decomposition of a
 symmetric matrix.

 The one I see in graphx is distributed? I haven't used it though.
 Maybe it could be part of a solution.



 On Thu, Aug 7, 2014 at 2:21 PM, yaochunnan yaochun...@gmail.com
 wrote:
  Our lab need to do some simulation on online social networks. We
 need to
  handle a 5000*5000 adjacency matrix, namely, to get its largest
 eigenvalue
  and corresponding eigenvector. Matlab can be used but it is
 time-consuming.
  Is Spark effective in linear algebra calculations and
 transformations? Later
  we would have 500*500 matrix processed. It seems emergent
 that we
  should find some distributed computation platform.
 
  I see SVD has been implemented and I can get eigenvalues of a
 matrix through
  this API.  But when I want to get both eigenvalues and eigenvectors
 or at
  least the biggest eigenvalue and the corresponding eigenvector, it
 seems
  that current Spark doesn't have such API. Is it possible that I
 write
  eigenvalue decomposition from

Tranforming flume events using Spark transformation functions

2014-07-22 Thread Sundaram, Muthu X.
Hi All,
  I am getting events from flume using following line.

  JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, 
port);

Each event is a delimited record. I like to use some of the transformation 
functions like map and reduce on this. Do I need to convert the 
JavaDStreamSparkFlumeEvent to JavaDStreamString or can I apply these 
function directly on this?

I need to do following kind of operations

 AA
YDelta
TAA
 Southwest
 AA

Unique tickets are  , Y, , , .
Count is  2,  1, T 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 
ticket.

I have to do transformations like this. Right now I am able to receives 
records. But I am struggling to transform them using spark transformation 
functions since they are not of type JavaRDDString.

Can I create new JavaRDDString? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
 String logRecord = null;
 ListSparkFlumeEvent events = eventsData.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator();
 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;
 // All the user level data is carried as payload in Flume 
Event
 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println(LOG RECORD =  + 
logRecord);
}

Where do I create new JavaRDDString? DO I do it before this loop? How do I 
create this JavaRDDString?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu




RE: Tranforming flume events using Spark transformation functions

2014-07-22 Thread Sundaram, Muthu X.
I tried to map SparkFlumeEvents to String of RDDs like below. But that map and 
call are not at all executed. I might be doing this in a wrong way. Any help 
would be appreciated.

flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
System.out.println(Inside for 
each...call);

JavaRDDString records = eventsData.map(
new FunctionSparkFlumeEvent, String() {
@Override
public String call(SparkFlumeEvent flume) throws Exception {
String logRecord = null;
AvroFlumeEvent avroEvent = null;
  ByteBuffer bytePayload = null;


System.out.println(Inside Map..call);
/* ListSparkFlumeEvent events = flume.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator(); 

SparkFlumeEvent flumeEvent = batchedEvents.next();*/
avroEvent = flume.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());


System.out.println(Record is + logRecord);

return logRecord;
}
});   
return null;
}

-Original Message-
From: Sundaram, Muthu X. [mailto:muthu.x.sundaram@sabre.com] 
Sent: Tuesday, July 22, 2014 10:24 AM
To: user@spark.apache.org; d...@spark.incubator.apache.org
Subject: Tranforming flume events using Spark transformation functions

Hi All,
  I am getting events from flume using following line.

  JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, 
port);

Each event is a delimited record. I like to use some of the transformation 
functions like map and reduce on this. Do I need to convert the 
JavaDStreamSparkFlumeEvent to JavaDStreamString or can I apply these 
function directly on this?

I need to do following kind of operations

 AA
YDelta
TAA
 Southwest
 AA

Unique tickets are  , Y, , , .
Count is  2,  1, T 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 
ticket.

I have to do transformations like this. Right now I am able to receives 
records. But I am struggling to transform them using spark transformation 
functions since they are not of type JavaRDDString.

Can I create new JavaRDDString? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
 String logRecord = null;
 ListSparkFlumeEvent events = eventsData.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator();
 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;
 // All the user level data is carried as payload in Flume 
Event
 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println(LOG RECORD =  + 
logRecord); }

Where do I create new JavaRDDString? DO I do it before this loop? How do I 
create this JavaRDDString?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu




RE: writing FLume data to HDFS

2014-07-14 Thread Sundaram, Muthu X.
I am not sure how to write it…I tried writing to local file system using 
FileWriter and Print Writer. I tried it inside the while loop. I am able to get 
the text and able to print it but it fails when I use regular java classes. 
Shouldn’t I use regular java classes here? Can I write to only HDFS? Should I 
have to create the file in HDFS using HDFS classes? I thought of using Spark’s 
SaveAsTextFile(). But I have JavaRDDSparkFlumeEvent of this..not 
JavaRDDAvroEvent. So I am not sure whether SaveAsText() will work. I 
appreciate any guidance here. How do I get more code examples? Books, URL?


  flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
 String logRecord = null;
 ListSparkFlumeEvent events = eventsData.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator();
 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;
 // All the user level data is carried as payload in Flume 
Event
 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println(LOG RECORD =  + 
logRecord);

   ??I was trying to write the data to hdfs..but it 
fails…


From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Friday, July 11, 2014 1:43 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: writing FLume data to HDFS

What is the error you are getting when you say ??I was trying to write the 
data to hdfs..but it fails…

TD

On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. 
muthu.x.sundaram@sabre.commailto:muthu.x.sundaram@sabre.com wrote:
I am new to spark. I am trying to do the following.
Netcat--Flume--Spark streaming(process Flume Data)--HDFS.

My flume config file has following set up.

Source = netcat
Sink=avrosink.

Spark Streaming code:
I am able to print data from flume to the monitor. But I am struggling to 
create a file. In order to get the real data I need to convert SparkEvent to 
avroEvent.
JavaRDD.saveAsText()--might not work..because JavaRDD is collection of 
SparkEvent..Do I need to convert this in to collection of JavaRDDAvroEvent?
Please share any code examples… Thanks.

Code:

 Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName(JavaFlumeEventCount);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, 
host, port);

flumeStream.count();
flumeStream.foreachRDD(new 
Function2JavaRDDSparkFlumeEvent,JavaRDDSparkFlumeEvent,Void(){
 @Override
 public Void call(JavaRDDSparkFlumeEvent events1,JavaRDDSparkFlumeEvent 
events2) throws Exception{
events1.saveasTextFile(output.txt);
return null;
 }
 });

/*flumeStream.count().map(new FunctionLong, String() {
  @Override
  public String call(Long in) {
return Received  + in +  flume events.;
  }
}).print();*/

flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
 String logRecord = null;
 ListSparkFlumeEvent events = eventsData.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator();


 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;

 // All the user level data is carried as payload in Flume 
Event

 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println(LOG RECORD =  + 
logRecord);

   ??I was trying to write the data to hdfs..but it 
fails…



 }
 System.out.println(Processed this batch in:  + 
(System.currentTimeMillis() - t1)/1000 +  seconds);
 return null;
  }
 });




Re: One question about RDD.zip function when trying Naive Bayes

2014-07-11 Thread x
I tried my test case with Spark 1.0.1 and saw the same result(27 pairs
becomes 25 pairs after zip).

Could someone please check it?

Regards,
xj

On Thu, Jul 3, 2014 at 2:31 PM, Xiangrui Meng men...@gmail.com wrote:

 This is due to a bug in sampling, which was fixed in 1.0.1 and latest
 master. See https://github.com/apache/spark/pull/1234 . -Xiangrui

 On Wed, Jul 2, 2014 at 8:23 PM, x wasedax...@gmail.com wrote:
  Hello,
 
  I a newbie to Spark MLlib and ran into a curious case when following the
  instruction at the page below.
 
  http://spark.apache.org/docs/latest/mllib-naive-bayes.html
 
  I ran a test program on my local machine using some data.
 
  val spConfig = (new
  SparkConf).setMaster(local).setAppName(SparkNaiveBayes)
  val sc = new SparkContext(spConfig)
 
  The test data was as follows and there were three lableled categories I
  wanted to predict.
 
   1  LabeledPoint(0.0, [4.9,3.0,1.4,0.2])
   2  LabeledPoint(0.0, [4.6,3.4,1.4,0.3])
   3  LabeledPoint(0.0, [5.7,4.4,1.5,0.4])
   4  LabeledPoint(0.0, [5.2,3.4,1.4,0.2])
   5  LabeledPoint(0.0, [4.7,3.2,1.6,0.2])
   6  LabeledPoint(0.0, [4.8,3.1,1.6,0.2])
   7  LabeledPoint(0.0, [5.1,3.8,1.9,0.4])
   8  LabeledPoint(0.0, [4.8,3.0,1.4,0.3])
   9  LabeledPoint(0.0, [5.0,3.3,1.4,0.2])
  10  LabeledPoint(1.0, [6.6,2.9,4.6,1.3])
  11  LabeledPoint(1.0, [5.2,2.7,3.9,1.4])
  12  LabeledPoint(1.0, [5.6,2.5,3.9,1.1])
  13  LabeledPoint(1.0, [6.4,2.9,4.3,1.3])
  14  LabeledPoint(1.0, [6.6,3.0,4.4,1.4])
  15  LabeledPoint(1.0, [6.0,2.7,5.1,1.6])
  16  LabeledPoint(1.0, [5.5,2.6,4.4,1.2])
  17  LabeledPoint(1.0, [5.8,2.6,4.0,1.2])
  18  LabeledPoint(1.0, [5.7,2.9,4.2,1.3])
  19  LabeledPoint(1.0, [5.7,2.8,4.1,1.3])
  20  LabeledPoint(2.0, [6.3,2.9,5.6,1.8])
  21  LabeledPoint(2.0, [6.5,3.0,5.8,2.2])
  22  LabeledPoint(2.0, [6.5,3.0,5.5,1.8])
  23  LabeledPoint(2.0, [6.7,3.3,5.7,2.1])
  24  LabeledPoint(2.0, [7.4,2.8,6.1,1.9])
  25  LabeledPoint(2.0, [6.3,3.4,5.6,2.4])
  26  LabeledPoint(2.0, [6.0,3.0,4.8,1.8])
  27  LabeledPoint(2.0, [6.8,3.2,5.9,2.3])
 
  The predicted result via NaiveBayes is below. Comparing to test data,
 only
  two predicted results(#11 and #15) were different.
 
   1  0.0
   2  0.0
   3  0.0
   4  0.0
   5  0.0
   6  0.0
   7  0.0
   8  0.0
   9  0.0
  10  1.0
  11  2.0
  12  1.0
  13  1.0
  14  1.0
  15  2.0
  16  1.0
  17  1.0
  18  1.0
  19  1.0
  20  2.0
  21  2.0
  22  2.0
  23  2.0
  24  2.0
  25  2.0
  26  2.0
  27  2.0
 
  After grouping test RDD and predicted RDD via zip I got this.
 
   1  (0.0,0.0)
   2  (0.0,0.0)
   3  (0.0,0.0)
   4  (0.0,0.0)
   5  (0.0,0.0)
   6  (0.0,0.0)
   7  (0.0,0.0)
   8  (0.0,0.0)
   9  (0.0,1.0)
  10  (0.0,1.0)
  11  (0.0,1.0)
  12  (1.0,1.0)
  13  (1.0,1.0)
  14  (2.0,1.0)
  15  (1.0,1.0)
  16  (1.0,2.0)
  17  (1.0,2.0)
  18  (1.0,2.0)
  19  (1.0,2.0)
  20  (2.0,2.0)
  21  (2.0,2.0)
  22  (2.0,2.0)
  23  (2.0,2.0)
  24  (2.0,2.0)
  25  (2.0,2.0)
 
  I expected there were 27 pairs but I saw two results were lost.
  Could someone please point out what I missed something here?
 
  Regards,
  xj



writing FLume data to HDFS

2014-07-10 Thread Sundaram, Muthu X.
I am new to spark. I am trying to do the following.
Netcat--Flume--Spark streaming(process Flume Data)--HDFS.

My flume config file has following set up.

Source = netcat
Sink=avrosink.

Spark Streaming code:
I am able to print data from flume to the monitor. But I am struggling to 
create a file. In order to get the real data I need to convert SparkEvent to 
avroEvent.
JavaRDD.saveAsText()--might not work..because JavaRDD is collection of 
SparkEvent..Do I need to convert this in to collection of JavaRDDAvroEvent?
Please share any code examples... Thanks.

Code:

 Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName(JavaFlumeEventCount);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, 
host, port);

flumeStream.count();
flumeStream.foreachRDD(new 
Function2JavaRDDSparkFlumeEvent,JavaRDDSparkFlumeEvent,Void(){
 @Override
 public Void call(JavaRDDSparkFlumeEvent events1,JavaRDDSparkFlumeEvent 
events2) throws Exception{
events1.saveasTextFile(output.txt);
return null;
 }
 });

/*flumeStream.count().map(new FunctionLong, String() {
  @Override
  public String call(Long in) {
return Received  + in +  flume events.;
  }
}).print();*/

flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
 String logRecord = null;
 ListSparkFlumeEvent events = eventsData.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator();


 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;

 // All the user level data is carried as payload in Flume 
Event

 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println(LOG RECORD =  + 
logRecord);

   ??I was trying to write the data to hdfs..but it 
fails...



 }
 System.out.println(Processed this batch in:  + 
(System.currentTimeMillis() - t1)/1000 +  seconds);
 return null;
  }
 });



One question about RDD.zip function when trying Naive Bayes

2014-07-02 Thread x
Hello,

I a newbie to Spark MLlib and ran into a curious case when following the
instruction at the page below.

http://spark.apache.org/docs/latest/mllib-naive-bayes.html

I ran a test program on my local machine using some data.

val spConfig = (new
SparkConf).setMaster(local).setAppName(SparkNaiveBayes)
val sc = new SparkContext(spConfig)

The test data was as follows and there were three lableled categories I
wanted to predict.

 1  LabeledPoint(0.0, [4.9,3.0,1.4,0.2])
 2  LabeledPoint(0.0, [4.6,3.4,1.4,0.3])
 3  LabeledPoint(0.0, [5.7,4.4,1.5,0.4])
 4  LabeledPoint(0.0, [5.2,3.4,1.4,0.2])
 5  LabeledPoint(0.0, [4.7,3.2,1.6,0.2])
 6  LabeledPoint(0.0, [4.8,3.1,1.6,0.2])
 7  LabeledPoint(0.0, [5.1,3.8,1.9,0.4])
 8  LabeledPoint(0.0, [4.8,3.0,1.4,0.3])
 9  LabeledPoint(0.0, [5.0,3.3,1.4,0.2])
10  LabeledPoint(1.0, [6.6,2.9,4.6,1.3])
11  LabeledPoint(1.0, [5.2,2.7,3.9,1.4])
12  LabeledPoint(1.0, [5.6,2.5,3.9,1.1])
13  LabeledPoint(1.0, [6.4,2.9,4.3,1.3])
14  LabeledPoint(1.0, [6.6,3.0,4.4,1.4])
15  LabeledPoint(1.0, [6.0,2.7,5.1,1.6])
16  LabeledPoint(1.0, [5.5,2.6,4.4,1.2])
17  LabeledPoint(1.0, [5.8,2.6,4.0,1.2])
18  LabeledPoint(1.0, [5.7,2.9,4.2,1.3])
19  LabeledPoint(1.0, [5.7,2.8,4.1,1.3])
20  LabeledPoint(2.0, [6.3,2.9,5.6,1.8])
21  LabeledPoint(2.0, [6.5,3.0,5.8,2.2])
22  LabeledPoint(2.0, [6.5,3.0,5.5,1.8])
23  LabeledPoint(2.0, [6.7,3.3,5.7,2.1])
24  LabeledPoint(2.0, [7.4,2.8,6.1,1.9])
25  LabeledPoint(2.0, [6.3,3.4,5.6,2.4])
26  LabeledPoint(2.0, [6.0,3.0,4.8,1.8])
27  LabeledPoint(2.0, [6.8,3.2,5.9,2.3])

The predicted result via NaiveBayes is below. Comparing to test data, only
two predicted results(#11 and #15) were different.

 1  0.0
 2  0.0
 3  0.0
 4  0.0
 5  0.0
 6  0.0
 7  0.0
 8  0.0
 9  0.0
10  1.0
11  2.0
12  1.0
13  1.0
14  1.0
15  2.0
16  1.0
17  1.0
18  1.0
19  1.0
20  2.0
21  2.0
22  2.0
23  2.0
24  2.0
25  2.0
26  2.0
27  2.0

After grouping test RDD and predicted RDD via zip I got this.

 1  (0.0,0.0)
 2  (0.0,0.0)
 3  (0.0,0.0)
 4  (0.0,0.0)
 5  (0.0,0.0)
 6  (0.0,0.0)
 7  (0.0,0.0)
 8  (0.0,0.0)
 9  (0.0,1.0)
10  (0.0,1.0)
11  (0.0,1.0)
12  (1.0,1.0)
13  (1.0,1.0)
14  (2.0,1.0)
15  (1.0,1.0)
16  (1.0,2.0)
17  (1.0,2.0)
18  (1.0,2.0)
19  (1.0,2.0)
20  (2.0,2.0)
21  (2.0,2.0)
22  (2.0,2.0)
23  (2.0,2.0)
24  (2.0,2.0)
25  (2.0,2.0)

I expected there were 27 pairs but I saw two results were lost.
Could someone please point out what I missed something here?

Regards,
xj


Re: One question about RDD.zip function when trying Naive Bayes

2014-07-02 Thread x
Thanks for the confirm.
I will be checking it.

Regards,
xj


On Thu, Jul 3, 2014 at 2:31 PM, Xiangrui Meng men...@gmail.com wrote:

 This is due to a bug in sampling, which was fixed in 1.0.1 and latest
 master. See https://github.com/apache/spark/pull/1234 . -Xiangrui

 On Wed, Jul 2, 2014 at 8:23 PM, x wasedax...@gmail.com wrote:
  Hello,
 
  I a newbie to Spark MLlib and ran into a curious case when following the
  instruction at the page below.
 
  http://spark.apache.org/docs/latest/mllib-naive-bayes.html
 
  I ran a test program on my local machine using some data.
 
  val spConfig = (new
  SparkConf).setMaster(local).setAppName(SparkNaiveBayes)
  val sc = new SparkContext(spConfig)
 
  The test data was as follows and there were three lableled categories I
  wanted to predict.
 
   1  LabeledPoint(0.0, [4.9,3.0,1.4,0.2])
   2  LabeledPoint(0.0, [4.6,3.4,1.4,0.3])
   3  LabeledPoint(0.0, [5.7,4.4,1.5,0.4])
   4  LabeledPoint(0.0, [5.2,3.4,1.4,0.2])
   5  LabeledPoint(0.0, [4.7,3.2,1.6,0.2])
   6  LabeledPoint(0.0, [4.8,3.1,1.6,0.2])
   7  LabeledPoint(0.0, [5.1,3.8,1.9,0.4])
   8  LabeledPoint(0.0, [4.8,3.0,1.4,0.3])
   9  LabeledPoint(0.0, [5.0,3.3,1.4,0.2])
  10  LabeledPoint(1.0, [6.6,2.9,4.6,1.3])
  11  LabeledPoint(1.0, [5.2,2.7,3.9,1.4])
  12  LabeledPoint(1.0, [5.6,2.5,3.9,1.1])
  13  LabeledPoint(1.0, [6.4,2.9,4.3,1.3])
  14  LabeledPoint(1.0, [6.6,3.0,4.4,1.4])
  15  LabeledPoint(1.0, [6.0,2.7,5.1,1.6])
  16  LabeledPoint(1.0, [5.5,2.6,4.4,1.2])
  17  LabeledPoint(1.0, [5.8,2.6,4.0,1.2])
  18  LabeledPoint(1.0, [5.7,2.9,4.2,1.3])
  19  LabeledPoint(1.0, [5.7,2.8,4.1,1.3])
  20  LabeledPoint(2.0, [6.3,2.9,5.6,1.8])
  21  LabeledPoint(2.0, [6.5,3.0,5.8,2.2])
  22  LabeledPoint(2.0, [6.5,3.0,5.5,1.8])
  23  LabeledPoint(2.0, [6.7,3.3,5.7,2.1])
  24  LabeledPoint(2.0, [7.4,2.8,6.1,1.9])
  25  LabeledPoint(2.0, [6.3,3.4,5.6,2.4])
  26  LabeledPoint(2.0, [6.0,3.0,4.8,1.8])
  27  LabeledPoint(2.0, [6.8,3.2,5.9,2.3])
 
  The predicted result via NaiveBayes is below. Comparing to test data,
 only
  two predicted results(#11 and #15) were different.
 
   1  0.0
   2  0.0
   3  0.0
   4  0.0
   5  0.0
   6  0.0
   7  0.0
   8  0.0
   9  0.0
  10  1.0
  11  2.0
  12  1.0
  13  1.0
  14  1.0
  15  2.0
  16  1.0
  17  1.0
  18  1.0
  19  1.0
  20  2.0
  21  2.0
  22  2.0
  23  2.0
  24  2.0
  25  2.0
  26  2.0
  27  2.0
 
  After grouping test RDD and predicted RDD via zip I got this.
 
   1  (0.0,0.0)
   2  (0.0,0.0)
   3  (0.0,0.0)
   4  (0.0,0.0)
   5  (0.0,0.0)
   6  (0.0,0.0)
   7  (0.0,0.0)
   8  (0.0,0.0)
   9  (0.0,1.0)
  10  (0.0,1.0)
  11  (0.0,1.0)
  12  (1.0,1.0)
  13  (1.0,1.0)
  14  (2.0,1.0)
  15  (1.0,1.0)
  16  (1.0,2.0)
  17  (1.0,2.0)
  18  (1.0,2.0)
  19  (1.0,2.0)
  20  (2.0,2.0)
  21  (2.0,2.0)
  22  (2.0,2.0)
  23  (2.0,2.0)
  24  (2.0,2.0)
  25  (2.0,2.0)
 
  I expected there were 27 pairs but I saw two results were lost.
  Could someone please point out what I missed something here?
 
  Regards,
  xj