Total Task size exception in Spark 1.6.0 when writing a DataFrame

2016-01-17 Thread Night Wolf
Hi all,

Doing some simple column transformations (e.g. trimming strings) on a
DataFrame using UDFs. This DataFrame is in Avro format and being loaded off
HDFS. The job has about 16,000 parts/tasks.

About half way through the job, then fails with a message;

org.apache.spark.SparkException: Job aborted due to stage failure: Total
size of serialized results of 6843 tasks (1024.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

This seems strange as we are saving the dataframe using the
df.write.parquet("/my/path") API.

As far as I understand, the code that runs these checks
https://github.com/apache/spark/blob/7a375bb87a8df56d9dde0c484e725e5c497a9876/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L595
is only used in one method;
https://github.com/apache/spark/blob/7a375bb87a8df56d9dde0c484e725e5c497a9876/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L47
in TaskResultGetter.

So it looks that even though we are not doing a .collect in the driver, its
still returning metadata about the successful tasks. I would assume its
returning the result value as a DirectTaskResult, with a BoxedUnit as the
return type. This data structure should be super small. However it seems
this is node the case.

This code is being run from the Spark 1.6 shell.

Any ideas why this what could be happening here?

Cheers,
~N


Spark + Sentry + Kerberos don't add up?

2016-01-17 Thread Ruslan Dautkhanov
Getting following error stack

The Spark session could not be created in the cluster:
> at org.apache.hadoop.security.*UserGroupInformation.doAs*
> (UserGroupInformation.java:1671)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:160)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) )
> at org.*apache.hadoop.hive.metastore.HiveMetaStoreClient*
> .open(HiveMetaStoreClient.java:466)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:234)
> at
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:74)
> ... 35 more


My understanding that hive.server2.enable.impersonation and
hive.server2.enable.doAs should be enabled to make
UserGroupInformation.doAs() work?

When I try to enable these parameters, Cloudera Manager shows error

Hive Impersonation is enabled for Hive Server2 role 'HiveServer2
> (hostname)'.
> Hive Impersonation should be disabled to enable Hive authorization using
> Sentry


So Spark-Hive conflicts with Sentry!?

Environment: Hue 3.9 Spark Notebooks + Livy Server (built from master). CDH
5.5.

This is a kerberized cluster with Sentry.

I was using hue's keytab as hue user is normally (by default in CDH) is
allowed to impersonate to other users.
So very convenient for Spark Notebooks.

Any information to help solve this will be highly appreciated.


-- 
Ruslan Dautkhanov


RE: SparkContext SyntaxError: invalid syntax

2016-01-17 Thread Felix Cheung
Do you still need help on the PR?
btw, does this apply to YARN client mode?
 
From: andrewweiner2...@u.northwestern.edu
Date: Sun, 17 Jan 2016 17:00:39 -0600
Subject: Re: SparkContext SyntaxError: invalid syntax
To: cutl...@gmail.com
CC: user@spark.apache.org

Yeah, I do think it would be worth explicitly stating this in the docs.  I was 
going to try to edit the docs myself and submit a pull request, but I'm having 
trouble building the docs from github.  If anyone else wants to do this, here 
is approximately what I would say:
(To be added to 
http://spark.apache.org/docs/latest/configuration.html#environment-variables)"Note:
 When running Spark on YARN in cluster mode, environment variables need to be 
set using the spark.yarn.appMasterEnv.[EnvironmentVariableName]  property in 
your conf/spark-defaults.conf file.  Environment variables that are set in 
spark-env.sh will not be reflected in the YARN Application Master process in 
cluster mode.  See the YARN-related Spark Properties for more information."
I might take another crack at building the docs myself if nobody beats me to 
this.
Andrew

On Fri, Jan 15, 2016 at 5:01 PM, Bryan Cutler  wrote:
Glad you got it going!  It's wasn't very obvious what needed to be set, maybe 
it is worth explicitly stating this in the docs since it seems to have come up 
a couple times before too.
Bryan
On Fri, Jan 15, 2016 at 12:33 PM, Andrew Weiner 
 wrote:
Actually, I just found this [https://issues.apache.org/jira/browse/SPARK-1680], 
which after a bit of googling and reading leads me to believe that the 
preferred way to change the yarn environment is to edit the spark-defaults.conf 
file by adding this line:spark.yarn.appMasterEnv.PYSPARK_PYTHON
/path/to/python

While both this solution and the solution from my prior email work, I believe 
this is the preferred solution.
Sorry for the flurry of emails.  Again, thanks for all the help!
Andrew
On Fri, Jan 15, 2016 at 1:47 PM, Andrew Weiner 
 wrote:
I finally got the pi.py example to run in yarn cluster mode.  This was the key 
insight:https://issues.apache.org/jira/browse/SPARK-9229

I had to set SPARK_YARN_USER_ENV in spark-env.sh:export 
SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/home/aqualab/local/bin/python"
This caused the PYSPARK_PYTHON environment variable to be used in my yarn 
environment in cluster mode.
Thank you for all your help!
Best,Andrew


On Fri, Jan 15, 2016 at 12:57 PM, Andrew Weiner 
 wrote:
I tried playing around with my environment variables, and here is an update.
When I run in cluster mode, my environment variables do not persist throughout 
the entire job.For example, I tried creating a local copy of HADOOP_CONF_DIR in 
/home//local/etc/hadoop/conf, and then, in spark-env.sh I the 
variable:export HADOOP_CONF_DIR=/home//local/etc/hadoop/conf
Later, when we print the environment variables in the python code, I see 
this:('HADOOP_CONF_DIR', '/etc/hadoop/conf')However, when I run in client mode, 
I see this:('HADOOP_CONF_DIR', '/home/awp066/local/etc/hadoop/conf')
Furthermore, if I omit that environment variable from spark-env.sh altogether, 
I get the expected error in both client and cluster mode:When running with 
master 'yarn'
either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.This 
suggests that my environment variables are being used when I first submit the 
job, but at some point during the job, my environment variables are thrown out 
and someone's (yarn's?) environment variables are being used.Andrew
On Fri, Jan 15, 2016 at 11:03 AM, Andrew Weiner 
 wrote:
Indeed!  Here is the output when I run in cluster mode:Traceback (most recent 
call last):
  File "pi.py", line 22, in ?
raise RuntimeError("\n"+str(sys.version_info) +"\n"+ 
RuntimeError: 
(2, 4, 3, 'final', 0)
[('PYSPARK_GATEWAY_PORT', '48079'), ('PYTHONPATH', 
'/scratch2/hadoop/yarn/local/usercache//filecache/116/spark-assembly-1.6.0-hadoop2.4.0.jar:/home//spark-1.6.0-bin-hadoop2.4/python:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/py4j-0.9-src.zip'),
 ('PYTHONUNBUFFERED', 'YES')]As we suspected, it is using Python 2.4
One thing that surprises me is that PYSPARK_PYTHON is not showing up in the 
list, even though I am setting it and exporting it in spark-submit and in 
spark-env.sh.  Is there somewhere else I need to set this variable?  Maybe in 
one of the hadoop conf files in my HADOOP_CONF_DIR?Andrew

On Thu, Jan 14, 2016 at 1:14 PM, Bryan Cutler  wrote:
It seems like it could be the case that some other Python version is being 
invoked.  To make sure, can you add something like this to the top of the .py 

Re: simultaneous actions

2016-01-17 Thread Koert Kuipers
the re-use of shuffle files is always a nice surprise to me

On Sun, Jan 17, 2016 at 3:17 PM, Mark Hamstra 
wrote:

> Same SparkContext means same pool of Workers.  It's up to the Scheduler,
> not the SparkContext, whether the exact same Workers or Executors will be
> used to calculate simultaneous actions against the same RDD.  It is likely
> that many of the same Workers and Executors will be used as the Scheduler
> tries to preserve data locality, but that is not guaranteed.  In fact, what
> is most likely to happen is that the shared Stages and Tasks being
> calculated for the simultaneous actions will not actually be run at exactly
> the same time, which means that shuffle files produced for one action will
> be reused by the other(s), and repeated calculations will be avoided even
> without explicitly caching/persisting the RDD.
>
> On Sun, Jan 17, 2016 at 8:06 AM, Koert Kuipers  wrote:
>
>> Same rdd means same sparkcontext means same workers
>>
>> Cache/persist the rdd to avoid repeated jobs
>> On Jan 17, 2016 5:21 AM, "Mennour Rostom"  wrote:
>>
>>> Hi,
>>>
>>> Thank you all for your answers,
>>>
>>> If I correctly understand, actions (in my case foreach) can be run
>>> concurrently and simultaneously on the SAME rdd, (which is logical because
>>> they are read only object). however, I want to know if the same workers are
>>> used for the concurrent analysis ?
>>>
>>> Thank you
>>>
>>> 2016-01-15 21:11 GMT+01:00 Jakob Odersky :
>>>
 I stand corrected. How considerable are the benefits though? Will the
 scheduler be able to dispatch jobs from both actions simultaneously (or on
 a when-workers-become-available basis)?

 On 15 January 2016 at 11:44, Koert Kuipers  wrote:

> we run multiple actions on the same (cached) rdd all the time, i guess
> in different threads indeed (its in akka)
>
> On Fri, Jan 15, 2016 at 2:40 PM, Matei Zaharia <
> matei.zaha...@gmail.com> wrote:
>
>> RDDs actually are thread-safe, and quite a few applications use them
>> this way, e.g. the JDBC server.
>>
>> Matei
>>
>> On Jan 15, 2016, at 2:10 PM, Jakob Odersky 
>> wrote:
>>
>> I don't think RDDs are threadsafe.
>> More fundamentally however, why would you want to run RDD actions in
>> parallel? The idea behind RDDs is to provide you with an abstraction for
>> computing parallel operations on distributed data. Even if you were to 
>> call
>> actions from several threads at once, the individual executors of your
>> spark environment would still have to perform operations sequentially.
>>
>> As an alternative, I would suggest to restructure your RDD
>> transformations to compute the required results in one single operation.
>>
>> On 15 January 2016 at 06:18, Jonathan Coveney 
>> wrote:
>>
>>> Threads
>>>
>>>
>>> El viernes, 15 de enero de 2016, Kira 
>>> escribió:
>>>
 Hi,

 Can we run *simultaneous* actions on the *same RDD* ?; if yes how
 can this
 be done ?

 Thank you,
 Regards



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com .


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>
>>
>

>>>
>


Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Mark Hamstra
-dev

What do you mean by JobContext?  That is a Hadoop mapreduce concept, not
Spark.

On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou  wrote:

> Dear all,
>
> Is there a way to reuse executor JVM across different JobContexts? Thanks.
>
> Best Regards,
> Jia
>


Re: Spark Streaming: BatchDuration and Processing time

2016-01-17 Thread Silvio Fiorito
It will just queue up the subsequent batches, however if this delay is constant 
you may start losing batches. It can handle spikes in processing time, but if 
you know you're consistently running over your batch duration you either need 
to increase the duration or look at enabling back pressure support. See: 
http://spark.apache.org/docs/latest/configuration.html#spark-streaming (1.5+).


From: pyspark2555 
Sent: Sunday, January 17, 2016 11:32 AM
To: user@spark.apache.org
Subject: Spark Streaming: BatchDuration and Processing time

Hi,

If BatchDuration is set to 1 second in StreamingContext and the actual
processing time is longer than one second, then how does Spark handle that?

For example, I am receiving a continuous Input stream. Every 1 second (batch
duration), the RDDs will be processed. What if this processing time is
longer than 1 second? What happens in the next batch duration?

Thanks.
Amit



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-BatchDuration-and-Processing-time-tp25986.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext SyntaxError: invalid syntax

2016-01-17 Thread Andrew Weiner
Yeah, I do think it would be worth explicitly stating this in the docs.  I
was going to try to edit the docs myself and submit a pull request, but I'm
having trouble building the docs from github.  If anyone else wants to do
this, here is approximately what I would say:

(To be added to
http://spark.apache.org/docs/latest/configuration.html#environment-variables
)
"Note: When running Spark on YARN in cluster mode, environment variables
need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName]
property in your conf/spark-defaults.conf file.  Environment variables that
are set in spark-env.sh will not be reflected in the YARN Application
Master process in cluster mode.  See the YARN-related Spark Properties

for more information."

I might take another crack at building the docs myself if nobody beats me
to this.

Andrew


On Fri, Jan 15, 2016 at 5:01 PM, Bryan Cutler  wrote:

> Glad you got it going!  It's wasn't very obvious what needed to be set,
> maybe it is worth explicitly stating this in the docs since it seems to
> have come up a couple times before too.
>
> Bryan
>
> On Fri, Jan 15, 2016 at 12:33 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> Actually, I just found this [
>> https://issues.apache.org/jira/browse/SPARK-1680], which after a bit of
>> googling and reading leads me to believe that the preferred way to change
>> the yarn environment is to edit the spark-defaults.conf file by adding this
>> line:
>> spark.yarn.appMasterEnv.PYSPARK_PYTHON/path/to/python
>>
>> While both this solution and the solution from my prior email work, I
>> believe this is the preferred solution.
>>
>> Sorry for the flurry of emails.  Again, thanks for all the help!
>>
>> Andrew
>>
>> On Fri, Jan 15, 2016 at 1:47 PM, Andrew Weiner <
>> andrewweiner2...@u.northwestern.edu> wrote:
>>
>>> I finally got the pi.py example to run in yarn cluster mode.  This was
>>> the key insight:
>>> https://issues.apache.org/jira/browse/SPARK-9229
>>>
>>> I had to set SPARK_YARN_USER_ENV in spark-env.sh:
>>> export
>>> SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/home/aqualab/local/bin/python"
>>>
>>> This caused the PYSPARK_PYTHON environment variable to be used in my
>>> yarn environment in cluster mode.
>>>
>>> Thank you for all your help!
>>>
>>> Best,
>>> Andrew
>>>
>>>
>>>
>>> On Fri, Jan 15, 2016 at 12:57 PM, Andrew Weiner <
>>> andrewweiner2...@u.northwestern.edu> wrote:
>>>
 I tried playing around with my environment variables, and here is an
 update.

 When I run in cluster mode, my environment variables do not persist
 throughout the entire job.
 For example, I tried creating a local copy of HADOOP_CONF_DIR in
 /home//local/etc/hadoop/conf, and then, in spark-env.sh I the
 variable:
 export HADOOP_CONF_DIR=/home//local/etc/hadoop/conf

 Later, when we print the environment variables in the python code, I
 see this:

 ('HADOOP_CONF_DIR', '/etc/hadoop/conf')

 However, when I run in client mode, I see this:

 ('HADOOP_CONF_DIR', '/home/awp066/local/etc/hadoop/conf')

 Furthermore, if I omit that environment variable from spark-env.sh 
 altogether, I get the expected error in both client and cluster mode:

 When running with master 'yarn'
 either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

 This suggests that my environment variables are being used when I first 
 submit the job, but at some point during the job, my environment variables 
 are thrown out and someone's (yarn's?) environment variables are being 
 used.

 Andrew


 On Fri, Jan 15, 2016 at 11:03 AM, Andrew Weiner <
 andrewweiner2...@u.northwestern.edu> wrote:

> Indeed!  Here is the output when I run in cluster mode:
>
> Traceback (most recent call last):
>   File "pi.py", line 22, in ?
> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
> RuntimeError:
> (2, 4, 3, 'final', 0)
> [('PYSPARK_GATEWAY_PORT', '48079'), ('PYTHONPATH', 
> '/scratch2/hadoop/yarn/local/usercache//filecache/116/spark-assembly-1.6.0-hadoop2.4.0.jar:/home//spark-1.6.0-bin-hadoop2.4/python:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/py4j-0.9-src.zip'),
>  ('PYTHONUNBUFFERED', 'YES')]
>
> As we suspected, it is using Python 2.4
>
> One thing that surprises me is that PYSPARK_PYTHON is not showing up in 
> the list, even though I am setting it and exporting it in spark-submit 
> *and* in spark-env.sh.  Is there somewhere else I need to set this 
> variable?  Maybe in one of the hadoop conf files in my 

Running out of memory locally launching multiple spark jobs using spark yarn / submit from shell script.

2016-01-17 Thread Colin Kincaid Williams
I launch around 30-60 of these jobs defined like start-job.sh in the
background from a wrapper script. I wait about 30 seconds between launches,
then the wrapper monitors yarn to determine when to launch more. There is a
limit defined at around 60 jobs, but even if I set it to 30, I run out of
memory on the host submitting the jobs. Why does my approach to using
spark-submit cause me to run out of memory. I have about 6G free, and I
don't feel like I should be running out of memory when submitting jobs.

start-job.sh

export HADOOP_CONF_DIR=/etc/hadoop/conf
spark-submit \
  --class sap.whcounter.WarehouseCounter \
  --master yarn-cluster \
  --num-executors 1 \
  --driver-memory 1024m \
  --executor-memory 1024m \
  --executor-cores 4 \
  --queue hlab \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --conf spark.app.name=wh_reader_sp \
  --conf spark.streaming.receiver.maxRate=1000 \
  --conf spark.streaming.concurrentJobs=2 \
  --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
  --conf spark.eventLog.enabled=true \
  --conf spark.eventLog.overwrite=true \
  --conf spark.yarn.historyServer.address="http://spark-history.local:18080/;
\
  --conf spark.yarn.jar="hdfs:///user/spark/share/lib/spark-assembly.jar" \
  --conf
spark.yarn.dist.files="hdfs:///user/colin.williams/warehouse-counter-0.0.1-SNAPSHOT-uber.jar"
\
  hdfs:///user/colin.williams/warehouse-counter-0.0.1-SNAPSHOT-uber.jar \
  $1 $2


ps aux | grep java

/usr/java/latest/bin/java -cp
::/usr/lib/spark/conf:/usr/lib/spark/lib/spark-assembly.jar:/etc/hadoop/conf:/usr/lib/hadoop/client/*:/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop/../hadoop-hdfs/./:/usr/lib/hadoop/../hadoop-hdfs/lib/*:/usr/lib/hadoop/../hadoop-hdfs/.//*:/usr/lib/hadoop/../hadoop-yarn/lib/*:/usr/lib/hadoop/../hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*:/usr/lib/spark/lib/scala-library.jar:/usr/lib/spark/lib/scala-compiler.jar:/usr/lib/spark/lib/jline.jar
-XX:MaxPermSize=128m -Xms1024m -Xmx1024m
org.apache.spark.deploy.SparkSubmit --class sap.whcounter.WarehouseCounter
--master yarn-cluster --num-executors 1 --driver-memory 1024m
--executor-memory 1024m --executor-cores 4 --queue hlab --conf
spark.yarn.submit.waitAppCompletion=false --conf spark.app.name=wh_reader_sp
--conf spark.streaming.receiver.maxRate=1000 --conf
spark.streaming.concurrentJobs=2 --conf
spark.eventLog.dir=hdfs:///user/spark/applicationHistory --conf
spark.eventLog.enabled=true --conf spark.eventLog.overwrite=true --conf
spark.yarn.historyServer.address=http://spark-history.local:18080/ --conf
spark.yarn.jar=hdfs:///user/spark/share/lib/spark-assembly.jar --conf
spark.yarn.dist.files=hdfs:///user/colin.williams/warehouse-counter-0.0.1-SNAPSHOT-uber.jar
hdfs:///user/colin.williams/warehouse-counter-0.0.1-SNAPSHOT-uber.jar
hdfs:///wh/2015/04/19/*

free -m
 total   used   free sharedbuffers
Mem:  7873992   6881  0 62
-/+ buffers/cache:500   7373
Swap:14947574  14373


hs_err_pid7433.log

# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 716177408 bytes for
committing reserved memory.
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Checes insufficient memory for the Java Runtime Environment to continue.
#   # Native memory allocation (malloc) failed to allocate 716177408 bytes
for committing reserved memory.
#   # Possible reasons:
#   #   The system is out of physical RAM or swap space
#   #   In 32 bit mode, the process size limit was hit
#   # Possible solutions:
#   #   Reduce memory load on the system
#   #   Increase physical memory or swap space
#   #   Check if swap backing store is full
#   #   Use 64 bit Java on a 64 bit OS
#   #   Decrease Java heap size (-Xmx/-Xms)
#   #   Decrease number of Java threads
#   #   Decrease Java thread stack sizes (-Xss)
#   #   Set larger code cache with -XX:ReservedCodeCacheSize=
#   # This output file may be truncated or incomplete.
#   #
#   #  Out of Memory Error (os_linux.cpp:2747), pid=7357,
tid=140414250673920
#   #
#   # JRE version:  (7.0_60-b19) (build )
#   # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.60-b09 mixed mode
linux-amd64 compressed oops)
#   # Failed to write core dump. Core dumps have been disabled. To enable
core dumping, try "ulimit -c unlimited" before starting Java again
#   if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error 

Re: simultaneous actions

2016-01-17 Thread Matei Zaharia
They'll be able to run concurrently and share workers / data. Take a look at 
http://spark.apache.org/docs/latest/job-scheduling.html 
 for how scheduling 
happens across multiple running jobs in the same SparkContext.

Matei

> On Jan 17, 2016, at 8:06 AM, Koert Kuipers  wrote:
> 
> Same rdd means same sparkcontext means same workers
> 
> Cache/persist the rdd to avoid repeated jobs
> 
> On Jan 17, 2016 5:21 AM, "Mennour Rostom"  > wrote:
> Hi,
> 
> Thank you all for your answers,
> 
> If I correctly understand, actions (in my case foreach) can be run 
> concurrently and simultaneously on the SAME rdd, (which is logical because 
> they are read only object). however, I want to know if the same workers are 
> used for the concurrent analysis ?
> 
> Thank you
> 
> 2016-01-15 21:11 GMT+01:00 Jakob Odersky  >:
> I stand corrected. How considerable are the benefits though? Will the 
> scheduler be able to dispatch jobs from both actions simultaneously (or on a 
> when-workers-become-available basis)?
> 
> On 15 January 2016 at 11:44, Koert Kuipers  > wrote:
> we run multiple actions on the same (cached) rdd all the time, i guess in 
> different threads indeed (its in akka)
> 
> On Fri, Jan 15, 2016 at 2:40 PM, Matei Zaharia  > wrote:
> RDDs actually are thread-safe, and quite a few applications use them this 
> way, e.g. the JDBC server.
> 
> Matei
> 
>> On Jan 15, 2016, at 2:10 PM, Jakob Odersky > > wrote:
>> 
>> I don't think RDDs are threadsafe.
>> More fundamentally however, why would you want to run RDD actions in 
>> parallel? The idea behind RDDs is to provide you with an abstraction for 
>> computing parallel operations on distributed data. Even if you were to call 
>> actions from several threads at once, the individual executors of your spark 
>> environment would still have to perform operations sequentially.
>> 
>> As an alternative, I would suggest to restructure your RDD transformations 
>> to compute the required results in one single operation.
>> 
>> On 15 January 2016 at 06:18, Jonathan Coveney > > wrote:
>> Threads
>> 
>> 
>> El viernes, 15 de enero de 2016, Kira > > escribió:
>> Hi,
>> 
>> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can this
>> be done ?
>> 
>> Thank you,
>> Regards
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>>  
>> 
>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>> .
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <>
>> For additional commands, e-mail: user-h...@spark.apache.org <>
>> 
>> 
> 
> 
> 
> 



Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Mark Hamstra
Yes, that is one of the basic reasons to use a
jobserver/shared-SparkContext.  Otherwise, in order share the data in an
RDD you have to use an external storage system, such as a distributed
filesystem or Tachyon.

On Sun, Jan 17, 2016 at 1:52 PM, Jia  wrote:

> Thanks, Mark. Then, I guess JobServer can fundamentally solve my problem,
> so that jobs can be submitted at different time and still share RDDs.
>
> Best Regards,
> Jia
>
>
> On Jan 17, 2016, at 3:44 PM, Mark Hamstra  wrote:
>
> There is a 1-to-1 relationship between Spark Applications and
> SparkContexts -- fundamentally, a Spark Applications is a program that
> creates and uses a SparkContext, and that SparkContext is destroyed when
> then Application ends.  A jobserver generically and the Spark JobServer
> specifically is an Application that keeps a SparkContext open for a long
> time and allows many Jobs to be be submitted and run using that shared
> SparkContext.
>
> More than one Application/SparkContext unavoidably implies more than one
> JVM process per Worker -- Applications/SparkContexts cannot share JVM
> processes.
>
> On Sun, Jan 17, 2016 at 1:15 PM, Jia  wrote:
>
>> Hi, Mark, sorry for the confusion.
>>
>> Let me clarify, when an application is submitted, the master will tell
>> each Spark worker to spawn an executor JVM process. All the task sets  of
>> the application will be executed by the executor. After the application
>> runs to completion. The executor process will be killed.
>> But I hope that all applications submitted can run in the same executor,
>> can JobServer do that? If so, it’s really good news!
>>
>> Best Regards,
>> Jia
>>
>> On Jan 17, 2016, at 3:09 PM, Mark Hamstra 
>> wrote:
>>
>> You've still got me confused.  The SparkContext exists at the Driver, not
>> on an Executor.
>>
>> Many Jobs can be run by a SparkContext -- it is a common pattern to use
>> something like the Spark Jobserver where all Jobs are run through a shared
>> SparkContext.
>>
>> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou 
>> wrote:
>>
>>> Hi, Mark, sorry, I mean SparkContext.
>>> I mean to change Spark into running all submitted jobs (SparkContexts)
>>> in one executor JVM.
>>>
>>> Best Regards,
>>> Jia
>>>
>>> On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra 
>>> wrote:
>>>
 -dev

 What do you mean by JobContext?  That is a Hadoop mapreduce concept,
 not Spark.

 On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou 
 wrote:

> Dear all,
>
> Is there a way to reuse executor JVM across different JobContexts?
> Thanks.
>
> Best Regards,
> Jia
>


>>>
>>
>>
>
>


Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Jia
Hi, Mark, sorry for the confusion.

Let me clarify, when an application is submitted, the master will tell each 
Spark worker to spawn an executor JVM process. All the task sets  of the 
application will be executed by the executor. After the application runs to 
completion. The executor process will be killed.
But I hope that all applications submitted can run in the same executor, can 
JobServer do that? If so, it’s really good news!

Best Regards,
Jia

On Jan 17, 2016, at 3:09 PM, Mark Hamstra  wrote:

> You've still got me confused.  The SparkContext exists at the Driver, not on 
> an Executor.
> 
> Many Jobs can be run by a SparkContext -- it is a common pattern to use 
> something like the Spark Jobserver where all Jobs are run through a shared 
> SparkContext.
> 
> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou  wrote:
> Hi, Mark, sorry, I mean SparkContext.
> I mean to change Spark into running all submitted jobs (SparkContexts) in one 
> executor JVM.
> 
> Best Regards,
> Jia
> 
> On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra  wrote:
> -dev
> 
> What do you mean by JobContext?  That is a Hadoop mapreduce concept, not 
> Spark.
> 
> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou  wrote:
> Dear all,
> 
> Is there a way to reuse executor JVM across different JobContexts? Thanks.
> 
> Best Regards,
> Jia
> 
> 
> 



Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Jia
Hi, Mark, sorry for the confusion.

Let me clarify, when an application is submitted, the master will tell each 
Spark worker to spawn an executor JVM process. All the task sets  of the 
application will be executed by the executor. After the application runs to 
completion. The executor process will be killed.
But I hope that all applications submitted can run in the same executor, can 
JobServer do that? If so, it’s really good news!

Best Regards,
Jia

On Jan 17, 2016, at 3:09 PM, Mark Hamstra  wrote:

> You've still got me confused.  The SparkContext exists at the Driver, not on 
> an Executor.
> 
> Many Jobs can be run by a SparkContext -- it is a common pattern to use 
> something like the Spark Jobserver where all Jobs are run through a shared 
> SparkContext.
> 
> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou  wrote:
> Hi, Mark, sorry, I mean SparkContext.
> I mean to change Spark into running all submitted jobs (SparkContexts) in one 
> executor JVM.
> 
> Best Regards,
> Jia
> 
> On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra  wrote:
> -dev
> 
> What do you mean by JobContext?  That is a Hadoop mapreduce concept, not 
> Spark.
> 
> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou  wrote:
> Dear all,
> 
> Is there a way to reuse executor JVM across different JobContexts? Thanks.
> 
> Best Regards,
> Jia
> 
> 
> 



Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Mark Hamstra
There is a 1-to-1 relationship between Spark Applications and SparkContexts
-- fundamentally, a Spark Applications is a program that creates and uses a
SparkContext, and that SparkContext is destroyed when then Application
ends.  A jobserver generically and the Spark JobServer specifically is an
Application that keeps a SparkContext open for a long time and allows many
Jobs to be be submitted and run using that shared SparkContext.

More than one Application/SparkContext unavoidably implies more than one
JVM process per Worker -- Applications/SparkContexts cannot share JVM
processes.

On Sun, Jan 17, 2016 at 1:15 PM, Jia  wrote:

> Hi, Mark, sorry for the confusion.
>
> Let me clarify, when an application is submitted, the master will tell
> each Spark worker to spawn an executor JVM process. All the task sets  of
> the application will be executed by the executor. After the application
> runs to completion. The executor process will be killed.
> But I hope that all applications submitted can run in the same executor,
> can JobServer do that? If so, it’s really good news!
>
> Best Regards,
> Jia
>
> On Jan 17, 2016, at 3:09 PM, Mark Hamstra  wrote:
>
> You've still got me confused.  The SparkContext exists at the Driver, not
> on an Executor.
>
> Many Jobs can be run by a SparkContext -- it is a common pattern to use
> something like the Spark Jobserver where all Jobs are run through a shared
> SparkContext.
>
> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou  wrote:
>
>> Hi, Mark, sorry, I mean SparkContext.
>> I mean to change Spark into running all submitted jobs (SparkContexts) in
>> one executor JVM.
>>
>> Best Regards,
>> Jia
>>
>> On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra 
>> wrote:
>>
>>> -dev
>>>
>>> What do you mean by JobContext?  That is a Hadoop mapreduce concept, not
>>> Spark.
>>>
>>> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou 
>>> wrote:
>>>
 Dear all,

 Is there a way to reuse executor JVM across different JobContexts?
 Thanks.

 Best Regards,
 Jia

>>>
>>>
>>
>
>


Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Jia
Thanks, Mark. Then, I guess JobServer can fundamentally solve my problem, so 
that jobs can be submitted at different time and still share RDDs.

Best Regards,
Jia


On Jan 17, 2016, at 3:44 PM, Mark Hamstra  wrote:

> There is a 1-to-1 relationship between Spark Applications and SparkContexts 
> -- fundamentally, a Spark Applications is a program that creates and uses a 
> SparkContext, and that SparkContext is destroyed when then Application ends.  
> A jobserver generically and the Spark JobServer specifically is an 
> Application that keeps a SparkContext open for a long time and allows many 
> Jobs to be be submitted and run using that shared SparkContext.
> 
> More than one Application/SparkContext unavoidably implies more than one JVM 
> process per Worker -- Applications/SparkContexts cannot share JVM processes.  
> 
> On Sun, Jan 17, 2016 at 1:15 PM, Jia  wrote:
> Hi, Mark, sorry for the confusion.
> 
> Let me clarify, when an application is submitted, the master will tell each 
> Spark worker to spawn an executor JVM process. All the task sets  of the 
> application will be executed by the executor. After the application runs to 
> completion. The executor process will be killed.
> But I hope that all applications submitted can run in the same executor, can 
> JobServer do that? If so, it’s really good news!
> 
> Best Regards,
> Jia
> 
> On Jan 17, 2016, at 3:09 PM, Mark Hamstra  wrote:
> 
>> You've still got me confused.  The SparkContext exists at the Driver, not on 
>> an Executor.
>> 
>> Many Jobs can be run by a SparkContext -- it is a common pattern to use 
>> something like the Spark Jobserver where all Jobs are run through a shared 
>> SparkContext.
>> 
>> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou  wrote:
>> Hi, Mark, sorry, I mean SparkContext.
>> I mean to change Spark into running all submitted jobs (SparkContexts) in 
>> one executor JVM.
>> 
>> Best Regards,
>> Jia
>> 
>> On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra  
>> wrote:
>> -dev
>> 
>> What do you mean by JobContext?  That is a Hadoop mapreduce concept, not 
>> Spark.
>> 
>> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou  wrote:
>> Dear all,
>> 
>> Is there a way to reuse executor JVM across different JobContexts? Thanks.
>> 
>> Best Regards,
>> Jia
>> 
>> 
>> 
> 
> 



Re: simultaneous actions

2016-01-17 Thread Mark Hamstra
Same SparkContext means same pool of Workers.  It's up to the Scheduler,
not the SparkContext, whether the exact same Workers or Executors will be
used to calculate simultaneous actions against the same RDD.  It is likely
that many of the same Workers and Executors will be used as the Scheduler
tries to preserve data locality, but that is not guaranteed.  In fact, what
is most likely to happen is that the shared Stages and Tasks being
calculated for the simultaneous actions will not actually be run at exactly
the same time, which means that shuffle files produced for one action will
be reused by the other(s), and repeated calculations will be avoided even
without explicitly caching/persisting the RDD.

On Sun, Jan 17, 2016 at 8:06 AM, Koert Kuipers  wrote:

> Same rdd means same sparkcontext means same workers
>
> Cache/persist the rdd to avoid repeated jobs
> On Jan 17, 2016 5:21 AM, "Mennour Rostom"  wrote:
>
>> Hi,
>>
>> Thank you all for your answers,
>>
>> If I correctly understand, actions (in my case foreach) can be run
>> concurrently and simultaneously on the SAME rdd, (which is logical because
>> they are read only object). however, I want to know if the same workers are
>> used for the concurrent analysis ?
>>
>> Thank you
>>
>> 2016-01-15 21:11 GMT+01:00 Jakob Odersky :
>>
>>> I stand corrected. How considerable are the benefits though? Will the
>>> scheduler be able to dispatch jobs from both actions simultaneously (or on
>>> a when-workers-become-available basis)?
>>>
>>> On 15 January 2016 at 11:44, Koert Kuipers  wrote:
>>>
 we run multiple actions on the same (cached) rdd all the time, i guess
 in different threads indeed (its in akka)

 On Fri, Jan 15, 2016 at 2:40 PM, Matei Zaharia  wrote:

> RDDs actually are thread-safe, and quite a few applications use them
> this way, e.g. the JDBC server.
>
> Matei
>
> On Jan 15, 2016, at 2:10 PM, Jakob Odersky  wrote:
>
> I don't think RDDs are threadsafe.
> More fundamentally however, why would you want to run RDD actions in
> parallel? The idea behind RDDs is to provide you with an abstraction for
> computing parallel operations on distributed data. Even if you were to 
> call
> actions from several threads at once, the individual executors of your
> spark environment would still have to perform operations sequentially.
>
> As an alternative, I would suggest to restructure your RDD
> transformations to compute the required results in one single operation.
>
> On 15 January 2016 at 06:18, Jonathan Coveney 
> wrote:
>
>> Threads
>>
>>
>> El viernes, 15 de enero de 2016, Kira  escribió:
>>
>>> Hi,
>>>
>>> Can we run *simultaneous* actions on the *same RDD* ?; if yes how
>>> can this
>>> be done ?
>>>
>>> Thank you,
>>> Regards
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com .
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>
>

>>>
>>


Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Jia Zou
Hi, Mark, sorry, I mean SparkContext.
I mean to change Spark into running all submitted jobs (SparkContexts) in
one executor JVM.

Best Regards,
Jia

On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra 
wrote:

> -dev
>
> What do you mean by JobContext?  That is a Hadoop mapreduce concept, not
> Spark.
>
> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou  wrote:
>
>> Dear all,
>>
>> Is there a way to reuse executor JVM across different JobContexts? Thanks.
>>
>> Best Regards,
>> Jia
>>
>
>


Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Mark Hamstra
You've still got me confused.  The SparkContext exists at the Driver, not
on an Executor.

Many Jobs can be run by a SparkContext -- it is a common pattern to use
something like the Spark Jobserver where all Jobs are run through a shared
SparkContext.

On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou  wrote:

> Hi, Mark, sorry, I mean SparkContext.
> I mean to change Spark into running all submitted jobs (SparkContexts) in
> one executor JVM.
>
> Best Regards,
> Jia
>
> On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra 
> wrote:
>
>> -dev
>>
>> What do you mean by JobContext?  That is a Hadoop mapreduce concept, not
>> Spark.
>>
>> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou  wrote:
>>
>>> Dear all,
>>>
>>> Is there a way to reuse executor JVM across different JobContexts?
>>> Thanks.
>>>
>>> Best Regards,
>>> Jia
>>>
>>
>>
>


Monitoring Spark with Ganglia on ElCapo

2016-01-17 Thread william tellme
Does anyone have a link handy that describes configuring Ganglia on the mac?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sending large objects to specific RDDs

2016-01-17 Thread Daniel Imberman
This is perfect. So I guess my best course of action will be to create a
custom partitioner to assure that the smallest amount of data is shuffled
when I join the partitions, and then I really only need to do a map (rather
than a mapPartitions) since the inverted index object will be pointed to
(rather than copied for each value as I had assumed).

Thank you Ted and Koert!

On Sat, Jan 16, 2016 at 1:37 PM Ted Yu  wrote:

> Both groupByKey and join() accept Partitioner as parameter.
>
> Maybe you can specify a custom Partitioner so that the amount of shuffle
> is reduced.
>
> On Sat, Jan 16, 2016 at 9:39 AM, Daniel Imberman <
> daniel.imber...@gmail.com> wrote:
>
>> Hi Ted,
>>
>> I think I might have figured something out!(Though I haven't tested it at
>> scale yet)
>>
>> My current thought is that I can do a groupByKey on the RDD of vectors
>> and then do a join with the invertedIndex.
>> It would look something like this:
>>
>> val InvIndexes:RDD[(Int,InvertedIndex)]
>> val partitionedVectors:RDD[(Int, Vector)]
>>
>> val partitionedTasks:RDD[(Int, (Iterator[Vector], InvertedIndex))] =
>> partitionedvectors.groupByKey().join(invIndexes)
>>
>> val similarities = partitionedTasks.map(//calculate similarities)
>> val maxSim = similarities.reduce(math.max)
>>
>>
>> So while I realize that usually a groupByKey is usually frowned upon, it
>> seems to me that since I need all associated vectors to be local anyways
>> that this repartitioning would not be too expensive.
>>
>> Does this seem like a reasonable approach to this problem or are there
>> any faults that I should consider should I approach it this way?
>>
>> Thank you for your help,
>>
>> Daniel
>>
>> On Fri, Jan 15, 2016 at 5:30 PM Ted Yu  wrote:
>>
>>> My knowledge of XSEDE is limited - I visited the website.
>>>
>>> If there is no easy way to deploy HBase, alternative approach (using
>>> hdfs ?) needs to be considered.
>>>
>>> I need to do more homework on this :-)
>>>
>>> On Thu, Jan 14, 2016 at 3:51 PM, Daniel Imberman <
>>> daniel.imber...@gmail.com> wrote:
>>>
 Hi Ted,

 So unfortunately after looking into the cluster manager that I will be
 using for my testing (I'm using a super-computer called XSEDE rather than
 AWS), it looks like the cluster does not actually come with Hbase installed
 (this cluster is becoming somewhat problematic, as it is essentially AWS
 but you have to do your own virtualization scripts). Do you have any other
 thoughts on how I could go about dealing with this purely using spark and
 HDFS?

 Thank you

 On Wed, Jan 13, 2016 at 11:49 AM Daniel Imberman <
 daniel.imber...@gmail.com> wrote:

> Thank you Ted! That sounds like it would probably be the most
> efficient (with the least overhead) way of handling this situation.
>
> On Wed, Jan 13, 2016 at 11:36 AM Ted Yu  wrote:
>
>> Another approach is to store the objects in NoSQL store such as HBase.
>>
>> Looking up object should be very fast.
>>
>> Cheers
>>
>> On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman <
>> daniel.imber...@gmail.com> wrote:
>>
>>> I'm looking for a way to send structures to pre-determined
>>> partitions so that
>>> they can be used by another RDD in a mapPartition.
>>>
>>> Essentially I'm given and RDD of SparseVectors and an RDD of inverted
>>> indexes. The inverted index objects are quite large.
>>>
>>> My hope is to do a MapPartitions within the RDD of vectors where I
>>> can
>>> compare each vector to the inverted index. The issue is that I only
>>> NEED one
>>> inverted index object per partition (which would have the same key
>>> as the
>>> values within that partition).
>>>
>>>
>>> val vectors:RDD[(Int, SparseVector)]
>>>
>>> val invertedIndexes:RDD[(Int, InvIndex)] =
>>> a.reduceByKey(generateInvertedIndex)
>>> vectors:RDD.mapPartitions{
>>> iter =>
>>>  val invIndex = invertedIndexes(samePartitionKey)
>>>  iter.map(invIndex.calculateSimilarity(_))
>>>  )
>>> }
>>>
>>> How could I go about setting up the Partition such that the specific
>>> data
>>> structure I need will be present for the mapPartition but I won't
>>> have the
>>> extra overhead of sending over all values (which would happen if I
>>> were to
>>> make a broadcast variable).
>>>
>>> One thought I have been having is to store the objects in HDFS but
>>> I'm not
>>> sure if that would be a suboptimal solution (It seems like it could
>>> slow
>>> down the process a lot)
>>>
>>> Another thought I am currently exploring is whether there is some
>>> way I can
>>> create a custom Partition or Partitioner that could hold the data
>>> structure
>>> (Although that might get too complicated and become 

Re: SQL UDF problem (with re to types)

2016-01-17 Thread Ted Yu
While reading some book on Java 8, I saw a reference to the following
w.r.t. declaration-site variance :

https://bugs.openjdk.java.net/browse/JDK-8043488

The above reportedly targets Java 9.

FYI

On Thu, Jan 14, 2016 at 12:33 PM, Michael Armbrust 
wrote:

> I don't believe that Java 8 got rid of erasure. In fact I think its
> actually worse when you use Java 8 lambdas.
>
> On Thu, Jan 14, 2016 at 10:54 AM, Raghu Ganti 
> wrote:
>
>> Would this go away if the Spark source was compiled against Java 1.8
>> (since the problem of type erasure is solved through proper generics
>> implementation in Java 1.8).
>>
>> On Thu, Jan 14, 2016 at 1:42 PM, Michael Armbrust > > wrote:
>>
>>> We automatically convert types for UDFs defined in Scala, but we can't
>>> do it in Java because the types are erased by the compiler.  If you want to
>>> use double you should cast before calling the UDF.
>>>
>>> On Wed, Jan 13, 2016 at 8:10 PM, Raghu Ganti 
>>> wrote:
>>>
 So, when I try BigDecimal, it works. But, should it not parse based on
 what the UDF defines? Am I missing something here?

 On Wed, Jan 13, 2016 at 4:57 PM, Ted Yu  wrote:

> Please take a look
> at 
> sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java
> which shows a UserDefinedAggregateFunction that works on DoubleType 
> column.
>
> sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
> shows how it is registered.
>
> Cheers
>
> On Wed, Jan 13, 2016 at 11:58 AM, raghukiran 
> wrote:
>
>> While registering and using SQL UDFs, I am running into the following
>> problem:
>>
>> UDF registered:
>>
>> ctx.udf().register("Test", new UDF1() {
>> /**
>>  *
>>  */
>> private static final long serialVersionUID =
>> -8231917155671435931L;
>>
>> public String call(Double x) throws Exception
>> {
>> return "testing";
>> }
>> }, DataTypes.StringType);
>>
>> Usage:
>> query = "SELECT Test(82.4)";
>> result = sqlCtx.sql(query).first();
>> System.out.println(result.toString());
>>
>> Problem: Class Cast exception thrown
>> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot
>> be cast
>> to java.lang.Double
>>
>> This problem occurs with Spark v1.5.2 and 1.6.0.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

>>>
>>
>


Spark Streaming: Does mapWithState implicitly partition the dsteram?

2016-01-17 Thread Lin Zhao
When the state is passed to the task that handles a mapWithState for a 
particular key, if the key is distributed, it seems extremely difficult to 
coordinate and synchronise the state. Is there a partition by key before a 
mapWithState? If not what exactly is the execution model?

Thanks,

Lin



Re: simultaneous actions

2016-01-17 Thread Mark Hamstra
It can be far more than that (e.g.
https://issues.apache.org/jira/browse/SPARK-11838), and is generally either
unrecognized or a greatly under-appreciated and underused feature of Spark.

On Sun, Jan 17, 2016 at 12:20 PM, Koert Kuipers  wrote:

> the re-use of shuffle files is always a nice surprise to me
>
> On Sun, Jan 17, 2016 at 3:17 PM, Mark Hamstra 
> wrote:
>
>> Same SparkContext means same pool of Workers.  It's up to the Scheduler,
>> not the SparkContext, whether the exact same Workers or Executors will be
>> used to calculate simultaneous actions against the same RDD.  It is likely
>> that many of the same Workers and Executors will be used as the Scheduler
>> tries to preserve data locality, but that is not guaranteed.  In fact, what
>> is most likely to happen is that the shared Stages and Tasks being
>> calculated for the simultaneous actions will not actually be run at exactly
>> the same time, which means that shuffle files produced for one action will
>> be reused by the other(s), and repeated calculations will be avoided even
>> without explicitly caching/persisting the RDD.
>>
>> On Sun, Jan 17, 2016 at 8:06 AM, Koert Kuipers  wrote:
>>
>>> Same rdd means same sparkcontext means same workers
>>>
>>> Cache/persist the rdd to avoid repeated jobs
>>> On Jan 17, 2016 5:21 AM, "Mennour Rostom"  wrote:
>>>
 Hi,

 Thank you all for your answers,

 If I correctly understand, actions (in my case foreach) can be run
 concurrently and simultaneously on the SAME rdd, (which is logical because
 they are read only object). however, I want to know if the same workers are
 used for the concurrent analysis ?

 Thank you

 2016-01-15 21:11 GMT+01:00 Jakob Odersky :

> I stand corrected. How considerable are the benefits though? Will the
> scheduler be able to dispatch jobs from both actions simultaneously (or on
> a when-workers-become-available basis)?
>
> On 15 January 2016 at 11:44, Koert Kuipers  wrote:
>
>> we run multiple actions on the same (cached) rdd all the time, i
>> guess in different threads indeed (its in akka)
>>
>> On Fri, Jan 15, 2016 at 2:40 PM, Matei Zaharia <
>> matei.zaha...@gmail.com> wrote:
>>
>>> RDDs actually are thread-safe, and quite a few applications use them
>>> this way, e.g. the JDBC server.
>>>
>>> Matei
>>>
>>> On Jan 15, 2016, at 2:10 PM, Jakob Odersky 
>>> wrote:
>>>
>>> I don't think RDDs are threadsafe.
>>> More fundamentally however, why would you want to run RDD actions in
>>> parallel? The idea behind RDDs is to provide you with an abstraction for
>>> computing parallel operations on distributed data. Even if you were to 
>>> call
>>> actions from several threads at once, the individual executors of your
>>> spark environment would still have to perform operations sequentially.
>>>
>>> As an alternative, I would suggest to restructure your RDD
>>> transformations to compute the required results in one single operation.
>>>
>>> On 15 January 2016 at 06:18, Jonathan Coveney 
>>> wrote:
>>>
 Threads


 El viernes, 15 de enero de 2016, Kira 
 escribió:

> Hi,
>
> Can we run *simultaneous* actions on the *same RDD* ?; if yes how
> can this
> be done ?
>
> Thank you,
> Regards
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com .
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>>>
>>>
>>
>

>>
>


Re: Reuse Executor JVM across different JobContext

2016-01-17 Thread Jia
I guess all jobs submitted through JobServer are executed in the same JVM, so 
RDDs cached by one job can be visible to all other jobs executed later.
On Jan 17, 2016, at 3:56 PM, Mark Hamstra  wrote:

> Yes, that is one of the basic reasons to use a jobserver/shared-SparkContext. 
>  Otherwise, in order share the data in an RDD you have to use an external 
> storage system, such as a distributed filesystem or Tachyon.
> 
> On Sun, Jan 17, 2016 at 1:52 PM, Jia  wrote:
> Thanks, Mark. Then, I guess JobServer can fundamentally solve my problem, so 
> that jobs can be submitted at different time and still share RDDs.
> 
> Best Regards,
> Jia
> 
> 
> On Jan 17, 2016, at 3:44 PM, Mark Hamstra  wrote:
> 
>> There is a 1-to-1 relationship between Spark Applications and SparkContexts 
>> -- fundamentally, a Spark Applications is a program that creates and uses a 
>> SparkContext, and that SparkContext is destroyed when then Application ends. 
>>  A jobserver generically and the Spark JobServer specifically is an 
>> Application that keeps a SparkContext open for a long time and allows many 
>> Jobs to be be submitted and run using that shared SparkContext.
>> 
>> More than one Application/SparkContext unavoidably implies more than one JVM 
>> process per Worker -- Applications/SparkContexts cannot share JVM processes. 
>>  
>> 
>> On Sun, Jan 17, 2016 at 1:15 PM, Jia  wrote:
>> Hi, Mark, sorry for the confusion.
>> 
>> Let me clarify, when an application is submitted, the master will tell each 
>> Spark worker to spawn an executor JVM process. All the task sets  of the 
>> application will be executed by the executor. After the application runs to 
>> completion. The executor process will be killed.
>> But I hope that all applications submitted can run in the same executor, can 
>> JobServer do that? If so, it’s really good news!
>> 
>> Best Regards,
>> Jia
>> 
>> On Jan 17, 2016, at 3:09 PM, Mark Hamstra  wrote:
>> 
>>> You've still got me confused.  The SparkContext exists at the Driver, not 
>>> on an Executor.
>>> 
>>> Many Jobs can be run by a SparkContext -- it is a common pattern to use 
>>> something like the Spark Jobserver where all Jobs are run through a shared 
>>> SparkContext.
>>> 
>>> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou  wrote:
>>> Hi, Mark, sorry, I mean SparkContext.
>>> I mean to change Spark into running all submitted jobs (SparkContexts) in 
>>> one executor JVM.
>>> 
>>> Best Regards,
>>> Jia
>>> 
>>> On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra  
>>> wrote:
>>> -dev
>>> 
>>> What do you mean by JobContext?  That is a Hadoop mapreduce concept, not 
>>> Spark.
>>> 
>>> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou  wrote:
>>> Dear all,
>>> 
>>> Is there a way to reuse executor JVM across different JobContexts? Thanks.
>>> 
>>> Best Regards,
>>> Jia
>>> 
>>> 
>>> 
>> 
>> 
> 
> 



Re: [Spark 1.6][Streaming] About the behavior of mapWithState

2016-01-17 Thread Terry Hoo
Hi Ryan,

Thanks for your comments!

Using reduceByKey() before the mapWithState can get the expected result.

Do we ever consider that mapWithState only outputs the changed key one time
in every batch interval, just like the updateStateByKey. For some cases,
user may only care about the final state.

Regards,
-Terry

On Sat, Jan 16, 2016 at 6:20 AM, Shixiong(Ryan) Zhu  wrote:

> Hey Terry,
>
> That's expected. If you want to only output (1, 3), you can use
> "reduceByKey" before "mapWithState" like this:
>
> dstream.reduceByKey(_ + _).mapWithState(spec)
>
> On Fri, Jan 15, 2016 at 1:21 AM, Terry Hoo  wrote:
>
>> Hi,
>> I am doing a simple test with mapWithState, and get some events
>> unexpected, is this correct?
>>
>> The test is very simple: sum the value of each key
>>
>> val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
>>   state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
>>   (key, state.get())
>> }
>> val spec = StateSpec.function(mappingFunction)
>> dstream.mapWithState(spec)
>>
>> I create two RDDs and insert into dstream:
>> RDD((1,1), (1,2), (2,1))
>> RDD((1,3))
>>
>> Get result like this:
>> RDD(*(1,1)*, *(1,3)*, (2,1))
>> RDD((1,6))
>>
>> You can see that the first batch will generate two items with the same
>> key "1": (1,1) and (1,3), is this expected behavior? I would expect (1,3)
>> only.
>>
>> Regards
>> - Terry
>>
>
>


Re: PCA OutOfMemoryError

2016-01-17 Thread Bharath Ravi Kumar
Hello Alex,

Thanks for the response. There isn't much other data on the driver, so the
issue is probably inherent to this particular PCA implementation.  I'll try
the alternative approach that you suggested instead. Thanks again.

-Bharath

On Wed, Jan 13, 2016 at 11:24 PM, Alex Gittens  wrote:

> The PCA.fit function calls the RowMatrix PCA routine, which attempts to
> construct the covariance matrix locally on the driver, and then computes
> the SVD of that to get the PCs. I'm not sure what's causing the memory
> error: RowMatrix.scala:124 is only using 3.5 GB of memory (n*(n+1)/2 with
> n=29604 and double precision), so unless you're filling up the memory with
> other RDDs, you should have plenty of space on the driver.
>
> One alternative is to manually center your RDD (so make one pass over it
> to compute the mean, then another to subtract it out and form a new RDD),
> then directly call the computeSVD routine in RowMatrix to compute the SVD
> of the gramian matrix of this RDD (e.g., the covariance matrix of the
> original RDD) in a distributed manner, so the covariance matrix doesn't
> need to be formed explicitly. You can look at the getLowRankFactorization
> and convertLowRankFactorizationToEOFs routines at
>
> https://github.com/rustandruin/large-scale-climate/blob/master/src/main/scala/eofs.scala
> for example of this approach (call the second on the results of the first
> to get the SVD of the input matrix to the first; EOF is another name for
> PCA).
>
> This takes about 30 minutes to compute the top 20 PCs of a 46.7K-by-6.3M
> dense matrix of doubles (~2 Tb), with most of the time spent on the
> distributed matrix-vector multiplies.
>
> Best,
> Alex
>
>
> On Tue, Jan 12, 2016 at 6:39 PM, Bharath Ravi Kumar 
> wrote:
>
>> Any suggestion/opinion?
>> On 12-Jan-2016 2:06 pm, "Bharath Ravi Kumar"  wrote:
>>
>>> We're running PCA (selecting 100 principal components) on a dataset that
>>> has ~29K columns and is 70G in size stored in ~600 parts on HDFS. The
>>> matrix in question is mostly sparse with tens of columns populate in most
>>> rows, but a few rows with thousands of columns populated. We're running
>>> spark on mesos with driver memory set to 40G and executor memory set to
>>> 80G. We're however encountering an out of memory error (included at the end
>>> of the message) regardless of the number of rdd partitions or the degree of
>>> task parallelism being set. I noticed a warning at the beginning of the PCA
>>> computation stage: " WARN
>>> org.apache.spark.mllib.linalg.distributed.RowMatrix: 29604 columns will
>>> require at least 7011 megabyte  of memory!"
>>> I don't understand which memory this refers to. Is this the executor
>>> memory?  The driver memory? Any other?
>>> The stacktrace appears to indicate that a large array is probably being
>>> passed along with the task. Could this array have been passed as a
>>> broadcast variable instead ? Any suggestions / workarounds other than
>>> re-implementing the algorithm?
>>>
>>> Thanks,
>>> Bharath
>>>
>>> 
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError: Requested array
>>> size exceeds VM limit
>>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>> at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>> at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>> at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>> at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>> at
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>>> at
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>>> at
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>>> at
>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>>> at
>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>>> at 

Re: has any one implemented TF_IDF using ML transformers?

2016-01-17 Thread Yanbo Liang
Hi Andy,

Actually, the output of ML IDF model is the TF-IDF vector of each instance
rather than IDF vector.
So it's unnecessary to do member wise multiplication to calculate TF-IDF
value. You can refer the code at here:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala#L121
I found the document of IDF is not very clear, we need to update it.

Thanks
Yanbo

2016-01-16 6:10 GMT+08:00 Andy Davidson :

> I wonder if I am missing something? TF-IDF is very popular. Spark ML has a
> lot of transformers how ever it TF_IDF is not supported directly.
>
> Spark provide a HashingTF and IDF transformer. The java doc
> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
>
> Mentions you can implement TFIDF as follows
>
> TFIDF(t,d,D)=TF(t,d)・IDF(t,D).
>
> The problem I am running into is both HashingTF and IDF return a sparse
> vector.
>
> *Ideally the spark code  to implement TFIDF would be one line.*
>
>
> * DataFrame ret = tmp.withColumn("features", 
> tmp.col("tf").multiply(tmp.col("idf")));*
>
> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
> data type mismatch: '(tf * idf)' requires numeric type, not vector;
>
> I could implement my own UDF to do member wise multiplication how ever
> given how common TF-IDF is I wonder if this code already exists some where
>
> I found  org.apache.spark.util.Vector.Multiplier. There is no
> documentation how ever give the argument is double, my guess is it just
> does scalar multiplication.
>
> I guess I could do something like
>
> Double[] v = mySparkVector.toArray();
>  Then use JBlas to do member wise multiplication
>
> I assume sparceVectors are not distributed so there  would not be any
> additional communication cost
>
>
> If this code is truly missing. I would be happy to write it and donate it
>
> Andy
>
>
> From: Andrew Davidson 
> Date: Wednesday, January 13, 2016 at 2:52 PM
> To: "user @spark" 
> Subject: trouble calculating TF-IDF data type mismatch: '(tf * idf)'
> requires numeric type, not vector;
>
> Bellow is a little snippet of my Java Test Code. Any idea how I implement
> member wise vector multiplication?
>
> Kind regards
>
> Andy
>
> transformed df printSchema()
>
> root
>
>  |-- id: integer (nullable = false)
>
>  |-- label: double (nullable = false)
>
>  |-- words: array (nullable = false)
>
>  ||-- element: string (containsNull = true)
>
>  |-- tf: vector (nullable = true)
>
>  |-- idf: vector (nullable = true)
>
>
>
> +---+-++-+---+
>
> |id |label|words   |tf   |idf
>   |
>
>
> +---+-++-+---+
>
> |0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
> |(7,[1,2],[0.0,0.9162907318741551]) |
>
> |1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
> |(7,[1,4],[0.0,0.9162907318741551]) |
>
> |2  |0.0  |[Chinese, Macao]|(7,[1,6],[1.0,1.0])
> |(7,[1,6],[0.0,0.9162907318741551]) |
>
> |3  |1.0  |[Tokyo, Japan, Chinese]
> |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.9162907318741551])|
>
>
> +---+-++-+---+
>
> @Test
>
> public void test() {
>
> DataFrame rawTrainingDF = createTrainingData();
>
> DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF);
>
> . . .
>
> }
>
>private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {
>
> HashingTF hashingTF = new HashingTF()
>
> .setInputCol("words")
>
> .setOutputCol("tf")
>
> .setNumFeatures(dictionarySize);
>
>
>
> DataFrame termFrequenceDF = hashingTF.transform(rawDF);
>
>
>
> termFrequenceDF.cache(); // idf needs to make 2 passes over data
> set
>
> IDFModel idf = new IDF()
>
> //.setMinDocFreq(1) // our vocabulary has 6 words
> we hash into 7
>
> .setInputCol(hashingTF.getOutputCol())
>
> .setOutputCol("idf")
>
> .fit(termFrequenceDF);
>
>
> DataFrame tmp = idf.transform(termFrequenceDF);
>
>
>
> DataFrame ret = tmp.withColumn("features", tmp.col("tf").multiply(
> tmp.col("idf")));
>
> logger.warn("\ntransformed df printSchema()");
>
> ret.printSchema();
>
> ret.show(false);
>
>
>
> return ret;
>
> }
>
>
> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
> data type mismatch: '(tf * idf)' 

Re: Feature importance for RandomForestRegressor in Spark 1.5

2016-01-17 Thread Yanbo Liang
Hi Robin,

#1 This feature is available from Spark 1.5.0.
#2 You should use the new ML rather than the old MLlib package to train the
Random Forest model and get featureImportances, because it was only exposed
at ML package. You can refer the documents:
https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
.

Thanks
Yanbo

2016-01-16 0:16 GMT+08:00 Robin East :

> re 1.
> The pull requests reference the JIRA ticket in this case
> https://issues.apache.org/jira/browse/SPARK-5133. The JIRA says it was
> released in 1.5.
>
>
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 15 Jan 2016, at 16:06, Scott Imig  wrote:
>
> Hello,
>
> I have a couple of quick questions about this pull request, which adds
> feature importance calculations to the random forests in MLLib.
>
> https://github.com/apache/spark/pull/7838
>
> 1. Can someone help me determine the Spark version where this is first
> available?  (1.5.0?  1.5.1?)
>
> 2. Following the templates in this  documentation to construct a
> RandomForestModel, should I be able to retrieve model.featureImportances?
> Or is there a different pattern for random forests in more recent spark
> versions?
>
> https://spark.apache.org/docs/1.2.0/mllib-ensembles.html
>
> Thanks for the help!
> Imig
> --
> S. Imig | Senior Data Scientist Engineer | *rich**relevance *|m:
> 425.999.5725
>
> I support Bip 101 and BitcoinXT .
>
>
>


Re: simultaneous actions

2016-01-17 Thread Mennour Rostom
Hi,

Thank you all for your answers,

If I correctly understand, actions (in my case foreach) can be run
concurrently and simultaneously on the SAME rdd, (which is logical because
they are read only object). however, I want to know if the same workers are
used for the concurrent analysis ?

Thank you

2016-01-15 21:11 GMT+01:00 Jakob Odersky :

> I stand corrected. How considerable are the benefits though? Will the
> scheduler be able to dispatch jobs from both actions simultaneously (or on
> a when-workers-become-available basis)?
>
> On 15 January 2016 at 11:44, Koert Kuipers  wrote:
>
>> we run multiple actions on the same (cached) rdd all the time, i guess in
>> different threads indeed (its in akka)
>>
>> On Fri, Jan 15, 2016 at 2:40 PM, Matei Zaharia 
>> wrote:
>>
>>> RDDs actually are thread-safe, and quite a few applications use them
>>> this way, e.g. the JDBC server.
>>>
>>> Matei
>>>
>>> On Jan 15, 2016, at 2:10 PM, Jakob Odersky  wrote:
>>>
>>> I don't think RDDs are threadsafe.
>>> More fundamentally however, why would you want to run RDD actions in
>>> parallel? The idea behind RDDs is to provide you with an abstraction for
>>> computing parallel operations on distributed data. Even if you were to call
>>> actions from several threads at once, the individual executors of your
>>> spark environment would still have to perform operations sequentially.
>>>
>>> As an alternative, I would suggest to restructure your RDD
>>> transformations to compute the required results in one single operation.
>>>
>>> On 15 January 2016 at 06:18, Jonathan Coveney 
>>> wrote:
>>>
 Threads


 El viernes, 15 de enero de 2016, Kira  escribió:

> Hi,
>
> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can
> this
> be done ?
>
> Thank you,
> Regards
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com .
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>>>
>>>
>>
>


Incorrect timeline for Scheduling Delay in Streaming page in web UI?

2016-01-17 Thread Jacek Laskowski
Hi,

I'm trying to understand how Scheduling Delays are displayed in
Streaming page in web UI and think the values are displayed
incorrectly in the Timelines column. I'm only concerned with the
scheduling delays (on y axis) per batch times (x axis). It appears
that the values (on y axis) are correct, but not how they are
displayed per batch times.

See the second screenshot in
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-streaming-webui.html#scheduling-delay.

Can anyone explain how the delays for batches per batch time should be
read? I'm specifically asking about the timeline (not histogram as it
seems fine).

Pozdrawiam,
Jacek

Jacek Laskowski | https://medium.com/@jaceklaskowski/
Mastering Apache Spark
==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Converting CSV files to Avro

2016-01-17 Thread Igor Berman
https://github.com/databricks/spark-avro ?

On 17 January 2016 at 13:46, Gideon  wrote:

> Hi everyone,
>
> I'm writing a Scala program which uses  Spark CSV
>    to read CSV files from a
> directory. After reading the CSVs as data frames I need to convert them to
> Avro format since I need to eventually convert that data to a
> GenericRecord
> <
> https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericData.Record.html
> >
> for further processing.
> I know I can do toJSON on the DF and get a valid JSON format, however I
> need
> it to be Avro compliant (I have some nullable fields in these CSV files
> which require  special handling
> <
> http://stackoverflow.com/questions/27485580/how-to-fix-expected-start-union-got-value-number-int-when-converting-json-to-av
> >
> when converting to Avro)
>
> Does anyone have any idea on how this can be done besides normalizing all
> the fields by myself?
>
> Thanks in advance
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Converting-CSV-files-to-Avro-tp25985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Converting CSV files to Avro

2016-01-17 Thread Gideon
Hi everyone,

I'm writing a Scala program which uses  Spark CSV
   to read CSV files from a
directory. After reading the CSVs as data frames I need to convert them to
Avro format since I need to eventually convert that data to a  GenericRecord

  
for further processing. 
I know I can do toJSON on the DF and get a valid JSON format, however I need
it to be Avro compliant (I have some nullable fields in these CSV files
which require  special handling

  
when converting to Avro)

Does anyone have any idea on how this can be done besides normalizing all
the fields by myself?

Thanks in advance 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Converting-CSV-files-to-Avro-tp25985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to tunning my spark application.

2016-01-17 Thread Ted Yu
In sampleArray(), there is a loop:
for (i <- 0 until ARRAY_SAMPLE_SIZE) {

ARRAY_SAMPLE_SIZE is a constant (100).

Not clear how the amount of computation in sampleArray() can be reduced.

Which Spark release are you using ?

Thanks

On Sun, Jan 17, 2016 at 6:22 AM, 张峻  wrote:

> Dear All
>
> I used jProfiler to profiling my spark application.
> And I had find more than 70% cpu is used by the
> org.apache.spark.util.SizeEstimator class.
>
> There call tree is as blow.
>
> java.lang.Thread.run
> --scala.collection.immutable.Range.foreach$mVc$sp
>
> org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp
> --scala.collection.immutable.List.foreach
>
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
> --scala.collection.immutable.List.foreach
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
>
> My code don’t show in this two biggest branch of the call tree.
>
> I want to know what will cause spark to spend so many time in
> “Range.foreach” or “.List.foreach”
> Any one can give me some tips?
>
> BR
>
> Julian Zhang
>
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to tunning my spark application.

2016-01-17 Thread 张峻
Dear Ted

My Spark release is 1.5.2

BR

Julian Zhang 

> 在 2016年1月17日,23:10,Ted Yu  写道:
> 
> In sampleArray(), there is a loop:
> for (i <- 0 until ARRAY_SAMPLE_SIZE) {
> 
> ARRAY_SAMPLE_SIZE is a constant (100).
> 
> Not clear how the amount of computation in sampleArray() can be reduced.
> 
> Which Spark release are you using ?
> 
> Thanks
> 
>> On Sun, Jan 17, 2016 at 6:22 AM, 张峻  wrote:
>> Dear All
>> 
>> I used jProfiler to profiling my spark application.
>> And I had find more than 70% cpu is used by the 
>> org.apache.spark.util.SizeEstimator class.
>> 
>> There call tree is as blow.
>> 
>> java.lang.Thread.run
>> --scala.collection.immutable.Range.foreach$mVc$sp
>> org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp
>> --scala.collection.immutable.List.foreach
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
>> --scala.collection.immutable.List.foreach
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
>> 
>> My code don’t show in this two biggest branch of the call tree.
>> 
>> I want to know what will cause spark to spend so many time in 
>> “Range.foreach” or “.List.foreach”
>> Any one can give me some tips?
>> 
>> BR
>> 
>> Julian Zhang
>> 
>> 
>> 
>> 
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Reuse Executor JVM across different JobContext

2016-01-17 Thread Jia Zou
Dear all,

Is there a way to reuse executor JVM across different JobContexts? Thanks.

Best Regards,
Jia


How to tunning my spark application.

2016-01-17 Thread 张峻
Dear All

I used jProfiler to profiling my spark application.
And I had find more than 70% cpu is used by the 
org.apache.spark.util.SizeEstimator class.

There call tree is as blow.

java.lang.Thread.run
--scala.collection.immutable.Range.foreach$mVc$sp
org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp
--scala.collection.immutable.List.foreach
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
--scala.collection.immutable.List.foreach
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply

My code don’t show in this two biggest branch of the call tree.

I want to know what will cause spark to spend so many time in “Range.foreach” 
or “.List.foreach”
Any one can give me some tips?

BR

Julian Zhang






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: simultaneous actions

2016-01-17 Thread Koert Kuipers
Same rdd means same sparkcontext means same workers

Cache/persist the rdd to avoid repeated jobs
On Jan 17, 2016 5:21 AM, "Mennour Rostom"  wrote:

> Hi,
>
> Thank you all for your answers,
>
> If I correctly understand, actions (in my case foreach) can be run
> concurrently and simultaneously on the SAME rdd, (which is logical because
> they are read only object). however, I want to know if the same workers are
> used for the concurrent analysis ?
>
> Thank you
>
> 2016-01-15 21:11 GMT+01:00 Jakob Odersky :
>
>> I stand corrected. How considerable are the benefits though? Will the
>> scheduler be able to dispatch jobs from both actions simultaneously (or on
>> a when-workers-become-available basis)?
>>
>> On 15 January 2016 at 11:44, Koert Kuipers  wrote:
>>
>>> we run multiple actions on the same (cached) rdd all the time, i guess
>>> in different threads indeed (its in akka)
>>>
>>> On Fri, Jan 15, 2016 at 2:40 PM, Matei Zaharia 
>>> wrote:
>>>
 RDDs actually are thread-safe, and quite a few applications use them
 this way, e.g. the JDBC server.

 Matei

 On Jan 15, 2016, at 2:10 PM, Jakob Odersky  wrote:

 I don't think RDDs are threadsafe.
 More fundamentally however, why would you want to run RDD actions in
 parallel? The idea behind RDDs is to provide you with an abstraction for
 computing parallel operations on distributed data. Even if you were to call
 actions from several threads at once, the individual executors of your
 spark environment would still have to perform operations sequentially.

 As an alternative, I would suggest to restructure your RDD
 transformations to compute the required results in one single operation.

 On 15 January 2016 at 06:18, Jonathan Coveney 
 wrote:

> Threads
>
>
> El viernes, 15 de enero de 2016, Kira  escribió:
>
>> Hi,
>>
>> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can
>> this
>> be done ?
>>
>> Thank you,
>> Regards
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com .
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


>>>
>>
>


Re: How to tunning my spark application.

2016-01-17 Thread Ted Yu
For 'List.foreach', it is likely for the pointerFields shown below:

  private class ClassInfo(
val shellSize: Long,
val pointerFields: List[Field]) {}

FYI

On Sun, Jan 17, 2016 at 7:15 AM, 张峻  wrote:

> Dear Ted
>
> My Spark release is 1.5.2
>
> BR
>
> Julian Zhang
>
> 在 2016年1月17日,23:10,Ted Yu  写道:
>
> In sampleArray(), there is a loop:
> for (i <- 0 until ARRAY_SAMPLE_SIZE) {
>
> ARRAY_SAMPLE_SIZE is a constant (100).
>
> Not clear how the amount of computation in sampleArray() can be reduced.
>
> Which Spark release are you using ?
>
> Thanks
>
> On Sun, Jan 17, 2016 at 6:22 AM, 张峻  wrote:
>
>> Dear All
>>
>> I used jProfiler to profiling my spark application.
>> And I had find more than 70% cpu is used by the
>> org.apache.spark.util.SizeEstimator class.
>>
>> There call tree is as blow.
>>
>> java.lang.Thread.run
>> --scala.collection.immutable.Range.foreach$mVc$sp
>>
>> org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp
>> --scala.collection.immutable.List.foreach
>>
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
>> --scala.collection.immutable.List.foreach
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
>>
>> My code don’t show in this two biggest branch of the call tree.
>>
>> I want to know what will cause spark to spend so many time in
>> “Range.foreach” or “.List.foreach”
>> Any one can give me some tips?
>>
>> BR
>>
>> Julian Zhang
>>
>>
>>
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Spark Streaming: BatchDuration and Processing time

2016-01-17 Thread pyspark2555
Hi,

If BatchDuration is set to 1 second in StreamingContext and the actual
processing time is longer than one second, then how does Spark handle that?

For example, I am receiving a continuous Input stream. Every 1 second (batch
duration), the RDDs will be processed. What if this processing time is
longer than 1 second? What happens in the next batch duration?

Thanks.
Amit



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-BatchDuration-and-Processing-time-tp25986.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org