Executor heartbeats on Kubernetes

2022-10-13 Thread Kristopher Kane
Due to settings like,
"spark.kubernetes.executor.missingPodDetectDelta" I've begun to wonder
about heartbeats on Kubernetes.

Do executors still conduct the traditional heartbeat to the driver
when run on Kubernetes?

Thanks,

Kris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.4 Structured Streaming Kafka assign API polling same offsets

2019-03-01 Thread Kristopher Kane
I figured out why.  We are not persisting the data at the end of
.load().  Thus, every operation like count() is going back to Kafka
for the data again.

On Fri, Mar 1, 2019 at 10:10 AM Kristopher Kane  wrote:
>
> We are using the assign API to do batch work with Spark and Kafka.
> What I'm seeing is the Spark executor work happening in the back
> ground and constantly polling the same data over and over until the
> main thread commits the offsets.
>
> Is the below a blocking operation?
>
>   Dataset df = spark.read().format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("assign", "topic1,topic2")
>   .option("startingOffsets",
> "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
>   .option("endingOffsets",
> "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
>   .load();
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>
>
> ###
>
> Here is an example.  Our desired batch is 20 records to commit on.
> Due to segment size (this is a test) 12 records are returned in each
> poll. Spark gets to offset 20 and our program is working to
> filter/process/commit but the Spark polling continues again in the
> back ground starting at offset -2 since it has not been committed yet.
> This suggesting the above .read.().load() is non-blocking.
>
>
> 2019-03-01 09:21:41 INFO  [THREAD ID=main] RawHdfsFlowType:50 -
> Getting data from Kafka
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset -2 requested 0
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Seeking to
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 0
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Polled
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> [compacted-gap-message-0]  12
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
> polling
>
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 1 requested 1
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 2 requested 2
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 3 requested 3
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 4 requested 4
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 5 requested 5
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 6 requested 6
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 7 requested 7
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 8 requested 8
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 9 requested 9
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319

Spark 2.4 Structured Streaming Kafka assign API polling same offsets

2019-03-01 Thread Kristopher Kane
We are using the assign API to do batch work with Spark and Kafka.
What I'm seeing is the Spark executor work happening in the back
ground and constantly polling the same data over and over until the
main thread commits the offsets.

Is the below a blocking operation?

  Dataset df = spark.read().format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("assign", "topic1,topic2")
  .option("startingOffsets",
"{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets",
"{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");


###

Here is an example.  Our desired batch is 20 records to commit on.
Due to segment size (this is a test) 12 records are returned in each
poll. Spark gets to offset 20 and our program is working to
filter/process/commit but the Spark polling continues again in the
back ground starting at offset -2 since it has not been committed yet.
This suggesting the above .read.().load() is non-blocking.


2019-03-01 09:21:41 INFO  [THREAD ID=main] RawHdfsFlowType:50 -
Getting data from Kafka
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset -2 requested 0
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 0
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Polled
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
[compacted-gap-message-0]  12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
polling

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 1 requested 1
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 2 requested 2
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 3 requested 3
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 4 requested 4
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 5 requested 5
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 6 requested 6
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 7 requested 7
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 8 requested 8
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 9 requested 9
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 10 requested 10
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 11 requested 11
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 12 requested 12

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Polled

Re: parquet vs orc files

2018-02-21 Thread Kane Kim
Thanks, how does min/max index work? Can spark itself configure bloom
filters when saving as orc?

On Wed, Feb 21, 2018 at 1:40 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> In the latest version both are equally well supported.
>
> You need to insert the data sorted on filtering columns
> Then you will benefit from min max indexes and in case of orc additional
> from bloom filters, if you configure them.
> In any case I recommend also partitioning of files (do not confuse with
> Spark partitioning ).
>
> What is best for you you have to figure out in a test. This highly depends
> on the data and the analysis you want to do.
>
> > On 21. Feb 2018, at 21:54, Kane Kim <kane.ist...@gmail.com> wrote:
> >
> > Hello,
> >
> > Which format is better supported in spark, parquet or orc?
> > Will spark use internal sorting of parquet/orc files (and how to test
> that)?
> > Can spark save sorted parquet/orc files?
> >
> > Thanks!
>


parquet vs orc files

2018-02-21 Thread Kane Kim
Hello,

Which format is better supported in spark, parquet or orc?
Will spark use internal sorting of parquet/orc files (and how to test that)?
Can spark save sorted parquet/orc files?

Thanks!


RE: Graphhopper/routing in Spark

2016-09-10 Thread Kane O'Donnell
It’s not obvious to me either = ) I was thinking more along the lines of 
retrieving the graph from HDFS/Spark, merging it together (which should be 
taken care of by sc.textFile) and then giving it to GraphHopper. Alternatively 
I guess I could just put the graph locally on every worker node. Or broadcast 
it … I must be able to just broadcast a chunk of byte data? (On disk, the 
contracted graph is only 30mb.)

I hadn’t considered GraphX. It doesn’t look suitable as it’s likely to be 
considerably slower, and not do all of the nice stuff GraphHopper does (e.g. 
vehicle specific stuff, including importing and processing OSM data).

Kane


Kane O'Donnell
Data Scientist<http://www.datamine.com/titles>
Datamine Limited - backing the Innovation Council in recognising brilliance in 
business


DDI:+64 9 303 2300
Mob:   +64 27 306 3964
0800 DATAMINE:   0800 328 264


Visit us at:


Shed One, 15 Faraday St, Parnell, Auckland 1052, New Zealand


Pop it in the post:


PO Box 37120, Parnell, Auckland 1151, New Zealand


Need more help finding us... ?: Click here!<https://goo.gl/maps/3qur6wXMBCp>


www.datamine.com<http://www.datamine.com/>


Disclaimer: This email and any files transmitted with it are confidential and 
may contain legally privileged material, intended solely for the use of the 
individual or entity to whom they are addressed. If you have received this 
e-mail message in error, please contact the sender and delete the material from 
any computer. Any use, review, dissemination, distribution or copying of this 
document by persons other than the intended recipient is strictly prohibited. 
Thank you.

From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Friday, 9 September 2016 7:08 p.m.
To: Kane O'Donnell
Cc: user@spark.apache.org
Subject: Re: Graphhopper/routing in Spark

It’s not obvious to me how that would work. In principle I imagine you could 
have your source data loaded into HDFS and read by GraphHopper instances 
running on Spark workers. But a graph by it’s nature has items that have 
connections to potentially any other item so GraphHopper instances would need 
to have a way of dealing with that and I presume GraphHopper is not designed 
that way. Spark’s Graph processing library, GraphX, was designed that way and 
plenty of thought has gone into how to distribute a graph across machines and 
still have a way of running algorithms.
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action




On 8 Sep 2016, at 22:45, kodonnell 
<kane.odonn...@datamine.com<mailto:kane.odonn...@datamine.com>> wrote:

Just wondering if anyone has experience at running Graphhopper (or similar)
in Spark?

In short, I can get it running in the master, but not in worker nodes. The
key trouble seems to be that Graphhopper depends on a pre-processed graph,
which it obtains from OSM data. In normal (desktop) use, it pre-processes,
and then caches to disk. My current thinking is that I could create the
cache locally, and then put it in HDFS, and tweak Graphhopper to read from
the HDFS source. Alternatively I could try to broadcast the cache (or the
entire Graphhopper instance) - though I believe that would require both
being serializable (which I've got little clue about). Does anyone have any
recommendations on the above?

In addition, I'm not quite sure how to structure it to minimise the cache
reading - I don't want to have to read the cache (and initialise
Graphhopper) for e.g. every route, as that's likely to be slow. It'd be nice
if this was only done once (e.g. for each partition) and then all the routes
in the partition processed with the same Graphhopper instance. Again, any
thoughts on this?

FYI, discussion on Graphhoper forum is  here
<https://discuss.graphhopper.com/t/how-to-use-graphhopper-in-spark/998>  ,
though no luck there.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphhopper-routing-in-Spark-tp27682.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com<http://nabble.com>.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>



Re: spark, reading from s3

2015-02-12 Thread Kane Kim
The thing is that my time is perfectly valid...

On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Its with the timezone actually, you can either use an NTP to maintain
 accurate system clock or you can adjust your system time to match with the
 AWS one. You can do it as:

 telnet s3.amazonaws.com 80
 GET / HTTP/1.0


 [image: Inline image 1]

 Thanks
 Best Regards

 On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote:

 I'm getting this warning when using s3 input:
 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in
 response to
 RequestTimeTooSkewed error. Local machine and S3 server disagree on the
 time by approximately 0 seconds. Retrying connection.

 After that there are tons of 403/forbidden errors and then job fails.
 It's sporadic, so sometimes I get this error and sometimes not, what
 could be the issue?
 I think it could be related to network connectivity?





Re: spark, reading from s3

2015-02-12 Thread Kane Kim
Looks like my clock is in sync:

-bash-4.1$ date  curl -v s3.amazonaws.com
Thu Feb 12 21:40:18 UTC 2015
* About to connect() to s3.amazonaws.com port 80 (#0)
*   Trying 54.231.12.24... connected
* Connected to s3.amazonaws.com (54.231.12.24) port 80 (#0)
 GET / HTTP/1.1
 User-Agent: curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/
3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2
 Host: s3.amazonaws.com
 Accept: */*

 HTTP/1.1 307 Temporary Redirect
 x-amz-id-2:
sl8Tg81ZnBj3tD7Q9f2KFBBZKC83TbAUieHJu9IA3PrBibvB3M7NpwAlfTi/Tdwg
 x-amz-request-id: 48C14DF82BE1A970
 Date: Thu, 12 Feb 2015 21:40:19 GMT
 Location: http://aws.amazon.com/s3/
 Content-Length: 0
 Server: AmazonS3


On Thu, Feb 12, 2015 at 12:26 PM, Franc Carter franc.car...@rozettatech.com
 wrote:


 Check that your timezone is correct as well, an incorrect timezone can
 make it look like your time is correct when it is skewed.

 cheers

 On Fri, Feb 13, 2015 at 5:51 AM, Kane Kim kane.ist...@gmail.com wrote:

 The thing is that my time is perfectly valid...

 On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Its with the timezone actually, you can either use an NTP to maintain
 accurate system clock or you can adjust your system time to match with the
 AWS one. You can do it as:

 telnet s3.amazonaws.com 80
 GET / HTTP/1.0


 [image: Inline image 1]

 Thanks
 Best Regards

 On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote:

 I'm getting this warning when using s3 input:
 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in
 response to
 RequestTimeTooSkewed error. Local machine and S3 server disagree on the
 time by approximately 0 seconds. Retrying connection.

 After that there are tons of 403/forbidden errors and then job fails.
 It's sporadic, so sometimes I get this error and sometimes not, what
 could be the issue?
 I think it could be related to network connectivity?






 --

 *Franc Carter* | Systems Architect | Rozetta Technology

 franc.car...@rozettatech.com  franc.car...@rozettatech.com|
 www.rozettatechnology.com

 Tel: +61 2 8355 2515

 Level 4, 55 Harrington St, The Rocks NSW 2000

 PO Box H58, Australia Square, Sydney NSW 1215

 AUSTRALIA




spark python exception

2015-02-10 Thread Kane Kim
sometimes I'm getting this exception:

Traceback (most recent call last):
  File /opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/daemon.py, line 162,
in manager
code = worker(sock)
  File /opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/daemon.py, line 64,
in worker
outfile.flush()
IOError: [Errno 32] Broken pipe

Is it a spark bug?


spark, reading from s3

2015-02-10 Thread Kane Kim
I'm getting this warning when using s3 input:
15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in response
to
RequestTimeTooSkewed error. Local machine and S3 server disagree on the
time by approximately 0 seconds. Retrying connection.

After that there are tons of 403/forbidden errors and then job fails.
It's sporadic, so sometimes I get this error and sometimes not, what could
be the issue?
I think it could be related to network connectivity?


Re: python api and gzip compression

2015-02-09 Thread Kane Kim
Found it - used saveAsHadoopFile

On Mon, Feb 9, 2015 at 9:11 AM, Kane Kim kane.ist...@gmail.com wrote:

 Hi, how to compress output with gzip using python api?

 Thanks!



Re: pyspark - gzip output compression

2015-02-05 Thread Kane Kim
I'm getting SequenceFile doesn't work with GzipCodec without native-hadoop
code! Where to get those libs and where to put it in the spark?

Also can I save plain text file (like saveAsTextFile) as gzip?

Thanks.

On Wed, Feb 4, 2015 at 11:10 PM, Kane Kim kane.ist...@gmail.com wrote:

 How to save RDD with gzip compression?

 Thanks.



Re: spark on ec2

2015-02-05 Thread Kane Kim
Oh yeah, they picked up changes after restart, thanks!

On Thu, Feb 5, 2015 at 8:13 PM, Charles Feduke charles.fed...@gmail.com
wrote:

 I don't see anything that says you must explicitly restart them to load
 the new settings, but usually there is some sort of signal trapped [or
 brute force full restart] to get a configuration reload for most daemons.
 I'd take a guess and use the $SPARK_HOME/sbin/{stop,start}-slaves.sh
 scripts on your master node and see. (
 http://spark.apache.org/docs/1.2.0/spark-standalone.html#cluster-launch-scripts
 )

 I just tested this out on my integration EC2 cluster and got odd results
 for stopping the workers (no workers found) but the start script... seemed
 to work. My integration cluster was running and functioning after executing
 both scripts, but I also didn't make any changes to spark-env either.

 On Thu Feb 05 2015 at 9:49:49 PM Kane Kim kane.ist...@gmail.com wrote:

 Hi,

 I'm trying to change setting as described here:
 http://spark.apache.org/docs/1.2.0/ec2-scripts.html
 export SPARK_WORKER_CORES=6

 Then I ran  ~/spark-ec2/copy-dir /root/spark/conf to distribute to
 slaves, but without any effect. Do I have to restart workers?
 How to do that with spark-ec2?

 Thanks.




spark driver behind firewall

2015-02-05 Thread Kane Kim
I submit spark job from machine behind firewall, I can't open any incoming
connections to that box, does driver absolutely need to accept incoming
connections? Is there any workaround for that case?

Thanks.


spark on ec2

2015-02-05 Thread Kane Kim
Hi,

I'm trying to change setting as described here:
http://spark.apache.org/docs/1.2.0/ec2-scripts.html
export SPARK_WORKER_CORES=6

Then I ran  ~/spark-ec2/copy-dir /root/spark/conf to distribute to slaves,
but without any effect. Do I have to restart workers?
How to do that with spark-ec2?

Thanks.


pyspark - gzip output compression

2015-02-04 Thread Kane Kim
How to save RDD with gzip compression?

Thanks.


Large dataset, reduceByKey - java heap space error

2015-01-22 Thread Kane Kim
I'm trying to process a large dataset, mapping/filtering works ok, but
as long as I try to reduceByKey, I get out of memory errors:

http://pastebin.com/70M5d0Bn

Any ideas how I can fix that?

Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



processing large dataset

2015-01-22 Thread Kane Kim
I'm trying to process 5TB of data, not doing anything fancy, just
map/filter and reduceByKey. Spent whole day today trying to get it
processed, but never succeeded. I've tried to deploy to ec2 with the
script provided with spark on pretty beefy machines (100 r3.2xlarge
nodes). Really frustrated that spark doesn't work out of the box for
anything bigger than word count sample. One big problem is that
defaults are not suitable for processing big datasets, provided ec2
script could do a better job, knowing instance type requested. Second
it takes hours to figure out what is wrong, when spark job fails
almost finished processing. Even after raising all limits as per
https://spark.apache.org/docs/latest/tuning.html it still fails (now
with: error communicating with MapOutputTracker).

After all I have only one question - how to get spark tuned up for
processing terabytes of data and is there a way to make this
configuration easier and more transparent?

Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does Spark automatically run different stages concurrently when possible?

2015-01-20 Thread Kane Kim
Related question - is execution of different stages optimized? I.e.
map followed by a filter will require 2 loops or they will be combined
into single one?

On Tue, Jan 20, 2015 at 4:33 AM, Bob Tiernay btier...@hotmail.com wrote:
 I found the following to be a good discussion of the same topic:

 http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-td13083.html


 From: so...@cloudera.com
 Date: Tue, 20 Jan 2015 10:02:20 +
 Subject: Re: Does Spark automatically run different stages concurrently
 when possible?
 To: paliwalash...@gmail.com
 CC: davidkl...@hotmail.com; user@spark.apache.org


 You can persist the RDD in (2) right after it is created. It will not
 cause it to be persisted immediately, but rather the first time it is
 materialized. If you persist after (3) is calculated, then it will be
 re-calculated (and persisted) after (4) is calculated.

 On Tue, Jan 20, 2015 at 3:38 AM, Ashish paliwalash...@gmail.com wrote:
  Sean,
 
  A related question. When to persist the RDD after step 2 or after Step
  3 (nothing would happen before step 3 I assume)?
 
  On Mon, Jan 19, 2015 at 5:17 PM, Sean Owen so...@cloudera.com wrote:
  From the OP:
 
  (1) val lines = Import full dataset using sc.textFile
  (2) val ABonly = Filter out all rows from lines that are not of type
  A or B
  (3) val processA = Process only the A rows from ABonly
  (4) val processB = Process only the B rows from ABonly
 
  I assume that 3 and 4 are actions, or else nothing happens here at all.
 
  When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen
  after 3, and may even cause 1 and 2 to happen again if nothing is
  persisted.
 
  You can invoke 3 and 4 in parallel on the driver if you like. That's
  fine. But actions are blocking in the driver.
 
 
 
  On Mon, Jan 19, 2015 at 8:21 AM, davidkl davidkl...@hotmail.com
  wrote:
  Hi Jon, I am looking for an answer for a similar question in the doc
  now, so
  far no clue.
 
  I would need to know what is spark behaviour in a situation like the
  example
  you provided, but taking into account also that there are multiple
  partitions/workers.
 
  I could imagine it's possible that different spark workers are not
  synchronized in terms of waiting for each other to progress to the
  next
  step/stage for the partitions of data they get assigned, while I
  believe in
  streaming they would wait for the current batch to complete before
  they
  start working on a new one.
 
  In the code I am working on, I need to make sure a particular step is
  completed (in all workers, for all partitions) before next
  transformation is
  applied.
 
  Would be great if someone could clarify or point to these issues in
  the doc!
  :-)
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html
  Sent from the Apache Spark User List mailing list archive at
  Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
  --
  thanks
  ashish
 
  Blog: http://www.ashishpaliwal.com/blog
  My Photo Galleries: http://www.pbase.com/ashishpaliwal

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark java options

2015-01-16 Thread Kane Kim
I want to add some java options when submitting application:
--conf spark.executor.extraJavaOptions=-XX:+UnlockCommercialFeatures
-XX:+FlightRecorder

But looks like it doesn't get set. Where I can add it to make it working?

Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: distinct on huge dataset

2014-03-23 Thread Kane
Yes, there was an error in data, after fixing it - count fails with Out of
Memory Error.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3051.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: distinct on huge dataset

2014-03-22 Thread Kane
Yes it works with smaller file, it can count and map, but not distinct.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3033.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: distinct on huge dataset

2014-03-22 Thread Kane
Yes, that helped, at least it was able to advance a bit further.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3038.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: distinct on huge dataset

2014-03-22 Thread Kane
But i was wrong - map also fails on big file and setting spark.shuffle.spill
doesn't help. Map fails with the same error.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3039.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


sequenceFile and groupByKey

2014-03-08 Thread Kane
when i try to open sequence file:
val t2 = sc.sequenceFile(/user/hdfs/e1Mseq, classOf[String],
classOf[String])
t2.groupByKey().take(5)

I get:
org.apache.spark.SparkException: Job aborted: Task 25.0:0 had a not
serializable result: java.io.NotSerializableException:
org.apache.hadoop.io.Text

another thing is:
t2.take(5) - returns 5 identical items, i guess I have to map/clone items,
but i get something like org.apache.hadoop.io.Text cannot be cast to
java.lang.String, how do i clone it?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sequenceFile-and-groupByKey-tp2428.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.