Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-02-29 Thread John Zhuge
Excellent work, congratulations!

On Wed, Feb 28, 2024 at 10:12 PM Dongjoon Hyun 
wrote:

> Congratulations!
>
> Bests,
> Dongjoon.
>
> On Wed, Feb 28, 2024 at 11:43 AM beliefer  wrote:
>
>> Congratulations!
>>
>>
>>
>> At 2024-02-28 17:43:25, "Jungtaek Lim" 
>> wrote:
>>
>> Hi everyone,
>>
>> We are happy to announce the availability of Spark 3.5.1!
>>
>> Spark 3.5.1 is a maintenance release containing stability fixes. This
>> release is based on the branch-3.5 maintenance branch of Spark. We
>> strongly
>> recommend all 3.5 users to upgrade to this stable release.
>>
>> To download Spark 3.5.1, head over to the download page:
>> https://spark.apache.org/downloads.html
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-3-5-1.html
>>
>> We would like to acknowledge all community members for contributing to
>> this
>> release. This release would not have been possible without you.
>>
>> Jungtaek Lim
>>
>> ps. Yikun is helping us through releasing the official docker image for
>> Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally available.
>>
>>

-- 
John Zhuge


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread John Zhuge
Congratulations! Excellent work!

On Tue, Feb 13, 2024 at 8:04 PM Yufei Gu  wrote:

> Absolutely thrilled to see the project going open-source! Huge congrats to
> Chao and the entire team on this milestone!
>
> Yufei
>
>
> On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:
>
>> Hi all,
>>
>> We are very happy to announce that Project Comet, a plugin to
>> accelerate Spark query execution via leveraging DataFusion and Arrow,
>> has now been open sourced under the Apache Arrow umbrella. Please
>> check the project repo
>> https://github.com/apache/arrow-datafusion-comet for more details if
>> you are interested. We'd love to collaborate with people from the open
>> source community who share similar goals.
>>
>> Thanks,
>> Chao
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

-- 
John Zhuge


Rename columns without manually setting them all

2023-06-21 Thread John Paul Jayme
Hi,

This is currently my column definition :
Employee ID NameClient  Project Team01/01/2022  02/01/2022  
03/01/2022  04/01/2022  05/01/2022
12345   Dummy x Dummy a abc team a  OFF WO  WH  WH  
WH

As you can see, the outer columns are just daily attendance dates. My goal is 
to count the employees who were OFF / WO / WH on said dates. I need to 
transpose them so it would look like this :

[cid:ff6d0260-0168-40a4-82db-6c2acd517c39]

I am still new to pandas. Can you guide me on how to produce this? I am reading 
about melt() and set_index() but I am not sure if they are the correct 
functions to use.



How to read excel file in PySpark

2023-06-20 Thread John Paul Jayme
Good day,

I have a task to read excel files in databricks but I cannot seem to proceed. I 
am referencing the API documents -  
read_excel<https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_excel.html>
 , but there is an error sparksession object has no attribute 'read_excel'. Can 
you advise?

JOHN PAUL JAYME
Data Engineer
[https://app.tdcx.com/email-signature/assets/img/tdcx-logo.png]
m. +639055716384  w. www.tdcx.com<http://www.tdcx.com/>

Winner of over 350 Industry Awards
[Linkedin]<https://www.linkedin.com/company/tdcxgroup/> [Facebook] 
<https://www.facebook.com/tdcxgroup/>  [Twitter] 
<https://twitter.com/tdcxgroup/>  [Youtube] 
<https://www.youtube.com/c/TDCXgroup>  [Instagram] 
<https://www.instagram.com/tdcxgroup/>

This is a confidential email that may be privileged or legally protected. You 
are not authorized to copy or disclose the contents of this email. If you are 
not the intended addressee, please inform the sender and delete this email.




RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target release day for Spark3.3?

2022-01-12 Thread Crowe, John
I get that Sean, I really do,  but customers being customers, they see Log4j, 
and they panic.. I’ve been telling them since this began that Version 1x is not 
affected, but.. but..

Letting them know that 2.17.1 is on the way, IS helpful, but of course they ask 
us when is it coming?  Just trying to reduce the madness.. 

Regards;
John Crowe
TDi Technologies, Inc.
1600 10th Street Suite B
Plano, TX  75074
(800) 695-1258
supp...@tditechnologies.com<mailto:supp...@tditechnologies.com>

From: Sean Owen 
Sent: Wednesday, January 12, 2022 10:23 AM
To: Crowe, John 
Cc: user@spark.apache.org
Subject: Re: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?

Again: the CVE has no known effect on released Spark versions. Spark 3.3 will 
have log4j 2.x anyway.

On Wed, Jan 12, 2022 at 10:21 AM Crowe, John 
mailto:john.cr...@tditechnologies.com>> wrote:
I too would like to know when you anticipate Spark 3.3.0 to be released due to 
the Log4j CVE’s.
Our customers are all quite concerned.


Regards;
John Crowe
TDi Technologies, Inc.
1600 10th Street Suite B
Plano, TX  75074
(800) 695-1258
supp...@tditechnologies.com<mailto:supp...@tditechnologies.com>

From: Juan Liu mailto:liuj...@cn.ibm.com>>
Sent: Wednesday, January 12, 2022 8:50 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Cc: Theodore J Griesenbrock mailto:t...@ibm.com>>
Subject: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?

Dear Spark support,

Due to the known log4j security issue, we are required to upgrade log4j version 
to 2.17.1. Currently, we use Spark3.1.2 with default log4j 1.2.17. Also we 
found log4j configuration document here:  
https://spark.apache.org/docs/3.2.0/configuration.html#configuring-logging

Our questions:

  *   Does Spark 3.1.2 support log4j v2.17.1? how to upgrade log4j from 1.* to 
2.17.1 in Spark? would you pls help to provide guidance?
  *   If Spark 3.1.2 doesn't support log4j v2.17.1, then how about Spark 3.2? 
pls also help to provide guidance, thanks!
  *   We found Spark 3.3 will support log4j migrate from 1 to 2 in this ticket: 
https://issues.apache.org/jira/browse/SPARK-37814, also I noticed all sub-tasks 
are done except one.  it's awesome! would you pls help to advise your target 
release day? if it's in very near future, like Jan, maybe we can wait for 3.3.

BTW, as log4j issue is very popular security issue, it's better if Spark team 
could post the solution directly in security page 
(https://spark.apache.org/security.html) to benefit end user.

Anyway, thank you so much for providing such a powerful tool for us, and thanks 
for your patience to read and reply this mail. Have a good day!
Juan Liu (刘娟) PMP®
Release Management, Watson Health, China Development Lab
Email: liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>
Phone: 86-10-82452506



RE: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target release day for Spark3.3?

2022-01-12 Thread Crowe, John
I too would like to know when you anticipate Spark 3.3.0 to be released due to 
the Log4j CVE’s.
Our customers are all quite concerned.


Regards;
John Crowe
TDi Technologies, Inc.
1600 10th Street Suite B
Plano, TX  75074
(800) 695-1258
supp...@tditechnologies.com<mailto:supp...@tditechnologies.com>

From: Juan Liu 
Sent: Wednesday, January 12, 2022 8:50 AM
To: user@spark.apache.org
Cc: Theodore J Griesenbrock 
Subject: Does Spark 3.1.2/3.2 support log4j 2.17.1+, and how? your target 
release day for Spark3.3?

Dear Spark support,

Due to the known log4j security issue, we are required to upgrade log4j version 
to 2.17.1. Currently, we use Spark3.1.2 with default log4j 1.2.17. Also we 
found log4j configuration document here:  
https://spark.apache.org/docs/3.2.0/configuration.html#configuring-logging

Our questions:

  *   Does Spark 3.1.2 support log4j v2.17.1? how to upgrade log4j from 1.* to 
2.17.1 in Spark? would you pls help to provide guidance?
  *   If Spark 3.1.2 doesn't support log4j v2.17.1, then how about Spark 3.2? 
pls also help to provide guidance, thanks!
  *   We found Spark 3.3 will support log4j migrate from 1 to 2 in this ticket: 
https://issues.apache.org/jira/browse/SPARK-37814, also I noticed all sub-tasks 
are done except one.  it's awesome! would you pls help to advise your target 
release day? if it's in very near future, like Jan, maybe we can wait for 3.3.

BTW, as log4j issue is very popular security issue, it's better if Spark team 
could post the solution directly in security page 
(https://spark.apache.org/security.html) to benefit end user.

Anyway, thank you so much for providing such a powerful tool for us, and thanks 
for your patience to read and reply this mail. Have a good day!
Juan Liu (刘娟) PMP®
Release Management, Watson Health, China Development Lab
Email: liuj...@cn.ibm.com<mailto:liuj...@cn.ibm.com>
Phone: 86-10-82452506




Re: Spark on Kubernetes scheduler variety

2021-06-24 Thread John Zhuge
tch Scheduling.
>>>> <https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/volcano-integration.md>
>>>>
>>>>
>>>>
>>>> What is not very clear is the degree of progress of these projects. You
>>>> may be kind enough to elaborate on KPI for each of these projects and where
>>>> you think your contributions is going to be.
>>>>
>>>>
>>>> HTH,
>>>>
>>>>
>>>> Mich
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 18 Jun 2021 at 00:44, Holden Karau 
>>>> wrote:
>>>>
>>>>> Hi Folks,
>>>>>
>>>>> I'm continuing my adventures to make Spark on containers party and I
>>>>> was wondering if folks have experience with the different batch
>>>>> scheduler options that they prefer? I was thinking so that we can
>>>>> better support dynamic allocation it might make sense for us to
>>>>> support using different schedulers and I wanted to see if there are
>>>>> any that the community is more interested in?
>>>>>
>>>>> I know that one of the Spark on Kube operators supports
>>>>> volcano/kube-batch so I was thinking that might be a place I start
>>>>> exploring but also want to be open to other schedulers that folks
>>>>> might be interested in.
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Holden :)
>>>>>
>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>> https://amzn.to/2MaRAG9
>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>
>>>>> -
>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>
>>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>> Books (Learning Spark, High Performance Spark, etc.):
>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>
>> --
John Zhuge


Column-level encryption in Spark SQL

2020-12-18 Thread john washington
Dear Spark team members,

Can you please advise if Column-level encryption is available in Spark SQL?
I am aware that HIVE supports column level encryption.

Appreciate your response.

Thanks,
John


Re: Timestamp Difference/operations

2018-10-12 Thread John Zhuge
Yeah, operator "-" does not seem to be supported, however, you can use
"datediff" function:

In [9]: select datediff(CAST('2000-02-01 12:34:34' AS TIMESTAMP),
CAST('2000-01-01 00:00:00' AS TIMESTAMP))
Out[9]:
+--+
| datediff(CAST(CAST(2000-02-01 12:34:34 AS TIMESTAMP) AS DATE),
CAST(CAST(2000-01-01 00:00:00 AS TIMESTAMP) AS DATE)) |
+--+
| 31
   |
+--+

In [10]: select datediff('2000-02-01 12:34:34', '2000-01-01 00:00:00')
Out[10]:
++
| datediff(CAST(2000-02-01 12:34:34 AS DATE), CAST(2000-01-01 00:00:00 AS
DATE)) |
++
| 31
 |
++

In [11]: select datediff(timestamp '2000-02-01 12:34:34', timestamp
'2000-01-01 00:00:00')
Out[11]:
+--+
| datediff(CAST(TIMESTAMP('2000-02-01 12:34:34.0') AS DATE),
CAST(TIMESTAMP('2000-01-01 00:00:00.0') AS DATE)) |
+--+
| 31
   |
+--+

On Fri, Oct 12, 2018 at 7:01 AM Paras Agarwal 
wrote:

> Hello Spark Community,
>
> Currently in hive we can do operations on Timestamp Like :
> CAST('2000-01-01 12:34:34' AS TIMESTAMP) - CAST('2000-01-01 00:00:00' AS
> TIMESTAMP)
>
> Seems its not supporting in spark.
> Is there any way available.
>
> Kindly provide some insight on this.
>
>
> Paras
> 9130006036
>


-- 
John


Re: Handle BlockMissingException in pyspark

2018-08-06 Thread John Zhuge
BlockMissingException typically indicates the HDFS file is corrupted. Might
be an HDFS issue, Hadoop mailing list is a better bet:
u...@hadoop.apache.org.

Capture at the full stack trace in executor log.
If the file still exists, run `hdfs fsck -blockId blk_1233169822_159765693`
to determine whether it is corrupted.
If not corrupted, could there be excessive (thousands) current reads on the
block?
Hadoop version? Spark version?



On Mon, Aug 6, 2018 at 2:21 AM Divay Jindal 
wrote:

> Hi ,
>
> I am running pyspark in dockerized jupyter environment , I am constantly
> getting this error :
>
> ```
>
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 
> in stage 25.0 failed 1 times, most recent failure: Lost task 33.0 in stage 
> 25.0 (TID 35067, localhost, executor driver)
> : org.apache.hadoop.hdfs.BlockMissingException
> : Could not obtain block: 
> BP-1742911633-10.225.201.50-1479296658503:blk_1233169822_159765693
>
> ```
>
> Please can anyone help me with how to handle such exception in pyspark.
>
> --
> Best Regards
> *Divay Jindal*
>
>
>

-- 
John


Where can I read the Kafka offsets in SparkSQL application

2018-07-24 Thread John, Vishal (Agoda)

Hello all,


I have to read data from Kafka topic at regular intervals. I create the 
dataframe as shown below.  I don’t want to start reading from the beginning on 
each run. At the same time, I don’t want to miss the messages between run 
intervals.

val queryDf = sqlContext
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", hosts)
  .option("enable.auto.commit", true)
  .option("subscribe", topicName)
  .option("auto.commit.interval.ms", 1000)
  .option("startingOffsets", " latest")  //??  earliest OR latest
  .load()
  .selectExpr("CAST(value AS STRING) as message")

I would like to understand where the offsets will be stored, so that I can 
supply it each time the application starts. Or is there a way to supply a 
custom location where to store the offsets.
This is not a Steaming application. So, I am not sure if checkpoint directory 
is valid in this case.

Any pointers would be highly helpful.


thanks,
Vishal


This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.


Re: Is spark-env.sh sourced by Application Master and Executor for Spark on YARN?

2018-01-03 Thread John Zhuge
Sounds good.

Should we add another paragraph after this paragraph in configuration.md to
explain executor env as well? I will be happy to upload a simple patch.

Note: When running Spark on YARN in cluster mode, environment variables
> need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName]
>  property in your conf/spark-defaults.conf file. Environment variables
> that are set in spark-env.sh will not be reflected in the YARN
> Application Master process in clustermode. See the YARN-related Spark
> Properties
> <https://github.com/apache/spark/blob/master/docs/running-on-yarn.html#spark-properties>
>  for
> more information.


Something like:

Note: When running Spark on YARN, environment variables for the executors
need to be set using the spark.yarn.executorEnv.[EnvironmentVariableName]
property in your conf/spark-defaults.conf file or on the command line.
Environment variables that are set in spark-env.sh will not be reflected in
the executor process.



On Wed, Jan 3, 2018 at 7:53 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> Because spark-env.sh is something that makes sense only on the gateway
> machine (where the app is being submitted from).
>
> On Wed, Jan 3, 2018 at 6:46 PM, John Zhuge <john.zh...@gmail.com> wrote:
> > Thanks Jacek and Marcelo!
> >
> > Any reason it is not sourced? Any security consideration?
> >
> >
> > On Wed, Jan 3, 2018 at 9:59 AM, Marcelo Vanzin <van...@cloudera.com>
> wrote:
> >>
> >> On Tue, Jan 2, 2018 at 10:57 PM, John Zhuge <jzh...@apache.org> wrote:
> >> > I am running Spark 2.0.0 and 2.1.1 on YARN in a Hadoop 2.7.3 cluster.
> Is
> >> > spark-env.sh sourced when starting the Spark AM container or the
> >> > executor
> >> > container?
> >>
> >> No, it's not.
> >>
> >> --
> >> Marcelo
> >
> >
> >
> >
> > --
> > John
>
>
>
> --
> Marcelo
>



-- 
John


Re: Is spark-env.sh sourced by Application Master and Executor for Spark on YARN?

2018-01-03 Thread John Zhuge
Thanks Jacek and Marcelo!

Any reason it is not sourced? Any security consideration?


On Wed, Jan 3, 2018 at 9:59 AM, Marcelo Vanzin <van...@cloudera.com> wrote:

> On Tue, Jan 2, 2018 at 10:57 PM, John Zhuge <jzh...@apache.org> wrote:
> > I am running Spark 2.0.0 and 2.1.1 on YARN in a Hadoop 2.7.3 cluster. Is
> > spark-env.sh sourced when starting the Spark AM container or the executor
> > container?
>
> No, it's not.
>
> --
> Marcelo
>



-- 
John


Is spark-env.sh sourced by Application Master and Executor for Spark on YARN?

2018-01-02 Thread John Zhuge
Hi,

I am running Spark 2.0.0 and 2.1.1 on YARN in a Hadoop 2.7.3 cluster. Is
spark-env.sh sourced when starting the Spark AM container or the executor
container?

Saw this paragraph on
https://github.com/apache/spark/blob/master/docs/configuration.md:

Note: When running Spark on YARN in cluster mode, environment variables
> need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName] 
> property
> in your conf/spark-defaults.conf file. Environment variables that are set
> in spark-env.sh will not be reflected in the YARN Application Master
> process in clustermode. See the YARN-related Spark Properties
> <https://github.com/apache/spark/blob/master/docs/running-on-yarn.html#spark-properties>
>  for
> more information.


Does it mean spark-env.sh will not be sourced when starting AM in cluster
mode?
Does this paragraph appy to executor as well?

Thanks,
-- 
John Zhuge


Cases when to clear the checkpoint directories.

2017-10-07 Thread John, Vishal (Agoda)


Hello TD,

You had replied to one of the questions about checkpointing –

This is an unfortunate design on my part when I was building DStreams :)

Fortunately, we learnt from our mistakes and built Structured Streaming the 
correct way. Checkpointing in Structured Streaming stores only the progress 
information (offsets, etc.), and the user can change their application code 
(within certain constraints, of course) and still restart from checkpoints 
(unlike DStreams). If you are just building out your streaming applications, 
then I highly recommend you to try out Structured Streaming instead of DStreams 
(which is effectively in maintenance mode).

Can you please elaborate on what you mean by application code change in DStream 
applications?

If I add a couple of println statements in my application code will that become 
an application code change? or do you mean, changing method signatures or 
adding new methods etc.
Could you please point to relevant source code in Spark, which does this type 
of code validation/de-serialisation in case of DStreams?

We are using mapWithState in our application and it builds its state from 
checkpointed RDDs.  I would like understand the cases where we can avoid 
clearing the checkpoint directories.


thanks in advance,
Vishal



This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.


Re: Logging in RDD mapToPair of Java Spark application

2017-07-30 Thread John Zeng
Hi, Ayan,


Thanks for the suggestion.  I did that and got following weird message even I 
enabled the log aggregation:


[root@john1 conf]# yarn logs -applicationId application_1501197841826_0013
17/07/30 16:45:06 INFO client.RMProxy: Connecting to ResourceManager at 
john1.dg/192.168.6.90:8032
/tmp/logs/root/logs/application_1501197841826_0013does not exist.
Log aggregation has not completed or is not enabled.

Any other way to see my logs?

Thanks

John





From: ayan guha <guha.a...@gmail.com>
Sent: Sunday, July 30, 2017 10:34 PM
To: John Zeng; Riccardo Ferrari
Cc: User
Subject: Re: Logging in RDD mapToPair of Java Spark application

Hi

As you are using yarn log aggregation, yarn moves all the logs to hdfs after 
the application completes.

You can use following command to get the logs:
yarn logs -applicationId 



On Mon, 31 Jul 2017 at 3:17 am, John Zeng 
<johnz...@hotmail.com<mailto:johnz...@hotmail.com>> wrote:

Thanks Riccardo for the valuable info.


Following your guidance, I looked at the Spark UI and figured out the default 
logs location for executors is 'yarn/container-logs'.  I ran my Spark app again 
and I can see a new folder was created for it:


[root@john2 application_1501197841826_0013]# ls -l
total 24
drwx--x--- 2 yarn yarn 4096 Jul 30 10:07 container_1501197841826_0013_01_01
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_01_02
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_01_03
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_02_01
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_02_02
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_02_03

But when I tried to look into each its content, it was gone and there was not 
file at all from the same place:

[root@john2 application_1501197841826_0013]# vi container_1501197841826_0013_*
[root@john2 application_1501197841826_0013]# ls -l
total 0
[root@john2 application_1501197841826_0013]# pwd
/yarn/container-logs/application_1501197841826_0013

I believe Spark moves these logs to a different place.  But where are they?

Thanks

John





From: Riccardo Ferrari <ferra...@gmail.com<mailto:ferra...@gmail.com>>
Sent: Saturday, July 29, 2017 8:18 PM
To: johnzengspark
Cc: User
Subject: Re: Logging in RDD mapToPair of Java Spark application

Hi John,

The reason you don't see the second sysout line is because is executed on a 
different JVM (ie. Driver vs Executor). the second sysout line should be 
available through the executor logs. Check the Executors tab.

There are alternative approaches to manage log centralization however it really 
depends on what are your requirements.

Hope it helps,

On Sat, Jul 29, 2017 at 8:09 PM, johnzengspark 
<johnz...@hotmail.com<mailto:johnz...@hotmail.com>> wrote:
Hi, All,

Although there are lots of discussions related to logging in this news
group, I did not find an answer to my specific question so I am posting mine
with the hope that this will not cause a duplicated question.

Here is my simplified Java testing Spark app:

public class SparkJobEntry {
public static void main(String[] args) {
// Following line is in stdout from JobTracker UI
System.out.println("argc=" + args.length);

SparkConf conf = new SparkConf().setAppName("TestSparkApp");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD fileRDD = sc.textFile(args[0]);

fileRDD.mapToPair(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, String> call(String input) throws 
Exception {
// Following line is not in stdout from 
JobTracker UI
System.out.println("This line should be printed 
in stdout");
// Other code removed from here to make things 
simple
return new Tuple2<String, String>("1", "Testing 
data");
}}).saveAsTextFile(args[0] + ".results");
}
}

What I expected from JobTracker UI is to see both stdout lines: first line
is "argc=2" and second line is "This line should be printed in stdout".  But
I only see the first line which is outside of the 'mapToPair'.  I actually
have verified my 'mapToPair' is called and the statements after the second
logging line were executed.  The only issue for me is why the second logging
is not in JobTracker UI.

Appreciate your help.

Thanks

John



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble

Re: Logging in RDD mapToPair of Java Spark application

2017-07-30 Thread John Zeng
Thanks Riccardo for the valuable info.


Following your guidance, I looked at the Spark UI and figured out the default 
logs location for executors is 'yarn/container-logs'.  I ran my Spark app again 
and I can see a new folder was created for it:


[root@john2 application_1501197841826_0013]# ls -l
total 24
drwx--x--- 2 yarn yarn 4096 Jul 30 10:07 container_1501197841826_0013_01_01
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_01_02
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_01_03
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_02_01
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_02_02
drwx--x--- 2 yarn yarn 4096 Jul 30 10:08 container_1501197841826_0013_02_03

But when I tried to look into each its content, it was gone and there was not 
file at all from the same place:

[root@john2 application_1501197841826_0013]# vi container_1501197841826_0013_*
[root@john2 application_1501197841826_0013]# ls -l
total 0
[root@john2 application_1501197841826_0013]# pwd
/yarn/container-logs/application_1501197841826_0013

I believe Spark moves these logs to a different place.  But where are they?

Thanks

John





From: Riccardo Ferrari <ferra...@gmail.com>
Sent: Saturday, July 29, 2017 8:18 PM
To: johnzengspark
Cc: User
Subject: Re: Logging in RDD mapToPair of Java Spark application

Hi John,

The reason you don't see the second sysout line is because is executed on a 
different JVM (ie. Driver vs Executor). the second sysout line should be 
available through the executor logs. Check the Executors tab.

There are alternative approaches to manage log centralization however it really 
depends on what are your requirements.

Hope it helps,

On Sat, Jul 29, 2017 at 8:09 PM, johnzengspark 
<johnz...@hotmail.com<mailto:johnz...@hotmail.com>> wrote:
Hi, All,

Although there are lots of discussions related to logging in this news
group, I did not find an answer to my specific question so I am posting mine
with the hope that this will not cause a duplicated question.

Here is my simplified Java testing Spark app:

public class SparkJobEntry {
public static void main(String[] args) {
// Following line is in stdout from JobTracker UI
System.out.println("argc=" + args.length);

SparkConf conf = new SparkConf().setAppName("TestSparkApp");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD fileRDD = sc.textFile(args[0]);

fileRDD.mapToPair(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, String> call(String input) throws 
Exception {
// Following line is not in stdout from 
JobTracker UI
System.out.println("This line should be printed 
in stdout");
// Other code removed from here to make things 
simple
return new Tuple2<String, String>("1", "Testing 
data");
}}).saveAsTextFile(args[0] + ".results");
}
}

What I expected from JobTracker UI is to see both stdout lines: first line
is "argc=2" and second line is "This line should be printed in stdout".  But
I only see the first line which is outside of the 'mapToPair'.  I actually
have verified my 'mapToPair' is called and the statements after the second
logging line were executed.  The only issue for me is why the second logging
is not in JobTracker UI.

Appreciate your help.

Thanks

John



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Logging-in-RDD-mapToPair-of-Java-Spark-application-tp29007.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




Spark on yarn logging

2017-06-29 Thread John Vines
I followed the instructions for configuring a custom logger per
https://spark.apache.org/docs/2.0.2/running-on-yarn.html (because we have
long running spark jobs, sometimes occasionally get stuck and without a
rolling file appender will fill up disk). This seems to work well for us,
but it breaks the web-ui because it only has links for stderr/stdout.

I can take that url and manually change it, but I'm wondering if there's a
way to configure the spark-ui to look for files of a specific format so
that way no manual url manipulation is necessary to view the logs.

Thanks


PySpark 2.1.1 Can't Save Model - Permission Denied

2017-06-27 Thread John Omernik
Hello all, I am running PySpark 2.1.1 as a user, jomernik. I am working
through some documentation here:

https://spark.apache.org/docs/latest/mllib-ensembles.html#random-forests

And was working on the Random Forest Classification, and found it to be
working!  That said, when I try to save the model to my hdfs (MaprFS in my
case)  I got a weird error:

I tried to save here:

model.save(sc,
"maprfs:///user/jomernik/tmp/myRandomForestClassificationModel")

/user/jomernik is my user directory and I have full access to the
directory.



All the directories down to

/user/jomernik/tmp/myRandomForestClassificationModel/metadata/_temporary/0
are owned by my with full permissions, but when I get to this directory,
here is the ls

$ ls -ls

total 1

1 drwxr-xr-x 2 root root 1 Jun 27 07:38 task_20170627123834_0019_m_00

0 drwxr-xr-x 2 root root 0 Jun 27 07:38 _temporary

Am I doing something wrong here? Why is the temp stuff owned by root? Is
there a bug in saving things due to this ownership?

John






Exception:
Py4JJavaError: An error occurred while calling o338.save.
: org.apache.hadoop.security.AccessControlException: User jomernik(user id
101) does has been denied access to rename
 
/user/jomernik/tmp/myRandomForestClassificationModel/metadata/_temporary/0/task_20170627123834_0019_m_00/part-0
to /user/jomernik/tmp/myRandomForestClassificationModel/metadata/part-0
at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:1112)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:461)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:475)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:392)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:364)
at
org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:111)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1227)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1168)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1071)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1037)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:963)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
at
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468)
at
org.apache.spark.mllib.tree.model.TreeEnsembleModel$SaveLoadV1_0$.save(treeEnsembleModels.scala:440)
at
org.apache.spark.mllib.tree.model.RandomForestModel.save(treeEnsembleModels.scala:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoke

Re: [MLLib]: Executor OutOfMemory in BlockMatrix Multiplication

2017-06-14 Thread John Compitello
No problem. It was a big headache for my team as well. One of us already 
reimplemented it from scratch, as seen in this pending PR for our project. 
https://github.com/hail-is/hail/pull/1895

Hopefully you find that useful. We'll hopefully try to PR that into Spark at 
some point. 

Best,

John

Sent from my iPhone

> On Jun 14, 2017, at 8:28 PM, Anthony Thomas <ahtho...@eng.ucsd.edu> wrote:
> 
> Interesting, thanks! That probably also explains why there seems to be a ton 
> of shuffle for this operation. So what's the best option for truly scalable 
> matrix multiplication on Spark then - implementing from scratch using the 
> coordinate matrix ((i,j), k) format?
> 
>> On Wed, Jun 14, 2017 at 4:29 PM, John Compitello <jo...@broadinstitute.org> 
>> wrote:
>> Hey Anthony,
>> 
>> You're the first person besides myself I've seen mention this. BlockMatrix 
>> multiply is not the best method. As far as me and my team can tell, the 
>> memory problem stems from the fact that when Spark tries to compute block 
>> (i, j) of the matrix, it tries to manifest all of row i from matrix 1 and 
>> all of column j from matrix 2 in memory at once on one executor. Then after 
>> doing that, it proceeds to combine them with a functional reduce, creating 
>> one additional block for each pair. So you end up manifesting 3n + logn 
>> matrix blocks in memory at once, which is why it sucks so much. 
>> 
>> Sent from my iPhone
>> 
>>> On Jun 14, 2017, at 7:07 PM, Anthony Thomas <ahtho...@eng.ucsd.edu> wrote:
>>> 
>>> I've been experimenting with MlLib's BlockMatrix for distributed matrix 
>>> multiplication but consistently run into problems with executors being 
>>> killed due to memory constrains. The linked gist (here) has a short example 
>>> of multiplying a 25,000 x 25,000 square matrix taking approximately 5G of 
>>> disk with a vector (also stored as a BlockMatrix). I am running this on a 3 
>>> node (1 master + 2 workers) cluster on Amazon EMR using the m4.xlarge 
>>> instance type. Each instance has 16GB of RAM and 4 CPU. The gist has 
>>> detailed information about the Spark environment.
>>> 
>>> I have tried reducing the block size of the matrix, increasing the number 
>>> of partitions in the underlying RDD, increasing defaultParallelism and 
>>> increasing spark.yarn.executor.memoryOverhead (up to 3GB) - all without 
>>> success. The input matrix should fit comfortably in distributed memory and 
>>> the resulting matrix should be quite small (25,000 x 1) so I'm confused as 
>>> to why Spark seems to want so much memory for this operation, and why Spark 
>>> isn't spilling to disk here if it wants more memory. The job does 
>>> eventually complete successfully, but for larger matrices stages have to be 
>>> repeated several times which leads to long run times. I don't encounter any 
>>> issues if I reduce the matrix size down to about 3GB. Can anyone with 
>>> experience using MLLib's matrix operators provide any suggestions about 
>>> what settings to look at, or what the hard constraints on memory for 
>>> BlockMatrix multiplication are?
>>> 
>>> Thanks,
>>> 
>>> Anthony
> 


Re: [MLLib]: Executor OutOfMemory in BlockMatrix Multiplication

2017-06-14 Thread John Compitello
Hey Anthony,

You're the first person besides myself I've seen mention this. BlockMatrix 
multiply is not the best method. As far as me and my team can tell, the memory 
problem stems from the fact that when Spark tries to compute block (i, j) of 
the matrix, it tries to manifest all of row i from matrix 1 and all of column j 
from matrix 2 in memory at once on one executor. Then after doing that, it 
proceeds to combine them with a functional reduce, creating one additional 
block for each pair. So you end up manifesting 3n + logn matrix blocks in 
memory at once, which is why it sucks so much. 

Sent from my iPhone

> On Jun 14, 2017, at 7:07 PM, Anthony Thomas  wrote:
> 
> I've been experimenting with MlLib's BlockMatrix for distributed matrix 
> multiplication but consistently run into problems with executors being killed 
> due to memory constrains. The linked gist (here) has a short example of 
> multiplying a 25,000 x 25,000 square matrix taking approximately 5G of disk 
> with a vector (also stored as a BlockMatrix). I am running this on a 3 node 
> (1 master + 2 workers) cluster on Amazon EMR using the m4.xlarge instance 
> type. Each instance has 16GB of RAM and 4 CPU. The gist has detailed 
> information about the Spark environment.
> 
> I have tried reducing the block size of the matrix, increasing the number of 
> partitions in the underlying RDD, increasing defaultParallelism and 
> increasing spark.yarn.executor.memoryOverhead (up to 3GB) - all without 
> success. The input matrix should fit comfortably in distributed memory and 
> the resulting matrix should be quite small (25,000 x 1) so I'm confused as to 
> why Spark seems to want so much memory for this operation, and why Spark 
> isn't spilling to disk here if it wants more memory. The job does eventually 
> complete successfully, but for larger matrices stages have to be repeated 
> several times which leads to long run times. I don't encounter any issues if 
> I reduce the matrix size down to about 3GB. Can anyone with experience using 
> MLLib's matrix operators provide any suggestions about what settings to look 
> at, or what the hard constraints on memory for BlockMatrix multiplication are?
> 
> Thanks,
> 
> Anthony


Re: Performance issue when running Spark-1.6.1 in yarn-client mode with Hadoop 2.6.0

2017-06-08 Thread Satish John Bosco
I have tried the configuration calculator sheet provided by Cloudera as
well but no improvements. However, ignoring the 17 mil operation to begin
with.

Let consider the simple sort on yarn and spark which has tremendous
difference.

The operation is simple Selected numeric col to be sorted ascending and
below is the results.

> 136 seconds - Yarn-client mode
> 40 seconds  - Spark Standalone mode

Can you guide me on having a simple yarn-site.xml configuration that should
be the bare minimum for the below hardware at least. So that I can see if I
am missing or overlooked any key configurations . Also if running in spark
Standalone mode the configuration of spark-env.sh and spark-defaults as to
how many instances to choose with memory and cores.

32GB RAM 8 Cores (16) and 1 TB HDD  3 (1 Master and 2 Workers)

Finally this key is mystifying as to why it created such performance
difference in spark 1.6.1 is not understood spark.sql.
autoBroadcastJoinThreshold::-1.





On Wed, Jun 7, 2017 at 11:16 AM, Jörn Franke  wrote:

> What does your Spark job do? Have you tried standard configurations and
> changing them gradually?
>
> Have you checked the logfiles/ui which tasks  take long?
>
> 17 Mio records does not sound much, but it depends what you do with it.
>
> I do not think that for such a small "cluster" it makes sense to have a
> special scheduling configuration.
>
> > On 6. Jun 2017, at 18:02, satishjohn  wrote:
> >
> > Performance issue / time taken to complete spark job in yarn is 4 x
> slower,
> > when considered spark standalone mode. However, in spark standalone mode
> > jobs often fails with executor lost issue.
> >
> > Hardware configuration
> >
> >
> > 32GB RAM 8 Cores (16) and 1 TB HDD  3 (1 Master and 2 Workers)
> >
> > Spark configuration:
> >
> >
> > spark.executor.memory 7g
> > Spark cores Max 96
> > Spark driver 5GB
> > spark.sql.autoBroadcastJoinThreshold::-1 (Without this key the job
> fails or
> > job takes 50x times more time)
> > spark.driver.maxResultSize::2g
> > spark.driver.memory::5g
> > No of Instances 4 per machine.
> >
> > With the above spark configuration the spark job for the business flow
> of 17
> > million records completes in 8 Minutes.
> >
> > Problem Area:
> >
> >
> > When run in yarn client mode with the below configuration which takes 33
> to
> > 42 minutes to run the same flow. Below is the yarn-site.xml configuration
> > data.
> >
> > 
> >  yarn.label.enabledtrue
> >
> > yarn.log-aggregation.enable-local-
> cleanupfalse
> >
> > yarn.resourcemanager.scheduler.
> client.thread-count64
> >
> > yarn.resourcemanager.resource-
> tracker.addresssatish-NS1:8031
> >
> > yarn.resourcemanager.scheduler.
> addresssatish-NS1:8030
> >
> > yarn.dispatcher.exit-on-error name>true
> >
> > yarn.nodemanager.container-manager.
> thread-count64
> >
> > yarn.nodemanager.local-dirs<
> value>/home/satish/yarn
> >
> > yarn.nodemanager.localizer.fetch.
> thread-count20
> >
> > yarn.resourcemanager.address
> satish-NS1:8032
> >
> > yarn.scheduler.increment-allocation-mb
> 512
> >
> > yarn.log.server.urlhttp:/
> /satish-NS1:19888/jobhistory/logs
> >
> > yarn.nodemanager.resource.memory-
> mb28000
> >
> > yarn.nodemanager.labelsMASTER property>
> >
> > yarn.nodemanager.resource.cpu-
> vcores48
> >
> > yarn.scheduler.minimum-allocation-
> mb1024
> >
> > yarn.log-aggregation-enable<
> value>true
> >
> > yarn.nodemanager.localizer.client.
> thread-count20
> >
> > yarn.app.mapreduce.am.labels<
> value>CORE
> >
> > yarn.log-aggregation.retain-seconds name>172800
> >
> > yarn.nodemanager.address<
> value>${yarn.nodemanager.hostname}:8041
> >
> > yarn.resourcemanager.hostname name>satish-NS1
> >
> > yarn.scheduler.maximum-allocation-
> mb8192
> >
> > yarn.nodemanager.remote-app-log-
> dir/home/satish/satish/hadoop-yarn/apps
> >
> > yarn.resourcemanager.resource-
> tracker.client.thread-count64
> >
> > yarn.scheduler.maximum-allocation-
> vcores1
> >
> > yarn.nodemanager.aux-services name>mapreduce_shuffle,
> >
> > yarn.nodemanager.aux-services.
> mapreduce_shuffle.classorg.apache.hadoop.
> mapred.ShuffleHandler
> >
> > yarn.resourcemanager.client.thread-
> count64
> >
> > yarn.nodemanager.container-metrics.
> enabletrue
> >
> > yarn.nodemanager.log-dirs<
> value>/home/satish/hadoop-yarn/containers
> >   yarn.nodemanager.aux-services
> > spark_shuffle,mapreduce_shuffle
> > 
> > yarn.nodemanager.aux-services.mapreduce.shuffle.class
> > org.apache.hadoop.mapred.ShuffleHandler
> >  yarn.nodemanager.aux-services.
> spark_shuffle.class
> > org.apache.spark.network.yarn.YarnShuffleService property>
> >
> > yarn.scheduler.minimum-allocation-
> vcores1
> >  yarn.scheduler.increment-allocation-vcores
> > 1
> >  yarn.resourcemanager.scheduler.class
> > org.apache.hadoop.yarn.server.resourcemanager.
> scheduler.fair.FairScheduler
> > yarn.scheduler.fair.preemption name>true
> >
> > 
> >
> > Also in capacity scheduler I am using Dominant resource calculator. 

Re: An Architecture question on the use of virtualised clusters

2017-06-05 Thread John Leach
Mich,

Yes, Isilon is in production...

Isilon is a serious product and has been around for quite a while.  For 
on-premise external storage, we see it quite a bit.  Separating the compute 
from the storage actually helps.  It is also a nice transition to the cloud 
providers.  

Have you looked at MapR?  Usually the system guys target snapshots, volumes, 
and posix compliance if they are bought into Isilon.  

Good luck Mich.

Regards,
John Leach




> On Jun 5, 2017, at 9:27 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> Hi John,
> 
> Thanks. Did you end up in production or in other words besides PoC did you 
> use it in anger?
> 
> The intention is to build Isilon on top of the whole HDFS cluster!. If we go 
> that way we also need to adopt it for DR as well.
> 
> Cheers
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 5 June 2017 at 15:19, John Leach <jle...@splicemachine.com 
> <mailto:jle...@splicemachine.com>> wrote:
> Mich,
> 
> We used Isilon for a POC of Splice Machine (Spark for Analytics, HBase for 
> real-time).  We were concerned initially and the initial setup took a bit 
> longer than excepted, but it performed well on both low latency and high 
> throughput use cases at scale (our POC ~ 100 TB).  
> 
> Just a data point.
> 
> Regards,
> John Leach
> 
>> On Jun 5, 2017, at 9:11 AM, Mich Talebzadeh <mich.talebza...@gmail.com 
>> <mailto:mich.talebza...@gmail.com>> wrote:
>> 
>> I am concerned about the use case of tools like Isilon or Panasas to create 
>> a layer on top of HDFS, essentially a HCFS on top of HDFS with the usual 3x 
>> replication gone into the tool itself.
>> 
>> There is interest to push Isilon  as a the solution forward but my caution 
>> is about scalability and future proof of such tools. So I was wondering if 
>> anyone else has tried such solution.
>> 
>> Thanks
>>  
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> On 2 June 2017 at 19:09, Gene Pang <gene.p...@gmail.com 
>> <mailto:gene.p...@gmail.com>> wrote:
>> As Vincent mentioned earlier, I think Alluxio can work for this. You can 
>> mount your (potentially remote) storage systems to Alluxio 
>> <http://www.alluxio.org/docs/master/en/Unified-and-Transparent-Namespace.html>,
>>  and deploy Alluxio co-located to the compute cluster. The computation 
>> framework will still achieve data locality since Alluxio workers are 
>> co-located, even though the existing storage systems may be remote. You can 
>> also use tiered storage 
>> <http://www.alluxio.org/docs/master/en/Tiered-Storage-on-Alluxio.html> to 
>> deploy using only memory, and/or other physical media.
>> 
>> Here are some blogs (Alluxio with Minio 
>> <https://www.alluxio.com/blog/scalable-genomics-data-processing-pipeline-with-alluxio-mesos-and-minio>,
>>  Alluxio with HDFS 
>> <https://www.alluxio.com/blog/qunar-performs-real-time-data-analytics-up-to-300x-faster-with-alluxio>,
>>  Alluxio with S3 
>> <https://www.alluxio.com/blog/accelerating-on-demand-data-analytics-with-alluxio>)
>>  which use similar architecture.
>> 
>> Hope that helps,
>> Gene
>> 
>> On Thu, Jun 1, 2017 at 1:45 AM, Mich Talebzadeh <mich.talebza...@gmail.com 
>> <mailto:mich.talebza...@gmail.com>>

Re: An Architecture question on the use of virtualised clusters

2017-06-05 Thread John Leach
Mich,

We used Isilon for a POC of Splice Machine (Spark for Analytics, HBase for 
real-time).  We were concerned initially and the initial setup took a bit 
longer than excepted, but it performed well on both low latency and high 
throughput use cases at scale (our POC ~ 100 TB).  

Just a data point.

Regards,
John Leach

> On Jun 5, 2017, at 9:11 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> I am concerned about the use case of tools like Isilon or Panasas to create a 
> layer on top of HDFS, essentially a HCFS on top of HDFS with the usual 3x 
> replication gone into the tool itself.
> 
> There is interest to push Isilon  as a the solution forward but my caution is 
> about scalability and future proof of such tools. So I was wondering if 
> anyone else has tried such solution.
> 
> Thanks
>  
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 2 June 2017 at 19:09, Gene Pang <gene.p...@gmail.com 
> <mailto:gene.p...@gmail.com>> wrote:
> As Vincent mentioned earlier, I think Alluxio can work for this. You can 
> mount your (potentially remote) storage systems to Alluxio 
> <http://www.alluxio.org/docs/master/en/Unified-and-Transparent-Namespace.html>,
>  and deploy Alluxio co-located to the compute cluster. The computation 
> framework will still achieve data locality since Alluxio workers are 
> co-located, even though the existing storage systems may be remote. You can 
> also use tiered storage 
> <http://www.alluxio.org/docs/master/en/Tiered-Storage-on-Alluxio.html> to 
> deploy using only memory, and/or other physical media.
> 
> Here are some blogs (Alluxio with Minio 
> <https://www.alluxio.com/blog/scalable-genomics-data-processing-pipeline-with-alluxio-mesos-and-minio>,
>  Alluxio with HDFS 
> <https://www.alluxio.com/blog/qunar-performs-real-time-data-analytics-up-to-300x-faster-with-alluxio>,
>  Alluxio with S3 
> <https://www.alluxio.com/blog/accelerating-on-demand-data-analytics-with-alluxio>)
>  which use similar architecture.
> 
> Hope that helps,
> Gene
> 
> On Thu, Jun 1, 2017 at 1:45 AM, Mich Talebzadeh <mich.talebza...@gmail.com 
> <mailto:mich.talebza...@gmail.com>> wrote:
> As a matter of interest what is the best way of creating virtualised clusters 
> all pointing to the same physical data?
> 
> thanks
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 1 June 2017 at 09:27, vincent gromakowski <vincent.gromakow...@gmail.com 
> <mailto:vincent.gromakow...@gmail.com>> wrote:
> If mandatory, you can use a local cache like alluxio
> 
> Le 1 juin 2017 10:23 AM, "Mich Talebzadeh" <mich.talebza...@gmail.com 
> <mailto:mich.talebza...@gmail.com>> a écrit :
> Thanks Vincent. I assume by physical data locality you mean you are going 
> through Isilon and HCFS and not through direct HDFS.
> 
> Also I agree with you that shared network could be an issue as well. However, 
> it allows you to reduce data redundancy (you do not need R3 in HDFS anymore) 
> and also you can build virtual clusters on the same data. One cluster for 
> read/writes and another for Reads? That is what has been suggestes!.
> 
> regards
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://ta

Re: Impact of coalesce operation before writing dataframe

2017-05-23 Thread John Compitello
Spark is doing operations on each partition in parallel. If you decrease number 
of partitions, you’re potentially doing less work in parallel depending on your 
cluster setup. 

> On May 23, 2017, at 4:23 PM, Andrii Biletskyi 
>  wrote:
> 
>  
> No, I didn't try to use repartition, how exactly it impacts the parallelism?
> In my understanding coalesce simply "unions" multiple partitions located on 
> same executor "one on on top of the other", while repartition does hash-based 
> shuffle decreasing the number of output partitions. So how this exactly 
> affects the parallelism, which stage of the job?
> 
> Thanks,
> Andrii
> 
> 
> 
> On Tuesday, May 23, 2017 10:20 PM, Michael Armbrust  
> wrote:
> 
> 
> coalesce is nice because it does not shuffle, but the consequence of avoiding 
> a shuffle is it will also reduce parallelism of the preceding computation.  
> Have you tried using repartition instead?
> 
> On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi 
>  > wrote:
> Hi all,
> 
> I'm trying to understand the impact of coalesce operation on spark job 
> performance.
> 
> As a side note: were are using emrfs (i.e. aws s3) as source and a target for 
> the job.
> 
> Omitting unnecessary details job can be explained as: join 200M records 
> Dataframe stored in orc format on emrfs with another 200M records cached 
> Dataframe, the result of the join put back to emrfs. First DF is a set of 
> wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark 
> shows 20 GB).
> 
> I have enough resources in my cluster to perform the job but I don't like the 
> fact that output datasource contains 200 part orc files (as 
> spark.sql.shuffle. partitions defaults to 200) so before saving orc to emrfs 
> I'm doing .coalesce(10). From documentation coalesce looks like a quite 
> harmless operations: no repartitioning etc.
> 
> But with such setup my job fails to write dataset on the last stage. Right 
> now the error is OOM: GC overhead. When I change  .coalesce(10) to 
> .coalesce(100) the job runs much faster and finishes without errors.
> 
> So what's the impact of .coalesce in this case? And how to do in place 
> concatenation of files (not involving hive) to end up with smaller amount of 
> bigger files, as with .coalesce(100) job generates 100 orc snappy encoded 
> files ~300MB each.
> 
> Thanks,
> Andrii
> 
> 
> 



Matrix multiplication and cluster / partition / blocks configuration

2017-05-11 Thread John Compitello
Hey all, 

I’ve found myself in a position where I need to do a relatively large matrix 
multiply (at least, compared to what I normally have to do). I’m looking to 
multiply a 100k by 500k dense matrix by its transpose to yield 100k by 100k 
matrix. I’m trying to do this on Google Cloud, so I don’t have any real limits 
on cluster size or memory. However, I have no idea where to begin as far as 
number of cores / number of partitions / how big to make the block size for 
best performance. Is there anywhere where Spark users collect optimal 
configurations for methods relative to data input size? Does anyone have any 
suggestions? I’ve tried throwing 900 cores at a 100k by 100k matrix multiply 
with 1000 by 1000 sized blocks, and that seemed to hang forever and eventually 
fail. 

Thanks ,

John
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ANNOUNCE] Apache Gora 0.7 Release

2017-03-23 Thread lewis john mcgibbney
Hi Folks,

The Apache Gora team are pleased to announce the immediate availability of
Apache Gora 0.7.
The Apache Gora open source framework provides an in-memory data model and
persistence for big data. Gora supports persisting to column stores, key
value stores, document stores and RDBMSs, and analyzing the data with
extensive Apache Hadoop™ MapReduce support.

The Gora DOAP can be found at http://gora.apache.org/current/doap_Gora.rdf

This release addresses 80 issues, for a breakdown please see the release
report . Drop by our mailing lists and ask
questions for information on any of the above.

Gora 0.7 provides support for the following projects

   - Apache Avro  1.8.1
   - Apache Hadoop  2.5.2
   - Apache HBase  1.2.3
   - Apache Cassandra  2.0.2
   - Apache Solr  5.5.1
   - MongoDB  (driver) 3.4.2
   - Apache Accumlo  1.7.1
   - Apache Spark  1.4.1
   - Apache CouchDB  1.4.2 (test containers
    1.1.0)
   - Amazon DynamoDB  (driver) 1.10.55
   - Infinispan  7.2.5.Final
   - JCache  1.0.0 with Hazelcast
    3.6.4 support.

Gora is released as both source code, downloads for which can be found at
our downloads page , as well as
Maven artifacts which can be found on Maven central
.
Thanks


-- 
http://home.apache.org/~lewismc/
@hectorMcSpector
http://www.linkedin.com/in/lmcgibbney


Re: DataFrame from in memory datasets in multiple JVMs

2017-02-28 Thread John Desuvio
Since the data is in multiple JVMs, only 1 of them can be the driver.   So
I can parallelize the data from 1 of the VMs but don't have a way to do the
same for the others.   Or am I missing something?

On Tue, Feb 28, 2017 at 3:53 PM, ayan guha <guha.a...@gmail.com> wrote:

> How about parallelize and then union all of them to one data frame?
>
> On Wed, 1 Mar 2017 at 3:07 am, Sean Owen <so...@cloudera.com> wrote:
>
>> Broadcasts let you send one copy of read only data to each executor.
>> That's not the same as a DataFrame and itseems nature means it doesnt make
>> sense to think of them as not distributed. But consider things like
>> broadcast hash joins which may be what you are looking for if you really
>> mean to join on a small DF efficiently.
>>
>> On Tue, Feb 28, 2017, 16:03 johndesuv <desu...@gmail.com> wrote:
>>
>> Hi,
>>
>> I have an application that runs on a series of JVMs that each contain a
>> subset of a large dataset in memory.  I'd like to use this data in spark
>> and
>> am looking at ways to use this as a data source in spark without writing
>> the
>> data to disk as a handoff.
>>
>> Parallelize doesn't work for me since I need to use the data across all
>> the
>> JVMs as one DataFrame.
>>
>> The only option I've come up with so far is to write a custom DataSource
>> that then transmits the data from each of the JVMs over the network.  This
>> seems like overkill though.
>>
>> Is there a simpler solution for getting this data into a DataFrame?
>>
>> Thanks,
>> John
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/DataFrame-from-in-memory-datasets-in-multiple-JVMs-
>> tp28438.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Best Regards,
> Ayan Guha
>


回复:Driver hung and happend out of memory while writing to console progress bar

2017-02-09 Thread John Fang
the spark version is 2.1.0
--发件人:方孝健(玄弟) 
发送时间:2017年2月10日(星期五) 12:35收件人:spark-dev 
; spark-user 主 题:Driver hung and 
happend out of memory while writing to console progress bar
[Stage 172:==> (10328 + 93) / 
16144][Stage 172:==> (10329 + 93) / 
16144][Stage 172:==> (10330 + 93) / 
16144][Stage 172:==> (10331 + 93) / 
16144][Stage 172:==> (10333 + 92) / 
16144][Stage 172:==> (10333 + 93) / 
16144][Stage 172:==> (10333 + 94) / 
16144][Stage 172:==> (10334 + 94) / 
16144][Stage 172:==> (10338 + 93) / 
16144][Stage 172:==> (10339 + 92) / 
16144][Stage 172:==> (10340 + 93) / 
16144][Stage 172:==> (10341 + 92) / 
16144][Stage 172:==> (10341 + 93) / 
16144][Stage 172:==> (10342 + 93) / 
16144][Stage 172:==> (10343 + 93) / 
16144][Stage 172:==> (10344 + 92) / 
16144][Stage 172:==> (10345 + 92) / 
16144][Stage 172:==> (10345 + 93) / 
16144][Stage 172:==> (10346 + 93) / 
16144][Stage 172:==> (10348 + 92) / 
16144][Stage 172:==> (10348 + 93) / 
16144][Stage 172:==> (10349 + 92) / 
16144][Stage 172:==> (10349 + 93) / 
16144][Stage 172:==> (10350 + 92) / 
16144][Stage 172:==> (10352 + 92) / 
16144][Stage 172:==> (10353 + 92) / 
16144][Stage 172:==> (10354 + 92) / 
16144][Stage 172:==> (10355 + 92) / 
16144][Stage 172:==> (10356 + 92) / 
16144][Stage 172:==> (10356 + 93) / 
16144][Stage 172:==> (10357 + 92) / 
16144][Stage 172:==> (10357 + 93) / 
16144][Stage 172:==> (10358 + 92) / 
16144][Stage 172:==> (10358 + 93) / 
16144][Stage 172:==> (10359 + 92) / 
16144][Stage 172:==> (10359 + 93) / 
16144][Stage 172:==> (10359 + 94) / 
16144][Stage 172:==> (10361 + 92) / 
16144][Stage 172:==> (10361 + 93) / 
16144][Stage 172:==> (10362 + 92) / 
16144][Stage 172:==> (10362 + 93) / 
16144][Stage 172:==> (10363 + 93) / 
16144][Stage 172:==> (10364 + 92) / 
16144][Stage 172:==> (10365 + 92) / 
16144][Stage 172:==> (10365 + 93) / 
16144][Stage 172:==> (10366 + 92) / 
16144][Stage 172:==> (10366 + 93) / 
16144][Stage 172:==> (10367 + 92) / 
16144][Stage 172:==> (10367 + 93) / 
16144][Stage 172:==> (10367 + 93) / 
16144][Stage 172:==> (10367 + 93) / 
16144][Stage 172:==> (10367 + 93) / 
16144][Stage 172:==> (10367 + 93) / 
16144][Stage 172:==> (10367 + 93) / 
16144][Stage 172:==> (10367 + 93) / 
16144][Stage 172:==> (10367 + 93) / 
16144][Stage 172:==> (10367 + 93) / 
16144]Exception in thread "JobGenerator" java.lang.OutOfMemoryError: 

Driver hung and happend out of memory while writing to console progress bar

2017-02-09 Thread John Fang
[Stage 172:==> (10328 + 93) / 16144]
[Stage 172:==> (10329 + 93) / 16144]
[Stage 172:==> (10330 + 93) / 16144]
[Stage 172:==> (10331 + 93) / 16144]
[Stage 172:==> (10333 + 92) / 16144]
[Stage 172:==> (10333 + 93) / 16144]
[Stage 172:==> (10333 + 94) / 16144]
[Stage 172:==> (10334 + 94) / 16144]
[Stage 172:==> (10338 + 93) / 16144]
[Stage 172:==> (10339 + 92) / 16144]
[Stage 172:==> (10340 + 93) / 16144]
[Stage 172:==> (10341 + 92) / 16144]
[Stage 172:==> (10341 + 93) / 16144]
[Stage 172:==> (10342 + 93) / 16144]
[Stage 172:==> (10343 + 93) / 16144]
[Stage 172:==> (10344 + 92) / 16144]
[Stage 172:==> (10345 + 92) / 16144]
[Stage 172:==> (10345 + 93) / 16144]
[Stage 172:==> (10346 + 93) / 16144]
[Stage 172:==> (10348 + 92) / 16144]
[Stage 172:==> (10348 + 93) / 16144]
[Stage 172:==> (10349 + 92) / 16144]
[Stage 172:==> (10349 + 93) / 16144]
[Stage 172:==> (10350 + 92) / 16144]
[Stage 172:==> (10352 + 92) / 16144]
[Stage 172:==> (10353 + 92) / 16144]
[Stage 172:==> (10354 + 92) / 16144]
[Stage 172:==> (10355 + 92) / 16144]
[Stage 172:==> (10356 + 92) / 16144]
[Stage 172:==> (10356 + 93) / 16144]
[Stage 172:==> (10357 + 92) / 16144]
[Stage 172:==> (10357 + 93) / 16144]
[Stage 172:==> (10358 + 92) / 16144]
[Stage 172:==> (10358 + 93) / 16144]
[Stage 172:==> (10359 + 92) / 16144]
[Stage 172:==> (10359 + 93) / 16144]
[Stage 172:==> (10359 + 94) / 16144]
[Stage 172:==> (10361 + 92) / 16144]
[Stage 172:==> (10361 + 93) / 16144]
[Stage 172:==> (10362 + 92) / 16144]
[Stage 172:==> (10362 + 93) / 16144]
[Stage 172:==> (10363 + 93) / 16144]
[Stage 172:==> (10364 + 92) / 16144]
[Stage 172:==> (10365 + 92) / 16144]
[Stage 172:==> (10365 + 93) / 16144]
[Stage 172:==> (10366 + 92) / 16144]
[Stage 172:==> (10366 + 93) / 16144]
[Stage 172:==> (10367 + 92) / 16144]
[Stage 172:==> (10367 + 93) / 16144]
[Stage 172:==> (10367 + 93) / 16144]
[Stage 172:==> (10367 + 93) / 16144]
[Stage 172:==> (10367 + 93) / 16144]
[Stage 172:==> (10367 + 93) / 16144]
[Stage 172:==> (10367 + 93) / 16144]
[Stage 172:==> (10367 + 93) / 16144]
[Stage 172:==> (10367 + 93) / 16144]
[Stage 172:==> (10367 + 93) / 
16144]Exception in thread "JobGenerator" java.lang.OutOfMemoryError: Java heap 
space
at 
com.fasterxml.jackson.core.util.BufferRecycler.calloc(BufferRecycler.java:156)
at 
com.fasterxml.jackson.core.util.BufferRecycler.allocCharBuffer(BufferRecycler.java:124)
at 
com.fasterxml.jackson.core.io.IOContext.allocTokenBuffer(IOContext.java:189)
at 

spark main thread quit, but the driver don't crash at standalone cluster

2017-01-17 Thread John Fang
My spark main thread create some daemon threads which maybe timer thread. Then 
the spark application throw some exceptions, and the main thread will quit. But 
the jvm of driver don't crash for standalone cluster. Of course the question 
don't happen at yarn cluster. Because the application master will monitor the 
main thread of applicaiton, but the stanalone cluster can't. for example:val 
sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
  new ThreadFactory() {
def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
  })

scheduledExecutorService.scheduleAtFixedRate(
  new Runnable() {
def run() {
  try {
System.out.println("runable")
  } catch {
case e: Exception => {
  System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
}
  }
}
  }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 
10)
wordCounts.foreachRDD{rdd =>
  rdd.collect().foreach(println)
  throw new RuntimeException  //exception
}
ssc.start()
try {
  ssc.awaitTermination()
} catch {
  case e: Exception => {
System.out.println("end!")
throw e
  }
}




spark main thread quit, but the Jvm of driver don't crash

2017-01-17 Thread John Fang
My spark main thread create some daemon thread. Then the spark application 
throw some exceptions, and the main thread will quit. But the jvm of driver 
don't crash, so How can i do?
for example:
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
  new ThreadFactory() {
def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
  })

scheduledExecutorService.scheduleAtFixedRate(
  new Runnable() {
def run() {
  try {
System.out.println("runable")
  } catch {
case e: Exception => {
  System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
}
  }
}
  }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 
10)
wordCounts.foreachRDD{rdd =>
  rdd.collect().foreach(println)
  throw new RuntimeException
}
ssc.start()
try {
  ssc.awaitTermination()
} catch {
  case e: Exception => {
System.out.println("end!")
throw e
  }
}



send this email to unsubscribe

2016-12-29 Thread John



how can I get the application belong to the driver?

2016-12-26 Thread John Fang
I hope I can get the application by the driverId, but I don't find the rest api 
at spark。Then how can i get the application, which belong to one driver。

can we unite the UI among different standaone clusters' UI?

2016-12-14 Thread John Fang
As we know, each standaone cluster has itself UI. Then we will have more than 
one UI if we have many standalone cluster. How can I only have a UI which can 
access different standaone clusters?

how can I set the log configuration file for spark history server ?

2016-12-08 Thread John Fang
./start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to 
/home/admin/koala/data/versions/0/SPARK/2.0.2/spark-2.0.2-bin-hadoop2.6/logs/spark-admin-org.apache.spark.deploy.history.HistoryServer-1-v069166214.sqa.zmf.out
Then the history will print all log to the XXX.sqa.zmf.out, so i can't limit 
the file max size.  I want limit the size of the log file

Question about the DirectKafkaInputDStream

2016-12-08 Thread John Fang
The source is DirectKafkaInputDStream which can ensure the exectly-once of the 
consumer side. But I have a question based the following code。As we known, the 
"graph.generateJobs(time)" will create rdds and generate jobs。And the source 
RDD is KafkaRDD which contain the offsetRange。 The jobs are submitted 
successfully by " jobScheduler.submitJobSet", and the cluster start running the 
jobs. After that, the driver crash suddenly and will lost the offsetRange. 
Because the driver has not run the "eventLoop.post(DoCheckpoint(time, 
clearCheckpointDataLater = false))" yet. 

```
  private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows 
(SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, 
"true")
Try {
  jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate 
received blocks to batch
  graph.generateJobs(time) // generate jobs using allocated block
} match {
  case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
  case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
  ```

Can spark support exactly once based kafka ? Due to these following question?

2016-12-04 Thread John Fang
1. If a task complete the operation, it will notify driver. The driver may not 
receive the message due to the network, and think the task is still running. 
Then the child stage won't be scheduled ?
2. how do spark guarantee the downstream-task can receive the shuffle-data 
completely. As fact, I can't find the checksum for blocks in spark. For 
example, the upstream-task may shuffle 100Mb data, but the downstream-task may 
receive 99Mb data due to network. Can spark verify the data is received 
completely based size ?

two spark-shells spark on mesos not working

2016-11-22 Thread John Yost
Hi Everyone,

There is probably an obvious answer to this, but not sure what it is. :)

I am attempting to launch 2..n spark shells using Mesos as the master (this
is to support 1..n researchers running pyspark stuff on our data). I can
launch two or more spark shells without any problem. But, when I attempt
any kind of operation that requires a Spark executor outside the driver
program such as:

val numbers = Ranger(1,1000)
val pNumbers = sc.parallelize(numbers)
pNumbers.take(5)

I get the dreaded message:
TaskSchedulerImpl: Initial job has not accepted any resources; check your
cluster UI to ensure that workers are registered and sufficient resources

I confirmed that both spark shells are listed as separate, uniquely-named
Mesos frameworks and that there are plenty of CPU core and memory resources
on our cluster.

I am using Spark 2.0.1 on Mesos 0.28.1. Any ideas that y'all may have would
be very much appreciated.

Thanks! :)

--John


Spark Logging : log4j.properties or log4j.xml

2016-08-24 Thread John Jacobs
One can specify "-Dlog4j.configuration=" or
"-Dlog4j.configuration=".
Is there any preference to using one over other?

All the spark documentation talks about using "log4j.properties" only (
http://spark.apache.org/docs/latest/configuration.html#configuring-logging).
So is only "log4j.properties" officially supported?


Re: Using R code as part of a Spark Application

2016-06-29 Thread John Aherne
I don't think R server requires R on the executor nodes. I originally set
up a SparkR cluster for our Data Scientist on Azure which required that I
install R on each node, but for the R Server set up, there is an extra edge
node with R server that they connect to. From what little research I was
able to do, it seems that there are some special functions in R Server that
can distribute the work to the cluster.

Documentation is light, and hard to find but I found this helpful:
https://blogs.msdn.microsoft.com/uk_faculty_connection/2016/05/10/r-server-for-hdinsight-running-on-microsoft-azure-cloud-data-science-challenges/



On Wed, Jun 29, 2016 at 3:29 PM, Sean Owen <so...@cloudera.com> wrote:

> Oh, interesting: does this really mean the return of distributing R
> code from driver to executors and running it remotely, or do I
> misunderstand? this would require having R on the executor nodes like
> it used to?
>
> On Wed, Jun 29, 2016 at 5:53 PM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
> > There is some new SparkR functionality coming in Spark 2.0, such as
> > "dapply". You could use SparkR to load a Parquet file and then run
> "dapply"
> > to apply a function to each partition of a DataFrame.
> >
> > Info about loading Parquet file:
> >
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/sparkr.html#from-data-sources
> >
> > API doc for "dapply":
> >
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/api/R/index.html
> >
> > Xinh
> >
> > On Wed, Jun 29, 2016 at 6:54 AM, sujeet jog <sujeet@gmail.com>
> wrote:
> >>
> >> try Spark pipeRDD's , you can invoke the R script from pipe , push  the
> >> stuff you want to do on the Rscript stdin,  p
> >>
> >>
> >> On Wed, Jun 29, 2016 at 7:10 PM, Gilad Landau <
> gilad.lan...@clicktale.com>
> >> wrote:
> >>>
> >>> Hello,
> >>>
> >>>
> >>>
> >>> I want to use R code as part of spark application (the same way I would
> >>> do with Scala/Python).  I want to be able to run an R syntax as a map
> >>> function on a big Spark dataframe loaded from a parquet file.
> >>>
> >>> Is this even possible or the only way to use R is as part of RStudio
> >>> orchestration of our Spark  cluster?
> >>>
> >>>
> >>>
> >>> Thanks for the help!
> >>>
> >>>
> >>>
> >>> Gilad
> >>>
> >>>
> >>
> >>
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 

John Aherne
Big Data and SQL Developer

[image: JustEnough Logo]

Cell:
Email:
Skype:
Web:

+1 (303) 809-9718
john.ahe...@justenough.com
john.aherne.je
www.justenough.com


Confidentiality Note: The information contained in this email and
document(s) attached are for the exclusive use of the addressee and
may contain confidential, privileged and non-disclosable information.
If the recipient of this email is not the addressee, such recipient is
strictly prohibited from reading, photocopying, distribution or
otherwise using this email or its contents in any way.


Re: Using R code as part of a Spark Application

2016-06-29 Thread John Aherne
Microsoft Azure has an option to create a spark cluster with R Server. MS
bought RevoScale (I think that was the name) and just recently deployed it.

On Wed, Jun 29, 2016 at 10:53 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote:

> There is some new SparkR functionality coming in Spark 2.0, such as
> "dapply". You could use SparkR to load a Parquet file and then run "dapply"
> to apply a function to each partition of a DataFrame.
>
> Info about loading Parquet file:
>
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/sparkr.html#from-data-sources
>
> API doc for "dapply":
>
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/api/R/index.html
>
> Xinh
>
> On Wed, Jun 29, 2016 at 6:54 AM, sujeet jog <sujeet@gmail.com> wrote:
>
>> try Spark pipeRDD's , you can invoke the R script from pipe , push  the
>> stuff you want to do on the Rscript stdin,  p
>>
>>
>> On Wed, Jun 29, 2016 at 7:10 PM, Gilad Landau <gilad.lan...@clicktale.com
>> > wrote:
>>
>>> Hello,
>>>
>>>
>>>
>>> I want to use R code as part of spark application (the same way I would
>>> do with Scala/Python).  I want to be able to run an R syntax as a map
>>> function on a big Spark dataframe loaded from a parquet file.
>>>
>>> Is this even possible or the only way to use R is as part of RStudio
>>> orchestration of our Spark  cluster?
>>>
>>>
>>>
>>> Thanks for the help!
>>>
>>>
>>>
>>> Gilad
>>>
>>>
>>>
>>
>>
>


-- 

John Aherne
Big Data and SQL Developer

[image: JustEnough Logo]

Cell:
Email:
Skype:
Web:

+1 (303) 809-9718
john.ahe...@justenough.com
john.aherne.je
www.justenough.com


Confidentiality Note: The information contained in this email and
document(s) attached are for the exclusive use of the addressee and
may contain confidential, privileged and non-disclosable information.
If the recipient of this email is not the addressee, such recipient is
strictly prohibited from reading, photocopying, distribution or
otherwise using this email or its contents in any way.


Re: Explode row with start and end dates into row for each date

2016-06-22 Thread John Aherne
Thanks Saurabh!

That explode function looks like it is exactly what I need.

We will be using MLlib quite a lot - Do I have to worry about python
versions for that?

John

On Wed, Jun 22, 2016 at 4:34 PM, Saurabh Sardeshpande <saurabh...@gmail.com>
wrote:

> Hi John,
>
> If you can do it in Hive, you should be able to do it in Spark. Just make
> sure you import HiveContext instead of SQLContext.
>
> If your intent is to explore rather than get stuff done, I've not aware of
> any RDD operations that do this for you, but there is a DataFrame operation
> called 'explode' which does this -
> https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.explode.
> You'll just have to generate the array of dates using something like this -
> http://stackoverflow.com/questions/7274267/print-all-day-dates-between-two-dates
> .
>
> It's generally recommended to use Python 3 if you're starting a new
> project and don't have old dependencies. But remember that there is still
> quite a lot of stuff that is not yet ported to Python 3.
>
> Regards,
> Saurabh
>
> On Wed, Jun 22, 2016 at 3:20 PM, John Aherne <john.ahe...@justenough.com>
> wrote:
>
>> Hi Everyone,
>>
>> I am pretty new to Spark (and the mailing list), so forgive me if the
>> answer is obvious.
>>
>> I have a dataset, and each row contains a start date and end date.
>>
>> I would like to explode each row so that each day between the start and
>> end dates becomes its own row.
>> e.g.
>> row1  2015-01-01  2015-01-03
>> becomes
>> row1   2015-01-01
>> row1   2015-01-02
>> row1   2015-01-03
>>
>> So, my questions are:
>> Is Spark a good place to do that?
>> I can do it in Hive, but it's a bit messy, and this seems like a good
>> problem to use for learning Spark (and Python).
>>
>> If so, any pointers on what methods I should use? Particularly how to
>> split one row into multiples.
>>
>> Lastly, I am a bit hesitant to ask but is there a recommendation on which
>> version of python to use? Not interested in which is better, just want to
>> know if they are both supported equally.
>>
>> I am using Spark 1.6.1 (Hortonworks distro).
>>
>> Thanks!
>> John
>>
>> --
>>
>> John Aherne
>> Big Data and SQL Developer
>>
>> [image: JustEnough Logo]
>>
>> Cell:
>> Email:
>> Skype:
>> Web:
>>
>> +1 (303) 809-9718
>> john.ahe...@justenough.com
>> john.aherne.je
>> www.justenough.com
>>
>>
>> Confidentiality Note: The information contained in this email and 
>> document(s) attached are for the exclusive use of the addressee and may 
>> contain confidential, privileged and non-disclosable information. If the 
>> recipient of this email is not the addressee, such recipient is strictly 
>> prohibited from reading, photocopying, distribution or otherwise using this 
>> email or its contents in any way.
>>
>>
>


-- 

John Aherne
Big Data and SQL Developer

[image: JustEnough Logo]

Cell:
Email:
Skype:
Web:

+1 (303) 809-9718
john.ahe...@justenough.com
john.aherne.je
www.justenough.com


Confidentiality Note: The information contained in this email and
document(s) attached are for the exclusive use of the addressee and
may contain confidential, privileged and non-disclosable information.
If the recipient of this email is not the addressee, such recipient is
strictly prohibited from reading, photocopying, distribution or
otherwise using this email or its contents in any way.


Explode row with start and end dates into row for each date

2016-06-22 Thread John Aherne
Hi Everyone,

I am pretty new to Spark (and the mailing list), so forgive me if the
answer is obvious.

I have a dataset, and each row contains a start date and end date.

I would like to explode each row so that each day between the start and end
dates becomes its own row.
e.g.
row1  2015-01-01  2015-01-03
becomes
row1   2015-01-01
row1   2015-01-02
row1   2015-01-03

So, my questions are:
Is Spark a good place to do that?
I can do it in Hive, but it's a bit messy, and this seems like a good
problem to use for learning Spark (and Python).

If so, any pointers on what methods I should use? Particularly how to split
one row into multiples.

Lastly, I am a bit hesitant to ask but is there a recommendation on which
version of python to use? Not interested in which is better, just want to
know if they are both supported equally.

I am using Spark 1.6.1 (Hortonworks distro).

Thanks!
John

-- 

John Aherne
Big Data and SQL Developer

[image: JustEnough Logo]

Cell:
Email:
Skype:
Web:

+1 (303) 809-9718
john.ahe...@justenough.com
john.aherne.je
www.justenough.com


Confidentiality Note: The information contained in this email and
document(s) attached are for the exclusive use of the addressee and
may contain confidential, privileged and non-disclosable information.
If the recipient of this email is not the addressee, such recipient is
strictly prohibited from reading, photocopying, distribution or
otherwise using this email or its contents in any way.


Re: Long Running Spark Streaming getting slower

2016-06-10 Thread John Simon
Hi Mich,

batch interval is 10 seconds, and I don't use sliding window.
Typical message count per batch is ~100k.


--
John Simon

On Fri, Jun 10, 2016 at 10:31 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Hi John,
>
> I did not notice anything unusual in your env variables.
>
> However, what are the batch interval, the windowsLength and SlindingWindow
> interval.
>
> Also how many messages are sent by Kafka in a typical batch interval?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 10 June 2016 at 18:21, john.simon <john.si...@tapjoy.com> wrote:
>
>> Hi all,
>>
>> I'm running Spark Streaming with Kafka Direct Stream, but after
>> running a couple of days, the batch processing time almost doubles.
>> I didn't find any slowdown on JVM GC logs, but I did find that Spark
>> broadcast variable reading time increasing.
>> Initially it takes less than 10ms, but after 3 days it takes more than
>> 60ms. It's really puzzling since I don't use broadcast variables at
>> all.
>>
>> My application needs to run 24/7, so I hope there's something I'm
>> missing to correct this behavior.
>>
>> FYI, we're running on AWS EMR with Spark version 1.6.1, in YARN client
>> mode.
>> Attached spark application environment settings file.
>>
>> --
>> John Simon
>>
>> *environment.txt* (7K) Download Attachment
>> <http://apache-spark-user-list.1001560.n3.nabble.com/attachment/27138/0/environment.txt>
>>
>> --
>> View this message in context: Long Running Spark Streaming getting slower
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Long-Running-Spark-Streaming-getting-slower-tp27138.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>


Re: Spark Streaming getting slower

2016-06-09 Thread John Simon
Sorry, forgot to mention that I don't use broadcast variables. That's
why I'm puzzled here.
--
John Simon


On Thu, Jun 9, 2016 at 11:09 AM, John Simon <john.si...@tapjoy.com> wrote:
> Hi,
>
> I'm running Spark Streaming with Kafka Direct Stream, batch interval
> is 10 seconds.
> After running about 72 hours, the batch processing time almost doubles.
> I didn't find anything wrong on JVM GC logs, but I did find that
> broadcast variable reading time increasing, like this:
>
> initially:
>
> ```
> 16/06/08 18:17:02 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 0 took 223 ms
> 16/06/08 18:17:08 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 2 took 73 ms
> 16/06/08 18:17:10 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 3 took 13 ms
> 16/06/08 18:17:11 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 4 took 9 ms
> 16/06/08 18:17:12 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 5 took 8 ms
> 16/06/08 18:17:14 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 6 took 13 ms
> 16/06/08 18:17:15 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 8 took 7 ms
> 16/06/08 18:17:16 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 10 took 8 ms
> 16/06/08 18:17:20 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 12 took 6 ms
> 16/06/08 18:17:20 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 13 took 7 ms
> ```
>
> after 23 hours:
>
> ```
> 16/06/09 17:23:43 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282938 took 18 ms
> 16/06/09 17:23:43 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282939 took 20 ms
> 16/06/09 17:23:43 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282940 took 20 ms
> 16/06/09 17:23:43 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282941 took 14 ms
> 16/06/09 17:23:44 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282942 took 17 ms
> 16/06/09 17:23:45 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282943 took 18 ms
> 16/06/09 17:23:45 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282944 took 14 ms
> 16/06/09 17:23:45 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282945 took 18 ms
> 16/06/09 17:23:45 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282946 took 14 ms
> 16/06/09 17:23:45 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 282947 took 18 ms
> ```
>
> FYI, we're running on AWS EMR with Spark version 1.6.1, in YARN client mode.
>
> application environment follows:
>
> ```
> Java Home /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre
> Java Version 1.7.0_101 (Oracle Corporation)
> Scala Version version 2.10.5
>
> spark.app.id application_1463430559850_0056
> spark.cleaner.ttl 60
> spark.default.parallelism 96
> spark.driver.appUIAddress http://172.16.4.168:4041
> spark.driver.extraClassPath
> /etc/hadoop/conf:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
> spark.driver.extraJavaOptions
> -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties
> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
> -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled
> -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
> spark.driver.extraLibraryPath
> /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
> spark.driver.host 172.16.4.168
> spark.driver.port 42142
> spark.dynamicAllocation.enabled false
> spark.eventLog.compress true
> spark.eventLog.dir hdfs:///var/log/spark/apps
> spark.eventLog.enabled false
> spark.executor.cores 8
> spark.executor.extraClassPath
> /etc/hadoop/conf:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
> spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
> spark.executor.extraLibraryPath
> /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
> spark.executor.id driver
> spark.executor.instances 24
> spark.executor.memory 6G
> spark.externalBlockStore.folderName spark-9ede6685-a7f9-4d65-bfb4-0920e8c5ba25
> spark.history.fs.cleaner.enabled true
> spark.history.fs.cleaner.maxAge 2d
> spark.history.fs.logDirectory hdfs:///var/log/spark/apps
> spark.history.ui.port 18080
> spark.master yarn-client
> spark.org.apache.hadoop.yarn.server.webproxy.amfilte

Spark Streaming getting slower

2016-06-09 Thread John Simon
spark.yarn.historyServer.address ip-172-16-4-168.ec2.internal:18080

SPARK_SUBMIT true
SPARK_YARN_MODE true
awt.toolkit sun.awt.X11.XToolkit
file.encoding UTF-8
file.encoding.pkg sun.io
file.separator /
java.awt.graphicsenv sun.awt.X11GraphicsEnvironment
java.awt.printerjob sun.print.PSPrinterJob
java.class.version 51.0
java.endorsed.dirs
/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/endorsed
java.ext.dirs 
/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/ext:/usr/java/packages/lib/ext
java.home /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre
java.io.tmpdir /tmp
java.library.path
/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.runtime.name OpenJDK Runtime Environment
java.runtime.version 1.7.0_101-mockbuild_2016_04_26_18_10-b00
java.specification.name Java Platform API Specification
java.specification.vendor Oracle Corporation
java.specification.version 1.7
java.vendor Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug http://bugreport.sun.com/bugreport/
java.version 1.7.0_101
java.vm.info mixed mode
java.vm.name OpenJDK 64-Bit Server VM
java.vm.specification.name Java Virtual Machine Specification
java.vm.specification.vendor Oracle Corporation
java.vm.specification.version 1.7
java.vm.vendor Oracle Corporation
java.vm.version 24.95-b01
line.separator
log4j.configuration file:///etc/spark/conf/log4j.properties
os.arch amd64
os.name Linux
os.version 4.4.5-15.26.amzn1.x86_64
path.separator :
sun.arch.data.model 64
sun.boot.class.path
/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/resources.jar:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/rt.jar:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/sunrsasign.jar:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/jce.jar:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/rhino.jar:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/jfr.jar:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/classes
sun.boot.library.path
/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/amd64
sun.cpu.endian little
sun.cpu.isalist
sun.io.unicode.encoding UnicodeLittle
sun.java.launcher SUN_STANDARD
sun.jnu.encoding UTF-8
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel
sun.os.patch.level unknown
user.country US
user.dir /home/hadoop
user.home /home/hadoop
user.language en
user.name hadoop
user.timezone UTC
```


--
John Simon

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HBase / Spark Kerberos problem

2016-05-19 Thread John Trengrove
Have you had a look at this issue?

https://issues.apache.org/jira/browse/SPARK-12279

There is a comment by Y Bodnar on how they successfully got Kerberos and
HBase working.

2016-05-18 18:13 GMT+10:00 :

> Hi all,
>
> I have been puzzling over a Kerberos problem for a while now and wondered
> if anyone can help.
>
> For spark-submit, I specify --keytab x --principal y, which creates my
> SparkContext fine.
> Connections to Zookeeper Quorum to find the HBase master work well too.
> But when it comes to a .count() action on the RDD, I am always presented
> with the stack trace at the end of this mail.
>
> We are using CDH5.5.2 (spark 1.5.0), and
> com.cloudera.spark.hbase.HBaseContext is a wrapper around
> TableInputFormat/hadoopRDD (see
> https://github.com/cloudera-labs/SparkOnHBase), as you can see in the
> stack trace.
>
> Am I doing something obvious wrong here?
> A similar flow, inside test code, works well, only going via spark-submit
> exposes this issue.
>
> Code snippet (I have tried using the commented-out lines in various
> combinations, without success):
>
>val conf = new SparkConf().
>   set("spark.shuffle.consolidateFiles", "true").
>   set("spark.kryo.registrationRequired", "false").
>   set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer").
>   set("spark.kryoserializer.buffer", "30m")
> val sc = new SparkContext(conf)
> val cfg = sc.hadoopConfiguration
> //cfg.addResource(new
> org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml"))
> //
> UserGroupInformation.getCurrentUser.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS)
> //cfg.set("hbase.security.authentication", "kerberos")
> val hc = new HBaseContext(sc, cfg)
> val scan = new Scan
> scan.setTimeRange(startMillis, endMillis)
> val matchesInRange = hc.hbaseRDD(MY_TABLE, scan, resultToMatch)
> val cnt = matchesInRange.count()
> log.info(s"matches in range $cnt")
>
> Stack trace / log:
>
> 16/05/17 17:04:47 INFO SparkContext: Starting job: count at
> Analysis.scala:93
> 16/05/17 17:04:47 INFO DAGScheduler: Got job 0 (count at
> Analysis.scala:93) with 1 output partitions
> 16/05/17 17:04:47 INFO DAGScheduler: Final stage: ResultStage 0(count at
> Analysis.scala:93)
> 16/05/17 17:04:47 INFO DAGScheduler: Parents of final stage: List()
> 16/05/17 17:04:47 INFO DAGScheduler: Missing parents: List()
> 16/05/17 17:04:47 INFO DAGScheduler: Submitting ResultStage 0
> (MapPartitionsRDD[1] at map at HBaseContext.scala:580), which has no
> missing parents
> 16/05/17 17:04:47 INFO MemoryStore: ensureFreeSpace(3248) called with
> curMem=428022, maxMem=244187136
> 16/05/17 17:04:47 INFO MemoryStore: Block broadcast_3 stored as values in
> memory (estimated size 3.2 KB, free 232.5 MB)
> 16/05/17 17:04:47 INFO MemoryStore: ensureFreeSpace(2022) called with
> curMem=431270, maxMem=244187136
> 16/05/17 17:04:47 INFO MemoryStore: Block broadcast_3_piece0 stored as
> bytes in memory (estimated size 2022.0 B, free 232.5 MB)
> 16/05/17 17:04:47 INFO BlockManagerInfo: Added broadcast_3_piece0 in
> memory on 10.6.164.40:33563 (size: 2022.0 B, free: 232.8 MB)
> 16/05/17 17:04:47 INFO SparkContext: Created broadcast 3 from broadcast at
> DAGScheduler.scala:861
> 16/05/17 17:04:47 INFO DAGScheduler: Submitting 1 missing tasks from
> ResultStage 0 (MapPartitionsRDD[1] at map at HBaseContext.scala:580)
> 16/05/17 17:04:47 INFO YarnScheduler: Adding task set 0.0 with 1 tasks
> 16/05/17 17:04:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 0, hpg-dev-vm, partition 0,PROCESS_LOCAL, 2208 bytes)
> 16/05/17 17:04:47 INFO BlockManagerInfo: Added broadcast_3_piece0 in
> memory on hpg-dev-vm:52698 (size: 2022.0 B, free: 388.4 MB)
> 16/05/17 17:04:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in
> memory on hpg-dev-vm:52698 (size: 26.0 KB, free: 388.4 MB)
> 16/05/17 17:04:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> hpg-dev-vm): org.apache.hadoop.hbase.client.RetriesExhaustedException:
> Can't get the location
> at
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:155)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:63)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
> at
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:314)
> at
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:289)
> at
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:161)
> at
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:156)
> at
> 

Re: How to get the batch information from Streaming UI

2016-05-16 Thread John Trengrove
You would want to add a listener to your Spark Streaming context. Have a
look at the StatsReportListener [1].

[1]
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.StatsReportListener

2016-05-17 7:18 GMT+10:00 Samuel Zhou :

> Hi,
>
> Does anyone know how to get the batch information(like batch time, input
> size, processing time, status) from Streaming UI by using Scala/Java API ?
> Because I want to put the information in log files and the streaming jobs
> are managed by YARN.
>
> Thanks,
> Samuel
>


Re: Silly Question on my part...

2016-05-16 Thread John Trengrove
If you are wanting to share RDDs it might be a good idea to check out
Tachyon / Alluxio.

For the Thrift server, I believe the datasets are located in your Spark
cluster as RDDs and you just communicate with it via the Thrift
JDBC Distributed Query Engine connector.

2016-05-17 5:12 GMT+10:00 Michael Segel :

> For one use case.. we were considering using the thrift server as a way to
> allow multiple clients access shared RDDs.
>
> Within the Thrift Context, we create an RDD and expose it as a hive table.
>
> The question  is… where does the RDD exist. On the Thrift service node
> itself, or is that just a reference to the RDD which is contained with
> contexts on the cluster?
>
>
> Thx
>
> -Mike
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to use the spark submit script / capability

2016-05-15 Thread John Trengrove
Assuming you are refering to running SparkSubmit.main programatically
otherwise read this [1].

I can't find any scaladocs for org.apache.spark.deploy.* but Oozie's [2]
example of using SparkSubmit is pretty comprehensive.

[1] http://spark.apache.org/docs/latest/submitting-applications.html
[2]
https://github.com/apache/oozie/blob/master/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java

John

2016-05-16 2:33 GMT+10:00 Stephen Boesch <java...@gmail.com>:

>
> There is a committed PR from Marcelo Vanzin addressing that capability:
>
> https://github.com/apache/spark/pull/3916/files
>
> Is there any documentation on how to use this?  The PR itself has two
> comments asking for the docs that were not answered.
>


Re: VectorAssembler handling null values

2016-04-20 Thread John Trengrove
You could handle null values by using the DataFrame.na functions in a
preprocessing step like DataFrame.na.fill().

For reference:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions

John

On 21 April 2016 at 03:41, Andres Perez <and...@tresata.com> wrote:

> so the missing data could be on a one-off basis, or from fields that are
> in general optional, or from, say, a count that is only relevant for
> certain cases (very sparse):
>
> f1|f2|f3|optF1|optF2|sparseF1
> a|15|3.5|cat1|142L|
> b|13|2.4|cat2|64L|catA
> c|2|1.6|||
> d|27|5.1||0|
>
> -Andy
>
> On Wed, Apr 20, 2016 at 1:38 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> Could you provide an example of what your input data looks like?
>> Supporting missing values in a sparse result vector makes sense.
>>
>> On Tue, 19 Apr 2016 at 23:55, Andres Perez <and...@tresata.com> wrote:
>>
>>> Hi everyone. org.apache.spark.ml.feature.VectorAssembler currently
>>> cannot handle null values. This presents a problem for us as we wish to run
>>> a decision tree classifier on sometimes sparse data. Is there a particular
>>> reason VectorAssembler is implemented in this way, and can anyone recommend
>>> the best path for enabling VectorAssembler to build vectors for data that
>>> will contain empty values?
>>>
>>> Thanks!
>>>
>>> -Andres
>>>
>>>
>


Docker Mesos Spark Port Mapping

2016-04-17 Thread John Omernik
The setting

spark.mesos.executor.docker.portmaps

Is interesting to me, without this setting, the docker executor uses
net=host and thus port mappings are not needed.

With this setting, (and just adding some random mappings) my executors fail
with less then helpful messages.

I guess some questions here


1. If I specify port mappings is there an "implied" net=bridge that happens
with my executors? Since they are failing fast, I really can't see the
command to determine what the net setting is.

2. If 1 = true, then what port mappings do I need to run this in net=bridge
mode. This is actually preferred for me in that I am using MapR FS and it
doesn't seem to like running a mapr client (which Spark is using to access
the Filesystem) in a docker container on a MapR FS Node,  where net =
host.  I can do it where net=bridge and that works for MApR, but not
net=host, and thus, I am hoping I have some options for net=bridge.

Thoughts?


John


Access to Mesos Docker Cmd for Spark Executors

2016-04-17 Thread John Omernik
Hey all,

I was wondering if there is a way to access/edit the command on Spark
Executors while using Docker on Mesos.

The reason is this: I am using the MapR File Client, and the Spark Driver
is trying to execute things as my user "user1" and since the executors are
running as root inside and the MapR file client is trying to get user
information for my user which doesn't exist in the docker container.

This is something I've handled on my cluster by having a user sync file and
a docker file that you can specify the user you are running as in the
command

So for example, when I run Zeppelin, I instead of
$ZEPPELIN_HOME/bin/zeppelin.sh, in Marathon I execute

/mapr/clustername/dockersync.sh user1 && su -c
$ZEPPELIN_HOME/bin/zeppelin.sh user1

Now my user I am running as, and all the groups that user is in is
installed into the docker container and when I start Zeppelin it's running
as that user (with the correct UIDs and GIDs)

so with the Mesos images, I'd like to be able to alter the command and
prepend my docker sync file to install the users so credentials etc can be
preserved.  There may be other uses as well, but since we can access the
image, the volumes, and the port maps, allowing us to edit the command
(have the default be "what works" and if we edit in such a way, it's our
own fault)  could give us the freedom to do things like this... does this
capability exist?

Thanks,

John


Re: Problem with pyspark on Docker talking to YARN cluster

2016-04-06 Thread John Omernik
Was there any other creative solutions for this? I am running into the same
issue with submitting to yarn from a Docker container and the solutions
don't provided don't work. (1. the host doesn't work, even if I use the
hostname of the physical node because when spark tries to bind to the
hostname of the physical node in bridged mode, it doesn't see it and errors
out... as stated we need a bind address, and advertise address if this is
to work), 2. Same restrictions. 3. cluster mode doesn't work for pyspark
shell.

Any other thoughts?

John

On Thu, Jun 11, 2015 at 12:09 AM, Ashwin Shankar <ashwinshanka...@gmail.com>
wrote:

> Hi Eron, Thanks for your reply, but none of these options works for us.
>>
>>
>>1. use 'spark.driver.host' and 'spark.driver.port' setting to
>>stabilize the driver-side endpoint.  (ref
>><https://spark.apache.org/docs/latest/configuration.html#networking>)
>>
>> This unfortunately won't help since if we set spark.driver.port to
> something, its going to be used to bind on the client
> side and the same will be passed to the AM. We need two variables,a) one
> to bind to on the client side, b)another port which is opened up on the
> docker host and will be used by the AM to talk back to the driver.
>
> 2. use host networking for your container, i.e. "docker run --net=host
>> ..."
>
> We run containers in shared environment, and this option makes host
> network stack accessible to all
> containers in it, which could leads to security issues.
>
> 3. use yarn-cluster mode
>
>  Pyspark interactive shell(ipython) doesn't have cluster mode. SPARK-5162
> <https://issues.apache.org/jira/browse/SPARK-5162> is for spark-submit
> python in cluster mode.
>
> Thanks,
> Ashwin
>
>
> On Wed, Jun 10, 2015 at 3:55 PM, Eron Wright <ewri...@live.com> wrote:
>
>> Options include:
>>
>>1. use 'spark.driver.host' and 'spark.driver.port' setting to
>>stabilize the driver-side endpoint.  (ref
>><https://spark.apache.org/docs/latest/configuration.html#networking>)
>>2. use host networking for your container, i.e. "docker run
>>--net=host ..."
>>3. use yarn-cluster mode (see SPARK-5162
>><https://issues.apache.org/jira/browse/SPARK-5162>)
>>
>>
>> Hope this helps,
>> Eron
>>
>>
>> --
>> Date: Wed, 10 Jun 2015 13:43:04 -0700
>> Subject: Problem with pyspark on Docker talking to YARN cluster
>> From: ashwinshanka...@gmail.com
>> To: d...@spark.apache.org; user@spark.apache.org
>>
>>
>> All,
>> I was wondering if any of you have solved this problem :
>>
>> I have pyspark(ipython mode) running on docker talking to
>> a yarn cluster(AM/executors are NOT running on docker).
>>
>> When I start pyspark in the docker container, it binds to port *49460.*
>>
>> Once the app is submitted to YARN, the app(AM) on the cluster side fails
>> with the following error message :
>> *ERROR yarn.ApplicationMaster: Failed to connect to driver at :49460*
>>
>> This makes sense because AM is trying to talk to container directly and
>> it cannot, it should be talking to the docker host instead.
>>
>> *Question* :
>> How do we make Spark AM talk to host1:port1 of the docker host(not the
>> container), which would then
>> route it to container which is running pyspark on host2:port2 ?
>>
>> One solution I could think of is : after starting the driver(say on
>> hostA:portA), and before submitting the app to yarn, we could
>> reset driver's host/port to hostmachine's ip/port. So the AM can then
>> talk hostmachine's ip/port, which would be mapped
>> to the container.
>>
>> Thoughts ?
>> --
>> Thanks,
>> Ashwin
>>
>>
>>
>
>
> --
> Thanks,
> Ashwin
>
>
>


Spark - Mesos HTTP API

2016-04-03 Thread John Omernik
Hey all, are there any plans to implement the Mesos HTTP API rather than
native libs?  The reason I ask is I am trying to run an application in a
docker container (Zeppelin) that would use Spark connecting to Mesos, but I
am finding that using the NATIVE_LIB from docker is difficult or would
result in a really big/heavy docker images in order to achieve this.So
that got me thinking about the HTTP API, and was wondering if there is JIRA
to track this or if this is something Spark is planning.

Thanks!

John


Apache Spark-Get All Field Names From Nested Arbitrary JSON Files

2016-03-31 Thread John Radin
Hello All-

I have run into a somewhat perplexing issue that has plagued me for several
months now (with haphazard workarounds). I am trying to create an Avro
Schema (schema-enforced format for serializing arbitrary data, basically,
as I understand it) to convert some complex JSON files (arbitrary and
nested) eventually to Parquet in a pipeline.

I am wondering if there is a way to get the superset of field names I need
for this use case staying in Apache Spark instead of Hadoop MR in a
reasonable fashion?

I think Apache Arrow under development might be able to help avoid this by
treating JSON as a first class citizen eventually, but it is still aways
off yet.

Any guidance would be sincerely appreciated!

Thanks!

John


RE: Graphx

2016-03-11 Thread John Lilley
We have almost zero node info – just an identifying integer.
John Lilley

From: Alexis Roos [mailto:alexis.r...@gmail.com]
Sent: Friday, March 11, 2016 11:24 AM
To: Alexander Pivovarov <apivova...@gmail.com>
Cc: John Lilley <john.lil...@redpoint.net>; Ovidiu-Cristian MARCU 
<ovidiu-cristian.ma...@inria.fr>; lihu <lihu...@gmail.com>; Andrew A 
<andrew.a...@gmail.com>; u...@spark.incubator.apache.org; Geoff Thompson 
<geoff.thomp...@redpoint.net>
Subject: Re: Graphx

Also we keep the Node info minimal as needed for connected components and 
rejoin later.

Alexis

On Fri, Mar 11, 2016 at 10:12 AM, Alexander Pivovarov 
<apivova...@gmail.com<mailto:apivova...@gmail.com>> wrote:
we use it in prod

70 boxes, 61GB RAM each

GraphX Connected Components works fine on 250M Vertices and 1B Edges (takes 
about 5-10 min)

Spark likes memory, so use r3.2xlarge boxes (61GB)
For example 10 x r3.2xlarge (61GB) work much faster than 20 x r3.xlarge (30.5 
GB) (especially if you have skewed data)

Also, use checkpoints before and after Connected Components to reduce DAG delays

You can also try to enable Kryo and register classes used in RDD


On Fri, Mar 11, 2016 at 8:07 AM, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:
I suppose for a 2.6bn case we’d need Long:

public class GenCCInput {
  public static void main(String[] args) {
if (args.length != 2) {
  System.err.println("Usage: \njava GenCCInput  ");
  System.exit(-1);
}
long edges = Long.parseLong(args[0]);
long groupSize = Long.parseLong(args[1]);
long currentEdge = 1;
long currentGroupSize = 0;
for (long i = 0; i < edges; i++) {
  System.out.println(currentEdge + " " + (currentEdge + 1));
  if (currentGroupSize == 0) {
currentGroupSize = 2;
  } else {
currentGroupSize++;
  }
  if (currentGroupSize >= groupSize) {
currentGroupSize = 0;
    currentEdge += 2;
  } else {
currentEdge++;
  }
}
  }
}

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516<tel:%2B1%C2%A0303%C2%A0541%201516>  | M: +1 720 938 
5761<tel:%2B1%20720%20938%205761> | F: +1 781-705-2077<tel:%2B1%20781-705-2077>
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: John Lilley 
[mailto:john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>]
Sent: Friday, March 11, 2016 8:46 AM
To: Ovidiu-Cristian MARCU 
<ovidiu-cristian.ma...@inria.fr<mailto:ovidiu-cristian.ma...@inria.fr>>
Cc: lihu <lihu...@gmail.com<mailto:lihu...@gmail.com>>; Andrew A 
<andrew.a...@gmail.com<mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>; Geoff 
Thompson <geoff.thomp...@redpoint.net<mailto:geoff.thomp...@redpoint.net>>
Subject: RE: Graphx

Ovidiu,

IMHO, this is one of the biggest issues facing GraphX and Spark.  There are a 
lot of knobs and levers to pull to affect performance, with very little 
guidance about which settings work in general.  We cannot ship software that 
requires end-user tuning; it just has to work.  Unfortunately GraphX seems very 
sensitive to working set size relative to available RAM and fails 
catastrophically as opposed to gracefully when working set is too large.  It is 
also very sensitive to the nature of the data.  For example, if we build a test 
file with input-edge representation like:
1 2
2 3
3 4
5 6
6 7
7 8
…
this represents a graph with connected components in groups of four.  We found 
experimentally that when this data in input in clustered order, the required 
memory is lower and runtime is much faster than when data is input in random 
order.  This makes intuitive sense because of the additional communication 
required for the random order.

Our 1bn-edge test case was of this same form, input in clustered order, with 
groups of 10 vertices per component.  It failed at 8 x 60GB.  This is the kind 
of data that our application processes, so it is a realistic test for us.  I’ve 
found that social media test data sets tend to follow power-law distributions, 
and that GraphX has much less problem with them.

A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges in 
10-vertex components using the synthetic test input I describe above.  I would 
be curious to know if this works and what settings you use to succeed, and if 
it continues to succeed for random input order.

As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2) behavior 
for large data sets, but it processes the 1bn-edge case on a single 60GB node 
in about 20 minutes.  It degrades gracefully along the O(N^2) curve and 
additional memory reduces time.

John Lilley

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]

RE: Graphx

2016-03-11 Thread John Lilley
Thanks Alexander, this is really good information.  However it reinforces that 
we cannot use GraphX, because our customers typically have on-prem clusters in 
the 10-node range.  Very few have the kind of horsepower you are talking about. 
 We can’t just tell them to quadruple their cluster size to run our software on 
1bn edges.

John Lilley

From: Alexander Pivovarov [mailto:apivova...@gmail.com]
Sent: Friday, March 11, 2016 11:13 AM
To: John Lilley <john.lil...@redpoint.net>
Cc: Ovidiu-Cristian MARCU <ovidiu-cristian.ma...@inria.fr>; lihu 
<lihu...@gmail.com>; Andrew A <andrew.a...@gmail.com>; 
u...@spark.incubator.apache.org; Geoff Thompson <geoff.thomp...@redpoint.net>
Subject: Re: Graphx

we use it in prod

70 boxes, 61GB RAM each

GraphX Connected Components works fine on 250M Vertices and 1B Edges (takes 
about 5-10 min)

Spark likes memory, so use r3.2xlarge boxes (61GB)
For example 10 x r3.2xlarge (61GB) work much faster than 20 x r3.xlarge (30.5 
GB) (especially if you have skewed data)

Also, use checkpoints before and after Connected Components to reduce DAG delays

You can also try to enable Kryo and register classes used in RDD


On Fri, Mar 11, 2016 at 8:07 AM, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:
I suppose for a 2.6bn case we’d need Long:

public class GenCCInput {
  public static void main(String[] args) {
if (args.length != 2) {
  System.err.println("Usage: \njava GenCCInput  ");
  System.exit(-1);
}
long edges = Long.parseLong(args[0]);
long groupSize = Long.parseLong(args[1]);
long currentEdge = 1;
long currentGroupSize = 0;
for (long i = 0; i < edges; i++) {
  System.out.println(currentEdge + " " + (currentEdge + 1));
  if (currentGroupSize == 0) {
currentGroupSize = 2;
  } else {
currentGroupSize++;
  }
  if (currentGroupSize >= groupSize) {
currentGroupSize = 0;
currentEdge += 2;
  } else {
currentEdge++;
  }
}
  }
}

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516<tel:%2B1%C2%A0303%C2%A0541%201516>  | M: +1 720 938 
5761<tel:%2B1%20720%20938%205761> | F: +1 781-705-2077<tel:%2B1%20781-705-2077>
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: John Lilley 
[mailto:john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>]
Sent: Friday, March 11, 2016 8:46 AM
To: Ovidiu-Cristian MARCU 
<ovidiu-cristian.ma...@inria.fr<mailto:ovidiu-cristian.ma...@inria.fr>>
Cc: lihu <lihu...@gmail.com<mailto:lihu...@gmail.com>>; Andrew A 
<andrew.a...@gmail.com<mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>; Geoff 
Thompson <geoff.thomp...@redpoint.net<mailto:geoff.thomp...@redpoint.net>>
Subject: RE: Graphx

Ovidiu,

IMHO, this is one of the biggest issues facing GraphX and Spark.  There are a 
lot of knobs and levers to pull to affect performance, with very little 
guidance about which settings work in general.  We cannot ship software that 
requires end-user tuning; it just has to work.  Unfortunately GraphX seems very 
sensitive to working set size relative to available RAM and fails 
catastrophically as opposed to gracefully when working set is too large.  It is 
also very sensitive to the nature of the data.  For example, if we build a test 
file with input-edge representation like:
1 2
2 3
3 4
5 6
6 7
7 8
…
this represents a graph with connected components in groups of four.  We found 
experimentally that when this data in input in clustered order, the required 
memory is lower and runtime is much faster than when data is input in random 
order.  This makes intuitive sense because of the additional communication 
required for the random order.

Our 1bn-edge test case was of this same form, input in clustered order, with 
groups of 10 vertices per component.  It failed at 8 x 60GB.  This is the kind 
of data that our application processes, so it is a realistic test for us.  I’ve 
found that social media test data sets tend to follow power-law distributions, 
and that GraphX has much less problem with them.

A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges in 
10-vertex components using the synthetic test input I describe above.  I would 
be curious to know if this works and what settings you use to succeed, and if 
it continues to succeed for random input order.

As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2) behavior 
for large data sets, but it processes the 1bn-edge case on a single 60GB node 
in about 20 minutes.  It degrades gracefully along the O(N^2) curve and 
additional memory reduces time.

John Lilley

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inri

RE: Graphx

2016-03-11 Thread John Lilley
I suppose for a 2.6bn case we’d need Long:

public class GenCCInput {
  public static void main(String[] args) {
if (args.length != 2) {
  System.err.println("Usage: \njava GenCCInput  ");
  System.exit(-1);
}
long edges = Long.parseLong(args[0]);
long groupSize = Long.parseLong(args[1]);
long currentEdge = 1;
long currentGroupSize = 0;
for (long i = 0; i < edges; i++) {
  System.out.println(currentEdge + " " + (currentEdge + 1));
  if (currentGroupSize == 0) {
currentGroupSize = 2;
  } else {
currentGroupSize++;
  }
  if (currentGroupSize >= groupSize) {
currentGroupSize = 0;
currentEdge += 2;
  } else {
currentEdge++;
  }
}
  }
}

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: John Lilley [mailto:john.lil...@redpoint.net]
Sent: Friday, March 11, 2016 8:46 AM
To: Ovidiu-Cristian MARCU <ovidiu-cristian.ma...@inria.fr>
Cc: lihu <lihu...@gmail.com>; Andrew A <andrew.a...@gmail.com>; 
u...@spark.incubator.apache.org; Geoff Thompson <geoff.thomp...@redpoint.net>
Subject: RE: Graphx

Ovidiu,

IMHO, this is one of the biggest issues facing GraphX and Spark.  There are a 
lot of knobs and levers to pull to affect performance, with very little 
guidance about which settings work in general.  We cannot ship software that 
requires end-user tuning; it just has to work.  Unfortunately GraphX seems very 
sensitive to working set size relative to available RAM and fails 
catastrophically as opposed to gracefully when working set is too large.  It is 
also very sensitive to the nature of the data.  For example, if we build a test 
file with input-edge representation like:
1 2
2 3
3 4
5 6
6 7
7 8
…
this represents a graph with connected components in groups of four.  We found 
experimentally that when this data in input in clustered order, the required 
memory is lower and runtime is much faster than when data is input in random 
order.  This makes intuitive sense because of the additional communication 
required for the random order.

Our 1bn-edge test case was of this same form, input in clustered order, with 
groups of 10 vertices per component.  It failed at 8 x 60GB.  This is the kind 
of data that our application processes, so it is a realistic test for us.  I’ve 
found that social media test data sets tend to follow power-law distributions, 
and that GraphX has much less problem with them.

A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges in 
10-vertex components using the synthetic test input I describe above.  I would 
be curious to know if this works and what settings you use to succeed, and if 
it continues to succeed for random input order.

As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2) behavior 
for large data sets, but it processes the 1bn-edge case on a single 60GB node 
in about 20 minutes.  It degrades gracefully along the O(N^2) curve and 
additional memory reduces time.

John Lilley

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, March 11, 2016 8:14 AM
To: John Lilley <john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>>
Cc: lihu <lihu...@gmail.com<mailto:lihu...@gmail.com>>; Andrew A 
<andrew.a...@gmail.com<mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>
Subject: Re: Graphx

Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose your C++ app (algorithm) does not scale if you used only one 
node.
I don’t understand how RDD’s serialization is taking excessive time, compared 
to the total time or other expected time?

For the different RDD times you have events and UI console and a bunch of 
papers describing how measure different things, lihu: did you used some 
incomplete tool or what are you looking for?

Best,
Ovidiu

On 11 Mar 2016, at 16:02, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:

A colleague did the experiments and I don’t know exactly how he observed that.  
I think it was indirect from the Spark diagnostics indicating the amount of I/O 
he deduced that this was RDD serialization.  Also when he added light 
compression to RDD serialization this improved matters.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoin

RE: Graphx

2016-03-11 Thread John Lilley
PS: This is the code I use to generate clustered test dat:

public class GenCCInput {
  public static void main(String[] args) {
if (args.length != 2) {
  System.err.println("Usage: \njava GenCCInput  ");
  System.exit(-1);
}
int edges = Integer.parseInt(args[0]);
int groupSize = Integer.parseInt(args[1]);
int currentEdge = 1;
int currentGroupSize = 0;
for (int i = 0; i < edges; i++) {
  System.out.println(currentEdge + " " + (currentEdge + 1));
  if (currentGroupSize == 0) {
currentGroupSize = 2;
  } else {
currentGroupSize++;
  }
  if (currentGroupSize >= groupSize) {
currentGroupSize = 0;
currentEdge += 2;
  } else {
currentEdge++;
  }
}
  }
}

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, March 11, 2016 8:14 AM
To: John Lilley <john.lil...@redpoint.net>
Cc: lihu <lihu...@gmail.com>; Andrew A <andrew.a...@gmail.com>; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose your C++ app (algorithm) does not scale if you used only one 
node.
I don’t understand how RDD’s serialization is taking excessive time, compared 
to the total time or other expected time?

For the different RDD times you have events and UI console and a bunch of 
papers describing how measure different things, lihu: did you used some 
incomplete tool or what are you looking for?

Best,
Ovidiu

On 11 Mar 2016, at 16:02, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:

A colleague did the experiments and I don’t know exactly how he observed that.  
I think it was indirect from the Spark diagnostics indicating the amount of I/O 
he deduced that this was RDD serialization.  Also when he added light 
compression to RDD serialization this improved matters.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: lihu [mailto:lihu...@gmail.com]
Sent: Friday, March 11, 2016 7:58 AM
To: John Lilley <john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>>
Cc: Andrew A <andrew.a...@gmail.com<mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>
Subject: Re: Graphx

Hi, John:
   I am very intersting in your experiment, How can you get that RDD 
serialization cost lots of time, from the log or some other tools?

On Fri, Mar 11, 2016 at 8:46 PM, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:
Andrew,

We conducted some tests for using Graphx to solve the connected-components 
problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 
100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD 
serialization would take excessive time and then we would get failures.  By 
contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a 
single 16GB node in about an hour.  I think that a very large cluster will do 
better, but we did not explore that.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516<tel:%2B1%C2%A0303%C2%A0541%201516>  | M: +1 720 938 
5761<tel:%2B1%20720%20938%205761> | F: +1 781-705-2077<tel:%2B1%20781-705-2077>
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: Andrew A [mailto:andrew.a...@gmail.com<mailto:andrew.a...@gmail.com>]
Sent: Thursday, March 10, 2016 2:44 PM
To: u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>
Subject: Graphx

Hi, is there anyone who use graphx in production? What maximum size of graphs 
did you process by spark and what cluster are you use for it?

i tried calculate pagerank for 1 Gb edges LJ - dataset for LiveJournalPageRank 
from spark examples and i faced with large volume shuffles produced by spark 
which fail my spark job.
Thank you,
Andrew



RE: Graphx

2016-03-11 Thread John Lilley
Ovidiu,

IMHO, this is one of the biggest issues facing GraphX and Spark.  There are a 
lot of knobs and levers to pull to affect performance, with very little 
guidance about which settings work in general.  We cannot ship software that 
requires end-user tuning; it just has to work.  Unfortunately GraphX seems very 
sensitive to working set size relative to available RAM and fails 
catastrophically as opposed to gracefully when working set is too large.  It is 
also very sensitive to the nature of the data.  For example, if we build a test 
file with input-edge representation like:
1 2
2 3
3 4
5 6
6 7
7 8
…
this represents a graph with connected components in groups of four.  We found 
experimentally that when this data in input in clustered order, the required 
memory is lower and runtime is much faster than when data is input in random 
order.  This makes intuitive sense because of the additional communication 
required for the random order.

Our 1bn-edge test case was of this same form, input in clustered order, with 
groups of 10 vertices per component.  It failed at 8 x 60GB.  This is the kind 
of data that our application processes, so it is a realistic test for us.  I’ve 
found that social media test data sets tend to follow power-law distributions, 
and that GraphX has much less problem with them.

A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges in 
10-vertex components using the synthetic test input I describe above.  I would 
be curious to know if this works and what settings you use to succeed, and if 
it continues to succeed for random input order.

As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2) behavior 
for large data sets, but it processes the 1bn-edge case on a single 60GB node 
in about 20 minutes.  It degrades gracefully along the O(N^2) curve and 
additional memory reduces time.

John Lilley

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, March 11, 2016 8:14 AM
To: John Lilley <john.lil...@redpoint.net>
Cc: lihu <lihu...@gmail.com>; Andrew A <andrew.a...@gmail.com>; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose your C++ app (algorithm) does not scale if you used only one 
node.
I don’t understand how RDD’s serialization is taking excessive time, compared 
to the total time or other expected time?

For the different RDD times you have events and UI console and a bunch of 
papers describing how measure different things, lihu: did you used some 
incomplete tool or what are you looking for?

Best,
Ovidiu

On 11 Mar 2016, at 16:02, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:

A colleague did the experiments and I don’t know exactly how he observed that.  
I think it was indirect from the Spark diagnostics indicating the amount of I/O 
he deduced that this was RDD serialization.  Also when he added light 
compression to RDD serialization this improved matters.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: lihu [mailto:lihu...@gmail.com]
Sent: Friday, March 11, 2016 7:58 AM
To: John Lilley <john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>>
Cc: Andrew A <andrew.a...@gmail.com<mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>
Subject: Re: Graphx

Hi, John:
   I am very intersting in your experiment, How can you get that RDD 
serialization cost lots of time, from the log or some other tools?

On Fri, Mar 11, 2016 at 8:46 PM, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:
Andrew,

We conducted some tests for using Graphx to solve the connected-components 
problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 
100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD 
serialization would take excessive time and then we would get failures.  By 
contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a 
single 16GB node in about an hour.  I think that a very large cluster will do 
better, but we did not explore that.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516<tel:%2B1%C2%A0303%C2%A0541%201516>  | M: +1 720 938 
5761<tel:%2B1%20720%20938%205761> | F: +1 781-705-2077<tel:%2B1%20781-705-2077>
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: 

RE: Graphx

2016-03-11 Thread John Lilley
A colleague did the experiments and I don’t know exactly how he observed that.  
I think it was indirect from the Spark diagnostics indicating the amount of I/O 
he deduced that this was RDD serialization.  Also when he added light 
compression to RDD serialization this improved matters.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: lihu [mailto:lihu...@gmail.com]
Sent: Friday, March 11, 2016 7:58 AM
To: John Lilley <john.lil...@redpoint.net>
Cc: Andrew A <andrew.a...@gmail.com>; u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi, John:
   I am very intersting in your experiment, How can you get that RDD 
serialization cost lots of time, from the log or some other tools?

On Fri, Mar 11, 2016 at 8:46 PM, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:
Andrew,

We conducted some tests for using Graphx to solve the connected-components 
problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 
100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD 
serialization would take excessive time and then we would get failures.  By 
contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a 
single 16GB node in about an hour.  I think that a very large cluster will do 
better, but we did not explore that.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516<tel:%2B1%C2%A0303%C2%A0541%201516>  | M: +1 720 938 
5761<tel:%2B1%20720%20938%205761> | F: +1 781-705-2077<tel:%2B1%20781-705-2077>
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: Andrew A [mailto:andrew.a...@gmail.com<mailto:andrew.a...@gmail.com>]
Sent: Thursday, March 10, 2016 2:44 PM
To: u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>
Subject: Graphx

Hi, is there anyone who use graphx in production? What maximum size of graphs 
did you process by spark and what cluster are you use for it?

i tried calculate pagerank for 1 Gb edges LJ - dataset for LiveJournalPageRank 
from spark examples and i faced with large volume shuffles produced by spark 
which fail my spark job.
Thank you,
Andrew



RE: Graphx

2016-03-11 Thread John Lilley
Andrew,

We conducted some tests for using Graphx to solve the connected-components 
problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 
100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD 
serialization would take excessive time and then we would get failures.  By 
contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a 
single 16GB node in about an hour.  I think that a very large cluster will do 
better, but we did not explore that.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net<mailto:john.lil...@redpoint.net> | 
www.redpoint.net<http://www.redpoint.net/>

From: Andrew A [mailto:andrew.a...@gmail.com]
Sent: Thursday, March 10, 2016 2:44 PM
To: u...@spark.incubator.apache.org
Subject: Graphx

Hi, is there anyone who use graphx in production? What maximum size of graphs 
did you process by spark and what cluster are you use for it?

i tried calculate pagerank for 1 Gb edges LJ - dataset for LiveJournalPageRank 
from spark examples and i faced with large volume shuffles produced by spark 
which fail my spark job.

Thank you,
Andrew


Appropriate Apache Users List Uses

2016-02-09 Thread John Omernik
All, I received this today, is this appropriate list use? Note: This was
unsolicited.

Thanks
John



From: Pierce Lamb <pl...@snappydata.io>
11:57 AM (1 hour ago)
to me

Hi John,

I saw you on the Spark Mailing List and noticed you worked for * and
wanted to reach out. My company, SnappyData, just launched an open source
OLTP + OLAP Database built on Spark. Our lead investor is Pivotal, whose
largest owner is EMC which makes * like a father figure :)

SnappyData’s goal is two fold: Operationalize Spark and deliver truly
interactive queries. To do this, we first integrated Spark with an
in-memory database with a pedigree of production customer deployments:
GemFireXD (GemXD).

GemXD operationalized Spark via:

-- True high availability

-- A highly concurrent environment

-- An OLTP engine that can process transactions (mutable state)

With GemXD as a storage engine, we packaged SnappyData with Approximate
Query Processing (AQP) technology. AQP enables interactive response times
even when data volumes are huge because it allows the developer to trade
latency for accuracy. AQP queries (SQL queries with a specified error rate)
execute on sample tables -- tables that have taken a stratified sample of
the full dataset. As such, AQP queries enable much faster decisions when
100% accuracy isn’t needed and sample tables require far fewer resources to
manage.

If that sounds interesting to you, please check out our Github repo (our
release is hosted there under “releases”):

https://github.com/SnappyDataInc/snappydata

We also have a technical paper that dives into the architecture:
http://www.snappydata.io/snappy-industrial

Are you currently using Spark at ? I’d love to set up a call with you
and hear about how you’re using it and see if SnappyData could be a fit.

In addition to replying to this email, there are many ways to chat with us:
https://github.com/SnappyDataInc/snappydata#community-support

Hope to hear from you,

Pierce

pl...@snappydata.io

http://www.twitter.com/snappydata


[ANNOUNCE] Apache Nutch 2.3.1 Release

2016-01-21 Thread lewis john mcgibbney
Hi Folks,

!!Apologies for cross posting!!

The Apache Nutch PMC are pleased to announce the immediate release of
Apache Nutch v2.3.1, we advise all current users and developers of the 2.X
series to upgrade to this release.

Nutch is a well matured, production ready Web crawler. Nutch 2.X branch is
becoming an emerging alternative taking direct inspiration from Nutch 1.X
series. 2.X differs in one key area; storage is abstracted away from any
specific underlying data store by using Apache Gora™
 for handling object to persistent data store
mappings.

The recommended Gora backends for this Nutch release are

   - Apache Avro 1.7.6
   - Apache Hadoop 1.2.1 and 2.5.2
   - Apache HBase 0.98.8-hadoop2 (although also tested with 1.X)
   - Apache Cassandra 2.0.2
   - Apache Solr 4.10.3
   - MongoDB 2.6.X
   - Apache Accumlo 1.5.1
   - Apache Spark 1.4.1

This bug fix release contains around 40 issues addressed. For a complete
overview of these issues please see the release report
.

As usual in the 2.X series, release artifacts are made available as only
source and also available within Maven Central

as a Maven dependency. The release is available from our DOWNLAODS PAGE
.

Thank you to everyone that contributed towards this release.


Problem About Worker System.out

2015-12-28 Thread David John
I have used  Spark 1.4  for 6 months.  Thanks  all the members of this 
community for your great work.I have a question  about the logging issue. I 
hope this question can be solved.
The program is running under this configurations: YARN Cluster, YARN-client 
mode.
In Scala,writing a code like:rdd.map( a => println(a) );   will get the output 
about the value of a in our console.
However,in Java (1.7),writing rdd.map( new Function(){ 
@Override public  Integer   call(Integer a) throws Exception {  
System.out.println(a); }});won't get the output in our console.
The configuration is the same.
I have try this code but not work either: rdd.map( new 
Function(){ @Override public  Integer   call(Integer 
a) throws Exception {org.apache.log4j.Logger log = 
Logger.getLogger(this.getClass()); log.info(a); 
log.warn(a); log.error(a); log.fatal(a); }});
No output either:final   org.apache.log4j.Logger log = 
Logger.getLogger(this.getClass()); rdd.map( new Function(){
 @Override public  Integer   call(Integer a) throws Exception { 
log.info(a); log.warn(a); log.error(a); 
log.fatal(a); }});
It seems that the output of stdout in worker doesn't send the output back to 
our driver.I am wonder why it works in scala but not in java.Is there a  simple 
way to make java work like scala?
Thanks. 
  

FW: Problem About Worker System.out

2015-12-28 Thread David John




Thanks.
Can we use a slf4j/log4j logger to transfer our message from a worker  to a  
driver?I saw some discussions say that we can use this code to transfer their 
message:object Holder extends Serializable {  
   @transient lazy val log = Logger.getLogger(getClass.getName)
}


val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
   Holder.log.info(element)
}ref: 
http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala

Is this a traditional way?Or Spark has a SocketAppender for developer?Date: 
Mon, 28 Dec 2015 17:52:17 +0800
Subject: Re: Problem About Worker System.out
From: sai.sai.s...@gmail.com
To: david_john_2...@outlook.com
CC: user@spark.apache.org

Stdout will not be sent back to driver, no matter you use Scala or Java. You 
must do something wrongly that makes you think it is an expected behavior.
On Mon, Dec 28, 2015 at 5:33 PM, David John <david_john_2...@outlook.com> wrote:



I have used  Spark 1.4  for 6 months.  Thanks  all the members of this 
community for your great work.I have a question  about the logging issue. I 
hope this question can be solved.
The program is running under this configurations: YARN Cluster, YARN-client 
mode.
In Scala,writing a code like:rdd.map( a => println(a) );   will get the output 
about the value of a in our console.
However,in Java (1.7),writing rdd.map( new Function<Integer,Integer>(){ 
@Override public  Integer   call(Integer a) throws Exception {  
System.out.println(a); }});won't get the output in our console.
The configuration is the same.
I have try this code but not work either: rdd.map( new 
Function<Integer,Integer>(){ @Override public  Integer   call(Integer 
a) throws Exception {org.apache.log4j.Logger log = 
Logger.getLogger(this.getClass()); log.info(a); 
log.warn(a); log.error(a); log.fatal(a); }});
No output either:final   org.apache.log4j.Logger log = 
Logger.getLogger(this.getClass()); rdd.map( new Function<Integer,Integer>(){
 @Override public  Integer   call(Integer a) throws Exception { 
log.info(a); log.warn(a); log.error(a); 
log.fatal(a); }});
It seems that the output of stdout in worker doesn't send the output back to 
our driver.I am wonder why it works in scala but not in java.Is there a  simple 
way to make java work like scala?
Thanks. 
  


  

Conf Settings in Mesos

2015-11-12 Thread John Omernik
Hey all,

I noticed today that if I take a tgz as my URI for Mesos, that I have to
repackaged it with my conf settings from where I execute say pyspark for
the executors to have the right configuration settings.

That is...

If I take a "stock" tgz from makedistribution.sh, unpack it, and then set
the URI in spark-defaults to be the unmodified tgz as the URI. Change other
settings in both spark-defaults.conf and spark-env.sh, then run
./bin/pyspark from that unpacked directory, I guess I would have thought
that when the executor spun up, that some sort of magic was happening where
the conf directory or the conf settings would propagate out to the
executors (thus making configuration changes easier to manage)

For things to work, I had to unpack the tgz, change conf settings, then
repackage the tgz with all my conf settings for the tgz in the URI then run
it. Then it seemed to work.

I have a work around, but I guess, from a usability point of view, it would
be nice to have tgz that is "binaries" and that when it's run, it takes the
conf at run time. It would help with managing multiple configurations that
are using the same binaries (different models/apps etc) Instead of having
to repackage an tgz for each app, it would just propagate...am I looking at
this wrong?

John


NPE is Spark Running on Mesos in Finegrained Mode

2015-11-12 Thread John Omernik
I have stumbled across and interesting (potential) bug.  I have an
environment that is MapR FS and Mesos.  I've posted a bit in the past
around getting this setup to work with Spark Mesos, and MapR and the Spark
community has been helpful.

In 1.4.1, I was able to get Spark working in this setup via setting
"spark.driver.extraClassPath"
and "spark.executor.extraClassPath" to include some specific MapR
libraries. That seemed to work, and all was well.

Fast forward to today, and I was trying to work with the "hadoop-provided"
download from the apache site, and things seemed to work with that setting,
i.e. in pyspark, when I ran this test:

tf = sc.textFile("/path/to/a/file/in/maprfs/file.json")
tf.count()

I didn't get the "No filesystem for scheme maprfs" but instead I got the
NPE on the tasks (NPE listed below). which was really similar to the NPE
issues I was getting on the 1.4.1 when I was not setting the classpath.

So I was frustrated, and started playing with settings as I am apt to do,
and what I realized is when I changed the mesos resource allocation to
coarse mode, it worked!

So I did some other tests to isolate this, and sure enough, ONLY changing
the spark.mesos.coarse setting to true, while leaving all else the same
fixes the issue. When I set to false (fine grained) then it failed with the
NPE.

So I thought about this, and I did open a case with MapR, but this seems to
me to be odd in how Spark is handling things. Perhaps the classpath
information isn't being properly propagated to the tasks in fine mode, but
when I run in coarse grained mode, the executor is properly seeing the
spark.executor.extraClassPath?  Is there a spark.task.extraClassPath? (I
will be googling this as well).

I am curious on this behaivior, and if it's something that might point to a
bug or if it's just classic uninitiated user error :)

John



NPE in Fine Grained Mode:

15/11/12 13:52:00 INFO storage.DiskBlockManager: Created local directory at
/tmp/blockmgr-94b6962b-2c28-4c10-946c-bd3b5c8c8069
15/11/12 13:52:00 INFO storage.MemoryStore: MemoryStore started with
capacity 1060.0 MB
java.lang.NullPointerException
at com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:109)
at com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:251)
at com.mapr.fs.ShimLoader.load(ShimLoader.java:213)
at
org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:61)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2147)
at
org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2362)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2579)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2531)
at
org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2444)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1156)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1128)
at
org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:107)
at
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:52)
at
org.apache.spark.deploy.SparkHadoopUtil$.hadoop$lzycompute(SparkHadoopUtil.scala:383)
at
org.apache.spark.deploy.SparkHadoopUtil$.hadoop(SparkHadoopUtil.scala:383)
at
org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:403)
at
org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:2049)
at
org.apache.spark.storage.BlockManager.(BlockManager.scala:97)
at
org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:347)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:218)
at
org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:70)
java.lang.RuntimeException: Failure loading MapRClient.
at com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:295)
at com.mapr.fs.ShimLoader.load(ShimLoader.java:213)
at
org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:61)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2147)
at
org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2362)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2579)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2531)
at
org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2444)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1156)
at org.apach

Different classpath across stages?

2015-11-11 Thread John Meehan
I’ve been running into a strange class not found problem, but only when my job 
has more than one phase.  I have an RDD[ProtobufClass] which behaves as 
expected in a single-stage job (e.g. serialize to JSON and export).  But when I 
try to groupByKey, the first stage runs (essentially a keyBy), but eventually 
errors with the relatively common ‘unable to find protocol buffer class’ error 
for the first task of the second stage.  I’ve tried the userClassPathFirst 
options, but then the whole job fails.  So I’m wondering if there is some kind 
of configuration I can use to help Spark resolve the right protocol buffer 
class across stage boundaries?

-John
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Question about GraphX connected-components

2015-10-12 Thread John Lilley
Thanks Igor,
We are definitely thinking along these lines, but I am hoping to shortcut our 
search of the Spark/GraphX tuning parameter space to find a reasonable set of 
starting points.  There are simultaneous questions of “what should we expect 
form GraphX?” and “what are the best parameters to do that?”.

What I’m asking is fairly specific:
-- What is a good set of tuning parameters (partitions, memory?) for a large 
data set that “should” fit into memory on an 8-node cluster with 8GB/node 
available to YARN?
-- Does anyone have or know of sample code that performs well on a real data 
set without adjusting lots of tuning knobs first?
-- How much available YARN memory is required to hold a given number of 
vertices+edges, with enough cushion to be comfortable?  You are giving some 
tantalizing hints (3x as much as I expected…), but no clear indication of how 
much memory should be needed.  Arriving at the answer through experimentation 
isn’t a good approach, because that assumes -- chicken-and-egg problem -- that 
we have already arrived at an optimal configuration.
-- Does GraphX connected-components performance degrade slowly or 
catastrophically when that memory limit is reached?  Are there tuning 
parameters that optimize for data all fitting in memory vs. data that must 
spill?

Thanks,
John Lilley

From: Igor Berman [mailto:igor.ber...@gmail.com]
Sent: Saturday, October 10, 2015 12:06 PM
To: John Lilley <john.lil...@redpoint.net>
Cc: user@spark.apache.org; Geoff Thompson <geoff.thomp...@redpoint.net>
Subject: Re: Question about GraphX connected-components

let's start from some basics: might be u need to split your data into more 
partitions?
spilling depends on your configuration when you create graph(look for storage 
level param) and your global configuration.
in addition, you assumption of 64GB/100M is probably wrong, since spark divides 
memory into 3 regions - for in memory caching, for shuffling and for 
"workspace" of serialization/deserialization etc see fraction parameters.

so depending on number of your partitions might be worker will try to ingest 
too much data at once(#cores * memory pressure of one task per one partition)

there is no such thing as "right" configuration. It depends on your 
application. You can post your configuration and people will suggest some 
tunning, still best way is to try what is best for ur case depending on what u 
see in spark ui metrics(as starting point)

On 10 October 2015 at 00:13, John Lilley 
<john.lil...@redpoint.net<mailto:john.lil...@redpoint.net>> wrote:
Greetings,
We are looking into using the GraphX connected-components algorithm on Hadoop 
for grouping operations.  Our typical data is on the order of 50-200M vertices 
with an edge:vertex ratio between 2 and 30.  While there are pathological cases 
of very large groups, they tend to be small.  I am trying to get a handle on 
the level of performance and scaling we should expect, and how to best 
configure GraphX/Spark to get there.  After some trying, we cannot get to 100M 
vertices/edges without running out of memory on a small cluster (8 nodes with 4 
cores and 8GB available for YARN on each node).  This limit seems low, as 
64GB/100M is 640 bytes per vertex, which should be enough.  Is this within 
reason?  Does anyone have sample they can share that has the right 
configurations for succeeding with this size of data and cluster?  What level 
of performance should we expect?  What happens when the data set exceed memory, 
does it spill to disk “nicely” or degrade catastrophically?

Thanks,
John Lilley




Question about GraphX connected-components

2015-10-09 Thread John Lilley
Greetings,
We are looking into using the GraphX connected-components algorithm on Hadoop 
for grouping operations.  Our typical data is on the order of 50-200M vertices 
with an edge:vertex ratio between 2 and 30.  While there are pathological cases 
of very large groups, they tend to be small.  I am trying to get a handle on 
the level of performance and scaling we should expect, and how to best 
configure GraphX/Spark to get there.  After some trying, we cannot get to 100M 
vertices/edges without running out of memory on a small cluster (8 nodes with 4 
cores and 8GB available for YARN on each node).  This limit seems low, as 
64GB/100M is 640 bytes per vertex, which should be enough.  Is this within 
reason?  Does anyone have sample they can share that has the right 
configurations for succeeding with this size of data and cluster?  What level 
of performance should we expect?  What happens when the data set exceed memory, 
does it spill to disk "nicely" or degrade catastrophically?

Thanks,
John Lilley



Python Packages in Spark w/Mesos

2015-09-21 Thread John Omernik
Hey all -

Curious at the best way to include python packages in my Spark
installation. (Such as NLTK). Basically I am running on Mesos, and would
like to find a way to include the package in the binary distribution in
that I don't want to install packages on all nodes.  We should be able to
include in the distribution, right?.

I thought of using the Docker Mesos integration, but I have been unable to
find information on this (see my other question on Docker/Mesos/Spark).
Any other thoughts on the best way to include packages in Spark WITHOUT
installing on each node would be appreciated!

John


Mesos Tasks only run on one node

2015-09-21 Thread John Omernik
I have a happy healthy Mesos cluster (0.24) running in my lab.  I've
compiled spark-1.5.0 and it seems to be working fine, except for one small
issue, my tasks all seem to run on one node. (I have 6 in the cluster).

Basically, I have directory of compressed text files.  Compressed, these 25
files add up to 1.2 GB of data, in bin/pyspark I do:

txtfiles = sc.textFile("/path/to/my/data/*")
txtfiles.count()

This goes through and gives me the correct count, but all my tasks (25 of
them) run on one node, let's call it node4.

Interesting.

So I was running spark from node4, but I would have thought it would have
hit up more nodes.

So I ran it on node5.  In executors tab on the spark UI, there is only one
registered, and it's node4, and once again all tasks ran on node4.

I am running in fine grain mode... is there a setting somewhere to allow
for more executors? This seems weird. I've been away from Spark from 1.2.x
but I don't seem to remember this...


Docker/Mesos with Spark

2015-09-19 Thread John Omernik
I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and
just found you CAN run it this way.  Are there any user posts, blog posts,
etc on why and how you'd do this?

Basically, at first I was questioning why you'd run spark in a docker
container, i.e., if you run with tar balled executor, what are you really
gaining?  And in this setup, are you losing out on performance somehow? (I
am guessing smarter people than I have figured that out).

Then I came along a situation where I wanted to use a python library with
spark, and it had to be installed on every node, and I realized one big
advantage of dockerized spark would be that spark apps that needed other
libraries could be contained and built well.

OK, that's huge, let's do that.  For my next question there are lot of
"questions" have on how this actually works.  Does Clustermode/client mode
apply here? If so, how?  Is there a good walk through on getting this
setup? Limitations? Gotchas?  Should I just dive in an start working with
it? Has anyone done any stories/rough documentation? This seems like a
really helpful feature to scaling out spark, and letting developers truly
build what they need without tons of admin overhead, so I really want to
explore.

Thanks!

John


[ANNOUNCE] Apache Gora 0.6.1 Release

2015-09-15 Thread lewis john mcgibbney
Hi All,

The Apache Gora team are pleased to announce the immediate availability of
Apache Gora 0.6.1.

What is Gora?
Gora is a framework which provides an in-memory data model and persistence
for big data. Gora supports persisting to column stores, key value stores,
document stores and RDBMSs, and analyzing the data with extensive Apache
Hadoop™  MapReduce
 support. This
release also offers input and output formats for Apache Spark.

Whats in this release?

This release addresses a modest 21 issues  with
many improvements and bug fixes for the gora-mongodb
 module, resolution of a
major bug whilst flushing data to Apache Solr, a gora-gradle plugin
 and our Gora Spark backend
support . Drop by our
mailing lists and ask questions for information on any of the above.

We provide Gora support for the following projects

   - Apache Avro 1.7.6
   - Apache Hadoop 1.2.1 and 2.5.2
   - Apache HBase 0.98.8-hadoop2 (although also tested with 1.X)
   - Apache Cassandra 2.0.2
   - Apache Solr 4.10.3
   - MongoDB 2.6.X
   - Apache Accumlo 1.5.1
   - Apache Spark 1.4.1

Gora is released as both source code, downloads for which can be found at
our downloads page  as well as Maven
artifacts which can be found on Maven central
.
Thank you
Lewis
(On behalf of the Apache Gora PMC)

http://people.apache.org/~lewismc || @hectorMcSpector ||
http://www.linkedin.com/in/lmcgibbney

  Apache Gora V.P || Apache Nutch PMC || Apache Any23 V.P ||
Apache OODT PMC
   Apache Open Climate Workbench PMC || Apache Tika PMC || Apache TAC
Apache Usergrid || Apache HTrace (incubating) || Apache CommonsRDF
(incubating)


Re: insert overwrite table phonesall in spark-sql resulted in java.io.StreamCorruptedException

2015-08-20 Thread John Jay
The answer is that my table was not serialized by kyro,but I started
spark-sql shell with kyro,so the data could not be deserialized。



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/insert-overwrite-table-phonesall-in-spark-sql-resulted-in-java-io-StreamCorruptedException-tp23579p24354.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ERROR SparkUI: Failed to bind SparkUI java.net.BindException: Address already in use: Service 'SparkUI' failed after 16 retries!

2015-07-24 Thread Joji John
HI,

I am getting this error for some of spark applications. I have multiple spark 
applications running in parallel. Is there a limit in the number of spark 
applications that I can run in parallel.



ERROR SparkUI: Failed to bind SparkUI

java.net.BindException: Address already in use: Service 'SparkUI' failed after 
16 retries!





Thanks

Joji john



Re: ERROR SparkUI: Failed to bind SparkUI java.net.BindException: Address already in use: Service 'SparkUI' failed after 16 retries!

2015-07-24 Thread Joji John
Thanks Ajay.


The way we wrote our spark application is that we have a generic python code, 
multiple instances of which can be called using different parameters. Does 
spark offer any function to bind it to a available port?


I guess the other option is to define a function to find open port and use that.


Thanks

Joji John



From: Ajay Singal asinga...@gmail.com
Sent: Friday, July 24, 2015 6:59 AM
To: Joji John
Cc: user@spark.apache.org
Subject: Re: ERROR SparkUI: Failed to bind SparkUI java.net.BindException: 
Address already in use: Service 'SparkUI' failed after 16 retries!

Hi Jodi,

I guess, there is no hard limit on number of Spark applications running in 
parallel.  However, you need to ensure that you do not use the same (e.g., 
default) port numbers for each application.

In your specific case, for example, if you try using default SparkUI port 
4040 for more than one Spark applications, the first application you start 
will bind to port 4040. So, this port becomes unavailable (at this moment).  
Therefore, all subsequent applications you start will get SparkUI BindException.

To solve this issue, simply use non-competing port numbers, e.g., 4040, 4041, 
4042...

Thanks,
Ajay

On Fri, Jul 24, 2015 at 6:21 AM, Joji John 
jj...@ebates.commailto:jj...@ebates.com wrote:

HI,

I am getting this error for some of spark applications. I have multiple spark 
applications running in parallel. Is there a limit in the number of spark 
applications that I can run in parallel.



ERROR SparkUI: Failed to bind SparkUI

java.net.BindException: Address already in use: Service 'SparkUI' failed after 
16 retries!





Thanks

Joji john




many-to-many join

2015-07-21 Thread John Berryman
Quick example problem that's stumping me:

* Users have 1 or more phone numbers and therefore one or more area codes.
* There are 100M users.
* States have one or more area codes.
* I would like to the states for the users (as indicated by phone area
code).

I was thinking about something like this:

If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567])
and area_code_state looks like (area_code,state) ex: (615, [Tennessee])
then we could do

states_and_users_mixed = area_code_user.join(area_code_state) \
.reduceByKey(lambda a,b: a+b) \
.values()

user_state_pairs = states_and_users_mixed.flatMap(
emit_cartesian_prod_of_userids_and_states )
user_to_states =   user_state_pairs.reduceByKey(lambda a,b: a+b)

user_to_states.first(1)

 (1234567,[Tennessee,Tennessee,California])

This would work, but the user_state_pairs is just a list of user_ids and
state names mixed together and emit_cartesian_prod_of_userids_and_states
has to correctly pair them. This is problematic because 1) it's weird and
sloppy and 2) there will be lots of users per state and having so many
users in a single row is going to make
emit_cartesian_prod_of_userids_and_states work extra hard to first locate
states and then emit all userid-state pairs.

How should I be doing this?

Thanks,
-John


insert overwrite table phonesall in spark-sql resulted in java.io.StreamCorruptedException

2015-07-02 Thread John Jay
My spark-sql command:

spark-sql --driver-memory 2g --master spark://hadoop04.xx.xx.com:8241 --conf
spark.driver.cores=20 --conf spark.cores.max=20 --conf
spark.executor.memory=2g --conf spark.driver.memory=2g --conf
spark.akka.frameSize=500 --conf spark.eventLog.enabled=true --conf
spark.eventLog.dir=file:///newdisk/tmp/spark-events --conf
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC --driver-java-options
-XX:-UseGCOverheadLimit -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=dumpsparksql.log

spark-sql output:

...
15/07/02 16:59:53 ERROR SparkSQLDriver: Failed in [insert overwrite table
phonesall  select phoneNumber from phone]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in
stage 5.0 failed 4 times, most recent failure: Lost task 2.3 in stage 5.0
(TID 46, hadoop04.xx.xx.com): java.io.StreamCorruptedException: invalid type
code: 61
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1373)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in
stage 5.0 failed 4 times, most recent failure: Lost task 2.3 in stage 5.0
(TID 46, m1-ite-hadoop04.m1.baidu.com): java.io.StreamCorruptedException:
invalid type code: 61
at 

PySpark on YARN port out of range

2015-06-19 Thread John Meehan
Has anyone encountered this “port out of range” error when launching PySpark 
jobs on YARN?  It is sporadic (e.g. 2/3 jobs get this error).

LOG:

15/06/19 11:49:44 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 39.0 
(TID 211) on executor xxx.xxx.xxx.com http://xxx.xxx.xxx.com/: 
java.lang.IllegalArgumentException (port out of range:1315905645) [duplicate 7]
Traceback (most recent call last):
 File stdin, line 1, in module
15/06/19 11:49:44 INFO cluster.YarnScheduler: Removed TaskSet 39.0, whose tasks 
have all completed, from pool
 File /home/john/spark-1.4.0/python/pyspark/rdd.py, line 745, in collect
   port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
 File 
/home/john/spark-1.4.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, 
line 538, in __call__
 File 
/home/john/spark-1.4.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 
300, in get_return_value
py4j.protocol.Py4JJavaError15/06/19 11:49:44 INFO storage.BlockManagerInfo: 
Removed broadcast_38_piece0 on 17.134.160.35:47455 in memory (size: 2.2 KB, 
free: 265.4 MB)
: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 39.0 failed 4 times, most recent failure: Lost task 1.3 in stage 39.0 
(TID 210, xxx.xxx.xxx.com http://xxx.xxx.xxx.com/): 
java.lang.IllegalArgumentException: port out of range:1315905645
at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
at java.net.InetSocketAddress.init(InetSocketAddress.java:185)
at java.net.Socket.init(Socket.java:241)
at 
org.apache.spark.api.python.PythonWorkerFactory.createSocket$1(PythonWorkerFactory.scala:75)
at 
org.apache.spark.api.python.PythonWorkerFactory.liftedTree1$1(PythonWorkerFactory.scala:90)
at 
org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:130)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:73)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

* Spark 1.4.0 build: 

build/mvn -Pyarn -Phive -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.1.4 
-DskipTests clean package

LAUNCH CMD:

export HADOOP_CONF_DIR=/path/to/conf
export PYSPARK_PYTHON=/path/to/python-2.7.2/bin/python
~/spark-1.4.0/bin/pyspark \
--conf 
spark.yarn.jar=/home/john/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0-hadoop2.3.0-cdh5.1.4.jar
 \
--master yarn-client \
--num-executors 3 \
--executor-cores 18 \
--executor-memory 48g

TEST JOB IN REPL:

words = [‘hi’, ‘there’, ‘yo’, ‘baby’]
wordsRdd = sc.parallelize(words)
words.map(lambda x: (x,1)).collect()

Fwd: Spark/PySpark errors on mysterious missing /tmp file

2015-06-12 Thread John Berryman
$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 In case you're wondering

 b.take(10)

[(16744491, 1),
 (16203827, 1),
 (16695357, 1),
 (16958298, 1),
 (16400458, 1),
 (16810060, 1),
 (11452497, 1),
 (14803033, 1),
 (15630426, 1),
 (14917736, 1)]

So maybe (I thought) there's some weird number in there that overflows or
something, and collecting and re-parallelizing fixes the problem. This
next bit of code proves this assumption wrong.

 a=sc.parallelize([(16646160,1)])
 b=stuff.map(lambda x:(16646160,1))
 #b=sc.parallelize(b.collect())
 a.join(b).take(10)

It still breaks. (Here again including the comment line fixes the problem.)

So I'm apparently looking at some sort of spark/pyspark bug. Spark 1.2.0.
Any idea?

-John


Reopen Jira or New Jira

2015-06-11 Thread John Omernik
Hey all, from my other post on Spark 1.3.1 issues, I think we found an
issue related to a previous closed Jira (
https://issues.apache.org/jira/browse/SPARK-1403)  Basically it looks like
the threat context class loader is NULL which is causing the NPE in MapR
and that's similar to posted Jira. New comments have been added to that
Jira, but I am not sure how to trace back changes to determine why it was
NULL in 0.9 apparently fixed in 1.0 working in 1.2 and then broken from
1.2.2 onward.

Is it possible to open a closed Jira? Should I open another? I think MapR
is working to handle in their code, but I think someone (with more
knowledge than I) should probably look into this on Spark as well due it
appearing to have changed behavior between versions.

Thoughts?

John


Previous Post

All -

I am facing and odd issue and I am not really sure where to go for support
at this point.  I am running MapR which complicates things as it relates to
Mesos, however this HAS worked in the past with no issues so I am stumped
here.

So for starters, here is what I am trying to run. This is a simple show
tables using the Hive Context:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row, HiveContext
sparkhc = HiveContext(sc)
test = sparkhc.sql(show tables)
for r in test.collect():
  print r

When I run it on 1.3.1 using ./bin/pyspark --master local  This works with
no issues.

When I run it using Mesos with all the settings configured (as they had
worked in the past) I get lost tasks and when I zoom in them, the error
that is being reported is below.  Basically it's a NullPointerException on
the com.mapr.fs.ShimLoader.  What's weird to me is is I took each instance
and compared both together, the class path, everything is exactly the same.
Yet running in local mode works, and running in mesos fails.  Also of note,
when the task is scheduled to run on the same node as when I run locally,
that fails too! (Baffling).

Ok, for comparison, how I configured Mesos was to download the mapr4
package from spark.apache.org.  Using the exact same configuration file
(except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0.
When I run this example with the mapr4 for 1.2.0 there is no issue in
Mesos, everything runs as intended. Using the same package for 1.3.1 then
it fails.

(Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as
well).

So basically When I used 1.2.0 and followed a set of steps, it worked on
Mesos and 1.3.1 fails.  Since this is a current version of Spark, MapR is
supports 1.2.1 only.  (Still working on that).

I guess I am at a loss right now on why this would be happening, any
pointers on where I could look or what I could tweak would be greatly
appreciated. Additionally, if there is something I could specifically draw
to the attention of MapR on this problem please let me know, I am perplexed
on the change from 1.2.0 to 1.3.1.

Thank you,

John




Full Error on 1.3.1 on Mesos:
15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity
1060.3 MB java.lang.NullPointerException at
com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at
com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at
com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at
org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60)
at java.lang.Class.forName0(Native Method) at
java.lang.Class.forName(Class.java:274) at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847)
at
org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at
org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at
org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98)
at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at
org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1959) at
org.apache.spark.storage.BlockManager.(BlockManager.scala:104) at
org.apache.spark.storage.BlockManager.(BlockManager.scala:179) at
org.apache.spark.SparkEnv$.create(SparkEnv.scala:310) at
org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:186) at
org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:70)
java.lang.RuntimeException: Failure loading MapRClient. at
com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:283) at
com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at
org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60)
at java.lang.Class.forName0(Native Method

Re: Spark 1.3.1 On Mesos Issues.

2015-06-08 Thread John Omernik
It appears this may be related.

https://issues.apache.org/jira/browse/SPARK-1403

Granted the NPE is in MapR's code, having Spark (seemingly, I am not an
expert here, just basing it off the comments) switch in its behavior (if
that's what it is doing) probably isn't good either. I guess the level that
this is happening at is way above my head.  :)



On Fri, Jun 5, 2015 at 4:38 PM, John Omernik j...@omernik.com wrote:

 Thanks all. The answers post is me too, I multi thread. That and Ted is
 aware to and Mapr is helping me with it.  I shall report the answer of that
 investigation when we have it.

 As to reproduction, I've installed mapr file system, tired both version
 4.0.2 and 4.1.0.  Have mesos running along side mapr, and then I use
 standard methods for submitting spark jobs to mesos. I don't have my
 configs now, on vacation :) but I can shar on Monday.

 I appreciate the support I am getting from every one, mesos community,
 spark community, and mapr.  Great to see folks solving problems and I will
 be sure report back findings as they arise.



 On Friday, June 5, 2015, Tim Chen t...@mesosphere.io wrote:

 It seems like there is another thread going on:


 http://answers.mapr.com/questions/163353/spark-from-apache-downloads-site-for-mapr.html

 I'm not particularly sure why, seems like the problem is that getting the
 current context class loader is returning null in this instance.

 Do you have some repro steps or config we can try this?

 Tim

 On Fri, Jun 5, 2015 at 3:40 AM, Steve Loughran ste...@hortonworks.com
 wrote:


  On 2 Jun 2015, at 00:14, Dean Wampler deanwamp...@gmail.com wrote:

  It would be nice to see the code for MapR FS Java API, but my google
 foo failed me (assuming it's open source)...


  I know that MapRFS is closed source, don't know about the java JAR.
 Why not ask Ted Dunning (cc'd)  nicely to see if he can track down the
 stack trace for you.

   So, shooting in the dark ;) there are a few things I would check, if
 you haven't already:

  1. Could there be 1.2 versions of some Spark jars that get picked up
 at run time (but apparently not in local mode) on one or more nodes? (Side
 question: Does your node experiment fail on all nodes?) Put another way,
 are the classpaths good for all JVM tasks?
 2. Can you use just MapR and Spark 1.3.1 successfully, bypassing Mesos?

  Incidentally, how are you combining Mesos and MapR? Are you running
 Spark in Mesos, but accessing data in MapR-FS?

  Perhaps the MapR shim library doesn't support Spark 1.3.1.

  HTH,

  dean

  Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com/
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Jun 1, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote:

 All -

  I am facing and odd issue and I am not really sure where to go for
 support at this point.  I am running MapR which complicates things as it
 relates to Mesos, however this HAS worked in the past with no issues so I
 am stumped here.

  So for starters, here is what I am trying to run. This is a simple
 show tables using the Hive Context:

  from pyspark import SparkContext, SparkConf
 from pyspark.sql import SQLContext, Row, HiveContext
 sparkhc = HiveContext(sc)
 test = sparkhc.sql(show tables)
 for r in test.collect():
   print r

  When I run it on 1.3.1 using ./bin/pyspark --master local  This works
 with no issues.

  When I run it using Mesos with all the settings configured (as they
 had worked in the past) I get lost tasks and when I zoom in them, the error
 that is being reported is below.  Basically it's a NullPointerException on
 the com.mapr.fs.ShimLoader.  What's weird to me is is I took each instance
 and compared both together, the class path, everything is exactly the same.
 Yet running in local mode works, and running in mesos fails.  Also of note,
 when the task is scheduled to run on the same node as when I run locally,
 that fails too! (Baffling).

  Ok, for comparison, how I configured Mesos was to download the mapr4
 package from spark.apache.org.  Using the exact same configuration
 file (except for changing the executor tgz from 1.2.0 to 1.3.1) from the
 1.2.0.  When I run this example with the mapr4 for 1.2.0 there is no issue
 in Mesos, everything runs as intended. Using the same package for 1.3.1
 then it fails.

  (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails
 as well).

  So basically When I used 1.2.0 and followed a set of steps, it worked
 on Mesos and 1.3.1 fails.  Since this is a current version of Spark, MapR
 is supports 1.2.1 only.  (Still working on that).

  I guess I am at a loss right now on why this would be happening, any
 pointers on where I could look or what I could tweak would be greatly
 appreciated. Additionally, if there is something I could specifically draw
 to the attention of MapR on this problem please let me know, I am

Transform Functions and Python Modules

2015-06-08 Thread John Omernik
I am learning more about Spark (and in this case Spark Streaming) and am
getting that a functions like dstream.map()  takes a function call and does
something to each element of the rdd and that in turn returns a new rdd
based on the original.

That's cool for the simple map functions in the examples where a lambda is
used to to take x and do x * x but what happens in Python  (specifically)
with more complex functions? Especially those that use modules (that ARE
build in on all nodes).

For example, instead of a simple map, I want to take  line of data and
regex parse it into fields. It's still not a map (not a flat map) in that
it's a one to one return. (One record of the RDD, a line of text, would
return on parsed record in a Python dict)

in my Spark Streaming Job, I have import re in the main part of the file,
and this all seems to work, but I want to ensure I am not by default
forcing computations in the driver rather than distributed.

This is working as in it's returning the expected data, however I want to
ensure I am not doing something weird by having a transform function using
a module that's imported only at the driver.  (Should I be calling import
re IN the functioon?)

If there are any good docs on this, I'd love to understand it more.

Thanks!

John



Example

def parseLine(line):


restr = ^(\w\w\w  ?\d\d? \d\d:\d\d:\d\d) ([^ ]+) 

logre = re.compile(restr)

m = logre.search(line[1]) # Why does every record of he RDD have a NONE
value in the first position of the tuple?

rec = {}

if m:

rec['field1'] = m.group(1)

rec['field2] = m.group(2)

return rec


fwlog_dstream = KafkaUtils.createStream(ssc, zkQuorum,
sparkstreaming-fwlog_parsed, {kafka_src_topic: 1})

recs = fwlog_dstream.map(parseLine)


Re: Spark Streaming for Each RDD - Exception on Empty

2015-06-05 Thread John Omernik
I am using Spark 1.3.1.   So I don't have the 1.4.0 isEmpty.  I guess I am
curious on the right approach here, like I said in my original post,
perhaps this isn't bad but I the exceptions I guess bother me from a
programmer level... is that wrong? :)



On Fri, Jun 5, 2015 at 11:07 AM, Ted Yu yuzhih...@gmail.com wrote:

 John:
 Which Spark release are you using ?
 As of 1.4.0, RDD has this method:

   def isEmpty(): Boolean = withScope {

 FYI

 On Fri, Jun 5, 2015 at 9:01 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Foreachpartition callback is provided with Iterator by the Spark
 Frameowrk – while iterator.hasNext() ……



 Also check whether this is not some sort of Python Spark API bug – Python
 seems to be the foster child here – Scala and Java are the darlings



 *From:* John Omernik [mailto:j...@omernik.com]
 *Sent:* Friday, June 5, 2015 4:08 PM
 *To:* user
 *Subject:* Spark Streaming for Each RDD - Exception on Empty



 Is there pythonic/sparkonic way to test for an empty RDD before using the
 foreachRDD?  Basically I am using the Python example
 https://spark.apache.org/docs/latest/streaming-programming-guide.html to
 put records somewhere  When I have data, it works fine, when I don't I
 get an exception. I am not sure about the performance implications of just
 throwing an exception every time there is no data, but can I just test
 before sending it?



 I did see one post mentioning look for take(1) from the stream to test
 for data, but I am not sure where I put that in this example... Is that in
 the lambda function? or somewhere else? Looking for pointers!

 Thanks!







 mydstream.foreachRDD(lambda rdd: rdd.foreachPartition(parseRDD))





 Using this example code from the link above:



 *def* sendPartition(iter):

 connection = createNewConnection()

 *for* record *in* iter:

 connection.send(record)

 connection.close()



 dstream.foreachRDD(*lambda* rdd: rdd.foreachPartition(sendPartition))





Re: Spark 1.3.1 On Mesos Issues.

2015-06-05 Thread John Omernik
Thanks all. The answers post is me too, I multi thread. That and Ted is
aware to and Mapr is helping me with it.  I shall report the answer of that
investigation when we have it.

As to reproduction, I've installed mapr file system, tired both version
4.0.2 and 4.1.0.  Have mesos running along side mapr, and then I use
standard methods for submitting spark jobs to mesos. I don't have my
configs now, on vacation :) but I can shar on Monday.

I appreciate the support I am getting from every one, mesos community,
spark community, and mapr.  Great to see folks solving problems and I will
be sure report back findings as they arise.



On Friday, June 5, 2015, Tim Chen t...@mesosphere.io wrote:

 It seems like there is another thread going on:


 http://answers.mapr.com/questions/163353/spark-from-apache-downloads-site-for-mapr.html

 I'm not particularly sure why, seems like the problem is that getting the
 current context class loader is returning null in this instance.

 Do you have some repro steps or config we can try this?

 Tim

 On Fri, Jun 5, 2015 at 3:40 AM, Steve Loughran ste...@hortonworks.com
 javascript:_e(%7B%7D,'cvml','ste...@hortonworks.com'); wrote:


  On 2 Jun 2015, at 00:14, Dean Wampler deanwamp...@gmail.com
 javascript:_e(%7B%7D,'cvml','deanwamp...@gmail.com'); wrote:

  It would be nice to see the code for MapR FS Java API, but my google
 foo failed me (assuming it's open source)...


  I know that MapRFS is closed source, don't know about the java JAR. Why
 not ask Ted Dunning (cc'd)  nicely to see if he can track down the stack
 trace for you.

   So, shooting in the dark ;) there are a few things I would check, if
 you haven't already:

  1. Could there be 1.2 versions of some Spark jars that get picked up at
 run time (but apparently not in local mode) on one or more nodes? (Side
 question: Does your node experiment fail on all nodes?) Put another way,
 are the classpaths good for all JVM tasks?
 2. Can you use just MapR and Spark 1.3.1 successfully, bypassing Mesos?

  Incidentally, how are you combining Mesos and MapR? Are you running
 Spark in Mesos, but accessing data in MapR-FS?

  Perhaps the MapR shim library doesn't support Spark 1.3.1.

  HTH,

  dean

  Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com/
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Jun 1, 2015 at 2:49 PM, John Omernik j...@omernik.com
 javascript:_e(%7B%7D,'cvml','j...@omernik.com'); wrote:

 All -

  I am facing and odd issue and I am not really sure where to go for
 support at this point.  I am running MapR which complicates things as it
 relates to Mesos, however this HAS worked in the past with no issues so I
 am stumped here.

  So for starters, here is what I am trying to run. This is a simple
 show tables using the Hive Context:

  from pyspark import SparkContext, SparkConf
 from pyspark.sql import SQLContext, Row, HiveContext
 sparkhc = HiveContext(sc)
 test = sparkhc.sql(show tables)
 for r in test.collect():
   print r

  When I run it on 1.3.1 using ./bin/pyspark --master local  This works
 with no issues.

  When I run it using Mesos with all the settings configured (as they
 had worked in the past) I get lost tasks and when I zoom in them, the error
 that is being reported is below.  Basically it's a NullPointerException on
 the com.mapr.fs.ShimLoader.  What's weird to me is is I took each instance
 and compared both together, the class path, everything is exactly the same.
 Yet running in local mode works, and running in mesos fails.  Also of note,
 when the task is scheduled to run on the same node as when I run locally,
 that fails too! (Baffling).

  Ok, for comparison, how I configured Mesos was to download the mapr4
 package from spark.apache.org.  Using the exact same configuration file
 (except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0.
 When I run this example with the mapr4 for 1.2.0 there is no issue in
 Mesos, everything runs as intended. Using the same package for 1.3.1 then
 it fails.

  (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails
 as well).

  So basically When I used 1.2.0 and followed a set of steps, it worked
 on Mesos and 1.3.1 fails.  Since this is a current version of Spark, MapR
 is supports 1.2.1 only.  (Still working on that).

  I guess I am at a loss right now on why this would be happening, any
 pointers on where I could look or what I could tweak would be greatly
 appreciated. Additionally, if there is something I could specifically draw
 to the attention of MapR on this problem please let me know, I am perplexed
 on the change from 1.2.0 to 1.3.1.

  Thank you,

  John




  Full Error on 1.3.1 on Mesos:
 15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity
 1060.3 MB java.lang.NullPointerException at
 com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java

Spark Streaming for Each RDD - Exception on Empty

2015-06-05 Thread John Omernik
Is there pythonic/sparkonic way to test for an empty RDD before using the
foreachRDD?  Basically I am using the Python example
https://spark.apache.org/docs/latest/streaming-programming-guide.html to
put records somewhere  When I have data, it works fine, when I don't I
get an exception. I am not sure about the performance implications of just
throwing an exception every time there is no data, but can I just test
before sending it?

I did see one post mentioning look for take(1) from the stream to test for
data, but I am not sure where I put that in this example... Is that in the
lambda function? or somewhere else? Looking for pointers!
Thanks!



mydstream.foreachRDD(lambda rdd: rdd.foreachPartition(parseRDD))



Using this example code from the link above:


def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))


Re: Spark 1.3.1 On Mesos Issues.

2015-06-04 Thread John Omernik
So a few updates.  When I run local as stated before, it works fine. When I
run in Yarn (via Apache Myriad on Mesos) it also runs fine. The only issue
is specifically with Mesos. I wonder if there is some sort of class path
goodness I need to fix or something along that lines.  Any tips would be
appreciated.

Thanks!

John

On Mon, Jun 1, 2015 at 6:14 PM, Dean Wampler deanwamp...@gmail.com wrote:

 It would be nice to see the code for MapR FS Java API, but my google foo
 failed me (assuming it's open source)...

 So, shooting in the dark ;) there are a few things I would check, if you
 haven't already:

 1. Could there be 1.2 versions of some Spark jars that get picked up at
 run time (but apparently not in local mode) on one or more nodes? (Side
 question: Does your node experiment fail on all nodes?) Put another way,
 are the classpaths good for all JVM tasks?
 2. Can you use just MapR and Spark 1.3.1 successfully, bypassing Mesos?

 Incidentally, how are you combining Mesos and MapR? Are you running Spark
 in Mesos, but accessing data in MapR-FS?

 Perhaps the MapR shim library doesn't support Spark 1.3.1.

 HTH,

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Jun 1, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote:

 All -

 I am facing and odd issue and I am not really sure where to go for
 support at this point.  I am running MapR which complicates things as it
 relates to Mesos, however this HAS worked in the past with no issues so I
 am stumped here.

 So for starters, here is what I am trying to run. This is a simple show
 tables using the Hive Context:

 from pyspark import SparkContext, SparkConf
 from pyspark.sql import SQLContext, Row, HiveContext
 sparkhc = HiveContext(sc)
 test = sparkhc.sql(show tables)
 for r in test.collect():
   print r

 When I run it on 1.3.1 using ./bin/pyspark --master local  This works
 with no issues.

 When I run it using Mesos with all the settings configured (as they had
 worked in the past) I get lost tasks and when I zoom in them, the error
 that is being reported is below.  Basically it's a NullPointerException on
 the com.mapr.fs.ShimLoader.  What's weird to me is is I took each instance
 and compared both together, the class path, everything is exactly the same.
 Yet running in local mode works, and running in mesos fails.  Also of note,
 when the task is scheduled to run on the same node as when I run locally,
 that fails too! (Baffling).

 Ok, for comparison, how I configured Mesos was to download the mapr4
 package from spark.apache.org.  Using the exact same configuration file
 (except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0.
 When I run this example with the mapr4 for 1.2.0 there is no issue in
 Mesos, everything runs as intended. Using the same package for 1.3.1 then
 it fails.

 (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as
 well).

 So basically When I used 1.2.0 and followed a set of steps, it worked on
 Mesos and 1.3.1 fails.  Since this is a current version of Spark, MapR is
 supports 1.2.1 only.  (Still working on that).

 I guess I am at a loss right now on why this would be happening, any
 pointers on where I could look or what I could tweak would be greatly
 appreciated. Additionally, if there is something I could specifically draw
 to the attention of MapR on this problem please let me know, I am perplexed
 on the change from 1.2.0 to 1.3.1.

 Thank you,

 John




 Full Error on 1.3.1 on Mesos:
 15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity
 1060.3 MB java.lang.NullPointerException at
 com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at
 com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at
 com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at
 org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60)
 at java.lang.Class.forName0(Native Method) at
 java.lang.Class.forName(Class.java:274) at
 org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847)
 at
 org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062)
 at
 org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272)
 at
 org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224)
 at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at
 org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at
 org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98)
 at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at
 org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at
 org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala

Spark 1.3.1 On Mesos Issues.

2015-06-01 Thread John Omernik
All -

I am facing and odd issue and I am not really sure where to go for support
at this point.  I am running MapR which complicates things as it relates to
Mesos, however this HAS worked in the past with no issues so I am stumped
here.

So for starters, here is what I am trying to run. This is a simple show
tables using the Hive Context:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row, HiveContext
sparkhc = HiveContext(sc)
test = sparkhc.sql(show tables)
for r in test.collect():
  print r

When I run it on 1.3.1 using ./bin/pyspark --master local  This works with
no issues.

When I run it using Mesos with all the settings configured (as they had
worked in the past) I get lost tasks and when I zoom in them, the error
that is being reported is below.  Basically it's a NullPointerException on
the com.mapr.fs.ShimLoader.  What's weird to me is is I took each instance
and compared both together, the class path, everything is exactly the same.
Yet running in local mode works, and running in mesos fails.  Also of note,
when the task is scheduled to run on the same node as when I run locally,
that fails too! (Baffling).

Ok, for comparison, how I configured Mesos was to download the mapr4
package from spark.apache.org.  Using the exact same configuration file
(except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0.
When I run this example with the mapr4 for 1.2.0 there is no issue in
Mesos, everything runs as intended. Using the same package for 1.3.1 then
it fails.

(Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as
well).

So basically When I used 1.2.0 and followed a set of steps, it worked on
Mesos and 1.3.1 fails.  Since this is a current version of Spark, MapR is
supports 1.2.1 only.  (Still working on that).

I guess I am at a loss right now on why this would be happening, any
pointers on where I could look or what I could tweak would be greatly
appreciated. Additionally, if there is something I could specifically draw
to the attention of MapR on this problem please let me know, I am perplexed
on the change from 1.2.0 to 1.3.1.

Thank you,

John




Full Error on 1.3.1 on Mesos:
15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity
1060.3 MB java.lang.NullPointerException at
com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at
com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at
com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at
org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60)
at java.lang.Class.forName0(Native Method) at
java.lang.Class.forName(Class.java:274) at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847)
at
org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at
org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at
org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98)
at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at
org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1959) at
org.apache.spark.storage.BlockManager.(BlockManager.scala:104) at
org.apache.spark.storage.BlockManager.(BlockManager.scala:179) at
org.apache.spark.SparkEnv$.create(SparkEnv.scala:310) at
org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:186) at
org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:70)
java.lang.RuntimeException: Failure loading MapRClient. at
com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:283) at
com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at
org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60)
at java.lang.Class.forName0(Native Method) at
java.lang.Class.forName(Class.java:274) at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847)
at
org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at
org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at
org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98)
at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala

dependencies on java-netlib and jblas

2015-05-08 Thread John Niekrasz
Newbie question...

Can I use any of the main ML capabilities of MLlib in a Java-only
environment, without any native library dependencies?

According to the documentation, java-netlib provides a JVM fallback. This
suggests that native netlib libraries are not required.

It appears that such a fallback is not available for jblas. However, a quick
look at the MLlib source suggests that MLlib's dependencies on jblas are
rather isolated:

 grep -R jblas
main/scala/org/apache/spark/ml/recommendation/ALS.scala:import
org.jblas.DoubleMatrix
main/scala/org/apache/spark/mllib/optimization/NNLS.scala:import
org.jblas.{DoubleMatrix, SimpleBlas}
main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala:import
org.jblas.DoubleMatrix
main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:import
org.jblas.DoubleMatrix
main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:   
org.jblas.util.Random.seed(42)
main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala:import
org.jblas.DoubleMatrix
main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala:import
org.jblas.DoubleMatrix

Is it true or false that many of MLlib's capabilities will work perfectly
fine without any native (non-Java) libraries installed at all?

Thanks for the help,
John



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/dependencies-on-java-netlib-and-jblas-tp22818.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Joining data using Latitude, Longitude

2015-03-10 Thread John Meehan
There are some techniques you can use If you geohash 
http://en.wikipedia.org/wiki/Geohash the lat-lngs.  They will naturally be 
sorted by proximity (with some edge cases so watch out).  If you go the join 
route, either by trimming the lat-lngs or geohashing them, you’re essentially 
grouping nearby locations into buckets — but you have to consider the borders 
of the buckets since the nearest location may actually be in an adjacent 
bucket.  Here’s a paper that discusses an implementation: 
http://www.gdeepak.com/thesisme/Finding%20Nearest%20Location%20with%20open%20box%20query.pdf
 
http://www.gdeepak.com/thesisme/Finding%20Nearest%20Location%20with%20open%20box%20query.pdf

 On Mar 9, 2015, at 11:42 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Are you using SparkSQL for the join? In that case I'm not quiet sure you have 
 a lot of options to join on the nearest co-ordinate. If you are using the 
 normal Spark code (by creating key-pair on lat,lon) you can apply certain 
 logic like trimming the lat,lon etc. If you want more specific computing then 
 you are better off using haversine formula. 
 http://www.movable-type.co.uk/scripts/latlong.html


Re: Getting to proto buff classes in Spark Context

2015-02-28 Thread John Meehan
Maybe try including the jar with 

--driver-class-path jar



 On Feb 26, 2015, at 12:16 PM, Akshat Aranya aara...@gmail.com wrote:
 
 My guess would be that you are packaging too many things in your job, which 
 is causing problems with the classpath.  When your jar goes in first, you get 
 the correct version of protobuf, but some other version of something else.  
 When your jar goes in later, other things work, but protobuf breaks.  This is 
 just a guess though; take a look at what you're packaging in your jar and 
 look for things that Spark or Kafka could also be using.
 
 On Thu, Feb 26, 2015 at 10:06 AM, necro351 . necro...@gmail.com wrote:
 Hello everyone,
 
 We are trying to decode a message inside a Spark job that we receive from 
 Kafka. The message is encoded using Proto Buff. The problem is when decoding 
 we get class-not-found exceptions. We have tried remedies we found online in 
 Stack Exchange and mail list archives but nothing seems to work.
 
 (This question is a re-ask, but we really cannot figure this one out.)
 
 We created a standalone repository with a very simple Spark job that 
 exhibits the above issues. The spark job reads the messages from the FS, 
 decodes them, and prints them. Its easy to checkout and try to see the 
 exception yourself: just uncomment the code that prints the messages from 
 within the RDD. The only sources are the generated Proto Buff java sources 
 and a small Spark Job that decodes a message. I'd appreciate if anyone could 
 take a look.
 
 https://github.com/vibhav/spark-protobuf
 
 We tried a couple remedies already.
 
 Setting spark.files.userClassPathFirst didn't fix the problem for us. I am 
 not very familiar with the Spark and Scala environment, so please correct 
 any incorrect assumptions or statements I make.
 
 However, I don't believe this to be a classpath visibility issue. I wrote a 
 small helper method to print out the classpath from both the driver and 
 worker, and the output is identical. (I'm printing out 
 System.getProperty(java.class.path) -- is there a better way to do this or 
 check the class path?). You can print out the class paths the same way we 
 are from the example project above.
 
 Furthermore, userClassPathFirst seems to have a detrimental effect on 
 otherwise working code, which I cannot explain or do not understand. 
 
 For example, I created a trivial RDD as such:
 
 val l = List(1, 2, 3)
 sc.makeRDD(l).foreach((x: Int) = {
 println(x.toString)
 })
 
 With userClassPathFirst set, I encounter a java.lang.ClassCastException 
 trying to execute that code. Is that to be expected? You can re-create this 
 issue by commenting out the block of code that tries to print the above in 
 the example project we linked to above.
 
 We also tried dynamically adding the jar with .addJar to the Spark Context 
 but this seemed to have no effect.
 
 Thanks in advance for any help y'all can provide.
 


Sharing Spark Drivers

2015-02-24 Thread John Omernik
I have been posting on the Mesos list, as I am looking to see if it
it's possible or not to share spark drivers.  Obviously, in stand
alone cluster mode, the Master handles requests, and you can
instantiate a new sparkcontext to a currently running master. However
in Mesos (and perhaps Yarn) I don't see how this is possible.

I guess I am curious on why? It could make quite a bit of sense to
have one driver act as a master, running as a certain user, (ideally
running out in the Mesos cluster, which I believe Tim Chen is working
on).   That driver could belong to a user, and be used as a long term
resource controlled instance that the user could use for adhoc
queries.  While running many little ones out on the cluster seems to
be a waste of driver resources, as each driver would be using the same
resources, and rarely would many be used at once (if they were for a
users adhoc environment). Additionally, the advantages of the shared
driver seem to play out for a user as they come back to the
environment over and over again.

Does this make sense? I really want to try to understand how looking
at this way is wrong, either from a Spark paradigm perspective of a
technological perspective.  I will grant, that I am coming from a
traditional background, so some of the older ideas for how to set
things up may be creeping into my thinking, but if that's the case,
I'd love to understand better.

Thanks1

John

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sharing Spark Drivers

2015-02-24 Thread John Omernik
I am aware of that, but two things are working against me here with
spark-kernel. Python is our language, and we are really looking for a
supported way to approach this for the enterprise.  I like the
concept, it just doesn't work for us given our constraints.

This does raise an interesting point though, if side projects are
spinning up to support this, why not make this a feature of the main
project or is it just that esoteric that it's not important for the
main project to be looking into it?



On Tue, Feb 24, 2015 at 9:25 AM, Chip Senkbeil chip.senkb...@gmail.com wrote:
 Hi John,

 This would be a potential application for the Spark Kernel project
 (https://github.com/ibm-et/spark-kernel). The Spark Kernel serves as your
 driver application, allowing you to feed it snippets of code (or load up
 entire jars via magics) in Scala to execute against a Spark cluster.

 Although not technically supported, you can connect multiple applications to
 the same Spark Kernel instance to use the same resources (both on the
 cluster and on the driver).

 If you're curious, you can find a getting started section here:
 https://github.com/ibm-et/spark-kernel/wiki/Getting-Started-with-the-Spark-Kernel

 Signed,
 Chip Senkbeil

 On Tue Feb 24 2015 at 8:04:08 AM John Omernik j...@omernik.com wrote:

 I have been posting on the Mesos list, as I am looking to see if it
 it's possible or not to share spark drivers.  Obviously, in stand
 alone cluster mode, the Master handles requests, and you can
 instantiate a new sparkcontext to a currently running master. However
 in Mesos (and perhaps Yarn) I don't see how this is possible.

 I guess I am curious on why? It could make quite a bit of sense to
 have one driver act as a master, running as a certain user, (ideally
 running out in the Mesos cluster, which I believe Tim Chen is working
 on).   That driver could belong to a user, and be used as a long term
 resource controlled instance that the user could use for adhoc
 queries.  While running many little ones out on the cluster seems to
 be a waste of driver resources, as each driver would be using the same
 resources, and rarely would many be used at once (if they were for a
 users adhoc environment). Additionally, the advantages of the shared
 driver seem to play out for a user as they come back to the
 environment over and over again.

 Does this make sense? I really want to try to understand how looking
 at this way is wrong, either from a Spark paradigm perspective of a
 technological perspective.  I will grant, that I am coming from a
 traditional background, so some of the older ideas for how to set
 things up may be creeping into my thinking, but if that's the case,
 I'd love to understand better.

 Thanks1

 John

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on Mesos: Multiple Users with iPython Notebooks

2015-02-20 Thread John Omernik
Awesome! This is exactly what I'd need.  Unfortunately, I am not a
programmer of any talent or skill, but how could I assist with this
JIRA? From a User perspective, this is really the next step for my org
taking our Mesos cluster to user land with Spark. I don't want to be
pushy, but is there any sort of time frame I could possibly
communicate to my team? Anything I can do?

Thanks!

On Fri, Feb 20, 2015 at 4:36 AM, Iulian Dragoș
iulian.dra...@typesafe.com wrote:


 On Thu, Feb 19, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote:

 I am running Spark on Mesos and it works quite well.  I have three
 users, all who setup iPython notebooks to instantiate a spark instance
 to work with on the notebooks. I love it so far.

 Since I am auto instantiating (I don't want a user to have to
 think about instantiating and submitting a spark app to do adhoc
 analysis, I want the environment setup ahead of time) this is done
 whenever an iPython notebook is open.  So far it's working pretty
 good, save one issue:

 Every notebook is a new driver. I.e. every time they open a notebook,
 a new spark submit is called, and the driver resources are allocated,
 regardless if they are used or not.  Yes, it's only the driver, but
 even that I find starts slowing down my queries for the notebooks that
 using spark.  (I am running in Mesos Fined Grained mode).


 I have three users on my system, ideally, I would love to find a way
 so that on the first notebook being opened, a driver is started for
 that user, and then can be used for any notebook the user has open. So
 if they open a new notebook, I can check that yes, the user has a
 spark driver running, and thus, that notebook, if there is a query,
 will run it through that driver. That allows me to understand the
 resource allocation better, and it limits users from running 10
 notebooks and having a lot of resources.

 The other thing I was wondering is could the driver actually be run on
 the mesos cluster? Right now, I have a edge node as an iPython
 server, the drivers all exist on that server, so as I get more and
 more drivers, the box's local resources get depleted with unused
 drivers.  Obviously if I could reuse the drivers per user, on that
 box, that is great first step, but if I could reuse drivers, and run
 them on the cluster, that would be ideal.  looking through the docs I
 was not clear on those options. If anyone could point me in the right
 direction, I would greatly appreciate it!


 Cluster mode support for Spark is tracked under
 [SPARK-5338](https://issues.apache.org/jira/browse/SPARK-5338). I know Tim
 Chen is working on it, so there will be progress soon.

 iulian



 John

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 --
 Iulian Dragos

 --
 Reactive Apps on the JVM
 www.typesafe.com


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark on Mesos: Multiple Users with iPython Notebooks

2015-02-19 Thread John Omernik
I am running Spark on Mesos and it works quite well.  I have three
users, all who setup iPython notebooks to instantiate a spark instance
to work with on the notebooks. I love it so far.

Since I am auto instantiating (I don't want a user to have to
think about instantiating and submitting a spark app to do adhoc
analysis, I want the environment setup ahead of time) this is done
whenever an iPython notebook is open.  So far it's working pretty
good, save one issue:

Every notebook is a new driver. I.e. every time they open a notebook,
a new spark submit is called, and the driver resources are allocated,
regardless if they are used or not.  Yes, it's only the driver, but
even that I find starts slowing down my queries for the notebooks that
using spark.  (I am running in Mesos Fined Grained mode).


I have three users on my system, ideally, I would love to find a way
so that on the first notebook being opened, a driver is started for
that user, and then can be used for any notebook the user has open. So
if they open a new notebook, I can check that yes, the user has a
spark driver running, and thus, that notebook, if there is a query,
will run it through that driver. That allows me to understand the
resource allocation better, and it limits users from running 10
notebooks and having a lot of resources.

The other thing I was wondering is could the driver actually be run on
the mesos cluster? Right now, I have a edge node as an iPython
server, the drivers all exist on that server, so as I get more and
more drivers, the box's local resources get depleted with unused
drivers.  Obviously if I could reuse the drivers per user, on that
box, that is great first step, but if I could reuse drivers, and run
them on the cluster, that would be ideal.  looking through the docs I
was not clear on those options. If anyone could point me in the right
direction, I would greatly appreciate it!

John

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[ANNOUNCE] Apache Science and Healthcare Track @ApacheCon NA 2015

2015-01-08 Thread Lewis John Mcgibbney
Hi Folks,

Apologies for cross posting :(

As some of you may already know, @ApacheCon NA 2015 is happening in Austin,
TX April 13th-16th.

This email is specifically written to attract all folks interested in
Science and Healthcare... this is an official call to arms! I am aware that
there are many Science and Healthcare-type people also lingering in the
Apache Semantic Web communities so this one is for all of you folks as well.

Over a number of years the Science track has been emerging as an attractive
and exciting, at times mind blowing non-traditional track running alongside
the resident HTTP server, Big Data, etc tracks. The Semantic Web Track is
another such emerging track which has proved popular. This year we want to
really get the message out there about how much Apache technology is
actually being used in Science and Healthcare. This is not *only* aimed at
attracting members of the communities below
http://wiki.apache.org/apachecon/ACNA2015ContentCommittee#Target_Projects
but also at potentially attracting a brand new breed of conference
participants to ApacheCon https://wiki.apache.org/apachecon/ApacheCon and
the Foundation e.g. Scientists who love Apache. We are looking for
exciting, invigorating, obscure, half-baked, funky, academic, practical and
impractical stories, use cases, experiments and down right successes alike
from within the Science domain. The only thing they need to have in common
is that they consume, contribute towards, advocate, disseminate or even
commercialize Apache technology within the Scientific domain and would be
relevant to that audience. It is fully open to interest whether this track
be combined with the proposed *healthcare track*... if there is interest to
do this then we can rename this track to Science and Healthcare. In essence
one could argue that they are one and the same however I digress [image: :)]

What I would like those of you that are interested to do, is to merely
check out the scope and intent of the Apache in Science content curation
which is currently ongoing and to potentially register your interest.

https://wiki.apache.org/apachecon/ACNA2015ContentCommittee#Apache_in_Science

I would love to see the Science and Healthcare track be THE BIGGEST track
@ApacheCon, and although we have some way to go, I'm sure many previous
track participants will tell you this is not to missed.

We are looking for content from a wide variety of Scientific use cases all
related to Apache technology.
Thanks in advance and I look forward to seeing you in Austin.
Lewis

-- 
*Lewis*


  1   2   >