Re: Kryo On Spark 1.6.0

2017-01-10 Thread Richard Startin
Hi Enrico,


Only set spark.kryo.registrationRequired if you want to forbid any classes you 
have not explicitly registered - see 
http://spark.apache.org/docs/latest/configuration.html.

Configuration - Spark 2.0.2 
Documentation
spark.apache.org
Spark Configuration. Spark Properties. Dynamically Loading Spark Properties; 
Viewing Spark Properties; Available Properties. Application Properties; Runtime 
Environment

To enable kryo, you just need 
spark.serializer=org.apache.spark.serializer.KryoSerializer. There is some info 
here - http://spark.apache.org/docs/latest/tuning.html

Cheers,
Richard



https://richardstartin.com/



From: Enrico DUrso 
Sent: 10 January 2017 11:10
To: user@spark.apache.org
Subject: Kryo On Spark 1.6.0


Hi,

I am trying to use Kryo on Spark 1.6.0.
I am able to register my own classes and it works, but when I set 
"spark.kryo.registrationRequired " to true, I get an error about a scala class:
"Class is not registered: scala.collection.mutable.WrappedArray$ofRef".

Any of you has already solved this issue in Java? I found the code to solve it 
in Scala, but unable to register this class in Java.

Cheers,

enrico



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


Re: ToLocalIterator vs collect

2017-01-05 Thread Richard Startin
Why not do that with spark sql to utilise the executors properly, rather than a 
sequential filter on the driver.

Select * from A left join B on A.fk = B.fk where B.pk is NULL limit k

If you were sorting just so you could iterate in order, this might save you a 
couple of sorts too.

https://richardstartin.com

> On 5 Jan 2017, at 10:40, Rohit Verma  wrote:
> 
> Hi all,
> 
> I am aware that collect will return a list aggregated on driver, this will 
> return OOM when we have a too big list.
> Is toLocalIterator safe to use with very big list, i want to access all 
> values one by one.
> 
> Basically the goal is to compare two sorted rdds (A and B) to find top k 
> entries missed in B but there in A 
> 
> Rohit
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: withColumn gives "Can only zip RDDs with same number of elements in each partition" but not with a LIMIT on the dataframe

2016-12-20 Thread Richard Startin
I think limit repartitions your data into a single partition if called as a non 
terminal operator. Hence zip works after limit because you only have one 
partition.

In practice, I have found joins to be much more applicable than zip because of 
the strict limitation of identical partitions.

https://richardstartin.com

On 20 Dec 2016, at 16:04, Jack Wenger 
> wrote:

Hello,

I'm facing a strange behaviour with Spark 1.5.0 (Cloudera 5.5.1).
I'm loading data from Hive with HiveContext (~42M records) and then try to add 
a new column with "withColumn" and a UDF.
Finally i'm suppose to create a new Hive table from this dataframe.


Here is the code :

_
_


DATETIME_TO_COMPARE = "-12-31 23:59:59.99"

myFunction = udf(lambda col: 0 if col != DATETIME_TO_COMPARE else 1, 
IntegerType())

df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable WHERE 
col4 == someValue")

df2 = df1.withColumn("myNewCol", myFunction(df1.col3))
df2.registerTempTable("df2")

hc.sql("create table my_db.new_table as select * from df2")

_
_


But I get this error :


py4j.protocol.Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in 
stage 2.0 failed 4 times, most recent failure: Lost task 18.3 in stage 2.0 (TID 
186, lxpbda25.ra1.intra.groupama.fr): 
org.apache.spark.SparkException: Can only zip RDDs with same number of elements 
in each partition
at 
org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.hasNext(RDD.scala:832)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)




What is suprising is that if I modify the select statement by addind a LIMIT 
1 (which is more than twice the number of records in my table), then 
it's working :

_
_

df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable WHERE 
col4 == someValue" LIMIT 1)

_
_

In both cases, if I run a count() on df1, I'm getting the same number : 42 593 
052

Is it a bug or am I missing something ?
If it is not a bug, what am I doing wrong ?


Thank you !


Jack


Re: Spark streaming completed batches statistics

2016-12-07 Thread Richard Startin
Ok it looks like I could reconstruct the logic in the Spark UI from the /jobs 
resource. Thanks.


https://richardstartin.com/



From: map reduced <k3t.gi...@gmail.com>
Sent: 07 December 2016 19:49
To: Richard Startin
Cc: user@spark.apache.org
Subject: Re: Spark streaming completed batches statistics

Have you checked http://spark.apache.org/docs/latest/monitoring.html#rest-api ?

KP

On Wed, Dec 7, 2016 at 11:43 AM, Richard Startin 
<richardstar...@outlook.com<mailto:richardstar...@outlook.com>> wrote:

Is there any way to get this information as CSV/JSON?


https://docs.databricks.com/_images/CompletedBatches.png

[https://docs.databricks.com/_images/CompletedBatches.png]


https://richardstartin.com/


________
From: Richard Startin 
<richardstar...@outlook.com<mailto:richardstar...@outlook.com>>
Sent: 05 December 2016 15:55
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Spark streaming completed batches statistics

Is there any way to get a more computer friendly version of the completes 
batches section of the streaming page of the application master? I am very 
interested in the statistics and am currently screen-scraping...

https://richardstartin.com
-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




Re: Spark streaming completed batches statistics

2016-12-07 Thread Richard Startin
Is there any way to get this information as CSV/JSON?


https://docs.databricks.com/_images/CompletedBatches.png

[https://docs.databricks.com/_images/CompletedBatches.png]


https://richardstartin.com/



From: Richard Startin <richardstar...@outlook.com>
Sent: 05 December 2016 15:55
To: user@spark.apache.org
Subject: Spark streaming completed batches statistics

Is there any way to get a more computer friendly version of the completes 
batches section of the streaming page of the application master? I am very 
interested in the statistics and am currently screen-scraping...

https://richardstartin.com
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Richard Startin
I've seen the feature work very well. For tuning, you've got:

spark.streaming.backpressure.pid.proportional (defaults to 1, non-negative) - 
weight for response to "error" (change between last batch and this batch)
spark.streaming.backpressure.pid.integral (defaults to 0.2, non-negative) - 
weight for the response to the accumulation of error. This has a dampening 
effect.
spark.streaming.backpressure.pid.derived (defaults to zero, non-negative) - 
weight for the response to the trend in error. This can cause 
arbitrary/noise-induced fluctuations in batch size, but can also help react 
quickly to increased/reduced capacity.
spark.streaming.backpressure.pid.minRate - the default value is 100 (must be 
positive), batch size won't go below this.

spark.streaming.receiver.maxRate - batch size won't go above this.


Cheers,

Richard


https://richardstartin.com/



From: Liren Ding 
Sent: 05 December 2016 22:18
To: d...@spark.apache.org; user@spark.apache.org
Subject: Back-pressure to Spark Kafka Streaming?

Hey all,

Does backressure actually work on spark kafka streaming? According to the 
latest spark streaming document:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
"In Spark 1.5, we have introduced a feature called backpressure that eliminate 
the need to set this rate limit, as Spark Streaming automatically figures out 
the rate limits and dynamically adjusts them if the processing conditions 
change. This backpressure can be enabled by setting the configuration parameter 
spark.streaming.backpressure.enabled to true."
But I also see a few open spark jira tickets on this option:
https://issues.apache.org/jira/browse/SPARK-7398
https://issues.apache.org/jira/browse/SPARK-18371

The case in the second ticket describes a similar issue as we have here. We use 
Kafka to send large batches (10~100M) to spark streaming, and the spark 
streaming interval is set to 1~4 minutes. With the backpressure set to true, 
the queued active batches still pile up when average batch processing time 
takes longer than default interval. After the spark driver is restarted, all 
queued batches turn to a giant batch, which block subsequent batches and also 
have a great chance to fail eventually. The only config we found that might 
help is "spark.streaming.kafka.maxRatePerPartition". It does limit the incoming 
batch size, but not a perfect solution since it depends on size of partition as 
well as the length of batch interval. For our case, hundreds of partitions X 
minutes of interval still produce a number that is too large for each batch. So 
we still want to figure out how to make the backressure work in spark kafka 
streaming, if it is supposed to work there. Thanks.


Liren









Spark streaming completed batches statistics

2016-12-05 Thread Richard Startin
Is there any way to get a more computer friendly version of the completes 
batches section of the streaming page of the application master? I am very 
interested in the statistics and am currently screen-scraping... 

https://richardstartin.com
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Livy with Spark

2016-12-05 Thread Richard Startin
There is a great write up on Livy at
http://henning.kropponline.de/2016/11/06/

On 5 Dec 2016, at 14:34, Mich Talebzadeh 
> wrote:

Hi,

Has there been any experience using Livy with Spark to share multiple Spark 
contexts?

thanks



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.