Executor heartbeats on Kubernetes
Due to settings like, "spark.kubernetes.executor.missingPodDetectDelta" I've begun to wonder about heartbeats on Kubernetes. Do executors still conduct the traditional heartbeat to the driver when run on Kubernetes? Thanks, Kris - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark 2.4 Structured Streaming Kafka assign API polling same offsets
I figured out why. We are not persisting the data at the end of .load(). Thus, every operation like count() is going back to Kafka for the data again. On Fri, Mar 1, 2019 at 10:10 AM Kristopher Kane wrote: > > We are using the assign API to do batch work with Spark and Kafka. > What I'm seeing is the Spark executor work happening in the back > ground and constantly polling the same data over and over until the > main thread commits the offsets. > > Is the below a blocking operation? > > Dataset df = spark.read().format("kafka") > .option("kafka.bootstrap.servers", "host1:port1,host2:port2") > .option("assign", "topic1,topic2") > .option("startingOffsets", > "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") > .option("endingOffsets", > "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}") > .load(); > df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); > > > ### > > Here is an example. Our desired batch is 20 records to commit on. > Due to segment size (this is a test) 12 records are returned in each > poll. Spark gets to offset 20 and our program is working to > filter/process/commit but the Spark polling continues again in the > back ground starting at offset -2 since it has not been committed yet. > This suggesting the above .read.().load() is non-blocking. > > > 2019-03-01 09:21:41 INFO [THREAD ID=main] RawHdfsFlowType:50 - > Getting data from Kafka > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset -2 requested 0 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Seeking to > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 0 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Polled > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > [compacted-gap-message-0] 12 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after > polling > > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 1 requested 1 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 2 requested 2 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 3 requested 3 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 4 requested 4 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 5 requested 5 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 6 requested 6 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 7 requested 7 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 8 requested 8 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 9 requested 9 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec
Spark 2.4 Structured Streaming Kafka assign API polling same offsets
We are using the assign API to do batch work with Spark and Kafka. What I'm seeing is the Spark executor work happening in the back ground and constantly polling the same data over and over until the main thread commits the offsets. Is the below a blocking operation? Dataset df = spark.read().format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("assign", "topic1,topic2") .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}") .load(); df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); ### Here is an example. Our desired batch is 20 records to commit on. Due to segment size (this is a test) 12 records are returned in each poll. Spark gets to offset 20 and our program is working to filter/process/commit but the Spark polling continues again in the back ground starting at offset -2 since it has not been committed yet. This suggesting the above .read.().load() is non-blocking. 2019-03-01 09:21:41 INFO [THREAD ID=main] RawHdfsFlowType:50 - Getting data from Kafka 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset -2 requested 0 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Seeking to spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 0 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Polled spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor [compacted-gap-message-0] 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after polling 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 1 requested 1 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 2 requested 2 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 3 requested 3 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 4 requested 4 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 5 requested 5 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 6 requested 6 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 7 requested 7 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 8 requested 8 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 9 requested 9 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 10 requested 10 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 11 requested 11 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 12 requested 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Seeking to spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Polled spark
Re: parquet vs orc files
Thanks, how does min/max index work? Can spark itself configure bloom filters when saving as orc? On Wed, Feb 21, 2018 at 1:40 PM, Jörn Franke wrote: > In the latest version both are equally well supported. > > You need to insert the data sorted on filtering columns > Then you will benefit from min max indexes and in case of orc additional > from bloom filters, if you configure them. > In any case I recommend also partitioning of files (do not confuse with > Spark partitioning ). > > What is best for you you have to figure out in a test. This highly depends > on the data and the analysis you want to do. > > > On 21. Feb 2018, at 21:54, Kane Kim wrote: > > > > Hello, > > > > Which format is better supported in spark, parquet or orc? > > Will spark use internal sorting of parquet/orc files (and how to test > that)? > > Can spark save sorted parquet/orc files? > > > > Thanks! >
parquet vs orc files
Hello, Which format is better supported in spark, parquet or orc? Will spark use internal sorting of parquet/orc files (and how to test that)? Can spark save sorted parquet/orc files? Thanks!
RE: Graphhopper/routing in Spark
It’s not obvious to me either = ) I was thinking more along the lines of retrieving the graph from HDFS/Spark, merging it together (which should be taken care of by sc.textFile) and then giving it to GraphHopper. Alternatively I guess I could just put the graph locally on every worker node. Or broadcast it … I must be able to just broadcast a chunk of byte data? (On disk, the contracted graph is only 30mb.) I hadn’t considered GraphX. It doesn’t look suitable as it’s likely to be considerably slower, and not do all of the nice stuff GraphHopper does (e.g. vehicle specific stuff, including importing and processing OSM data). Kane Kane O'Donnell Data Scientist<http://www.datamine.com/titles> Datamine Limited - backing the Innovation Council in recognising brilliance in business DDI:+64 9 303 2300 Mob: +64 27 306 3964 0800 DATAMINE: 0800 328 264 Visit us at: Shed One, 15 Faraday St, Parnell, Auckland 1052, New Zealand Pop it in the post: PO Box 37120, Parnell, Auckland 1151, New Zealand Need more help finding us... ?: Click here!<https://goo.gl/maps/3qur6wXMBCp> www.datamine.com<http://www.datamine.com/> Disclaimer: This email and any files transmitted with it are confidential and may contain legally privileged material, intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail message in error, please contact the sender and delete the material from any computer. Any use, review, dissemination, distribution or copying of this document by persons other than the intended recipient is strictly prohibited. Thank you. From: Robin East [mailto:robin.e...@xense.co.uk] Sent: Friday, 9 September 2016 7:08 p.m. To: Kane O'Donnell Cc: user@spark.apache.org Subject: Re: Graphhopper/routing in Spark It’s not obvious to me how that would work. In principle I imagine you could have your source data loaded into HDFS and read by GraphHopper instances running on Spark workers. But a graph by it’s nature has items that have connections to potentially any other item so GraphHopper instances would need to have a way of dealing with that and I presume GraphHopper is not designed that way. Spark’s Graph processing library, GraphX, was designed that way and plenty of thought has gone into how to distribute a graph across machines and still have a way of running algorithms. --- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/books/spark-graphx-in-action On 8 Sep 2016, at 22:45, kodonnell mailto:kane.odonn...@datamine.com>> wrote: Just wondering if anyone has experience at running Graphhopper (or similar) in Spark? In short, I can get it running in the master, but not in worker nodes. The key trouble seems to be that Graphhopper depends on a pre-processed graph, which it obtains from OSM data. In normal (desktop) use, it pre-processes, and then caches to disk. My current thinking is that I could create the cache locally, and then put it in HDFS, and tweak Graphhopper to read from the HDFS source. Alternatively I could try to broadcast the cache (or the entire Graphhopper instance) - though I believe that would require both being serializable (which I've got little clue about). Does anyone have any recommendations on the above? In addition, I'm not quite sure how to structure it to minimise the cache reading - I don't want to have to read the cache (and initialise Graphhopper) for e.g. every route, as that's likely to be slow. It'd be nice if this was only done once (e.g. for each partition) and then all the routes in the partition processed with the same Graphhopper instance. Again, any thoughts on this? FYI, discussion on Graphhoper forum is here <https://discuss.graphhopper.com/t/how-to-use-graphhopper-in-spark/998> , though no luck there. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphhopper-routing-in-Spark-tp27682.html Sent from the Apache Spark User List mailing list archive at Nabble.com<http://nabble.com>. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
Re: spark, reading from s3
Looks like my clock is in sync: -bash-4.1$ date && curl -v s3.amazonaws.com Thu Feb 12 21:40:18 UTC 2015 * About to connect() to s3.amazonaws.com port 80 (#0) * Trying 54.231.12.24... connected * Connected to s3.amazonaws.com (54.231.12.24) port 80 (#0) > GET / HTTP/1.1 > User-Agent: curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/ 3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2 > Host: s3.amazonaws.com > Accept: */* > < HTTP/1.1 307 Temporary Redirect < x-amz-id-2: sl8Tg81ZnBj3tD7Q9f2KFBBZKC83TbAUieHJu9IA3PrBibvB3M7NpwAlfTi/Tdwg < x-amz-request-id: 48C14DF82BE1A970 < Date: Thu, 12 Feb 2015 21:40:19 GMT < Location: http://aws.amazon.com/s3/ < Content-Length: 0 < Server: AmazonS3 On Thu, Feb 12, 2015 at 12:26 PM, Franc Carter wrote: > > Check that your timezone is correct as well, an incorrect timezone can > make it look like your time is correct when it is skewed. > > cheers > > On Fri, Feb 13, 2015 at 5:51 AM, Kane Kim wrote: > >> The thing is that my time is perfectly valid... >> >> On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das >> wrote: >> >>> Its with the timezone actually, you can either use an NTP to maintain >>> accurate system clock or you can adjust your system time to match with the >>> AWS one. You can do it as: >>> >>> telnet s3.amazonaws.com 80 >>> GET / HTTP/1.0 >>> >>> >>> [image: Inline image 1] >>> >>> Thanks >>> Best Regards >>> >>> On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim wrote: >>> >>>> I'm getting this warning when using s3 input: >>>> 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in >>>> response to >>>> RequestTimeTooSkewed error. Local machine and S3 server disagree on the >>>> time by approximately 0 seconds. Retrying connection. >>>> >>>> After that there are tons of 403/forbidden errors and then job fails. >>>> It's sporadic, so sometimes I get this error and sometimes not, what >>>> could be the issue? >>>> I think it could be related to network connectivity? >>>> >>> >>> >> > > > -- > > *Franc Carter* | Systems Architect | Rozetta Technology > > franc.car...@rozettatech.com | > www.rozettatechnology.com > > Tel: +61 2 8355 2515 > > Level 4, 55 Harrington St, The Rocks NSW 2000 > > PO Box H58, Australia Square, Sydney NSW 1215 > > AUSTRALIA > >
Re: spark, reading from s3
The thing is that my time is perfectly valid... On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das wrote: > Its with the timezone actually, you can either use an NTP to maintain > accurate system clock or you can adjust your system time to match with the > AWS one. You can do it as: > > telnet s3.amazonaws.com 80 > GET / HTTP/1.0 > > > [image: Inline image 1] > > Thanks > Best Regards > > On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim wrote: > >> I'm getting this warning when using s3 input: >> 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in >> response to >> RequestTimeTooSkewed error. Local machine and S3 server disagree on the >> time by approximately 0 seconds. Retrying connection. >> >> After that there are tons of 403/forbidden errors and then job fails. >> It's sporadic, so sometimes I get this error and sometimes not, what >> could be the issue? >> I think it could be related to network connectivity? >> > >
spark, reading from s3
I'm getting this warning when using s3 input: 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in response to RequestTimeTooSkewed error. Local machine and S3 server disagree on the time by approximately 0 seconds. Retrying connection. After that there are tons of 403/forbidden errors and then job fails. It's sporadic, so sometimes I get this error and sometimes not, what could be the issue? I think it could be related to network connectivity?
spark python exception
sometimes I'm getting this exception: Traceback (most recent call last): File "/opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/daemon.py", line 162, in manager code = worker(sock) File "/opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/daemon.py", line 64, in worker outfile.flush() IOError: [Errno 32] Broken pipe Is it a spark bug?
Re: python api and gzip compression
Found it - used saveAsHadoopFile On Mon, Feb 9, 2015 at 9:11 AM, Kane Kim wrote: > Hi, how to compress output with gzip using python api? > > Thanks! >
python api and gzip compression
Hi, how to compress output with gzip using python api? Thanks!
Re: spark on ec2
Oh yeah, they picked up changes after restart, thanks! On Thu, Feb 5, 2015 at 8:13 PM, Charles Feduke wrote: > I don't see anything that says you must explicitly restart them to load > the new settings, but usually there is some sort of signal trapped [or > brute force full restart] to get a configuration reload for most daemons. > I'd take a guess and use the $SPARK_HOME/sbin/{stop,start}-slaves.sh > scripts on your master node and see. ( > http://spark.apache.org/docs/1.2.0/spark-standalone.html#cluster-launch-scripts > ) > > I just tested this out on my integration EC2 cluster and got odd results > for stopping the workers (no workers found) but the start script... seemed > to work. My integration cluster was running and functioning after executing > both scripts, but I also didn't make any changes to spark-env either. > > On Thu Feb 05 2015 at 9:49:49 PM Kane Kim wrote: > >> Hi, >> >> I'm trying to change setting as described here: >> http://spark.apache.org/docs/1.2.0/ec2-scripts.html >> export SPARK_WORKER_CORES=6 >> >> Then I ran ~/spark-ec2/copy-dir /root/spark/conf to distribute to >> slaves, but without any effect. Do I have to restart workers? >> How to do that with spark-ec2? >> >> Thanks. >> >
spark on ec2
Hi, I'm trying to change setting as described here: http://spark.apache.org/docs/1.2.0/ec2-scripts.html export SPARK_WORKER_CORES=6 Then I ran ~/spark-ec2/copy-dir /root/spark/conf to distribute to slaves, but without any effect. Do I have to restart workers? How to do that with spark-ec2? Thanks.
spark driver behind firewall
I submit spark job from machine behind firewall, I can't open any incoming connections to that box, does driver absolutely need to accept incoming connections? Is there any workaround for that case? Thanks.
Re: pyspark - gzip output compression
I'm getting SequenceFile doesn't work with GzipCodec without native-hadoop code! Where to get those libs and where to put it in the spark? Also can I save plain text file (like saveAsTextFile) as gzip? Thanks. On Wed, Feb 4, 2015 at 11:10 PM, Kane Kim wrote: > How to save RDD with gzip compression? > > Thanks. >
pyspark - gzip output compression
How to save RDD with gzip compression? Thanks.
processing large dataset
I'm trying to process 5TB of data, not doing anything fancy, just map/filter and reduceByKey. Spent whole day today trying to get it processed, but never succeeded. I've tried to deploy to ec2 with the script provided with spark on pretty beefy machines (100 r3.2xlarge nodes). Really frustrated that spark doesn't work out of the box for anything bigger than word count sample. One big problem is that defaults are not suitable for processing big datasets, provided ec2 script could do a better job, knowing instance type requested. Second it takes hours to figure out what is wrong, when spark job fails almost finished processing. Even after raising all limits as per https://spark.apache.org/docs/latest/tuning.html it still fails (now with: error communicating with MapOutputTracker). After all I have only one question - how to get spark tuned up for processing terabytes of data and is there a way to make this configuration easier and more transparent? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
reducing number of output files
How I can reduce number of output files? Is there a parameter to saveAsTextFile? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Large dataset, reduceByKey - java heap space error
I'm trying to process a large dataset, mapping/filtering works ok, but as long as I try to reduceByKey, I get out of memory errors: http://pastebin.com/70M5d0Bn Any ideas how I can fix that? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark automatically run different stages concurrently when possible?
Related question - is execution of different stages optimized? I.e. map followed by a filter will require 2 loops or they will be combined into single one? On Tue, Jan 20, 2015 at 4:33 AM, Bob Tiernay wrote: > I found the following to be a good discussion of the same topic: > > http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-td13083.html > > >> From: so...@cloudera.com >> Date: Tue, 20 Jan 2015 10:02:20 + >> Subject: Re: Does Spark automatically run different stages concurrently >> when possible? >> To: paliwalash...@gmail.com >> CC: davidkl...@hotmail.com; user@spark.apache.org > >> >> You can persist the RDD in (2) right after it is created. It will not >> cause it to be persisted immediately, but rather the first time it is >> materialized. If you persist after (3) is calculated, then it will be >> re-calculated (and persisted) after (4) is calculated. >> >> On Tue, Jan 20, 2015 at 3:38 AM, Ashish wrote: >> > Sean, >> > >> > A related question. When to persist the RDD after step 2 or after Step >> > 3 (nothing would happen before step 3 I assume)? >> > >> > On Mon, Jan 19, 2015 at 5:17 PM, Sean Owen wrote: >> >> From the OP: >> >> >> >> (1) val lines = Import full dataset using sc.textFile >> >> (2) val ABonly = Filter out all rows from "lines" that are not of type >> >> A or B >> >> (3) val processA = Process only the A rows from ABonly >> >> (4) val processB = Process only the B rows from ABonly >> >> >> >> I assume that 3 and 4 are actions, or else nothing happens here at all. >> >> >> >> When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen >> >> after 3, and may even cause 1 and 2 to happen again if nothing is >> >> persisted. >> >> >> >> You can invoke 3 and 4 in parallel on the driver if you like. That's >> >> fine. But actions are blocking in the driver. >> >> >> >> >> >> >> >> On Mon, Jan 19, 2015 at 8:21 AM, davidkl >> >> wrote: >> >>> Hi Jon, I am looking for an answer for a similar question in the doc >> >>> now, so >> >>> far no clue. >> >>> >> >>> I would need to know what is spark behaviour in a situation like the >> >>> example >> >>> you provided, but taking into account also that there are multiple >> >>> partitions/workers. >> >>> >> >>> I could imagine it's possible that different spark workers are not >> >>> synchronized in terms of waiting for each other to progress to the >> >>> next >> >>> step/stage for the partitions of data they get assigned, while I >> >>> believe in >> >>> streaming they would wait for the current batch to complete before >> >>> they >> >>> start working on a new one. >> >>> >> >>> In the code I am working on, I need to make sure a particular step is >> >>> completed (in all workers, for all partitions) before next >> >>> transformation is >> >>> applied. >> >>> >> >>> Would be great if someone could clarify or point to these issues in >> >>> the doc! >> >>> :-) >> >>> >> >>> >> >>> >> >>> >> >>> -- >> >>> View this message in context: >> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html >> >>> Sent from the Apache Spark User List mailing list archive at >> >>> Nabble.com. >> >>> >> >>> - >> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >>> For additional commands, e-mail: user-h...@spark.apache.org >> >>> >> >> >> >> - >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> > >> > >> > >> > -- >> > thanks >> > ashish >> > >> > Blog: http://www.ashishpaliwal.com/blog >> > My Photo Galleries: http://www.pbase.com/ashishpaliwal >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark java options
I want to add some java options when submitting application: --conf "spark.executor.extraJavaOptions=-XX:+UnlockCommercialFeatures -XX:+FlightRecorder" But looks like it doesn't get set. Where I can add it to make it working? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: distinct on huge dataset
Got a bit further, i think out of memory error was caused by setting spark.spill to false. Now i have this error, is there an easy way to increase file limit for spark, cluster-wide?: java.io.FileNotFoundException: /tmp/spark-local-20140324074221-b8f1/01/temp_1ab674f9-4556-4239-9f21-688dfc9f17d2 (Too many open files) at java.io.FileOutputStream.openAppend(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:192) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:191) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:141) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3084.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: distinct on huge dataset
Yes, there was an error in data, after fixing it - count fails with Out of Memory Error. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3051.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: distinct on huge dataset
But i was wrong - map also fails on big file and setting spark.shuffle.spill doesn't help. Map fails with the same error. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3039.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: distinct on huge dataset
Yes, that helped, at least it was able to advance a bit further. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3038.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: distinct on huge dataset
I mean everything works with the small file. With huge file only count and map work, distinct - doesn't -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3034.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: distinct on huge dataset
Yes it works with smaller file, it can count and map, but not distinct. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3033.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: distinct on huge dataset
It's 0.9.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025p3027.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
distinct on huge dataset
I have a huge 2.5TB file. When i run: val t = sc.textFile("/user/hdfs/dump.csv") t.distinct.count It fails right away with a lot of: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 1 at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:18) at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:16) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
sequenceFile and groupByKey
when i try to open sequence file: val t2 = sc.sequenceFile("/user/hdfs/e1Mseq", classOf[String], classOf[String]) t2.groupByKey().take(5) I get: org.apache.spark.SparkException: Job aborted: Task 25.0:0 had a not serializable result: java.io.NotSerializableException: org.apache.hadoop.io.Text another thing is: t2.take(5) - returns 5 identical items, i guess I have to map/clone items, but i get something like org.apache.hadoop.io.Text cannot be cast to java.lang.String, how do i clone it? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sequenceFile-and-groupByKey-tp2428.html Sent from the Apache Spark User List mailing list archive at Nabble.com.