Re: detecting last record of partition

2016-10-13 Thread Holden Karau
It sounds like mapPartitionsWithIndex will give you the information you
want over flatMap.

On Thursday, October 13, 2016, Shushant Arora 
wrote:

> Hi
>
> I have a transformation on a pair rdd using flatmap function.
>
> 1.Can I detect in flatmap whether the current record is last record of
> partition being processed and
> 2. what is the partition index of this partition.
>
> public Iterable> call(Tuple2 t)
> throws Exception {
> //whether element is last elemet of current partiton or not ??
> //partition index to which element belongs
> to
> }
> });
> Thanks
>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


SparkR execution hang on when handle a RDD which is converted from DataFrame

2016-10-13 Thread Lantao Jin
sqlContext <- sparkRHive.init(sc)
sqlString<-
"SELECT
key_id,
rtl_week_beg_dt rawdate,
gmv_plan_rate_amt value
FROM
metrics_moveing_detection_cube
"
df <- sql(sqlString)
rdd<-SparkR:::toRDD(df)

#hang on case one: take from rdd
#take(rdd,3)

#hang on case two: convert back to dataframe
#df1<-createDataFrame(rdd)
#head(df1)

#not hang case: direct handle on dataframe is ok
head(df,3)

Code above is spark2.0.0, change "df <- sql(sqlString)" to "df <-
sql(sqlContext , sqlString)" and "createDataFrame(rdd)" to
"createDataFrame(sqlContext, rdd)" can be used in Spark1.6.2
BTW, metrics_moveing_detection_cube is a table from hive.

All version is the same result. Thanks.


Hang on happened in executor.

stack is below:


Thread IDThread NameThread State
116 Executor task launch worker-0 RUNNABLE

java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:170)
java.net.SocketInputStream.read(SocketInputStream.java:141)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
java.io.DataInputStream.readInt(DataInputStream.java:387)org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:212)
org.apache.spark.api.r.RRunner$$anon$1.(RRunner.scala:96)
org.apache.spark.api.r.RRunner.compute(RRunner.scala:87)
org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

119 client DomainSocketWatcher RUNNABLE

org.apache.hadoop.net.unix.DomainSocketWatcher.doPoll0(Native Method)
org.apache.hadoop.net.unix.DomainSocketWatcher.access$900(DomainSocketWatcher.java:52)
org.apache.hadoop.net.unix.DomainSocketWatcher$2.run(DomainSocketWatcher.java:511)
java.lang.Thread.run(Thread.java:745)

69 dispatcher-event-loop-0 WAITING

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:205)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

70 dispatcher-event-loop-1 WAITING

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:205)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

79 dispatcher-event-loop-10 WAITING

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:205)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

80 dispatcher-event-loop-11 WAITING

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:205)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

81 dispatcher-event-loop-12 WAITING

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)

detecting last record of partition

2016-10-13 Thread Shushant Arora
Hi

I have a transformation on a pair rdd using flatmap function.

1.Can I detect in flatmap whether the current record is last record of
partition being processed and
2. what is the partition index of this partition.

public Iterable> call(Tuple2 t)
throws Exception {
//whether element is last elemet of current partiton or not ??
//partition index to which element belongs
to
}
});
Thanks


[Spark 2.0.0] error when unioning to an empty dataset

2016-10-13 Thread Efe Selcuk
I have a use case where I want to build a dataset based off of
conditionally available data. I thought I'd do something like this:

case class SomeData( ... ) // parameters are basic encodable types like
strings and BigDecimals

var data = spark.emptyDataset[SomeData]

// loop, determining what data to ingest and process into datasets
  data = data.union(someCode.thatReturnsADataset)
// end loop

However I get a runtime exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
unresolved operator 'Union;
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Granted, I'm new at Spark so this might be an anti-pattern, so I'm open to
suggestions. However it doesn't seem like I'm doing anything incorrect
here, the types are correct. Searching for this error online returns
results seemingly about working in dataframes and having mismatching
schemas or a different order of fields, and it seems like bugfixes have
gone into place for those cases.

Thanks in advance.
Efe


Re: No way to set mesos cluster driver memory overhead?

2016-10-13 Thread drewrobb
It seems like this is a real issue, so I've opened an issue:
https://issues.apache.org/jira/browse/SPARK-17928



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-way-to-set-mesos-cluster-driver-memory-overhead-tp27897p27901.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Java.util.ArrayList is not a valid external type for schema of array

2016-10-13 Thread Mohamed Nadjib MAMI
In Spark 1.5.2 I had a job that reads from textFile and saves some data
into a Parquet table. One value was of type `ArrayList` being
successfully saved as an "array" column in the Parquet table. I
upgraded to Spark version 2.0.1, I changed the necessary code (SparkConf to
SparkSession,  DataFrame to Dataset) so no syntactic issues in the code.
However, the job is not finishing anymore. The following exception is fired:
`java.lang.RuntimeException: Error while encoding:
java.lang.RuntimeException: java.util.ArrayList is not a valid external
type for schema of array`

at the line:
`table.write().parquet(table_name);`

I inspected the schema and it looked fine. Here is the string array column:

`StructType(StructField(array_column,ArrayType(StringType,true),true)`

...and the value to be saved therein looks like:

[aaa, bbb, ccc]

The column array is constructed this way:
`DataTypes.createStructField(column,
DataTypes.createArrayType(DataTypes.StringType,
true), true);`

I guess I provided all necessary code, but if more helps, please let me
know.

So there should be some logic-change in the latest version altering the
possibility to save ArrayList in an array of string in Parquet
tables. Any help on solving/working around this would be very appreciated.

*Regards, Grüße, **Cordialement,** Recuerdos, Saluti, προσρήσεις, 问候,
تحياتي.*
*Mohamed Nadjib Mami*
*PhD Student - EIS Department - **Bonn University (Germany).*

*About me! *
*LinkedIn *


How to spark-submit using python subprocess module?

2016-10-13 Thread Vikram Kone
I have a python script that is used to submit spark jobs using the
spark-submit tool. I want to execute the command and write the output both
to STDOUT and a logfile in real time. i'm using python 2.7 on a ubuntu
server.

This is what I have so far in my SubmitJob.py script

#!/usr/bin/python
# Submit the commanddef submitJob(cmd, log_file):
with open(log_file, 'w') as fh:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print output.strip()
fh.write(output)
rc = process.poll()
return rc
if __name__ == "__main__":
cmdList = ["dse", "spark-submit", "--spark-master",
"spark://127.0.0.1:7077", "--class", "com.spark.myapp", "./myapp.jar"]
log_file = "/tmp/out.log"
exist_status = submitJob(cmdList, log_file)
print "job finished with status ",exist_status

The strange thing is, when I execute the same command directly in the shell
it works fine and produces output on screen as the program proceeds.

So it looks like something is wrong in the way I'm using the
subprocess.PIPE for stdout and writing the file.

What's the current recommended way to use subprocess module for writing to
stdout and log file in real time line by line? I see a lot of different
options on the internet but not sure which is correct or latest.

Is there  anything specific to the way spark-submit buffers the stdout that
I need to take care of?

thanks


Re: Can mapWithState state func be called every batchInterval?

2016-10-13 Thread manasdebashiskar
Actually each element of mapwithstate has a time out component. You can write
a function to "treat" your time out.

You can match it with your batch size and do fun stuff when the batch ends.

People do session management with the same approach.
When activity is registered the session is refreshed, and the session is
deleted("one way to treat it") when time out happens.

..Mana






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-mapWithState-state-func-be-called-every-batchInterval-tp27877p27898.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Re-partitioning mapwithstateDstream

2016-10-13 Thread manasdebashiskar
StateSpec has a method numPartitions to set the initial number of partition.

That should do the trick.

...Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-partitioning-mapwithstateDstream-tp27880p27899.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Python Spark Improvements (forked from Spark Improvement Proposals)

2016-10-13 Thread Holden Karau
Awesome, good points everyone. The ranking of the issues is super useful
and I'd also completely forgotten about the lack of built in UDAF support
which is rather important. There is a PR to make it easier to call/register
JVM UDFs from Python which will hopefully help a bit there too. I'm getting
on a flight to London for OSCON but I want to continueo encourage users to
chime in with their experiences (to that end I'm trying to re include user@
since it doesn't seem to have been posted there despite my initial attempt
to do so.)

On Thursday, October 13, 2016, assaf.mendelson 
wrote:

> Hi,
>
> We are actually using pyspark heavily.
>
> I agree with all of your points,  for me I see the following as the main
> hurdles:
>
> 1.   Pyspark does not have support for UDAF. We have had multiple
> needs for UDAF and needed to go to java/scala to support these. Having
> python UDAF would have made life much easier (especially at earlier stages
> when we prototype).
>
> 2.   Performance. I cannot stress this enough. Currently we have
> engineers who take python UDFs and convert them to scala UDFs for
> performance. We are currently even looking at writing UDFs and UDAFs in a
> more native way (e.g. using expressions) to improve performance but working
> with pyspark can be really problematic.
>
>
>
> BTW, other than using jython or arrow, I believe there are a couple of
> other ways to get improve performance:
>
> 1.   Python provides tool to generate AST for python code (
> https://docs.python.org/2/library/ast.html). This means we can use the
> AST to construct scala code very similar to how expressions are build for
> native spark functions in scala. Of course doing full conversion is very
> hard but at least handling simple cases should be simple.
>
> 2.   The above would of course be limited if we use python packages
> but over time it is possible to add some “translation” tools (i.e. take
> python packages and find the appropriate scala equivalent. We can even
> provide this to the user to supply their own conversions thereby looking as
> a regular python code but being converted to scala code behind the scenes).
>
> 3.   In scala, it is possible to use codegen to actually generate
> code from a string. There is no reason why we can’t write the expression in
> python and provide a scala string. This would mean learning some scala but
> would mean we do not have to create a separate code tree.
>
>
>
> BTW, the fact that all of the tools to access java are marked as private
> has me a little worried. Nearly all of our UDFs (and all of our UDAFs) are
> written in scala for performance. The wrapping to provide them in python
> uses way too many private elements for my taste.
>
>
>
>
>
> *From:* msukmanowsky [via Apache Spark Developers List] [mailto:ml-node+
> [hidden email]
> ]
> *Sent:* Thursday, October 13, 2016 3:51 AM
> *To:* Mendelson, Assaf
> *Subject:* Re: Python Spark Improvements (forked from Spark Improvement
> Proposals)
>
>
>
> As very heavy Spark users at Parse.ly, I just wanted to give a +1 to all
> of the issues raised by Holden and Ricardo. I'm also giving a talk at PyCon
> Canada on PySpark https://2016.pycon.ca/en/schedule/096-mike-sukmanowsky/.
>
>
> Being a Python shop, we were extremely pleased to learn about PySpark a
> few years ago as our main ETL pipeline used Apache Pig at the time. I was
> one of the only folks who understood Pig and Java so collaborating on this
> as a team was difficult.
>
> Spark provided a means for the entire team to collaborate, but we've hit
> our fair share of issues all of which are enumerated in this thread.
>
> Besides giving a +1 here, I think if I were to force rank these items for
> us, it'd be:
>
> 1. Configuration difficulties: we've lost literally weeks to
> troubleshooting memory issues for larger jobs. It took a long time to even
> understand *why* certain jobs were failing since Spark would just report
> executors being lost. Finally we tracked things down to understanding that
> spark.yarn.executor.memoryOverhead controls the portion of memory
> reserved for Python processes, but none of this is documented anywhere as
> far as I can tell. We discovered this via trial and error. Both
> documentation and better defaults for this setting when running a PySpark
> application are probably sufficient. We've also had a number of troubles
> with saving Parquet output as part of an ETL flow, but perhaps we'll save
> that for a blog post of its own.
>
> 2. Dependency management: I've tried to help move the conversation on
> https://issues.apache.org/jira/browse/SPARK-13587 but it seems we're a
> bit stalled. Installing the required dependencies for a PySpark application
> is a really messy ordeal right now.
>
> 3. Development workflow: I'd combine both "incomprehensible error
> messages" and "
> difficulty using PySpark from 

Re: No way to set mesos cluster driver memory overhead?

2016-10-13 Thread Michael Gummelt
We see users run both in the dispatcher and marathon.  I generally prefer
marathon, because there's a higher likelihood it's going to have some
feature you need that the dispatcher lacks (like in this case).

It doesn't look like we support overhead for the driver.

On Thu, Oct 13, 2016 at 10:42 AM, drewrobb  wrote:

> When using spark on mesos and deploying a job in cluster mode using
> dispatcher, there appears to be no memory overhead configuration for the
> launched driver processes ("--driver-memory" is the same as Xmx which is
> the
> same as the memory quota). This makes it almost a guarantee that a long
> running driver will be OOM killed by mesos. Yarn cluster mode has an
> equivalent option -- spark.yarn.driver.memoryOverhead. Is there some way
> to
> configure driver memory overhead that I'm missing?
>
> Bigger picture question-- Is it even best practice to deploy long running
> spark streaming jobs using dispatcher? I could alternatively launch the
> driver by itself using marathon for example, where it would be trivial to
> grant the process additional memory.
>
> Thanks!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/No-way-to-set-mesos-cluster-driver-
> memory-overhead-tp27897.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: No way to set mesos cluster driver memory overhead?

2016-10-13 Thread Rodrick Brown
On Thu, Oct 13, 2016 at 1:42 PM, drewrobb  wrote:

> When using spark on mesos and deploying a job in cluster mode using
> dispatcher, there appears to be no memory overhead configuration for the
> launched driver processes ("--driver-memory" is the same as Xmx which is
> the
> same as the memory quota). This makes it almost a guarantee that a long
> running driver will be OOM killed by mesos. Yarn cluster mode has an
> equivalent option -- spark.yarn.driver.memoryOverhead. Is there some way
> to
> configure driver memory overhead that I'm missing?
>
> Bigger picture question-- Is it even best practice to deploy long running
> spark streaming jobs using dispatcher? I could alternatively launch the
> driver by itself using marathon for example, where it would be trivial to
> grant the process additional memory.
>
> Thanks!
>
> I wrote a simple wrapper to provide a little head room, I'm currently
using the Chronos framework to run my spark jobs which provides some handy
environment variables like CHRONOS_RESOURCE_MEM which is what's offered
from Mesos.

execString.append('--driver-memory {:.0f}M'.format(.7 *
float(os.environ.get('CHRONOS_RESOURCE_MEM',params.get('memory')
execString.append('--executor-memory {:.0f}M'.format(.98 *
float(os.environ.get('CHRONOS_RESOURCE_MEM',params.get('memory')
execString.append('--jars {} {} {}'.format(params.get('classpath'),
params.get('jarfile'), params.get('optargs')))

i.e. SPARK_MESOS_DRIVER_MEM = CHRONOS_RESOURCE_MEM * .7

We basically set the upper limit for the driver to use 70% of what's
allocated in total to Mesos.
This will help get around Mesos killing long running jobs because of OOM.

-- 

[image: Orchard Platform] 

*Rodrick Brown */ *SRE - DevOPs*

9174456839 / rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.


Re: spark on mesos memory sizing with offheap

2016-10-13 Thread Michael Gummelt
It doesn't look like we are.  Can you file a JIRA?  A workaround is to set
spark.mesos.executor.overhead to be at least spark.memory.offheap.size.
This is how the container is sized:
https://github.com/apache/spark/blob/master/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L366

On Thu, Oct 13, 2016 at 7:23 AM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Hi,
> I am trying to understand how mesos allocate memory when offheap is
> enabled but it seems that the framework is only taking the heap + 400 MB
> overhead into consideration for resources allocation.
> Example: spark.executor.memory=3g spark.memory.offheap.size=1g ==> mesos
> report 3.4g allocated for the executor
> Is there any configuration to use both heap and offheap for mesos
> allocation ?
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


No way to set mesos cluster driver memory overhead?

2016-10-13 Thread drewrobb
When using spark on mesos and deploying a job in cluster mode using
dispatcher, there appears to be no memory overhead configuration for the
launched driver processes ("--driver-memory" is the same as Xmx which is the
same as the memory quota). This makes it almost a guarantee that a long
running driver will be OOM killed by mesos. Yarn cluster mode has an
equivalent option -- spark.yarn.driver.memoryOverhead. Is there some way to
configure driver memory overhead that I'm missing?

Bigger picture question-- Is it even best practice to deploy long running
spark streaming jobs using dispatcher? I could alternatively launch the
driver by itself using marathon for example, where it would be trivial to
grant the process additional memory.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-way-to-set-mesos-cluster-driver-memory-overhead-tp27897.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Limit Kafka batches size with Spark Streaming

2016-10-13 Thread Samy Dindane

Hi,

On 10/13/2016 04:35 PM, Cody Koeninger wrote:

So I see in the logs that PIDRateEstimator is choosing a new rate, and
the rate it's choosing is 100.

But it's always choosing 100, while all the other variables change (processing 
time, latestRate, etc.) change.
Also, the records per batch is always the same despite the rate being 100.


That happens to be the default minimum of an (apparently undocumented) setting,

spark.streaming.backpressure.pid.minRate

Try setting that to 1 and see if there's different behavior.

Same behavior. Always choose the same rate, the records per batch number does 
not change.


BTW, how many kafka partitions are you using, and how many actually
have data for a given batch?

3 partitions.
All of them have more than maxRatePerPartition records (my topic has hundred of 
millions of records).



On Thu, Oct 13, 2016 at 4:33 AM, Samy Dindane  wrote:

Hey Cody,

Thanks for the reply. Really helpful.

Following your suggestion, I set spark.streaming.backpressure.enabled to
true and maxRatePerPartition to 10.
I know I can handle 100k records at the same time, but definitely not in 1
second (the batchDuration), so I expect the backpressure to lower that
number.

Unfortunately the backpressure doesn't work and I keep getting 100k records
per batch.

Here is my output log:
https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba
And this is my conf:

conf.set("spark.streaming.kafka.consumer.poll.ms", "3")
conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
conf.set("spark.streaming.backpressure.enabled", "true")

That's not normal, is it? Do you notice anything odd in my logs?

Thanks a lot.



On 10/12/2016 07:31 PM, Cody Koeninger wrote:


Cool, just wanted to make sure.

To answer your question about


Isn't "spark.streaming.backpressure.initialRate" supposed to do this?



that configuration was added well after the integration of the direct
stream with the backpressure code, and was added only to the receiver
code, which the direct stream doesn't share since it isn't a receiver.
Not making excuses about it being confusing, just explaining how
things ended up that way :(  So yeah, maxRatePerPartition is the
closest thing you have on the direct stream side to being able to
limit before the backpressure estimator has something to work with.

So to try and debug what you're seeing, if you add a line like this to
your log4j.properties

log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE

you should start seeing log lines like

16/10/12 12:18:01 TRACE PIDRateEstimator:
time = 1476292681092, # records = 20, processing time = 20949,
scheduling delay = 6
16/10/12 12:18:01 TRACE PIDRateEstimator:
latestRate = -1.0, error = -1.9546995083297531
latestError = -1.0, historicalError = 0.001145639409995704
delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10

and then once it updates, lines like

16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0

For a really artificially constrained example where
maxRatePerPartition is set such that it limits to 20 per batch but the
system can really only handle 5 per batch, the streaming UI will look
something like this:

https://i.imgsafe.org/e730492453.png

notice the cutover point


On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane  wrote:


I am 100% sure.

println(conf.get("spark.streaming.backpressure.enabled")) prints true.


On 10/12/2016 05:48 PM, Cody Koeninger wrote:



Just to make 100% sure, did you set

spark.streaming.backpressure.enabled

to true?

On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane  wrote:





On 10/12/2016 04:40 PM, Cody Koeninger wrote:




How would backpressure know anything about the capacity of your system
on the very first batch?




Isn't "spark.streaming.backpressure.initialRate" supposed to do this?





You should be able to set maxRatePerPartition at a value that makes
sure your first batch doesn't blow things up, and let backpressure
scale from there.




Backpressure doesn't scale even when using maxRatePerPartition: when I
enable backpressure and set maxRatePerPartition to n, I always get n
records, even if my batch takes longer than batchDuration to finish.

Example:
* I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
Durations.seconds(1))`
* I set backpressure.initialRate and/or maxRatePerPartition to 100,000
and
enable backpressure
* Since I can't handle 100,000 records in 1 second, I expect the
backpressure to kick in in the second batch, and get less than 100,000;
but
this does not happen

What am I missing here?





On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane 
wrote:




That's what I was looking for, thank you.

Unfortunately, neither

* spark.streaming.backpressure.initialRate
* spark.streaming.backpressure.enabled
* spark.streaming.receiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different 

Spark 2.0.0 TreeAggregate with larger depth will be OOM?

2016-10-13 Thread Jy Chen
Hi,all
I'm using Spark 2.0.0 to train a model with 1000w+ parameters, about 500GB
data. The treeAggregate is used to aggregate the gradient, when I set the
depth = 2 or 3, it works, and depth equals to 3 is faster.
So I set depth = 4 to obtain better performance, but now some executors
will be OOM in the shuffle phase. Why would this happen? With deeper depth,
each executor should aggregate less records and use less memory, I don't
know why OOM happens. Can someone help?


Re: Limit Kafka batches size with Spark Streaming

2016-10-13 Thread Cody Koeninger
So I see in the logs that PIDRateEstimator is choosing a new rate, and
the rate it's choosing is 100.

That happens to be the default minimum of an (apparently undocumented) setting,

spark.streaming.backpressure.pid.minRate

Try setting that to 1 and see if there's different behavior.

BTW, how many kafka partitions are you using, and how many actually
have data for a given batch?


On Thu, Oct 13, 2016 at 4:33 AM, Samy Dindane  wrote:
> Hey Cody,
>
> Thanks for the reply. Really helpful.
>
> Following your suggestion, I set spark.streaming.backpressure.enabled to
> true and maxRatePerPartition to 10.
> I know I can handle 100k records at the same time, but definitely not in 1
> second (the batchDuration), so I expect the backpressure to lower that
> number.
>
> Unfortunately the backpressure doesn't work and I keep getting 100k records
> per batch.
>
> Here is my output log:
> https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba
> And this is my conf:
>
> conf.set("spark.streaming.kafka.consumer.poll.ms", "3")
> conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
> conf.set("spark.streaming.backpressure.enabled", "true")
>
> That's not normal, is it? Do you notice anything odd in my logs?
>
> Thanks a lot.
>
>
>
> On 10/12/2016 07:31 PM, Cody Koeninger wrote:
>>
>> Cool, just wanted to make sure.
>>
>> To answer your question about
>>
>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>
>>
>> that configuration was added well after the integration of the direct
>> stream with the backpressure code, and was added only to the receiver
>> code, which the direct stream doesn't share since it isn't a receiver.
>> Not making excuses about it being confusing, just explaining how
>> things ended up that way :(  So yeah, maxRatePerPartition is the
>> closest thing you have on the direct stream side to being able to
>> limit before the backpressure estimator has something to work with.
>>
>> So to try and debug what you're seeing, if you add a line like this to
>> your log4j.properties
>>
>> log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE
>>
>> you should start seeing log lines like
>>
>> 16/10/12 12:18:01 TRACE PIDRateEstimator:
>> time = 1476292681092, # records = 20, processing time = 20949,
>> scheduling delay = 6
>> 16/10/12 12:18:01 TRACE PIDRateEstimator:
>> latestRate = -1.0, error = -1.9546995083297531
>> latestError = -1.0, historicalError = 0.001145639409995704
>> delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10
>>
>> and then once it updates, lines like
>>
>> 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0
>>
>> For a really artificially constrained example where
>> maxRatePerPartition is set such that it limits to 20 per batch but the
>> system can really only handle 5 per batch, the streaming UI will look
>> something like this:
>>
>> https://i.imgsafe.org/e730492453.png
>>
>> notice the cutover point
>>
>>
>> On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane  wrote:
>>>
>>> I am 100% sure.
>>>
>>> println(conf.get("spark.streaming.backpressure.enabled")) prints true.
>>>
>>>
>>> On 10/12/2016 05:48 PM, Cody Koeninger wrote:


 Just to make 100% sure, did you set

 spark.streaming.backpressure.enabled

 to true?

 On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane  wrote:
>
>
>
>
> On 10/12/2016 04:40 PM, Cody Koeninger wrote:
>>
>>
>>
>> How would backpressure know anything about the capacity of your system
>> on the very first batch?
>
>
>
> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>
>>
>>
>>
>> You should be able to set maxRatePerPartition at a value that makes
>> sure your first batch doesn't blow things up, and let backpressure
>> scale from there.
>
>
>
> Backpressure doesn't scale even when using maxRatePerPartition: when I
> enable backpressure and set maxRatePerPartition to n, I always get n
> records, even if my batch takes longer than batchDuration to finish.
>
> Example:
> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
> Durations.seconds(1))`
> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000
> and
> enable backpressure
> * Since I can't handle 100,000 records in 1 second, I expect the
> backpressure to kick in in the second batch, and get less than 100,000;
> but
> this does not happen
>
> What am I missing here?
>
>
>
>>
>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane 
>> wrote:
>>>
>>>
>>>
>>> That's what I was looking for, thank you.
>>>
>>> Unfortunately, neither
>>>
>>> * spark.streaming.backpressure.initialRate
>>> * spark.streaming.backpressure.enabled
>>> * 

pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-13 Thread Pietro Pugni
Hi there,
I opened a question on StackOverflow at this link: 
http://stackoverflow.com/questions/40007972/pyspark-doesnt-recognize-mmm-dateformat-pattern-in-spark-read-load-for-dates?noredirect=1#comment67297930_40007972

I didn’t get any useful answer, so I’m writing here hoping that someone can 
help me.

In short, I’m trying to read a CSV containing data columns stored using the 
pattern “MMMdd”. What doesn’t work for me is “MMM”. I’ve done some testing 
and discovered that it’s a localization issue. As you can read from the 
StackOverflow question, I run a simple Java code to parse the date “1989Dec31” 
and it works only if I specify Locale.US in the SimpleDateFormat() function.

I would like pyspark to work. I tried setting a different local from console 
(LANG=“en_US”), but it doesn’t work. I tried also setting it using the locale 
package from Python.

So, there’s a way to set locale in Spark when using pyspark? The issue is Java 
related and not Python related (the function that parses data is invoked by 
spark.read.load(dateFormat=“MMMdd”, …). I don’t want to use other solutions 
in order to encode data because they are slower (from what I’ve seen so far).

Thank you
 Pietro
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Want to test spark-sql-kafka but get unresolved dependency error

2016-10-13 Thread Cody Koeninger
As Sean said, it's unreleased.  If you want to try it out, build spark

http://spark.apache.org/docs/latest/building-spark.html

The easiest way to include the jar is probably to use mvn install to
put it in your local repository, then link it in your application's
mvn or sbt build file as described in the docs you linked.


On Thu, Oct 13, 2016 at 3:24 AM, JayKay  wrote:
> I want to work with the Kafka integration for structured streaming. I use
> Spark version 2.0.0. and I start the spark-shell with:
>
> spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.0
>
> As described here:
> https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md
>
> But I get a unresolved dependency error ("unresolved dependency:
> org.apache.spark#spark-sql-kafka-0-10_2.11;2.0.0: not found"). So it seems
> not to be available via maven or spark-packages.
>
> How can I accesss this package? Or am I doing something wrong/missing?
>
> Thank you for you help.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Want-to-test-spark-sql-kafka-but-get-unresolved-dependency-error-tp27891.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark on mesos memory sizing with offheap

2016-10-13 Thread vincent gromakowski
Hi,
I am trying to understand how mesos allocate memory when offheap is enabled
but it seems that the framework is only taking the heap + 400 MB overhead
into consideration for resources allocation.
Example: spark.executor.memory=3g spark.memory.offheap.size=1g ==> mesos
report 3.4g allocated for the executor
Is there any configuration to use both heap and offheap for mesos
allocation ?


Re: Spyder and SPARK combination problem...Please help!

2016-10-13 Thread innocent73
Finally I found the solution!
I have changed the Python's directory settings as below:

import os
import sys
os.chdir(*"C:\Python27"*)
os.curdir

and it works like a charm :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spyder-and-SPARK-combination-problem-Please-help-tp27882p27896.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark security

2016-10-13 Thread Mendelson, Assaf
Hi,
We have a spark cluster and we wanted to add some security for it. I was 
looking at the documentation (in  
http://spark.apache.org/docs/latest/security.html) and had some questions.

1.   Do all executors listen by the same blockManager port? For example, in 
yarn there are multiple executors per node, do they all listen to the same port?

2.   Are ports defined in earlier version (e.g. 
http://spark.apache.org/docs/1.6.1/security.html) and removed in the latest 
(such as spark.executor.port and spark.fileserver.port) gone and can be blocked?

3.   If I define multiple workers per node in spark standalone mode, how do 
I set the different ports for each worker (there is only one 
spark.worker.ui.port / SPARK_WORKER_WEBUI_PORT definition. Do I have to start 
each worker separately to configure a port?) The same is true for the worker 
port (SPARK_WORKER_PORT)

4.   Is it possible to encrypt the logs instead of just limiting with 
permissions the log directory?

5.   Is the communication between the servers encrypted (e.g. using ssh?)

6.   Are there any additional best practices beyond what is written in the 
documentation?
Thanks,
Assaf.




Re: spark with kerberos

2016-10-13 Thread Saisai Shao
I think security has nothing to do with what API you use, spark sql or RDD
API.

Assuming you're running on yarn cluster (that is the only cluster manager
supports Kerberos currently).

Firstly you need to get Kerberos tgt in your local spark-submit process,
after being authenticated by Kerberos, Spark could get delegation tokens
from HDFS, so that you could communicate with security hadoop cluster. Here
in your case since you have to communicate with other remote HDFS clusters,
so you have to get tokens from all these remote clusters, you could
configure "spark.yarn.access.namenodes" to list all the security hdfs
cluster you want to access, then hadoop client API will get tokens from all
these clusters.

For the details you could refer to
https://spark.apache.org/docs/latest/running-on-yarn.html.

I didn't try personally since I don't have such requirements. It may
requires additional steps which I missed. You could take a a try.


On Thu, Oct 13, 2016 at 6:38 PM, Denis Bolshakov 
wrote:

> The problem happens when writting (reading works fine)
>
> rdd.saveAsNewAPIHadoopFile
>
> We use just RDD and HDFS, no other things.
> Spark 1.6.1 version.
> `Claster A` - CDH 5.7.1
> `Cluster B` - vanilla hadoop 2.6.5
> `Cluster C` - CDH 5.8.0
>
> Best regards,
> Denis
>
> On 13 October 2016 at 13:06, ayan guha  wrote:
>
>> And a little more details on Spark version, hadoop version and
>> distribution would also help...
>>
>> On Thu, Oct 13, 2016 at 9:05 PM, ayan guha  wrote:
>>
>>> I think one point you need to mention is your target - HDFS, Hive or
>>> Hbase (or something else) and which end points are used.
>>>
>>> On Thu, Oct 13, 2016 at 8:50 PM, dbolshak 
>>> wrote:
>>>
 Hello community,

 We've a challenge and no ideas how to solve it.

 The problem,

 Say we have the following environment:
 1. `cluster A`, the cluster does not use kerberos and we use it as a
 source
 of data, important thing is - we don't manage this cluster.
 2. `cluster B`, small cluster where our spark application is running and
 performing some logic. (we manage this cluster and it does not have
 kerberos).
 3. `cluster C`, the cluster uses kerberos and we use it to keep results
 of
 our spark application, we manage this cluster

 Our requrements and conditions that are not mentioned yet:
 1. All clusters are in a single data center, but in the different
 subnetworks.
 2. We cannot turn on kerberos on `cluster A`
 3. We cannot turn off kerberos on `cluster C`
 4. We can turn on/off kerberos on `cluster B`, currently it's turned
 off.
 5. Spark app is built on top of RDD and does not depend on spark-sql.

 Does anybody know how to write data using RDD api to remote cluster
 which is
 running with Kerberos?

 --
 //with Best Regards
 --Denis Bolshakov
 e-mail: bolshakov.de...@gmail.com



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/spark-with-kerberos-tp27894.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> //with Best Regards
> --Denis Bolshakov
> e-mail: bolshakov.de...@gmail.com
>
>
>


Re: spark with kerberos

2016-10-13 Thread Denis Bolshakov
The problem happens when writting (reading works fine)

rdd.saveAsNewAPIHadoopFile

We use just RDD and HDFS, no other things.
Spark 1.6.1 version.
`Claster A` - CDH 5.7.1
`Cluster B` - vanilla hadoop 2.6.5
`Cluster C` - CDH 5.8.0

Best regards,
Denis

On 13 October 2016 at 13:06, ayan guha  wrote:

> And a little more details on Spark version, hadoop version and
> distribution would also help...
>
> On Thu, Oct 13, 2016 at 9:05 PM, ayan guha  wrote:
>
>> I think one point you need to mention is your target - HDFS, Hive or
>> Hbase (or something else) and which end points are used.
>>
>> On Thu, Oct 13, 2016 at 8:50 PM, dbolshak 
>> wrote:
>>
>>> Hello community,
>>>
>>> We've a challenge and no ideas how to solve it.
>>>
>>> The problem,
>>>
>>> Say we have the following environment:
>>> 1. `cluster A`, the cluster does not use kerberos and we use it as a
>>> source
>>> of data, important thing is - we don't manage this cluster.
>>> 2. `cluster B`, small cluster where our spark application is running and
>>> performing some logic. (we manage this cluster and it does not have
>>> kerberos).
>>> 3. `cluster C`, the cluster uses kerberos and we use it to keep results
>>> of
>>> our spark application, we manage this cluster
>>>
>>> Our requrements and conditions that are not mentioned yet:
>>> 1. All clusters are in a single data center, but in the different
>>> subnetworks.
>>> 2. We cannot turn on kerberos on `cluster A`
>>> 3. We cannot turn off kerberos on `cluster C`
>>> 4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
>>> 5. Spark app is built on top of RDD and does not depend on spark-sql.
>>>
>>> Does anybody know how to write data using RDD api to remote cluster
>>> which is
>>> running with Kerberos?
>>>
>>> --
>>> //with Best Regards
>>> --Denis Bolshakov
>>> e-mail: bolshakov.de...@gmail.com
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/spark-with-kerberos-tp27894.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: RowMatrix from DenseVector

2016-10-13 Thread Meeraj Kunnumpurath
Apologies, oversight, I had a mix of mllib and ml imports.

On Thu, Oct 13, 2016 at 2:27 PM, Meeraj Kunnumpurath <
mee...@servicesymphony.com> wrote:

> Hello,
>
> How do I create a row matrix from a dense vector. The following code,
> doesn't compile.
>
> val features = df.rdd.map(r => Vectors.dense(r.getAs[Double]("constant"), 
> r.getAs[Double]("sqft_living")))
> val rowMatrix = new RowMatrix(features, features.count(), 2)
>
> The compiler error
>
> Error:(24, 33) type mismatch;
>  found   : org.apache.spark.rdd.RDD[org.apache.spark.ml.linalg.Vector]
>  required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
>   val rowMatrix = new RowMatrix(features, features.count(), 2)
> ^
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169mee...@servicesymphony.com *
>



-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169mee...@servicesymphony.com *


RowMatrix from DenseVector

2016-10-13 Thread Meeraj Kunnumpurath
Hello,

How do I create a row matrix from a dense vector. The following code,
doesn't compile.

val features = df.rdd.map(r =>
Vectors.dense(r.getAs[Double]("constant"),
r.getAs[Double]("sqft_living")))
val rowMatrix = new RowMatrix(features, features.count(), 2)

The compiler error

Error:(24, 33) type mismatch;
 found   : org.apache.spark.rdd.RDD[org.apache.spark.ml.linalg.Vector]
 required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
  val rowMatrix = new RowMatrix(features, features.count(), 2)
^

-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169mee...@servicesymphony.com *


[1.6.0] Skipped stages keep increasing and causes OOM finally

2016-10-13 Thread Mungeol Heo
Hello,

My task is updating a dataframe in a while loop until there is no more data
to update.
The spark SQL I used is like below



val hc = sqlContext
hc.sql("use person")

var temp_pair = hc.sql("""
select ROW_NUMBER() OVER (ORDER BY PID) AS pair
, pid
, actionchanneluserid
from fdt_pid_channel_info
where dt = '2016-09-27'
and actionchanneltype = 2
""").repartition(200)
temp_pair.persist.registerTempTable("temp_pair")

var result = 1.0

while(result > 0) {
val temp1 = hc.sql("""
select B.PAIR as minpair, A.*
FROM TEMP_PAIR A
INNER JOIN (
SELECT pid, MIN(PAIR) AS PAIR
FROM TEMP_PAIR
GROUP BY pid) B
ON A.pid = B.pid
WHERE A.PAIR > B.PAIR
""")
temp1.persist.registerTempTable("temp1")

result = temp1.count

if(temp1.count > 0) {
val temp = temp_pair.except(hc.sql("select pair, pid, actionchanneluserid
from temp1")).unionAll(hc.sql("select minpair, pid, actionchanneluserid
from temp1")).coalesce(200)
temp.persist
temp.count
temp_pair.unpersist
temp_pair = temp
temp_pair.registerTempTable("temp_pair")
}

temp1.unpersist

val temp2 = hc.sql("""
select B.PAIR as minpair, A.*
FROM TEMP_PAIR A
INNER JOIN (
SELECT actionchanneluserid, MIN(PAIR) AS PAIR
FROM TEMP_PAIR
GROUP BY actionchanneluserid) B
ON A.actionchanneluserid = B.actionchanneluserid
WHERE A.PAIR > B.PAIR
""")
temp2.persist.registerTempTable("temp2")

result = result + temp2.count

if(temp2.count > 0) {
val temp = temp_pair.except(hc.sql("select pair, pid, actionchanneluserid
from temp2")).unionAll(hc.sql("select minpair, pid, actionchanneluserid
from temp2")).coalesce(200)
temp.persist
temp.count
temp_pair.unpersist
temp_pair = temp
temp_pair.registerTempTable("temp_pair")
}

temp2.unpersist
}

=

This job causes the skipped stages keep increasing and finally
"java.lang.OutOfMemoryError: Java heap space"


​
Is there any way to avoid this kind of situation?
Any help will be great!
Thank you


Re: spark with kerberos

2016-10-13 Thread ayan guha
And a little more details on Spark version, hadoop version and distribution
would also help...

On Thu, Oct 13, 2016 at 9:05 PM, ayan guha  wrote:

> I think one point you need to mention is your target - HDFS, Hive or Hbase
> (or something else) and which end points are used.
>
> On Thu, Oct 13, 2016 at 8:50 PM, dbolshak 
> wrote:
>
>> Hello community,
>>
>> We've a challenge and no ideas how to solve it.
>>
>> The problem,
>>
>> Say we have the following environment:
>> 1. `cluster A`, the cluster does not use kerberos and we use it as a
>> source
>> of data, important thing is - we don't manage this cluster.
>> 2. `cluster B`, small cluster where our spark application is running and
>> performing some logic. (we manage this cluster and it does not have
>> kerberos).
>> 3. `cluster C`, the cluster uses kerberos and we use it to keep results of
>> our spark application, we manage this cluster
>>
>> Our requrements and conditions that are not mentioned yet:
>> 1. All clusters are in a single data center, but in the different
>> subnetworks.
>> 2. We cannot turn on kerberos on `cluster A`
>> 3. We cannot turn off kerberos on `cluster C`
>> 4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
>> 5. Spark app is built on top of RDD and does not depend on spark-sql.
>>
>> Does anybody know how to write data using RDD api to remote cluster which
>> is
>> running with Kerberos?
>>
>> --
>> //with Best Regards
>> --Denis Bolshakov
>> e-mail: bolshakov.de...@gmail.com
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/spark-with-kerberos-tp27894.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


Re: spark with kerberos

2016-10-13 Thread ayan guha
I think one point you need to mention is your target - HDFS, Hive or Hbase
(or something else) and which end points are used.

On Thu, Oct 13, 2016 at 8:50 PM, dbolshak  wrote:

> Hello community,
>
> We've a challenge and no ideas how to solve it.
>
> The problem,
>
> Say we have the following environment:
> 1. `cluster A`, the cluster does not use kerberos and we use it as a source
> of data, important thing is - we don't manage this cluster.
> 2. `cluster B`, small cluster where our spark application is running and
> performing some logic. (we manage this cluster and it does not have
> kerberos).
> 3. `cluster C`, the cluster uses kerberos and we use it to keep results of
> our spark application, we manage this cluster
>
> Our requrements and conditions that are not mentioned yet:
> 1. All clusters are in a single data center, but in the different
> subnetworks.
> 2. We cannot turn on kerberos on `cluster A`
> 3. We cannot turn off kerberos on `cluster C`
> 4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
> 5. Spark app is built on top of RDD and does not depend on spark-sql.
>
> Does anybody know how to write data using RDD api to remote cluster which
> is
> running with Kerberos?
>
> --
> //with Best Regards
> --Denis Bolshakov
> e-mail: bolshakov.de...@gmail.com
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-with-kerberos-tp27894.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


spark with kerberos

2016-10-13 Thread dbolshak
Hello community,

We've a challenge and no ideas how to solve it.

The problem,

Say we have the following environment:
1. `cluster A`, the cluster does not use kerberos and we use it as a source
of data, important thing is - we don't manage this cluster. 
2. `cluster B`, small cluster where our spark application is running and
performing some logic. (we manage this cluster and it does not have
kerberos).
3. `cluster C`, the cluster uses kerberos and we use it to keep results of
our spark application, we manage this cluster

Our requrements and conditions that are not mentioned yet:
1. All clusters are in a single data center, but in the different
subnetworks.
2. We cannot turn on kerberos on `cluster A`
3. We cannot turn off kerberos on `cluster C`
4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
5. Spark app is built on top of RDD and does not depend on spark-sql.

Does anybody know how to write data using RDD api to remote cluster which is
running with Kerberos?

-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-kerberos-tp27894.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark with kerberos

2016-10-13 Thread Denis Bolshakov
Hello community,

We've a challenge and no ideas how to solve it.

The problem,

Say we have the following environment:
1. `cluster A`, the cluster does not use kerberos and we use it as a source
of data, important thing is - we don't manage this cluster.
2. `cluster B`, small cluster where our spark application is running and
performing some logic. (we manage this cluster and it does not have
kerberos).
3. `cluster C`, the cluster uses kerberos and we use it to keep results of
our spark application, we manage this cluster

Our requrements and conditions that are not mentioned yet:
1. All clusters are in a single data center, but in the different
subnetworks.
2. We cannot turn on kerberos on `cluster A`
3. We cannot turn off kerberos on `cluster C`
4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
5. Spark app is built on top of RDD and does not depend on spark-sql.

Does anybody know how to write data using RDD api to remote cluster which
is running with Kerberos?

-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: OOM when running Spark SQL by PySpark on Java 8

2016-10-13 Thread Shady Xu
All nodes of my YARN cluster is running on Java 7, but I submit the job
from a Java 8 client.

I realised I run the job in yarn cluster mode and that's why setting '
--driver-java-options' is effective. Now the problem is, why submitting a
job from a Java 8 client to a Java 7 cluster causes a PermGen OOM.

2016-10-13 17:30 GMT+08:00 Sean Owen :

> You can specify it; it just doesn't do anything but cause a warning in
> Java 8. It won't work in general to have such a tiny PermGen. If it's
> working it means you're on Java 8 because it's ignored. You should set
> MaxPermSize if anything, not PermSize. However the error indicates you are
> not using Java 8 everywhere on your cluster, and that's a potentially
> bigger problem.
>
> On Thu, Oct 13, 2016 at 10:26 AM Shady Xu  wrote:
>
>> Solved the problem by specifying the PermGen size when submitting the job
>> (even to just a few MB).
>>
>> Seems Java 8 has removed the Permanent Generation space, thus
>> corresponding JVM arguments are ignored.  But I can still
>> use --driver-java-options "-XX:PermSize=80M -XX:MaxPermSize=100m" to
>> specify them when submitting the Spark job, which is wried. I don't know
>> whether it has anything to do with py4j as I am not familiar with it.
>>
>> 2016-10-13 17:00 GMT+08:00 Shady Xu :
>>
>> Hi,
>>
>> I have a problem when running Spark SQL by PySpark on Java 8. Below is
>> the log.
>>
>>
>> 16/10/13 16:46:40 INFO spark.SparkContext: Starting job: sql at 
>> NativeMethodAccessorImpl.java:-2
>> Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: 
>> PermGen space
>>  at java.lang.ClassLoader.defineClass1(Native Method)
>>  at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>>  at 
>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>  at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
>>  at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>  at 
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:857)
>>  at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630)
>>  at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
>>  at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
>>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> Exception in thread "shuffle-server-2" java.lang.OutOfMemoryError: PermGen 
>> space
>> Exception in thread "shuffle-server-4" java.lang.OutOfMemoryError: PermGen 
>> space
>> Exception in thread "threadDeathWatcher-2-1" java.lang.OutOfMemoryError: 
>> PermGen space
>>
>>
>> I tried to increase the driver memory and didn't help. However, things are 
>> ok when I run the same code after switching to Java 7. I also find it ok to 
>> run the SparkPi example on Java 8. So I believe the problem stays with 
>> PySpark rather theSpark core.
>>
>>
>> I am using Spark 2.0.1 and run the program in YARN cluster mode. Anyone any 
>> idea is appreciated.
>>
>>
>>


Re: DataFrame API: how to partition by a "virtual" column, or by a nested column?

2016-10-13 Thread Samy Dindane

This partially answers the question: http://stackoverflow.com/a/35449563/604041

On 10/04/2016 03:10 PM, Samy Dindane wrote:

Hi,

I have the following schema:

-root
 |-timestamp
 |-date
   |-year
   |-month
   |-day
 |-some_column
 |-some_other_column

I'd like to achieve either of these:

1) Use the timestamp field to partition by year, month and day.
This looks weird though, as Spark wouldn't magically know how to load the data 
back since the year, month and day columns don't exist in the schema.

2) If 1) is not possible, partition data by date.year, date.month and date.day.
`df.write.partitionBy('date.year')` does not work, since the `date.year` column 
does not exist in the schema.

If 2) isn't possible either, I'll just move year, month and day to the root of 
the schema, which I don't like as it bloats it.

Do you know if any of these is possible?

Thank you,

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Limit Kafka batches size with Spark Streaming

2016-10-13 Thread Samy Dindane

Hey Cody,

Thanks for the reply. Really helpful.

Following your suggestion, I set spark.streaming.backpressure.enabled to true 
and maxRatePerPartition to 10.
I know I can handle 100k records at the same time, but definitely not in 1 
second (the batchDuration), so I expect the backpressure to lower that number.

Unfortunately the backpressure doesn't work and I keep getting 100k records per 
batch.

Here is my output log: 
https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba
And this is my conf:

conf.set("spark.streaming.kafka.consumer.poll.ms", "3")
conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
conf.set("spark.streaming.backpressure.enabled", "true")

That's not normal, is it? Do you notice anything odd in my logs?

Thanks a lot.


On 10/12/2016 07:31 PM, Cody Koeninger wrote:

Cool, just wanted to make sure.

To answer your question about


Isn't "spark.streaming.backpressure.initialRate" supposed to do this?


that configuration was added well after the integration of the direct
stream with the backpressure code, and was added only to the receiver
code, which the direct stream doesn't share since it isn't a receiver.
Not making excuses about it being confusing, just explaining how
things ended up that way :(  So yeah, maxRatePerPartition is the
closest thing you have on the direct stream side to being able to
limit before the backpressure estimator has something to work with.

So to try and debug what you're seeing, if you add a line like this to
your log4j.properties

log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE

you should start seeing log lines like

16/10/12 12:18:01 TRACE PIDRateEstimator:
time = 1476292681092, # records = 20, processing time = 20949,
scheduling delay = 6
16/10/12 12:18:01 TRACE PIDRateEstimator:
latestRate = -1.0, error = -1.9546995083297531
latestError = -1.0, historicalError = 0.001145639409995704
delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10

and then once it updates, lines like

16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0

For a really artificially constrained example where
maxRatePerPartition is set such that it limits to 20 per batch but the
system can really only handle 5 per batch, the streaming UI will look
something like this:

https://i.imgsafe.org/e730492453.png

notice the cutover point


On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane  wrote:

I am 100% sure.

println(conf.get("spark.streaming.backpressure.enabled")) prints true.


On 10/12/2016 05:48 PM, Cody Koeninger wrote:


Just to make 100% sure, did you set

spark.streaming.backpressure.enabled

to true?

On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane  wrote:




On 10/12/2016 04:40 PM, Cody Koeninger wrote:



How would backpressure know anything about the capacity of your system
on the very first batch?



Isn't "spark.streaming.backpressure.initialRate" supposed to do this?




You should be able to set maxRatePerPartition at a value that makes
sure your first batch doesn't blow things up, and let backpressure
scale from there.



Backpressure doesn't scale even when using maxRatePerPartition: when I
enable backpressure and set maxRatePerPartition to n, I always get n
records, even if my batch takes longer than batchDuration to finish.

Example:
* I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
Durations.seconds(1))`
* I set backpressure.initialRate and/or maxRatePerPartition to 100,000
and
enable backpressure
* Since I can't handle 100,000 records in 1 second, I expect the
backpressure to kick in in the second batch, and get less than 100,000;
but
this does not happen

What am I missing here?





On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane  wrote:



That's what I was looking for, thank you.

Unfortunately, neither

* spark.streaming.backpressure.initialRate
* spark.streaming.backpressure.enabled
* spark.streaming.receiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different combinations).

The only configuration that works is
"spark.streaming.kafka.maxRatePerPartition".
That's better than nothing, but I'd be useful to have backpressure
enabled
for automatic scaling.

Do you have any idea about why aren't backpressure working? How to
debug
this?


On 10/11/2016 06:08 PM, Cody Koeninger wrote:




http://spark.apache.org/docs/latest/configuration.html

"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see
below)."

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane 
wrote:




Hi,

Is it possible to limit the size of the batches returned by the Kafka
consumer for Spark Streaming?
I am asking because the first batch I get has hundred of millions of
records
and it takes ages to process and checkpoint them.

Thank you.

Samy


Re: OOM when running Spark SQL by PySpark on Java 8

2016-10-13 Thread Sean Owen
You can specify it; it just doesn't do anything but cause a warning in Java
8. It won't work in general to have such a tiny PermGen. If it's working it
means you're on Java 8 because it's ignored. You should set MaxPermSize if
anything, not PermSize. However the error indicates you are not using Java
8 everywhere on your cluster, and that's a potentially bigger problem.

On Thu, Oct 13, 2016 at 10:26 AM Shady Xu  wrote:

> Solved the problem by specifying the PermGen size when submitting the job
> (even to just a few MB).
>
> Seems Java 8 has removed the Permanent Generation space, thus
> corresponding JVM arguments are ignored.  But I can still
> use --driver-java-options "-XX:PermSize=80M -XX:MaxPermSize=100m" to
> specify them when submitting the Spark job, which is wried. I don't know
> whether it has anything to do with py4j as I am not familiar with it.
>
> 2016-10-13 17:00 GMT+08:00 Shady Xu :
>
> Hi,
>
> I have a problem when running Spark SQL by PySpark on Java 8. Below is the
> log.
>
>
> 16/10/13 16:46:40 INFO spark.SparkContext: Starting job: sql at 
> NativeMethodAccessorImpl.java:-2
> Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: 
> PermGen space
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>   at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
>   at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:857)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Exception in thread "shuffle-server-2" java.lang.OutOfMemoryError: PermGen 
> space
> Exception in thread "shuffle-server-4" java.lang.OutOfMemoryError: PermGen 
> space
> Exception in thread "threadDeathWatcher-2-1" java.lang.OutOfMemoryError: 
> PermGen space
>
>
> I tried to increase the driver memory and didn't help. However, things are ok 
> when I run the same code after switching to Java 7. I also find it ok to run 
> the SparkPi example on Java 8. So I believe the problem stays with PySpark 
> rather theSpark core.
>
>
> I am using Spark 2.0.1 and run the program in YARN cluster mode. Anyone any 
> idea is appreciated.
>
>
>


Re: OOM when running Spark SQL by PySpark on Java 8

2016-10-13 Thread Shady Xu
Solved the problem by specifying the PermGen size when submitting the job
(even to just a few MB).

Seems Java 8 has removed the Permanent Generation space, thus corresponding
JVM arguments are ignored.  But I can still use --driver-java-options
"-XX:PermSize=80M -XX:MaxPermSize=100m" to specify them when submitting the
Spark job, which is wried. I don't know whether it has anything to do with
py4j as I am not familiar with it.

2016-10-13 17:00 GMT+08:00 Shady Xu :

> Hi,
>
> I have a problem when running Spark SQL by PySpark on Java 8. Below is the
> log.
>
>
> 16/10/13 16:46:40 INFO spark.SparkContext: Starting job: sql at 
> NativeMethodAccessorImpl.java:-2
> Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: 
> PermGen space
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>   at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
>   at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:857)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Exception in thread "shuffle-server-2" java.lang.OutOfMemoryError: PermGen 
> space
> Exception in thread "shuffle-server-4" java.lang.OutOfMemoryError: PermGen 
> space
> Exception in thread "threadDeathWatcher-2-1" java.lang.OutOfMemoryError: 
> PermGen space
>
>
> I tried to increase the driver memory and didn't help. However, things are ok 
> when I run the same code after switching to Java 7. I also find it ok to run 
> the SparkPi example on Java 8. So I believe the problem stays with PySpark 
> rather theSpark core.
>
>
> I am using Spark 2.0.1 and run the program in YARN cluster mode. Anyone any 
> idea is appreciated.
>
>


Re: OOM when running Spark SQL by PySpark on Java 8

2016-10-13 Thread Sean Owen
The error doesn't say you're out of memory, but says you're out of PermGen.
If you see this, you aren't running Java 8 AFAIK, because 8 has no PermGen.
But if you're running Java 7, and you go investigate what this error means,
you'll find you need to increase PermGen. This is mentioned in the Spark
docs too, as you need to increase this when building on Java 7.

On Thu, Oct 13, 2016 at 10:00 AM Shady Xu  wrote:

> Hi,
>
> I have a problem when running Spark SQL by PySpark on Java 8. Below is the
> log.
>
>
> 16/10/13 16:46:40 INFO spark.SparkContext: Starting job: sql at 
> NativeMethodAccessorImpl.java:-2
> Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: 
> PermGen space
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>   at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
>   at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:857)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Exception in thread "shuffle-server-2" java.lang.OutOfMemoryError: PermGen 
> space
> Exception in thread "shuffle-server-4" java.lang.OutOfMemoryError: PermGen 
> space
> Exception in thread "threadDeathWatcher-2-1" java.lang.OutOfMemoryError: 
> PermGen space
>
>
> I tried to increase the driver memory and didn't help. However, things are ok 
> when I run the same code after switching to Java 7. I also find it ok to run 
> the SparkPi example on Java 8. So I believe the problem stays with PySpark 
> rather theSpark core.
>
>
> I am using Spark 2.0.1 and run the program in YARN cluster mode. Anyone any 
> idea is appreciated.
>
>


Re: Want to test spark-sql-kafka but get unresolved dependency error

2016-10-13 Thread Mich Talebzadeh
add --jars /spark-streaming-kafka_2.10-1.5.1.jar

(may need to download the jar file or any newer version)


to spark-shell.

I also have spark-streaming-kafka-assembly_2.10-1.6.1.jar as well on --jar
list

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 October 2016 at 09:24, JayKay  wrote:

> I want to work with the Kafka integration for structured streaming. I use
> Spark version 2.0.0. and I start the spark-shell with:
>
> spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.0
>
> As described here:
> https://github.com/apache/spark/blob/master/docs/
> structured-streaming-kafka-integration.md
>
> But I get a unresolved dependency error ("unresolved dependency:
> org.apache.spark#spark-sql-kafka-0-10_2.11;2.0.0: not found"). So it seems
> not to be available via maven or spark-packages.
>
> How can I accesss this package? Or am I doing something wrong/missing?
>
> Thank you for you help.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Want-to-test-spark-sql-kafka-but-get-
> unresolved-dependency-error-tp27891.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


OOM when running Spark SQL by PySpark on Java 8

2016-10-13 Thread Shady Xu
Hi,

I have a problem when running Spark SQL by PySpark on Java 8. Below is the
log.


16/10/13 16:46:40 INFO spark.SparkContext: Starting job: sql at
NativeMethodAccessorImpl.java:-2
Exception in thread "dag-scheduler-event-loop"
java.lang.OutOfMemoryError: PermGen space
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:857)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "shuffle-server-2" java.lang.OutOfMemoryError: PermGen space
Exception in thread "shuffle-server-4" java.lang.OutOfMemoryError: PermGen space
Exception in thread "threadDeathWatcher-2-1"
java.lang.OutOfMemoryError: PermGen space


I tried to increase the driver memory and didn't help. However, things
are ok when I run the same code after switching to Java 7. I also find
it ok to run the SparkPi example on Java 8. So I believe the problem
stays with PySpark rather theSpark core.


I am using Spark 2.0.1 and run the program in YARN cluster mode.
Anyone any idea is appreciated.


Re: Want to test spark-sql-kafka but get unresolved dependency error

2016-10-13 Thread Sean Owen
I don't believe that's been released yet. It looks like it was merged into
branches about a week ago. You're looking at unreleased docs too - have a
look at http://spark.apache.org/docs/latest/ for the latest released docs.

On Thu, Oct 13, 2016 at 9:24 AM JayKay  wrote:

> I want to work with the Kafka integration for structured streaming. I use
> Spark version 2.0.0. and I start the spark-shell with:
>
> spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.0
>
> As described here:
>
> https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md
>
> But I get a unresolved dependency error ("unresolved dependency:
> org.apache.spark#spark-sql-kafka-0-10_2.11;2.0.0: not found"). So it seems
> not to be available via maven or spark-packages.
>
> How can I accesss this package? Or am I doing something wrong/missing?
>
> Thank you for you help.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Want-to-test-spark-sql-kafka-but-get-unresolved-dependency-error-tp27891.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Want to test spark-sql-kafka but get unresolved dependency error

2016-10-13 Thread JayKay
I want to work with the Kafka integration for structured streaming. I use
Spark version 2.0.0. and I start the spark-shell with: 

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.0

As described here:
https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md

But I get a unresolved dependency error ("unresolved dependency:
org.apache.spark#spark-sql-kafka-0-10_2.11;2.0.0: not found"). So it seems
not to be available via maven or spark-packages.

How can I accesss this package? Or am I doing something wrong/missing? 

Thank you for you help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Want-to-test-spark-sql-kafka-but-get-unresolved-dependency-error-tp27891.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Anyone attending spark summit?

2016-10-13 Thread Andrew Gelinas
*ANDREW! thank you. The code worked, Youre a legend. I was going to
register today and now saved **€**€**€. Owe you a beer*


*Gregory*



2016-10-12 10:04 GMT+09:00 Andrew James :

> Hey, I just found a promo code for Spark Summit Europe that saves 20%.
> It’s "Summit16" -  I love Brussels and just registered! Who’s coming with
> me to get their Spark on?!
>
>
>
> Cheers,
>
> Andrew
>


receiving stream data options

2016-10-13 Thread vr spark
Hi,
 I have a continuous rest api stream which keeps spitting out data in form
of json.
I access the stream using python requests.get(url, stream=True,
headers=headers).

I want to receive them using spark and do further processing. I am not sure
which is best way to receive it in spark.

What are the options i have.

Some options i can think of

1.  push data from rest api stream in to kakfa queue and use spark kafka
streaming utilities to capture data and further process.

2. push data from rest api stream to a local socket and use spark socket
stream utilities to capture data and further process.

3.  is there any other way to receive it?

thanks

VR