[Release Question]: Estimate on 3.5.2 release?
Hello, I'm curious if there is an estimate when 3.5.2 for Spark Core will be released. There are several bug and security vulnerability fixes in the dependencies we are excited to receive! If anyone has any insights, that would be greatly appreciated. Thanks! - Paul [cid:8a2e80d5-1a98-4eca-b993-46937e35b2e9]<http://www.21cs.com/> Paul Gerver Streams Software Engineer [cid:0412bb89-89ee-4329-a946-00e56d95d85c]<http://www.21cs.com/>[cid:8543c1a1-eba3-4153-9c05-88c2e1d9bc13]<https://www.linkedin.com/company/21st-century-software/> This e-mail (including any attachments) may contain privileged, confidential, proprietary, private, copyrighted, or other legally protected information. The information is intended to be for the use of the individual or entity designated above. If you are not the intended recipient (even if the e-mail address above is yours), please notify us by return e-mail immediately, and delete the message and any attachments. Any disclosure, reproduction, distribution or other use of this message or any attachments by an individual or entity other than the intended recipient is prohibited.
CFP for the 2nd Performance Engineering track at Community over Code NA 2023
Hi Apache Spark people - There are only 10 days left to submit a talk proposal (title and abstract only) for Community over Code NA 2023 - the 2nd Performance Engineering track is on this year so any Apache project-related performance and scalability talks are welcome, here's the CFP for more ideas and links including the CPF submission page: https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fpulse%2Fcall-papers-2nd-performance-engineering-track-over-code-brebner%2F=05%7C01%7CPaul.Brebner%40netapp.com%7C0d1187d03bfc4f4feaa108db7b7b805f%7C4b0911a0929b4715944bc03745165b3a%7C0%7C0%7C638239542594411186%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=KLphTZD56cLkYNTRjsnPB0lkQ40kpEW1CB1wyVtutps%3D=0<https://www.linkedin.com/pulse/call-papers-2nd-performance-engineering-track-over-code-brebner/> - Paul Brebner and Roger Abelenda
Rename columns without manually setting them all
Hi, This is currently my column definition : Employee ID NameClient Project Team01/01/2022 02/01/2022 03/01/2022 04/01/2022 05/01/2022 12345 Dummy x Dummy a abc team a OFF WO WH WH WH As you can see, the outer columns are just daily attendance dates. My goal is to count the employees who were OFF / WO / WH on said dates. I need to transpose them so it would look like this : [cid:ff6d0260-0168-40a4-82db-6c2acd517c39] I am still new to pandas. Can you guide me on how to produce this? I am reading about melt() and set_index() but I am not sure if they are the correct functions to use.
How to read excel file in PySpark
Good day, I have a task to read excel files in databricks but I cannot seem to proceed. I am referencing the API documents - read_excel<https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_excel.html> , but there is an error sparksession object has no attribute 'read_excel'. Can you advise? JOHN PAUL JAYME Data Engineer [https://app.tdcx.com/email-signature/assets/img/tdcx-logo.png] m. +639055716384 w. www.tdcx.com<http://www.tdcx.com/> Winner of over 350 Industry Awards [Linkedin]<https://www.linkedin.com/company/tdcxgroup/> [Facebook] <https://www.facebook.com/tdcxgroup/> [Twitter] <https://twitter.com/tdcxgroup/> [Youtube] <https://www.youtube.com/c/TDCXgroup> [Instagram] <https://www.instagram.com/tdcxgroup/> This is a confidential email that may be privileged or legally protected. You are not authorized to copy or disclose the contents of this email. If you are not the intended addressee, please inform the sender and delete this email.
Re: NoClassDefError and SparkSession should only be created and accessed on the driver.
Hi Rajat, I have been facing similar problem recently and could solve it by moving the UDF implementation into a dedicated class instead having it implemented in the driver class/object. Regards, Paul. On Tuesday 20 September 2022 10:11:31 (+02:00), rajat kumar wrote: Hi Alton, it's in same scala class only. Is there any change in spark3 to serialize separately? Regards Rajat On Tue, Sep 20, 2022, 13:35 Xiao, Alton wrote: Can you show us your code? your udf wasn’t serialized by spark, In my opinion, were they out of the spark running code? 发件人: rajat kumar 日期: 星期二, 2022年9月20日 15:58 收件人: user @spark 主题: NoClassDefError and SparkSession should only be created and accessed on the driver. Hello , I am using Spark3 where there are some UDFs along . I am using Dataframe APIs to write parquet using spark. I am getting NoClassDefError along with below error. If I comment out all UDFs , it is working fine. Could someone suggest what could be wrong. It was working fine in Spark2.4 22/09/20 06:33:17 WARN TaskSetManager: Lost task 9.0 in stage 1.0 (TID 10) (vm-36408481 executor 2): java.lang.ExceptionInInitializerError at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431
Re: pyspark - memory leak leading to OOM after submitting 100 jobs?
Well, dumb question: Given the workflow outlined above, should Local Mode keep running? Or is the leak a known issue? I just wanted to check because I can't recall seeing this issue with a non-local master, though it's possible there were task failures that hid the issue. If this issue looks new, what's the easiest way to record memory dumps or do profiling? Can I put something in my spark-defaults.conf ? The code is open source and the run is reproducible, although the specific test currently requires a rather large (but public) dataset. On Sun, Oct 20, 2019 at 6:24 PM Jungtaek Lim wrote: > > Honestly I'd recommend you to spend you time to look into the issue, via > taking memory dump per some interval and compare differences (at least share > these dump files to community with redacting if necessary). Otherwise someone > has to try to reproduce without reproducer and even couldn't reproduce even > they spent their time. Memory leak issue is not really easy to reproduce, > unless it leaks some objects without any conditions. > > - Jungtaek Lim (HeartSaVioR) > > On Sun, Oct 20, 2019 at 7:18 PM Paul Wais wrote: >> >> Dear List, >> >> I've observed some sort of memory leak when using pyspark to run ~100 >> jobs in local mode. Each job is essentially a create RDD -> create DF >> -> write DF sort of flow. The RDD and DFs go out of scope after each >> job completes, hence I call this issue a "memory leak." Here's >> pseudocode: >> >> ``` >> row_rdds = [] >> for i in range(100): >> row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)]) >> row_rdds.append(row_rdd) >> >> for row_rdd in row_rdds: >> df = spark.createDataFrame(row_rdd) >> df.persist() >> print(df.count()) >> df.write.save(...) # Save parquet >> df.unpersist() >> >> # Does not help: >> # del df >> # del row_rdd >> ``` >> >> In my real application: >> * rows are much larger, perhaps 1MB each >> * row_rdds are sized to fit available RAM >> >> I observe that after 100 or so iterations of the second loop (each of >> which creates a "job" in the Spark WebUI), the following happens: >> * pyspark workers have fairly stable resident and virtual RAM usage >> * java process eventually approaches resident RAM cap (8GB standard) >> but virtual RAM usage keeps ballooning. >> >> Eventually the machine runs out of RAM and the linux OOM killer kills >> the java process, resulting in an "IndexError: pop from an empty >> deque" error from py4j/java_gateway.py . >> >> >> Does anybody have any ideas about what's going on? Note that this is >> local mode. I have personally run standalone masters and submitted a >> ton of jobs and never seen something like this over time. Those were >> very different jobs, but perhaps this issue is bespoke to local mode? >> >> Emphasis: I did try to del the pyspark objects and run python GC. >> That didn't help at all. >> >> pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image) >> >> 12-core i7 with 16GB of ram and 22GB swap file (swap is *on*). >> >> Cheers, >> -Paul >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
pyspark - memory leak leading to OOM after submitting 100 jobs?
Dear List, I've observed some sort of memory leak when using pyspark to run ~100 jobs in local mode. Each job is essentially a create RDD -> create DF -> write DF sort of flow. The RDD and DFs go out of scope after each job completes, hence I call this issue a "memory leak." Here's pseudocode: ``` row_rdds = [] for i in range(100): row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)]) row_rdds.append(row_rdd) for row_rdd in row_rdds: df = spark.createDataFrame(row_rdd) df.persist() print(df.count()) df.write.save(...) # Save parquet df.unpersist() # Does not help: # del df # del row_rdd ``` In my real application: * rows are much larger, perhaps 1MB each * row_rdds are sized to fit available RAM I observe that after 100 or so iterations of the second loop (each of which creates a "job" in the Spark WebUI), the following happens: * pyspark workers have fairly stable resident and virtual RAM usage * java process eventually approaches resident RAM cap (8GB standard) but virtual RAM usage keeps ballooning. Eventually the machine runs out of RAM and the linux OOM killer kills the java process, resulting in an "IndexError: pop from an empty deque" error from py4j/java_gateway.py . Does anybody have any ideas about what's going on? Note that this is local mode. I have personally run standalone masters and submitted a ton of jobs and never seen something like this over time. Those were very different jobs, but perhaps this issue is bespoke to local mode? Emphasis: I did try to del the pyspark objects and run python GC. That didn't help at all. pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image) 12-core i7 with 16GB of ram and 22GB swap file (swap is *on*). Cheers, -Paul - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Avro support broken?
Dear List, Has anybody gotten avro support to work in pyspark? I see multiple reports of it being broken on Stackoverflow and added my own repro to this ticket: https://issues.apache.org/jira/browse/SPARK-27623?focusedCommentId=16878896=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16878896 Cheers, -Paul - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
dropping unused data from a stream
I will be streaming data and am trying to understand how to get rid of old data from a stream so it does not become to large. I will stream in one large table of buying data and join that to another table of different data. I need the last 14 days from the second table. I will not need data that is older than 14 days. Here is my practice code: streaming1 = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\ .csv( "input_stream_csv1") streaming1_with_impressions = streaming1.withWatermark("creation_time", "2 minutes") streaming2 = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\ .csv( "input_stream_csv2") streaming1.registerTempTable("my_table1") streaming2.registerTempTable("my_table2") spark.sql("""select t1.* from my_table1 t1 inner join my_table2 t2 on t1.key = t2.key where t1.creation_time < current_timestamp() - interval 15 minutes""")\ .writeStream.trigger(processingTime='10 seconds')\ .format("parquet")\ .option("checkpointLocation", "checkpoint_dir").outputMode("append")\ .option("path", "stream_dir5").start() The important part of the code is the where in the SQL statement, "where t1.creation_time < current_timestamp() - interval 15 minutes" For this example, I am hoping that the stream will not contain any rows more than 15 minutes ago. Is this assumption correct? I am not sure how to test this. In addition I have set a watermark on the first stream of 2 minutes. I am thinking that this watermark will make Spark wait an additional 2 minutes for any data that comes in late. Thanks! -- Henry Tremblay Data Engineer, Best Buy
Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception
I would like to see the full error. However, S3 can give misleading messages if you don't have the correct permissions. On Tue, Apr 24, 2018, 2:28 PM Marco Mistroniwrote: > HI all > i am using the following code for persisting data into S3 (aws keys are > already stored in the environment variables) > > dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName) > > > However, i keep on receiving an exception that the file does not exist > > here's what comes from logs > > 18/04/24 22:15:32 INFO Persiste: Persisting data to text file: > s3://ec2-bucket-mm-spark/form4-results-2404.results > Exception in thread "main" java.io.IOException: > /form4-results-2404.results doesn't exist > > It seems that Spark expects the file to be there before writing? which > seems bizzarre? > > I Have even tried to remove the coalesce ,but still got the same exception > Could anyone help pls? > kind regarsd > marco >
[Spark scheduling] Spark schedules single task although rdd has 48 partitions?
(please notice this question was previously posted to https://stackoverflow.com/questions/49943655/spark-schedules-single-task-although-rdd-has-48-partitions) We are running Spark 2.3 / Python 3.5.2. For a job we run following code (please notice that the input txt files are just a simplified example, in-fact these are large binary files and sc.binaryFiles(...) runs out of memory loading the content into memory, therefor only the filenames are parallelized and the executors open/read the content): files = [u'foo.txt', u'bar.txt', u'baz.txt', etc] # len(files) == 155 def func(filename): from app import generate_rows return list(generate_rows(filename)) rdd = sc.parallelize(files, numSlices=48) rdd2 = rdd.flatMap(func) rdd3 = rdd2.map(lambda d: Row(**d)) df = spark.createDataFrame(rdd3) df.write.mode(u'append').partitionBy(u'foo').parquet(output_path) Where the app is a Python module (added to Spark using --py-files app.egg), simplified code is like this: def generate_rows(filename): yield OrderedDict([ (u'filename', filename), (u'item1', u'item1'), etc ]) We notice that the cluster is not utilized fully during the first stages which we don't understand, and we are looking for ways to control this behavior. Job0 Stage0 1Task 1min paralellize Job1 Stage1 1Task 2min paralellize Job2 Stage2 1Task 1min paralellize Job3 Stage3 48Tasks 5min paralellize|mappartitions|map|mappartitions|existingRDD|sort What are the first 3 jobs? And why isn't there 1 Job/Stage with the 48 tasks (as expected given the second parameter of parallelize set to 48)? Excerpt from DEBUG logging: 18/05/02 10:09:07 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0 18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1 18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1 ... 18/05/02 10:09:58 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1 18/05/02 10:09:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1 18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0 18/05/02 10:10:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/05/02 10:10:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 0 18/05/02 10:10:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1 18/05/02 10:10:02 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1 ... 18/05/02 10:12:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1 18/05/02 10:12:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1 18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 0 18/05/02 10:12:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/05/02 10:12:05 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 0 18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1 18/05/02 10:12:06 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1 ... 18/05/02 10:12:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1 18/05/02 10:13:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1 18/05/02 10:13:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 0 18/05/02 10:13:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 18/05/02 10:13:03 INFO TaskSchedulerImpl: Adding task set 3.0 with 48 tasks 18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 0 18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 48 18/05/02 10:13:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 48 ... 18/05/02 10:17:16 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 1 18/05/02 10:17:17 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 1 18/05/02 10:17:18 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 0 18/05/02 10:17:18 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool -- The information contained in this communication and any attachments is confidential and may be privileged, and is for the sole use of the intended recipient(s). Any unauthorized review, use, disclosure or distribution is prohibited. Unless explicitly stated otherwise in the body of this communication or the attachment thereto
History server and non-HDFS filesystems
Hi - I had originally posted this as a bug (SPARK-22528) but given my uncertainty, it was suggested that I send it to the mailing list instead... We are using Azure Data Lake (ADL) to store our event logs. This worked fine in 2.1.x, but in 2.2.0 the underlying files are no longer visible to the history server - even though we are using the same service principal that was used to write the logs. I tracked it down to this call in "FSHistoryProvider" (which was added for v2.2.0): SparkHadoopUtil.checkAccessPermission() >From what I can tell, it is preemptively checking the permissions on the files and skipping the ones which it thinks are not readable. The problem is that its using a check that appears to be specific to HDFS and so even though the files are definitely readable, it skips over them. Also, "FSHistoryProvider" is the only place this code is used. I was able to workaround it by either: * setting the permissions for the files on ADL to world readable * or setting HADOOP_PROXY to the objectId of the Azure service principal which owns file Neither of these workarounds are acceptable for our environment. That said, I am not sure how this should be addressed: * Is this an issue with the Azure/Hadoop not complying with how the Hadoop FileSystem interface/contract in some way? * Is this an issue with "checkAccessPermission()" not really accounting for all of the possible FileSystem implementations? My gut tells me its the latter because the SparkHadoopUtil.checkAccessPermission() gets its "currentUser" info from outside of the FileSystem class and it doesn't make sense to me that an instance of FileSystem would affect a global context since there could be many FileSytem instances in a given app. That said, I know ADL is not heavily used at this time so I wonder if anyone is seeing this with S3 as well? Maybe not since S3 permissions are always reported as world-readable (I think) which causes checkAccessPermission() to succeed. Any thoughts or feedback appreciated. -- Thanks, Paul
Spark REST API
Is there a way to flush the API? I execute http://localhost:18080/api/v1/applications?status=runningning In the results I will get a list of applications but not all are still running. This is causing an issue with monitoring what is actually running. To compound the problem these are currently streaming apps running on EMR. Paul Corley | Principle Data Engineer IgnitionOne | Marketing Technology. Simplified. Office: 1545 Peachtree St NE | Suite 500 | Atlanta, GA | 30309 Direct: 702.336.0094 Email: paul.cor...@ignitionone.com<mailto:paul.cor...@ignitionone.com>
Re: Running spark examples in Intellij
You say you did the maven package but did you do a maven install and define your local maven repo in SBT? -Paul Sent from my iPhone > On Oct 11, 2017, at 5:48 PM, Stephen Boesch <java...@gmail.com> wrote: > > When attempting to run any example program w/ Intellij I am running into > guava versioning issues: > > Exception in thread "main" java.lang.NoClassDefFoundError: > com/google/common/cache/CacheLoader > at > org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:73) > at org.apache.spark.SparkConf.(SparkConf.scala:68) > at org.apache.spark.SparkConf.(SparkConf.scala:55) > at > org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:919) > at > org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:918) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:918) > at > org.apache.spark.examples.ml.KMeansExample$.main(KMeansExample.scala:40) > at org.apache.spark.examples.ml.KMeansExample.main(KMeansExample.scala) > Caused by: java.lang.ClassNotFoundException: > com.google.common.cache.CacheLoader > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 9 more > > The *scope*s for the spark dependencies were already changed from *provided* > to *compile* . Both `sbt assembly` and `mvn package` had already been run > (successfully) from command line - and the (mvn) project completely rebuilt > inside intellij. > > The spark testcases run fine: this is a problem only in the examples module. > Anyone running these successfully in IJ? I have tried for 2.1.0-SNAPSHOT and > 2.3.0-SNAPSHOT - with the same outcome. > >
Re: is it ok to have multiple sparksession's in one spark structured streaming app?
You would set the Kafka topic as your data source and you would write a custom output to Cassandra everything would be or could be contained within your stream -Paul Sent from my iPhone > On Sep 8, 2017, at 2:52 PM, kant kodali <kanth...@gmail.com> wrote: > > How can I use one SparkSession to talk to both Kafka and Cassandra let's say? > > >> On Fri, Sep 8, 2017 at 3:46 AM, Arkadiusz Bicz <arkadiusz.b...@gmail.com> >> wrote: >> You don't need multiple spark sessions to have more than one stream working, >> but from maintenance and reliability perspective it is not good idea. >> >>> On Thu, Sep 7, 2017 at 2:40 AM, kant kodali <kanth...@gmail.com> wrote: >>> Hi All, >>> >>> I am wondering if it is ok to have multiple sparksession's in one spark >>> structured streaming app? Basically, I want to create 1) Spark session for >>> reading from Kafka and 2) Another Spark session for storing the mutations >>> of a dataframe/dataset to a persistent table as I get the mutations from >>> #1? >>> >>> Finally, is this a common practice? >>> >>> Thanks, >>> kant >> >
Structured Streaming from Parquet
I have a Spark Structured Streaming process that is implemented in 2 separate streaming apps. First App reads .gz, which range in size from 1GB to 9GB compressed, files in from s3 filters out invalid records and repartitions the data and outputs to parquet on s3 partitioned the same as the stream is partitioned. This process produces thousands of files which other processes consume. The thought on this approach was to: 1) Break the file down to smaller more easily consumed sizes 2) Allow a more parallelism in the processes that consume the data. 3) Allow multiple downstream processes to consume data that has already a. Had bad records filtered out b. Not have to fully read in such large files Second application reads in the files produced by the first app. This process then reformats the data from a row that is: 12NDSIN|20170101:123313, 5467;20170115:987 into: 12NDSIN, 20170101, 123313 12NDSIN, 20170101, 5467 12NDSIN, 20170115, 987 App 1 runs no problems and churns through files in its source directory on s3. Total process time for a file is < 10min. App2 is the one having issues. The source is defined as val rawReader = sparkSession .readStream .option("latestFirst", "true") .option("maxFilesPerTrigger", batchSize) .schema(rawSchema) .parquet(config.getString("aws.s3.sourcepath")) <=Line85 output is defined as val query = output .writeStream .queryName("bk") .format("parquet") .partitionBy("expireDate") .trigger(ProcessingTime("10 seconds")) .option("checkpointLocation",config.getString("spark.app.checkpoint_dir") + "/bk") .option("path", config.getString("spark.app.s3.output")) .start() .awaitTermination() If files exist from app 1 app 2 enters a cycle of just cycling through parquet at ProcessFromSource.scala:85<http://ip-10-205-68-107.ec2.internal:18080/history/application_1491337161441_4439/stages/stage?id=78=0> 3999/3999 If there are a few files output from app1 eventually it will enter the stage where it actually processes the data and begins to output, but the more files produced by app1 the longer it takes if it ever completes these steps. With an extremely large number of files the app eventually throws a java OOM error. Additionally each cycle through this step takes successively longer. Hopefully someone can lend some insight as to what is actually taking place in this step and how to alleviate it Thanks, Paul Corley | Principle Data Engineer
splitting a huge file
We are tasked with loading a big file (possibly 2TB) into a data warehouse. In order to do this efficiently, we need to split the file into smaller files. I don't believe there is a way to do this with Spark, because in order for Spark to distribute the file to the worker nodes, it first has to be split up, right? We ended up using a single machine with a single thread to do the splitting. I just want to make sure I am not missing something obvious. Thanks! -- Paul Henry Tremblay Attunix
small job runs out of memory using wholeTextFiles
As part of my processing, I have the following code: rdd = sc.wholeTextFiles("s3://paulhtremblay/noaa_tmp/", 10) rdd.count() The s3 directory has about 8GB of data and 61,878 files. I am using Spark 2.1, and running it with 15 modes of m3.xlarge nodes on EMR. The job fails with this error: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 35532 in stage 0.0 failed 4 times, most recent failure: Lost task 35532.3 in stage 0.0 (TID 35543, ip-172-31-36-192.us-west-2.compute.internal, executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 7.4 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. I have run it dozens of times, increasing partitions, reducing the size of my data set (the original is 60GB), and increasing the number of partitions, but get the same error each time. In contrast, if I run a simple: rdd = sc.textFile("s3://paulhtremblay/noaa_tmp/") rdd.coutn() The job finishes in 15 minutes, even with just 3 nodes. Thanks -- Paul Henry Tremblay Robert Half Technology
Re: bug with PYTHONHASHSEED
I saw the bug fix. I am using the latest Spark available on AWS EMR which I think is 2.01. I am at work and can't check my home config. I don't think AWS merged in this fix. Henry On Tue, Apr 4, 2017 at 4:42 PM, Jeff Zhang <zjf...@gmail.com> wrote: > > It is fixed in https://issues.apache.org/jira/browse/SPARK-13330 > > > > Holden Karau <hol...@pigscanfly.ca>于2017年4月5日周三 上午12:03写道: > >> Which version of Spark is this (or is it a dev build)? We've recently >> made some improvements with PYTHONHASHSEED propagation. >> >> On Tue, Apr 4, 2017 at 7:49 AM Eike von Seggern <eike.seggern@seven >> cal.com> wrote: >> >> 2017-04-01 21:54 GMT+02:00 Paul Tremblay <paulhtremb...@gmail.com>: >> >> When I try to to do a groupByKey() in my spark environment, I get the >> error described here: >> >> http://stackoverflow.com/questions/36798833/what-does- >> exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh >> >> In order to attempt to fix the problem, I set up my ipython environment >> with the additional line: >> >> PYTHONHASHSEED=1 >> >> When I fire up my ipython shell, and do: >> >> In [7]: hash("foo") >> Out[7]: -2457967226571033580 >> >> In [8]: hash("foo") >> Out[8]: -2457967226571033580 >> >> So my hash function is now seeded so it returns consistent values. But >> when I do a groupByKey(), I get the same error: >> >> >> Exception: Randomness of hash of string should be disabled via >> PYTHONHASHSEED >> >> Anyone know how to fix this problem in python 3.4? >> >> >> Independent of the python version, you have to ensure that Python on >> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by >> adding it to the environment of the spark processes. >> >> Best >> >> Eike >> >> -- >> Cell : 425-233-8271 <(425)%20233-8271> >> Twitter: https://twitter.com/holdenkarau >> > -- Paul Henry Tremblay Robert Half Technology
Re: bug with PYTHONHASHSEED
So that means I have to pass that bash variable to the EMR clusters when I spin them up, not afterwards. I'll give that a go. Thanks! Henry On Tue, Apr 4, 2017 at 7:49 AM, Eike von Seggern <eike.segg...@sevenval.com> wrote: > 2017-04-01 21:54 GMT+02:00 Paul Tremblay <paulhtremb...@gmail.com>: > >> When I try to to do a groupByKey() in my spark environment, I get the >> error described here: >> >> http://stackoverflow.com/questions/36798833/what-does-except >> ion-randomness-of-hash-of-string-should-be-disabled-via-pythonh >> >> In order to attempt to fix the problem, I set up my ipython environment >> with the additional line: >> >> PYTHONHASHSEED=1 >> >> When I fire up my ipython shell, and do: >> >> In [7]: hash("foo") >> Out[7]: -2457967226571033580 >> >> In [8]: hash("foo") >> Out[8]: -2457967226571033580 >> >> So my hash function is now seeded so it returns consistent values. But >> when I do a groupByKey(), I get the same error: >> >> >> Exception: Randomness of hash of string should be disabled via >> PYTHONHASHSEED >> >> Anyone know how to fix this problem in python 3.4? >> > > Independent of the python version, you have to ensure that Python on > spark-master and -workers is started with PYTHONHASHSEED set, e.g. by > adding it to the environment of the spark processes. > > Best > > Eike > -- Paul Henry Tremblay Robert Half Technology
Re: Alternatives for dataframe collectAsList()
What do you want to do with the results of the query? Henry On Wed, Mar 29, 2017 at 12:00 PM, szep.laszlo.it <szep.laszlo...@gmail.com> wrote: > Hi, > > after I created a dataset > > Dataset df = sqlContext.sql("query"); > > I need to have a result values and I call a method: collectAsList() > > List list = df.collectAsList(); > > But it's very slow, if I work with large datasets (20-30 million records). > I > know, that the result isn't presented in driver app, that's why it takes > long time, because collectAsList() collect all data from worker nodes. > > But then what is the right way to get result values? Is there an other > solution to iterate over a result dataset rows, or get values? Can anyone > post a small & working example? > > Thanks & Regards, > Laszlo Szep > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Alternatives-for-dataframe- > collectAsList-tp28547.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > ----- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Paul Henry Tremblay Robert Half Technology
Re: Read file and represent rows as Vectors
So if I am understanding your problem, you have the data in CSV files, but the CSV files are gunzipped? If so Spark can read a gunzip file directly. Sorry if I didn't understand your question. Henry On Mon, Apr 3, 2017 at 5:05 AM, Old-School <giorgos_myrianth...@outlook.com> wrote: > I have a dataset that contains DocID, WordID and frequency (count) as shown > below. Note that the first three numbers represent 1. the number of > documents, 2. the number of words in the vocabulary and 3. the total number > of words in the collection. > > 189 > 1430 > 12300 > 1 2 1 > 1 39 1 > 1 42 3 > 1 77 1 > 1 95 1 > 1 96 1 > 2 105 1 > 2 108 1 > 3 133 3 > > > What I want to do is to read the data (ignore the first three lines), > combine the words per document and finally represent each document as a > vector that contains the frequency of the wordID. > > Based on the above dataset the representation of documents 1, 2 and 3 will > be (note that vocab_size can be extracted by the second line of the data): > > val data = Array( > Vectors.sparse(vocab_size, Seq((2, 1.0), (39, 1.0), (42, 3.0), (77, 1.0), > (95, 1.0), (96, 1.0))), > Vectors.sparse(vocab_size, Seq((105, 1.0), (108, 1.0))), > Vectors.sparse(vocab_size, Seq((133, 3.0 > > > The problem is that I am not quite sure how to read the .txt.gz file as RDD > and create an Array of sparse vectors as described above. Please note that > I > actually want to pass the data array in the PCA transformer. > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Read-file-and-represent-rows-as-Vectors-tp28562.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Paul Henry Tremblay Robert Half Technology
Re: Looking at EMR Logs
Thanks. That seems to work great, except EMR doesn't always copy the logs to S3. The behavior seems inconsistent and I am debugging it now. On Fri, Mar 31, 2017 at 7:46 AM, Vadim Semenov <vadim.seme...@datadoghq.com> wrote: > You can provide your own log directory, where Spark log will be saved, and > that you could replay afterwards. > > Set in your job this: `spark.eventLog.dir=s3://bucket/some/directory` and > run it. > Note! The path `s3://bucket/some/directory` must exist before you run your > job, it'll not be created automatically. > > The Spark HistoryServer on EMR won't show you anything because it's > looking for logs in `hdfs:///var/log/spark/apps` by default. > > After that you can either copy the log files from s3 to the hdfs path > above, or you can copy them locally to `/tmp/spark-events` (the default > directory for spark logs) and run the history server like: > ``` > cd /usr/local/src/spark-1.6.1-bin-hadoop2.6 > sbin/start-history-server.sh > ``` > and then open http://localhost:18080 > > > > > On Thu, Mar 30, 2017 at 8:45 PM, Paul Tremblay <paulhtremb...@gmail.com> > wrote: > >> I am looking for tips on evaluating my Spark job after it has run. >> >> I know that right now I can look at the history of jobs through the web >> ui. I also know how to look at the current resources being used by a >> similar web ui. >> >> However, I would like to look at the logs after the job is finished to >> evaluate such things as how many tasks were completed, how many executors >> were used, etc. I currently save my logs to S3. >> >> Thanks! >> >> Henry >> >> -- >> Paul Henry Tremblay >> Robert Half Technology >> > > -- Paul Henry Tremblay Robert Half Technology
bug with PYTHONHASHSEED
When I try to to do a groupByKey() in my spark environment, I get the error described here: http://stackoverflow.com/questions/36798833/what-does- exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh In order to attempt to fix the problem, I set up my ipython environment with the additional line: PYTHONHASHSEED=1 When I fire up my ipython shell, and do: In [7]: hash("foo") Out[7]: -2457967226571033580 In [8]: hash("foo") Out[8]: -2457967226571033580 So my hash function is now seeded so it returns consistent values. But when I do a groupByKey(), I get the same error: Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED Anyone know how to fix this problem in python 3.4? Thanks Henry -- Paul Henry Tremblay Robert Half Technology
pyspark bug with PYTHONHASHSEED
When I try to to do a groupByKey() in my spark environment, I get the error described here: http://stackoverflow.com/questions/36798833/what-does-exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh In order to attempt to fix the problem, I set up my ipython environment with the additional line: PYTHONHASHSEED=1 When I fire up my ipython shell, and do: In [7]: hash("foo") Out[7]: -2457967226571033580 In [8]: hash("foo") Out[8]: -2457967226571033580 So my hash function is now seeded so it returns consistent values. But when I do a groupByKey(), I get the same error: Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED Anyone know how to fix this problem in python 3.4? Thanks Henry -- Paul Henry Tremblay Robert Half Technology
Looking at EMR Logs
I am looking for tips on evaluating my Spark job after it has run. I know that right now I can look at the history of jobs through the web ui. I also know how to look at the current resources being used by a similar web ui. However, I would like to look at the logs after the job is finished to evaluate such things as how many tasks were completed, how many executors were used, etc. I currently save my logs to S3. Thanks! Henry -- Paul Henry Tremblay Robert Half Technology
Re: wholeTextFiles fails, but textFile succeeds for same path
I've been working on this problem for several days (I am doing more to increase my knowledge of Spark). The code you linked to hangs because after reading in the file, I have to gunzip it. Another way that seems to be working is reading each file in using sc.textFile, and then writing it the HDFS, and then using wholeTextFiles for the HDFS result. But the bigger issue is that both methods are not executed in parallel. When I open my yarn manager, it shows that only one node is being used. Henry On 02/06/2017 03:39 PM, Jon Gregg wrote: Strange that it's working for some directories but not others. Looks like wholeTextFiles maybe doesn't work with S3? https://issues.apache.org/jira/browse/SPARK-4414 . If it's possible to load the data into EMR and run Spark from there that may be a workaround. This blogspot shows a python workaround that might work as well: http://michaelryanbell.com/processing-whole-files-spark-s3.html Jon On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote: I've actually been able to trace the problem to the files being read in. If I change to a different directory, then I don't get the error. Is one of the executors running out of memory? On 02/06/2017 02:35 PM, Paul Tremblay wrote: When I try to create an rdd using wholeTextFiles, I get an incomprehensible error. But when I use the same path with sc.textFile, I get no error. I am using pyspark with spark 2.1. in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/ rdd = sc.wholeTextFiles(in_path) rdd.take(1) /usr/lib/spark/python/pyspark/rdd.py in take(self, num) 1341 1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 963 # SparkContext#runJob. 964 mappedRDD = rdd.mapPartitions(partitionFunc) --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc <http://jsc.sc>(), mappedRDD._jrdd, partitions) 966 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 967 /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name <http://self.name>) 1134 1135 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.com <http://ip-172-31-45-114.us-west-2.com>pute.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486415078210_0005_01_16 on host: ip-172-31-45-114.us-west-2.com <http://ip-172-31-45-114.us-west-2.com>pute.internal. Exit status: 52. Diagnostics: Exception from container-launch. Container id: container_1486415078210_0005_01_16 Exit code: 52 Stack trace: ExitCodeException exitCode=52: at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.yarn.server.nodemanager.Def
Re: Turning rows into columns
Yes, that's what I need. Thanks. P. On 02/05/2017 12:17 PM, Koert Kuipers wrote: since there is no key to group by and assemble records i would suggest to write this in RDD land and then convert to data frame. you can use sc.wholeTextFiles to process text files and create a state machine On Feb 4, 2017 16:25, "Paul Tremblay" <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote: I am using pyspark 2.1 and am wondering how to convert a flat file, with one record per row, into a columnar format. Here is an example of the data: u'WARC/1.0', u'WARC-Type: warcinfo', u'WARC-Date: 2016-12-08T13:00:23Z', u'WARC-Record-ID: ', u'Content-Length: 344', u'Content-Type: application/warc-fields', u'WARC-Filename: CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz', u'', u'robots: classic', u'hostname: ip-10-31-129-80.ec2.internal', u'software: Nutch 1.6 (CC)/CC WarcExport 1.0', u'isPartOf: CC-MAIN-2016-50', u'operator: CommonCrawl Admin', u'description: Wide crawl of the web for November 2016', u'publisher: CommonCrawl', u'format: WARC File Format 1.0', u'conformsTo: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf <http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf>', u'', u'', u'WARC/1.0', u'WARC-Type: request', u'WARC-Date: 2016-12-02T17:54:09Z', u'WARC-Record-ID: ', u'Content-Length: 220', u'Content-Type: application/http; msgtype=request', u'WARC-Warcinfo-ID: ', u'WARC-IP-Address: 217.197.115.133', u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ <http://1018201.vkrugudruzei.ru/blog/>', u'', u'GET /blog/ HTTP/1.0', u'Host: 1018201.vkrugudruzei.ru <http://1018201.vkrugudruzei.ru>', u'Accept-Encoding: x-gzip, gzip, deflate', u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/) <http://commoncrawl.org/faq/%29>', u'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', u'', u'', u'', u'WARC/1.0', u'WARC-Type: response', u'WARC-Date: 2016-12-02T17:54:09Z', u'WARC-Record-ID: ', u'Content-Length: 577', u'Content-Type: application/http; msgtype=response', u'WARC-Warcinfo-ID: ', u'WARC-Concurrent-To: ', u'WARC-IP-Address: 217.197.115.133', u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ <http://1018201.vkrugudruzei.ru/blog/>', u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM', u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B', u''] I want to convert it to something like: {warc-type='request',warc-date='2016-12-02'. ward-record-id='<urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b} In Python I would simply set a flag, and read line by line (create a state machine). You can't do this in spark, though. Thanks Henry -- Paul Henry Tremblay Robert Half Technology
Re: wholeTextFiles fails, but textFile succeeds for same path
I've actually been able to trace the problem to the files being read in. If I change to a different directory, then I don't get the error. Is one of the executors running out of memory? On 02/06/2017 02:35 PM, Paul Tremblay wrote: When I try to create an rdd using wholeTextFiles, I get an incomprehensible error. But when I use the same path with sc.textFile, I get no error. I am using pyspark with spark 2.1. in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/ rdd = sc.wholeTextFiles(in_path) rdd.take(1) /usr/lib/spark/python/pyspark/rdd.py in take(self, num) 1341 1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 963 # SparkContext#runJob. 964 mappedRDD = rdd.mapPartitions(partitionFunc) --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 966 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 967 /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486415078210_0005_01_16 on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. Diagnostics: Exception from container-launch. Container id: container_1486415078210_0005_01_16 Exit code: 52 Stack trace: ExitCodeException exitCode=52: at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) rdd = sc.textFile(in_path) In [8]: rdd.take(1) Out[8]: [u'WARC/1.0'] - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
wholeTextFiles fails, but textFile succeeds for same path
When I try to create an rdd using wholeTextFiles, I get an incomprehensible error. But when I use the same path with sc.textFile, I get no error. I am using pyspark with spark 2.1. in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/ rdd = sc.wholeTextFiles(in_path) rdd.take(1) /usr/lib/spark/python/pyspark/rdd.py in take(self, num) 1341 1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 963 # SparkContext#runJob. 964 mappedRDD = rdd.mapPartitions(partitionFunc) --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 966 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 967 /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486415078210_0005_01_16 on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. Diagnostics: Exception from container-launch. Container id: container_1486415078210_0005_01_16 Exit code: 52 Stack trace: ExitCodeException exitCode=52: at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) rdd = sc.textFile(in_path) In [8]: rdd.take(1) Out[8]: [u'WARC/1.0'] - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Turning rows into columns
I am using pyspark 2.1 and am wondering how to convert a flat file, with one record per row, into a columnar format. Here is an example of the data: u'WARC/1.0', u'WARC-Type: warcinfo', u'WARC-Date: 2016-12-08T13:00:23Z', u'WARC-Record-ID: ', u'Content-Length: 344', u'Content-Type: application/warc-fields', u'WARC-Filename: CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz', u'', u'robots: classic', u'hostname: ip-10-31-129-80.ec2.internal', u'software: Nutch 1.6 (CC)/CC WarcExport 1.0', u'isPartOf: CC-MAIN-2016-50', u'operator: CommonCrawl Admin', u'description: Wide crawl of the web for November 2016', u'publisher: CommonCrawl', u'format: WARC File Format 1.0', u'conformsTo: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf', u'', u'', u'WARC/1.0', u'WARC-Type: request', u'WARC-Date: 2016-12-02T17:54:09Z', u'WARC-Record-ID: ', u'Content-Length: 220', u'Content-Type: application/http; msgtype=request', u'WARC-Warcinfo-ID: ', u'WARC-IP-Address: 217.197.115.133', u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/', u'', u'GET /blog/ HTTP/1.0', u'Host: 1018201.vkrugudruzei.ru', u'Accept-Encoding: x-gzip, gzip, deflate', u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/)', u'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', u'', u'', u'', u'WARC/1.0', u'WARC-Type: response', u'WARC-Date: 2016-12-02T17:54:09Z', u'WARC-Record-ID: ', u'Content-Length: 577', u'Content-Type: application/http; msgtype=response', u'WARC-Warcinfo-ID: ', u'WARC-Concurrent-To: ', u'WARC-IP-Address: 217.197.115.133', u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/', u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM', u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B', u''] I want to convert it to something like: {warc-type='request',warc-date='2016-12-02'. ward-record-id='<urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b} In Python I would simply set a flag, and read line by line (create a state machine). You can't do this in spark, though. Thanks Henry -- Paul Henry Tremblay Robert Half Technology
RE: spark 2.02 error when writing to s3
Not sure what you mean by "a consistency layer on top." Any explanation would be greatly appreciated! Paul _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP Tel. + ▪ Mobile + _ From: Steve Loughran [mailto:ste...@hortonworks.com] Sent: Friday, January 27, 2017 3:20 AM To: VND Tremblay, Paul Cc: Neil Jonkers; Takeshi Yamamuro; user@spark.apache.org Subject: Re: spark 2.02 error when writing to s3 OK Nobody should be committing output directly to S3 without having something add a consistency layer on top, not if you want reliabie (as in "doesn't lose/corrupt data" reliable) work On 26 Jan 2017, at 19:09, VND Tremblay, Paul <tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote: This seems to have done the trick, although I am not positive. If I have time, I'll test spinning up a cluster with and without consistent view to pin point the error. _____ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP Tel. + ▪ Mobile + _ From: Neil Jonkers [mailto:neilod...@gmail.com] Sent: Friday, January 20, 2017 11:39 AM To: Steve Loughran; VND Tremblay, Paul Cc: Takeshi Yamamuro; user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: spark 2.02 error when writing to s3 Can you test by enabling emrfs consistent view and use s3:// uri. http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html Original message From: Steve Loughran Date:20/01/2017 21:17 (GMT+02:00) To: "VND Tremblay, Paul" Cc: Takeshi Yamamuro ,user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: spark 2.02 error when writing to s3 AWS S3 is eventually consistent: even after something is deleted, a LIST/GET call may show it. You may be seeing that effect; even after the DELETE has got rid of the files, a listing sees something there, And I suspect the time it takes for the listing to "go away" will depend on the total number of entries underneath, as there are more deletion markers "tombstones" to propagate around s3 Try deleting the path and then waiting a short period On 20 Jan 2017, at 18:54, VND Tremblay, Paul <tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote: I am using an EMR cluster, and the latest version offered is 2.02. The link below indicates that that user had the same problem, which seems unresolved. Thanks Paul _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP Tel. + ▪ Mobile + _ From: Takeshi Yamamuro [mailto:linguin@gmail.com] Sent: Thursday, January 19, 2017 9:27 PM To: VND Tremblay, Paul Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: spark 2.02 error when writing to s3 Hi, Do you get the same exception also in v2.1.0? Anyway, I saw another guy reporting the same error, I think. https://www.mail-archive.com/user@spark.apache.org/msg60882.html // maropu On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul <tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote: I have come across a problem when writing CSV files to S3 in Spark 2.02. The problem does not exist in Spark 1.6. 19:09:20 Caused by: java.io.IOException: File already exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv My code is this: new_rdd\ 135 .map(add_date_diff)\ 136 .map(sid_offer_days)\ 137 .groupByKey()\ 138 .map(custom_sort)\ 139 .map(before_rev_date)\ 140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, num_weeks))\ 141 .toDF()\ 142 .write.csv( 143 sep = "|", 144 header = True, 145 nullValue = '', 146 quote = None, 147 path = path 148 ) In order to get the path (the last argument), I call this function: 150 def _get_s3_write(test): 151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), _get_s3_write_dir(test)): 152 s3_utility.remove_s3_dir(_get_write_bucket_name(), _get_s3_write_dir(test)) 153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test)) In other words, I am removing the directory if it exists before I write. Notes: * If I use a small set of data, then I don't get the error * If I use Spark 1.6, I don'
RE: spark 2.02 error when writing to s3
This seems to have done the trick, although I am not positive. If I have time, I'll test spinning up a cluster with and without consistent view to pin point the error. _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP Tel. + ▪ Mobile + _ From: Neil Jonkers [mailto:neilod...@gmail.com] Sent: Friday, January 20, 2017 11:39 AM To: Steve Loughran; VND Tremblay, Paul Cc: Takeshi Yamamuro; user@spark.apache.org Subject: Re: spark 2.02 error when writing to s3 Can you test by enabling emrfs consistent view and use s3:// uri. http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html Original message From: Steve Loughran Date:20/01/2017 21:17 (GMT+02:00) To: "VND Tremblay, Paul" Cc: Takeshi Yamamuro ,user@spark.apache.org Subject: Re: spark 2.02 error when writing to s3 AWS S3 is eventually consistent: even after something is deleted, a LIST/GET call may show it. You may be seeing that effect; even after the DELETE has got rid of the files, a listing sees something there, And I suspect the time it takes for the listing to "go away" will depend on the total number of entries underneath, as there are more deletion markers "tombstones" to propagate around s3 Try deleting the path and then waiting a short period On 20 Jan 2017, at 18:54, VND Tremblay, Paul <tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote: I am using an EMR cluster, and the latest version offered is 2.02. The link below indicates that that user had the same problem, which seems unresolved. Thanks Paul _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP Tel. + ▪ Mobile + _ From: Takeshi Yamamuro [mailto:linguin@gmail.com] Sent: Thursday, January 19, 2017 9:27 PM To: VND Tremblay, Paul Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: spark 2.02 error when writing to s3 Hi, Do you get the same exception also in v2.1.0? Anyway, I saw another guy reporting the same error, I think. https://www.mail-archive.com/user@spark.apache.org/msg60882.html // maropu On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul <tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote: I have come across a problem when writing CSV files to S3 in Spark 2.02. The problem does not exist in Spark 1.6. 19:09:20 Caused by: java.io.IOException: File already exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv My code is this: new_rdd\ 135 .map(add_date_diff)\ 136 .map(sid_offer_days)\ 137 .groupByKey()\ 138 .map(custom_sort)\ 139 .map(before_rev_date)\ 140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, num_weeks))\ 141 .toDF()\ 142 .write.csv( 143 sep = "|", 144 header = True, 145 nullValue = '', 146 quote = None, 147 path = path 148 ) In order to get the path (the last argument), I call this function: 150 def _get_s3_write(test): 151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), _get_s3_write_dir(test)): 152 s3_utility.remove_s3_dir(_get_write_bucket_name(), _get_s3_write_dir(test)) 153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test)) In other words, I am removing the directory if it exists before I write. Notes: * If I use a small set of data, then I don't get the error * If I use Spark 1.6, I don't get the error * If I read in a simple dataframe and then write to S3, I still get the error (without doing any transformations) * If I do the previous step with a smaller set of data, I don't get the error. * I am using pyspark, with python 2.7 * The thread at this link: https://forums.aws.amazon.com/thread.jspa?threadID=152470 Indicates the problem is caused by a problem sync problem. With large datasets, spark tries to write multiple times and causes the error. The suggestion is to turn off speculation, but I believe speculation is turned off by default in pyspark. Thanks! Paul _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP STL ▪ Tel. + ▪ Mobile + tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com> _ Read BCG's latest insights, ana
RE: Ingesting Large csv File to relational database
What relational db are you using? We do this at work, and the way we handle it is to unload the db into Spark (actually, we unload it to S3 and then into Spark). Redshift is very efficient at dumlping tables this way. _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP Tel. + ▪ Mobile + _ From: Eric Dain [mailto:ericdai...@gmail.com] Sent: Wednesday, January 25, 2017 11:14 PM To: user@spark.apache.org Subject: Ingesting Large csv File to relational database Hi, I need to write nightly job that ingest large csv files (~15GB each) and add/update/delete the changed rows to relational database. If a row is identical to what in the database, I don't want to re-write the row to the database. Also, if same item comes from multiple sources (files) I need to implement a logic to choose if the new source is preferred or the current one in the database should be kept unchanged. Obviously, I don't want to query the database for each item to check if the item has changed or no. I prefer to maintain the state inside Spark. Is there a preferred and performant way to do that using Apache Spark ? Best, Eric __ The Boston Consulting Group, Inc. This e-mail message may contain confidential and/or privileged information. If you are not an addressee or otherwise authorized to receive this message, you should not use, copy, disclose or take any action based on this e-mail or any information contained in the message. If you have received this material in error, please advise the sender immediately by reply e-mail and delete this message. Thank you.
RE: spark 2.02 error when writing to s3
I am using an EMR cluster, and the latest version offered is 2.02. The link below indicates that that user had the same problem, which seems unresolved. Thanks Paul _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP Tel. + ▪ Mobile + _ From: Takeshi Yamamuro [mailto:linguin@gmail.com] Sent: Thursday, January 19, 2017 9:27 PM To: VND Tremblay, Paul Cc: user@spark.apache.org Subject: Re: spark 2.02 error when writing to s3 Hi, Do you get the same exception also in v2.1.0? Anyway, I saw another guy reporting the same error, I think. https://www.mail-archive.com/user@spark.apache.org/msg60882.html // maropu On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul <tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote: I have come across a problem when writing CSV files to S3 in Spark 2.02. The problem does not exist in Spark 1.6. 19:09:20 Caused by: java.io.IOException: File already exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv My code is this: new_rdd\ 135 .map(add_date_diff)\ 136 .map(sid_offer_days)\ 137 .groupByKey()\ 138 .map(custom_sort)\ 139 .map(before_rev_date)\ 140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, num_weeks))\ 141 .toDF()\ 142 .write.csv( 143 sep = "|", 144 header = True, 145 nullValue = '', 146 quote = None, 147 path = path 148 ) In order to get the path (the last argument), I call this function: 150 def _get_s3_write(test): 151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), _get_s3_write_dir(test)): 152 s3_utility.remove_s3_dir(_get_write_bucket_name(), _get_s3_write_dir(test)) 153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test)) In other words, I am removing the directory if it exists before I write. Notes: * If I use a small set of data, then I don't get the error * If I use Spark 1.6, I don't get the error * If I read in a simple dataframe and then write to S3, I still get the error (without doing any transformations) * If I do the previous step with a smaller set of data, I don't get the error. * I am using pyspark, with python 2.7 * The thread at this link: https://forums.aws.amazon.com/thread.jspa?threadID=152470 Indicates the problem is caused by a problem sync problem. With large datasets, spark tries to write multiple times and causes the error. The suggestion is to turn off speculation, but I believe speculation is turned off by default in pyspark. Thanks! Paul _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP STL ▪ Tel. + ▪ Mobile + tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com> _ Read BCG's latest insights, analysis, and viewpoints at bcgperspectives.com<http://www.bcgperspectives.com> The Boston Consulting Group, Inc. This e-mail message may contain confidential and/or privileged information. If you are not an addressee or otherwise authorized to receive this message, you should not use, copy, disclose or take any action based on this e-mail or any information contained in the message. If you have received this material in error, please advise the sender immediately by reply e-mail and delete this message. Thank you. -- --- Takeshi Yamamuro
spark 2.02 error when writing to s3
I have come across a problem when writing CSV files to S3 in Spark 2.02. The problem does not exist in Spark 1.6. 19:09:20 Caused by: java.io.IOException: File already exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv My code is this: new_rdd\ 135 .map(add_date_diff)\ 136 .map(sid_offer_days)\ 137 .groupByKey()\ 138 .map(custom_sort)\ 139 .map(before_rev_date)\ 140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, num_weeks))\ 141 .toDF()\ 142 .write.csv( 143 sep = "|", 144 header = True, 145 nullValue = '', 146 quote = None, 147 path = path 148 ) In order to get the path (the last argument), I call this function: 150 def _get_s3_write(test): 151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), _get_s3_write_dir(test)): 152 s3_utility.remove_s3_dir(_get_write_bucket_name(), _get_s3_write_dir(test)) 153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test)) In other words, I am removing the directory if it exists before I write. Notes: * If I use a small set of data, then I don't get the error * If I use Spark 1.6, I don't get the error * If I read in a simple dataframe and then write to S3, I still get the error (without doing any transformations) * If I do the previous step with a smaller set of data, I don't get the error. * I am using pyspark, with python 2.7 * The thread at this link: https://forums.aws.amazon.com/thread.jspa?threadID=152470 Indicates the problem is caused by a problem sync problem. With large datasets, spark tries to write multiple times and causes the error. The suggestion is to turn off speculation, but I believe speculation is turned off by default in pyspark. Thanks! Paul _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP STL ▪ Tel. + ▪ Mobile + tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com> _ Read BCG's latest insights, analysis, and viewpoints at bcgperspectives.com<http://www.bcgperspectives.com> __ The Boston Consulting Group, Inc. This e-mail message may contain confidential and/or privileged information. If you are not an addressee or otherwise authorized to receive this message, you should not use, copy, disclose or take any action based on this e-mail or any information contained in the message. If you have received this material in error, please advise the sender immediately by reply e-mail and delete this message. Thank you.
Spark 2.0 Encoder().schema() is sorting StructFields
Hi all, I am using Spark 2.0 to read a CSV file into a Dataset in Java. This works fine if i define the StructType with the StructField array ordered by hand. What I would like to do is use a bean class for both the schema and Dataset row type. For example, Dataset beanDS = spark.read().schema( Encoders.bean(Bean.class).schema()).as(Encoders.bean(Bean.class)); When using the Encoder(Bean.class).schema() method to generate the StructType array of StructFields the class attributes are returned as a sorted list and not in the defined order within the Bean.class. This makes the schema unusable for reading from a CSV file where the ordering of the attributes is significant. Is there anyway to cause the Encoder().schema() method to return the array of StructFields in the original bean class definition? (Aside from prefix every attribute name to maintain order) Would this be considered a bug/enhancement? Regards, Paul - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: AVRO vs Parquet
Nice article about Parquet *with* Avro : - https://dzone.com/articles/understanding-how-parquet - http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/ Nice video from the good folks of Cloudera for the *differences* between "Avrow" and Parquet - https://www.youtube.com/watch?v=AY1dEfyFeHc 2016-03-04 7:12 GMT+01:00 Koert Kuipers <ko...@tresata.com>: > well can you use orc without bringing in the kitchen sink of dependencies > also known as hive? > > On Thu, Mar 3, 2016 at 11:48 PM, Jong Wook Kim <ilike...@gmail.com> wrote: > >> How about ORC? I have experimented briefly with Parquet and ORC, and I >> liked the fact that ORC has its schema within the file, which makes it >> handy to work with any other tools. >> >> Jong Wook >> >> On 3 March 2016 at 23:29, Don Drake <dondr...@gmail.com> wrote: >> >>> My tests show Parquet has better performance than Avro in just about >>> every test. It really shines when you are querying a subset of columns in >>> a wide table. >>> >>> -Don >>> >>> On Wed, Mar 2, 2016 at 3:49 PM, Timothy Spann <tim.sp...@airisdata.com> >>> wrote: >>> >>>> Which format is the best format for SparkSQL adhoc queries and general >>>> data storage? >>>> >>>> There are lots of specialized cases, but generally accessing some but >>>> not all the available columns with a reasonable subset of the data. >>>> >>>> I am learning towards Parquet as it has great support in Spark. >>>> >>>> I also have to consider any file on HDFS may be accessed from other >>>> tools like Hive, Impala, HAWQ. >>>> >>>> Suggestions? >>>> — >>>> airis.DATA >>>> Timothy Spann, Senior Solutions Architect >>>> C: 609-250-5894 >>>> http://airisdata.com/ >>>> http://meetup.com/nj-datascience >>>> >>>> >>>> >>> >>> >>> -- >>> Donald Drake >>> Drake Consulting >>> http://www.drakeconsulting.com/ >>> https://twitter.com/dondrake <http://www.MailLaunder.com/> >>> 800-733-2143 >>> >> >> > -- Paul Leclercq | Data engineer paul.lecle...@tabmo.io | http://www.tabmo.fr/
Re: Kafka streaming receiver approach - new topic not read from beginning
I successfully processed my data by resetting manually my topic offsets on ZK. If it may help someone, here's my steps : Make sure you stop all your consumers before doing that, otherwise they overwrite the new offsets you wrote set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset} Source : https://metabroadcast.com/blog/resetting-kafka-offsets 2016-02-22 11:55 GMT+01:00 Paul Leclercq <paul.lecle...@tabmo.io>: > Thanks for your quick answer. > > If I set "auto.offset.reset" to "smallest" as for KafkaParams like this > > val kafkaParams = Map[String, String]( > "metadata.broker.list" -> brokers, > "group.id" -> groupId, > "auto.offset.reset" -> "smallest" > ) > > And then use : > > val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics, > StorageLevel.MEMORY_AND_DISK_SER_2) > > My fear is that, every time I deploy a new version, the all consumer's topics > are going to be read from the beginning, but as said in Kafka's documentation > > auto.offset.reset default : largest > > What to do when there* is no initial offset in ZooKeeper* or if an offset is > out of range: > * smallest : automatically reset the offset to the smallest offset > > So I will go for this option the next time I need to process a new topic > > To fix my problem, as the topic as already been processed and registred in > ZK, I will use a directStream from smallest and remove all DB inserts of this > topic, and restart a "normal" stream when the lag will be caught up. > > > 2016-02-22 10:57 GMT+01:00 Saisai Shao <sai.sai.s...@gmail.com>: > >> You could set this configuration "auto.offset.reset" through parameter >> "kafkaParams" which is provided in some other overloaded APIs of >> createStream. >> >> By default Kafka will pick data from latest offset unless you explicitly >> set it, this is the behavior Kafka, not Spark. >> >> Thanks >> Saisai >> >> On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq <paul.lecle...@tabmo.io> >> wrote: >> >>> Hi, >>> >>> Do you know why, with the receiver approach >>> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach> >>> and a *consumer group*, a new topic is not read from the beginning but >>> from the lastest ? >>> >>> Code example : >>> >>> val kafkaStream = KafkaUtils.createStream(streamingContext, >>> [ZK quorum], [consumer group id], [per-topic number of Kafka >>> partitions to consume]) >>> >>> >>> Is there a way to tell *only for new topic *to read from the beginning ? >>> >>> From Confluence FAQ >>> >>>> Alternatively, you can configure the consumer by setting >>>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest" >>>> for the old consumer. >>> >>> >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata? >>> >>> Thanks >>> -- >>> >>> Paul Leclercq >>> >> >> > > > -- > > Paul Leclercq | Data engineer > > > paul.lecle...@tabmo.io | http://www.tabmo.fr/ > -- Paul Leclercq | Data engineer paul.lecle...@tabmo.io | http://www.tabmo.fr/
Re: Kafka streaming receiver approach - new topic not read from beginning
Thanks for your quick answer. If I set "auto.offset.reset" to "smallest" as for KafkaParams like this val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "group.id" -> groupId, "auto.offset.reset" -> "smallest" ) And then use : val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics, StorageLevel.MEMORY_AND_DISK_SER_2) My fear is that, every time I deploy a new version, the all consumer's topics are going to be read from the beginning, but as said in Kafka's documentation auto.offset.reset default : largest What to do when there* is no initial offset in ZooKeeper* or if an offset is out of range: * smallest : automatically reset the offset to the smallest offset So I will go for this option the next time I need to process a new topic To fix my problem, as the topic as already been processed and registred in ZK, I will use a directStream from smallest and remove all DB inserts of this topic, and restart a "normal" stream when the lag will be caught up. 2016-02-22 10:57 GMT+01:00 Saisai Shao <sai.sai.s...@gmail.com>: > You could set this configuration "auto.offset.reset" through parameter > "kafkaParams" which is provided in some other overloaded APIs of > createStream. > > By default Kafka will pick data from latest offset unless you explicitly > set it, this is the behavior Kafka, not Spark. > > Thanks > Saisai > > On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq <paul.lecle...@tabmo.io> > wrote: > >> Hi, >> >> Do you know why, with the receiver approach >> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach> >> and a *consumer group*, a new topic is not read from the beginning but >> from the lastest ? >> >> Code example : >> >> val kafkaStream = KafkaUtils.createStream(streamingContext, >> [ZK quorum], [consumer group id], [per-topic number of Kafka partitions >> to consume]) >> >> >> Is there a way to tell *only for new topic *to read from the beginning ? >> >> From Confluence FAQ >> >>> Alternatively, you can configure the consumer by setting >>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest" >>> for the old consumer. >> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata? >> >> Thanks >> -- >> >> Paul Leclercq >> > > -- Paul Leclercq | Data engineer paul.lecle...@tabmo.io | http://www.tabmo.fr/
Kafka streaming receiver approach - new topic not read from beginning
Hi, Do you know why, with the receiver approach <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach> and a *consumer group*, a new topic is not read from the beginning but from the lastest ? Code example : val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) Is there a way to tell *only for new topic *to read from the beginning ? >From Confluence FAQ > Alternatively, you can configure the consumer by setting auto.offset.reset > to "earliest" for the new consumer in 0.9 and "smallest" for the old > consumer. https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata? Thanks -- Paul Leclercq
Re: spark-1.2.0--standalone-ha-zookeeper
Hi Raghvendra and Spark users, I also have trouble activating my stand by master when my first master is shutdown (via a ./sbin/stop-master.sh or via a instance shut down) and just want to share with you my thoughts. To answer your question Raghvendra, in *spark-env.sh*, if 2 IPs are set for SPARK_MASTER_IP(SPARK_MASTER_IP='W.X.Y.Z,A.B.C.D'), the standalone cluster cannot be launched. So I only use only one IP there, as the Spark context can know other masters with a other way, as written in the Standalone Zookeeper HA <http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper> doc, "you might start your SparkContext pointing to spark://host1:port1,host2:port2" In my opinion, we should not have to set a SPARK_MASTER_IP as this is stored in ZooKeeper : you can launch multiple Masters in your cluster connected to the same > ZooKeeper instance. One will be elected “leader” and the others will remain > in standby mode. When starting up, an application or Worker needs to be able to find and > register with the current lead Master. Once it successfully registers, > though, it is “in the system” (i.e., stored in ZooKeeper). - http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper As I understand it, after a ./sbin/stop-master.sh on both master, a master will be elected, and the other will be stand by. To launch the workers, we can use ./sbin/start-slave.sh spark://MASTER_ELECTED_IP:7077 I don't think if we can use the ./sbin/start-all.sh that use the salve file to launch workers and masters as we cannot set 2 master IPs inside spark-env.sh My SPARK_DAEMON_JAVA_OPTS content : SPARK_DAEMON_JAVA_OPTS='-Dspark.deploy.recoveryMode="ZOOKEEPER" > -Dspark.deploy.zookeeper.url="ZOOKEEPER_IP:2181" > -Dspark.deploy.zookeeper.dir="/spark"' A good thing to check if everything went OK is the folder /spark on the ZooKeeper server. I could not find it on my server. Thanks for reading, Paul 2016-01-19 22:12 GMT+01:00 Raghvendra Singh <raghvendra.ii...@gmail.com>: > Hi, there is one question. In spark-env.sh should i specify all masters > for parameter SPARK_MASTER_IP. I've set SPARK_DAEMON_JAVA_OPTS already > with zookeeper configuration as specified in spark documentation. > > Thanks & Regards > Raghvendra > > On Wed, Jan 20, 2016 at 1:46 AM, Raghvendra Singh < > raghvendra.ii...@gmail.com> wrote: > >> Here's the complete master log on reproducing the error >> http://pastebin.com/2YJpyBiF >> >> Regards >> Raghvendra >> >> On Wed, Jan 20, 2016 at 12:38 AM, Raghvendra Singh < >> raghvendra.ii...@gmail.com> wrote: >> >>> Ok I Will try to reproduce the problem. Also I don't think this is an >>> uncommon problem I am searching for this problem on Google for many days >>> and found lots of questions but no answers. >>> >>> Do you know what kinds of settings spark and zookeeper allow for >>> handling time outs during leader election etc. When one is down. >>> >>> Regards >>> Raghvendra >>> On 20-Jan-2016 12:28 am, "Ted Yu" <yuzhih...@gmail.com> wrote: >>> >>>> Perhaps I don't have enough information to make further progress. >>>> >>>> On Tue, Jan 19, 2016 at 10:55 AM, Raghvendra Singh < >>>> raghvendra.ii...@gmail.com> wrote: >>>> >>>>> I currently do not have access to those logs but there were only about >>>>> five lines before this error. They were the same which are present usually >>>>> when everything works fine. >>>>> >>>>> Can you still help? >>>>> >>>>> Regards >>>>> Raghvendra >>>>> On 18-Jan-2016 8:50 pm, "Ted Yu" <yuzhih...@gmail.com> wrote: >>>>> >>>>>> Can you pastebin master log before the error showed up ? >>>>>> >>>>>> The initial message was posted for Spark 1.2.0 >>>>>> Which release of Spark / zookeeper do you use ? >>>>>> >>>>>> Thanks >>>>>> >>>>>> On Mon, Jan 18, 2016 at 6:47 AM, doctorx <raghvendra.ii...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> I am facing the same issue, with the given error >>>>>>> >>>>>>> ERROR Master:75 - Leadership has been revoked -- master shutting >>>>>>> down. >>>>>>> >>>>>>> Can anybody help. Any clue will be useful. Should i change something >>>>>>> in >>>>>>> spark cluster or zookeeper. Is there any setting in spark which can >>>>>>> help me? >>>>>>> >>>>>>> Thanks & Regards >>>>>>> Raghvendra >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> View this message in context: >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-0-standalone-ha-zookeeper-tp21308p25994.html >>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> >>>>>>> - >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>> >> >
Re: Spark streaming job hangs
You might not have enough cores to process data from Kafka > When running a Spark Streaming program locally, do not use “local” or > “local[1]” as the master URL. Either of these means that only one thread > will be used for running tasks locally. If you are using a input DStream > based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single > thread will be used to run the receiver, leaving no thread for processing > the received data. *Hence, when running locally, always use “local[n]” as > the master URL, *where n > number of receivers to run (see Spark > Properties for information on how to set the master).* https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers <https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers> 2015-12-01 7:13 GMT+01:00 Cassa L <lcas...@gmail.com>: > Hi, > I am reading data from Kafka into spark. It runs fine for sometime but > then hangs forever with following output. I don't see and errors in logs. > How do I debug this? > > 2015-12-01 06:04:30,697 [dag-scheduler-event-loop] INFO > (Logging.scala:59) - Adding task set 19.0 with 4 tasks > 2015-12-01 06:04:30,872 [pool-13-thread-1] INFO (Logging.scala:59) - > Disconnected from Cassandra cluster: APG DEV Cluster > 2015-12-01 06:04:35,060 [JobGenerator] INFO (Logging.scala:59) - Added > jobs for time 1448949875000 ms > 2015-12-01 06:04:40,054 [JobGenerator] INFO (Logging.scala:59) - Added > jobs for time 144894988 ms > 2015-12-01 06:04:45,034 [JobGenerator] INFO (Logging.scala:59) - Added > jobs for time 1448949885000 ms > 2015-12-01 06:04:50,100 [JobGenerator] INFO (Logging.scala:59) - Added > jobs for time 144894989 ms > 2015-12-01 06:04:55,064 [JobGenerator] INFO (Logging.scala:59) - Added > jobs for time 1448949895000 ms > 2015-12-01 06:05:00,125 [JobGenerator] INFO (Logging.scala:59) - Added > jobs for time 144894990 ms > > > Thanks > LCassa > -- Paul Leclercq | Data engineer paul.lecle...@tabmo.io | http://www.tabmo.fr/
unpersist RDD from another thread
Hi, What is the behavior when calling rdd.unpersist() from a different thread while another thread is using that rdd. Below is a simple case for this: 1) create rdd and load data 2) call rdd.cache() to bring data into memory 3) create another thread and pass rdd for a long computation 4) call rdd.unpersist while 3. is still running Questions: * Will the computation in 3) finish properly even if unpersist was called on the rdd while running? * What happens if a part of the computation fails and the rdd needs to reconstruct based on DAG lineage, will this still work even though unpersist has been called? thanks, -paul
Re: unpersist RDD from another thread
So in order to not incur any performance issues I should really wait for all usage of the rdd to complete before calling unpersist, correct? On Wed, Sep 16, 2015 at 4:08 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > unpredictable. I think it will be safe (as in nothing should fail), but > the performance will be unpredictable (some partition may use cache, some > may not be able to use the cache). > > On Wed, Sep 16, 2015 at 1:06 PM, Paul Weiss <paulweiss@gmail.com> > wrote: > >> Hi, >> >> What is the behavior when calling rdd.unpersist() from a different thread >> while another thread is using that rdd. Below is a simple case for this: >> >> 1) create rdd and load data >> 2) call rdd.cache() to bring data into memory >> 3) create another thread and pass rdd for a long computation >> 4) call rdd.unpersist while 3. is still running >> >> Questions: >> >> * Will the computation in 3) finish properly even if unpersist was called >> on the rdd while running? >> * What happens if a part of the computation fails and the rdd needs to >> reconstruct based on DAG lineage, will this still work even though >> unpersist has been called? >> >> thanks, >> -paul >> > >
RE: Too many open files
Maybe you forgot Tod close a reader Ort writer object. Am 29. Juli 2015 18:04:59 MESZ, schrieb saif.a.ell...@wellsfargo.com: Thank you both, I will take a look, but 1. For high-shuffle tasks, is this right for the system to have the size and thresholds high? I hope there is no bad consequences. 2. I will try to overlook admin access and see if I can get anything with only user rights From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Wednesday, July 29, 2015 12:59 PM To: Ellafi, Saif A. Cc: user@spark.apache.org Subject: Re: Too many open files Please increase limit for open files: http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux On Jul 29, 2015, at 8:39 AM, saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hello, I’ve seen a couple emails on this issue but could not find anything to solve my situation. Tried to reduce the partitioning level, enable consolidateFiles and increase the sizeInFlight limit, but still no help. Spill manager is sort, which is the default, any advice? 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), shuffleId=3, mapId=0, reduceId=34, message= org.apache.spark.shuffle.FetchFailedException: /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index (Too many open files) .. .. 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 11.0 (TID 306) org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 (TID 317, localhost): java.io.FileNotFoundException: /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc (Too many open files) my fs is ext4 and currently ulist –n is 1024 Thanks Saif -- Diese Nachricht wurde von meinem Android-Mobiltelefon mit K-9 Mail gesendet.
Jobs with unknown origin.
Hey, I have quite a few jobs appearing in the web-ui with the description run at ThreadPoolExecutor.java:1142. Are these generated by SparkSQL internally? There are so many that they cause a RejectedExecutionException when the thread-pool runs out of space for them. RejectedExecutionException Task scala.concurrent.impl.Future$PromiseCompletingRunnable@30ec07a4 rejected from java.util.concurrent.ThreadPoolExecutor@9110d1[Running, pool size = 128, active threads = 128, queued tasks = 0, completed tasks = 392] java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution (ThreadPoolExecutor.java:2047) Any ideas where they come from? I'm pretty sure that they don't originate from my code. Cheers Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Benchmark results between Flink and Spark
I would guess the opposite is true for highly iterative benchmarks (common in graph processing and data-science). Spark has a pretty large overhead per iteration, more optimisations and planning only makes this worse. Sure people implemented things like dijkstra's algorithm in spark (a problem where the number of iterations is bounded by the circumference of the input graph), but all the datasets I've seen it running on had a very small circumference (which is common for e.g. social networks). Take sparkSQL for example. Catalyst is a really good query optimiser, but it introduces significant overhead. Since spark has no iterative semantics on its own (unlike flink), one has to materialise the intermediary dataframe at each iteration boundary to determine if a termination criterion is reached. This causes a huge amount of planning, especially since it looks like catalyst will try to optimise the dependency graph regardless of caching. A dependency graph that grows in the number of iterations and thus the size of the input dataset. In flink on the other hand, you can describe you entire iterative program through transformations without ever calling an action. This means that the optimiser will only have to do planing once. Just my 2 cents :) Cheers, Jan On 06 Jul 2015, at 06:10, n...@reactor8.com wrote: Maybe some flink benefits from some pts they outline here: http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html Probably if re-ran the benchmarks with 1.5/tungsten line would close the gap a bit(or a lot) with spark moving towards similar style off-heap memory mgmt, more planning optimizations From: Jerry Lam [mailto:chiling...@gmail.com] Sent: Sunday, July 5, 2015 6:28 PM To: Ted Yu Cc: Slim Baltagi; user Subject: Re: Benchmark results between Flink and Spark Hi guys, I just read the paper too. There is no much information regarding why Flink is faster than Spark for data science type of workloads in the benchmark. It is very difficult to generalize the conclusion of a benchmark from my point of view. How much experience the author has with Spark is in comparisons to Flink is one of the immediate questions I have. It would be great if they have the benchmark software available somewhere for other people to experiment. just my 2 cents, Jerry On Sun, Jul 5, 2015 at 4:35 PM, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: There was no mentioning of the versions of Flink and Spark used in benchmarking. The size of cluster is quite small. Cheers On Sun, Jul 5, 2015 at 10:24 AM, Slim Baltagi sbalt...@gmail.com mailto:sbalt...@gmail.com wrote: Hi Apache Flink outperforms Apache Spark in processing machine learning graph algorithms and relational queries but not in batch processing! The results were published in the proceedings of the 18th International Conference, Business Information Systems 2015, Poznań, Poland, June 24-26, 2015. Thanks to our friend Google, Chapter 3: 'Evaluating New Approaches of Big Data Analytics Frameworks' by Norman Spangenberg, Martin Roth and Bogdan Franczyk is available for preview at http://goo.gl/WocQci http://goo.gl/WocQci on pages 28-37. Enjoy! Slim Baltagi http://www.SparkBigData.com http://www.sparkbigdata.com/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Benchmark results between Flink and Spark
Sorry, that should be shortest path, and diameter of the graph. I shouldn't write emails before I get my morning coffee... On 06 Jul 2015, at 09:09, Jan-Paul Bultmann janpaulbultm...@me.com wrote: I would guess the opposite is true for highly iterative benchmarks (common in graph processing and data-science). Spark has a pretty large overhead per iteration, more optimisations and planning only makes this worse. Sure people implemented things like dijkstra's algorithm in spark (a problem where the number of iterations is bounded by the circumference of the input graph), but all the datasets I've seen it running on had a very small circumference (which is common for e.g. social networks). Take sparkSQL for example. Catalyst is a really good query optimiser, but it introduces significant overhead. Since spark has no iterative semantics on its own (unlike flink), one has to materialise the intermediary dataframe at each iteration boundary to determine if a termination criterion is reached. This causes a huge amount of planning, especially since it looks like catalyst will try to optimise the dependency graph regardless of caching. A dependency graph that grows in the number of iterations and thus the size of the input dataset. In flink on the other hand, you can describe you entire iterative program through transformations without ever calling an action. This means that the optimiser will only have to do planing once. Just my 2 cents :) Cheers, Jan On 06 Jul 2015, at 06:10, n...@reactor8.com mailto:n...@reactor8.com wrote: Maybe some flink benefits from some pts they outline here: http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html Probably if re-ran the benchmarks with 1.5/tungsten line would close the gap a bit(or a lot) with spark moving towards similar style off-heap memory mgmt, more planning optimizations From: Jerry Lam [mailto:chiling...@gmail.com mailto:chiling...@gmail.com] Sent: Sunday, July 5, 2015 6:28 PM To: Ted Yu Cc: Slim Baltagi; user Subject: Re: Benchmark results between Flink and Spark Hi guys, I just read the paper too. There is no much information regarding why Flink is faster than Spark for data science type of workloads in the benchmark. It is very difficult to generalize the conclusion of a benchmark from my point of view. How much experience the author has with Spark is in comparisons to Flink is one of the immediate questions I have. It would be great if they have the benchmark software available somewhere for other people to experiment. just my 2 cents, Jerry On Sun, Jul 5, 2015 at 4:35 PM, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: There was no mentioning of the versions of Flink and Spark used in benchmarking. The size of cluster is quite small. Cheers On Sun, Jul 5, 2015 at 10:24 AM, Slim Baltagi sbalt...@gmail.com mailto:sbalt...@gmail.com wrote: Hi Apache Flink outperforms Apache Spark in processing machine learning graph algorithms and relational queries but not in batch processing! The results were published in the proceedings of the 18th International Conference, Business Information Systems 2015, Poznań, Poland, June 24-26, 2015. Thanks to our friend Google, Chapter 3: 'Evaluating New Approaches of Big Data Analytics Frameworks' by Norman Spangenberg, Martin Roth and Bogdan Franczyk is available for preview at http://goo.gl/WocQci http://goo.gl/WocQci on pages 28-37. Enjoy! Slim Baltagi http://www.SparkBigData.com http://www.sparkbigdata.com/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html Sent from the Apache Spark User List mailing list archive at Nabble.com http://nabble.com/. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
generateTreeString causes huge performance problems on dataframe persistence
Hey, I noticed that my code spends hours with `generateTreeString` even though the actual dag/dataframe execution takes seconds. I’m running a query that grows exponential in the number of iterations when evaluated without caching, but should be linear when caching previous results. E.g. result_i+1 = distinct(join(result_i, result_i)) Which evaluates exponentially like this this without caching. Iteration | Dataframe Plan Tree 0|/\ 1| /\/\ 2|/\/\ /\/\ n|………. But should be linear with caching. Iteration | Dataframe Plan Tree 0| /\ | \/ 1| /\ | \/ 2| /\ | \/ n| ………. It seems that even though the DAG will have the later form, `generateTreeString` will walk the entire plan naively as if no caching was done. The spark webui also shows no active jobs even though my CPU uses one core fully, calculating that string. Below is the piece of stacktrace that starts the entire walk. ^ | Thousands of calls to `generateTreeString`. | org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(int, StringBuilder) TreeNode.scala:431 org.apache.spark.sql.catalyst.trees.TreeNode.treeString() TreeNode.scala:400 org.apache.spark.sql.catalyst.trees.TreeNode.toString() TreeNode.scala:397 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply() InMemoryColumnarTableScan.scala:164 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply() InMemoryColumnarTableScan.scala:164 scala.Option.getOrElse(Function0) Option.scala:120 org.apache.spark.sql.columnar.InMemoryRelation.buildBuffers() InMemoryColumnarTableScan.scala:164 org.apache.spark.sql.columnar.InMemoryRelation.init(Seq, boolean, int, StorageLevel, SparkPlan, Option, RDD, Statistics, Accumulable) InMemoryColumnarTableScan.scala:112 org.apache.spark.sql.columnar.InMemoryRelation$.apply(boolean, int, StorageLevel, SparkPlan, Option) InMemoryColumnarTableScan.scala:45 org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply() CacheManager.scala:102 org.apache.spark.sql.execution.CacheManager.writeLock(Function0) CacheManager.scala:70 org.apache.spark.sql.execution.CacheManager.cacheQuery(DataFrame, Option, StorageLevel) CacheManager.scala:94 org.apache.spark.sql.DataFrame.persist(StorageLevel) DataFrame.scala:1320 ^ | Application logic. | Could someone confirm my suspicion? And does somebody know why it’s called while caching, and why it walks the entire tree including cached results? Cheers, Jan-Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: generateTreeString causes huge performance problems on dataframe persistence
Seems you're hitting the self-join, currently Spark SQL won't cache any result/logical tree for further analyzing or computing for self-join. Other joins don’t suffer from this problem? Since the logical tree is huge, it's reasonable to take long time in generating its tree string recursively. The internal structure is basically a graph though, right? Where equal cached subtrees are structurally shared by reference instead of copying them by value. Is the `generateTreeString` result needed for anything other than giving the RDD a nice name? It seems rather wasteful to compute a graphs unfolding into a tree for this. And I also doubt the computing can finish within a reasonable time, as there probably be lots of partitions (grows exponentially) of the intermediate result. Possibly, so far the number of partitions stayed the same though. But I didn’t run that many iterations due to the problem :). As a workaround, you can break the iterations into smaller ones and trigger them manually in sequence. You mean` write` ing them to disk after each iteration? Thanks :), Jan -Original Message- From: Jan-Paul Bultmann [mailto:janpaulbultm...@me.com] Sent: Wednesday, June 17, 2015 6:17 PM To: User Subject: generateTreeString causes huge performance problems on dataframe persistence Hey, I noticed that my code spends hours with `generateTreeString` even though the actual dag/dataframe execution takes seconds. I’m running a query that grows exponential in the number of iterations when evaluated without caching, but should be linear when caching previous results. E.g. result_i+1 = distinct(join(result_i, result_i)) Which evaluates exponentially like this this without caching. Iteration | Dataframe Plan Tree 0|/\ 1| /\/\ 2|/\/\ /\/\ n|………. But should be linear with caching. Iteration | Dataframe Plan Tree 0| /\ | \/ 1| /\ | \/ 2| /\ | \/ n| ………. It seems that even though the DAG will have the later form, `generateTreeString` will walk the entire plan naively as if no caching was done. The spark webui also shows no active jobs even though my CPU uses one core fully, calculating that string. Below is the piece of stacktrace that starts the entire walk. ^ | Thousands of calls to `generateTreeString`. | org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(int, StringBuilder) TreeNode.scala:431 org.apache.spark.sql.catalyst.trees.TreeNode.treeString() TreeNode.scala:400 org.apache.spark.sql.catalyst.trees.TreeNode.toString() TreeNode.scala:397 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply() InMemoryColumnarTableScan.scala:164 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply() InMemoryColumnarTableScan.scala:164 scala.Option.getOrElse(Function0) Option.scala:120 org.apache.spark.sql.columnar.InMemoryRelation.buildBuffers() InMemoryColumnarTableScan.scala:164 org.apache.spark.sql.columnar.InMemoryRelation.init(Seq, boolean, int, StorageLevel, SparkPlan, Option, RDD, Statistics, Accumulable) InMemoryColumnarTableScan.scala:112 org.apache.spark.sql.columnar.InMemoryRelation$.apply(boolean, int, StorageLevel, SparkPlan, Option) InMemoryColumnarTableScan.scala:45 org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply() CacheManager.scala:102 org.apache.spark.sql.execution.CacheManager.writeLock(Function0) CacheManager.scala:70 org.apache.spark.sql.execution.CacheManager.cacheQuery(DataFrame, Option, StorageLevel) CacheManager.scala:94 org.apache.spark.sql.DataFrame.persist(StorageLevel) DataFrame.scala:1320 ^ | Application logic. | Could someone confirm my suspicion? And does somebody know why it’s called while caching, and why it walks the entire tree including cached results? Cheers, Jan-Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: build jar with all dependencies
] org.apache.spark.SecurityManager INFO - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(proewer); users with modify permissions: Set(proewer) Exception in thread main com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version' at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155) at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197) at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156) at org.apache.spark.SparkContext.init(SparkContext.scala:203) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at mgm.tp.bigdata.ma_spark.SparkMain.main(SparkMain.java:38) what i do wrong? best regards, paul
Soft distinct on data frames.
Hey, Is there a way to do a distinct operation on each partition only? My program generates quite a few duplicate tuples and it would be nice to remove some of these as an optimisation without having to reshuffle the data. I’ve also noticed that plans generated with an unique transformation have this peculiar form: == Physical Plan == Distinct false Exchange (HashPartitioning [_0#347L,_1#348L], 200) Distinct true PhysicalRDD [_0#347L,_1#348L], MapPartitionsRDD[247] at map at SQLContext.scala:394 Does this mean that set semantics are just a flag that can be turned off and on for each shuffling operation? If so, is it possible to do so in general, so that one always uses set semantics instead of bag? Or will the optimiser try to propagate the set semantics? Cheers Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark sql, creating literal columns in java.
Hey, What is the recommended way to create literal columns in java? Scala has the `lit` function from `org.apache.spark.sql.functions`. Should it be called from java as well? Cheers jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Jackson-core-asl conflict with Spark
So... one solution would be to use a non-Jurassic version of Jackson. 2.6 will drop before too long, and 3.0 is in longer-term planning. The 1.x series is long deprecated. If you're genuinely stuck with something ancient, then you need to include the JAR that contains the class, and 1.9.13 does not. Why do you think you need that particular version? — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Mar 12, 2015 at 9:58 AM, Uthayan Suthakar uthayan.sutha...@gmail.com wrote: Hello Guys, I'm running into below error: Exception in thread main java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass I have created a uber jar with Jackson-core-asl.1.9.13 and passed it with --jars configuration, but still getting errors. I searched on the net and found a few suggestions, such as disabling USE_ANNOTATIONS, still no joy. I tried disabling SQL module and recompiled Spark and installed the custom library, yet no joy. I'm running out of ideas, could you please assist me with this issue? Many thanks. Uthay.
Perf impact of BlockManager byte[] copies
Dear List, I'm investigating some problems related to native code integration with Spark, and while picking through BlockManager I noticed that data (de)serialization currently issues lots of array copies. Specifically: - Deserialization: BlockManager marshals all deserialized bytes through a spark.util. ByteBufferInputStream, which necessitates copying data into an intermediate temporary byte[] . The temporary byte[] might be reused between deserialization of T instances, but nevertheless the bytes must be copied (and likely in a Java loop). - Serialization: BlockManager buffers all serialized bytes into a java.io.ByteArrayOutputStream, which maintains an internal byte[] buffer and grows/re-copies the buffer like a vector as the buffer fills. BlockManager then retrieves the internal byte[] buffer, wraps it in a ByteBuffer, and sends it off to be stored (e.g. in MemoryStore, DiskStore, Tachyon, etc). When an individual T is somewhat large (e.g. a feature vector, an image, etc), or blocks are megabytes in size, these copies become expensive (especially for large written blocks), right? Does anybody have any measurements of /how/ expensive they are? If not, is there serialization benchmark code (e.g. for KryoSerializer ) that might be helpful here? As part of my investigation, I've found that one might be able to sidestep these issues by extending Spark's SerializerInstance API to offer I/O on ByteBuffers (in addition to {Input,Output}Streams). An extension including a ByteBuffer API would furthermore have many benefits for native code. A major downside of this API addition is that it wouldn't interoperate (nontrivially) with compression, so shuffles wouldn't benefit. Nevertheless, BlockManager could probably deduce when use of this ByteBuffer API is possible and leverage it. Cheers, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Support for SQL on unions of tables (merge tables?)
Thanks Cheng! For the list, I talked with Michael Armbrust at a recent Spark meetup and his comments were: * For a union of tables, use a view and the Hive metastore * SQLContext might have the directory-traversing logic I need in it already * The union() of sequence files I saw was slow because Spark was probably trying to shuffle the whole union. A similar Spark SQL join will also be slow (or break) unless one runs statistics so that the smaller table can be broadcasted (e.g. see https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options ) I have never used Hive, so I'll have to investigate further. On Tue, Jan 20, 2015 at 1:15 PM, Cheng Lian lian.cs@gmail.com wrote: I think you can resort to a Hive table partitioned by date https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables On 1/11/15 9:51 PM, Paul Wais wrote: Dear List, What are common approaches for addressing over a union of tables / RDDs? E.g. suppose I have a collection of log files in HDFS, one log file per day, and I want to compute the sum of some field over a date range in SQL. Using log schema, I can read each as a distinct SchemaRDD, but I want to union them all and query against one 'table'. If this data were in MySQL, I could have a table for each day of data and use a MyISAM merge table to union these tables together and just query against the merge table. What's nice here is that MySQL persists the merge table, and the merge table is r/w, so one can just update the merge table once per day. (What's not nice is that merge tables scale poorly, backup admin is a pain, and oh hey I'd like to use Spark not MySQL). One naive and untested idea (that achieves implicit persistence): scan an HDFS directory for log files, create one RDD per file, union() the RDDs, then create a Schema RDD from that union(). A few specific questions: * Any good approaches to a merge / union table? (Other than the naive idea above). Preferably with some way to persist that table / RDD between Spark runs. (How does Impala approach this problem?) * Has anybody tried joining against such a union of tables / RDDs on a very large amount of data? When I've tried (non-spark-sql) union()ing Sequence Files, and then join()ing them against another RDD, Spark seems to try to compute the full union before doing any join() computation (and eventually OOMs the cluster because the union of Sequence Files is so big). I haven't tried something similar with Spark SQL. * Are there any plans related to this in the Spark roadmap? (This feature would be a nice compliment to, say, persistent RDD indices for interactive querying). * Related question: are there plans to use Parquet Index Pages to make Spark SQL faster? E.g. log indices over date ranges would be relevant here. All the best, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 three times slower than spark 1.1, why?
To force one instance per executor, you could explicitly subclass FlatMapFunction and have it lazy-create your parser in the subclass constructor. You might also want to try RDD#mapPartitions() (instead of RDD#flatMap() if you want one instance per partition. This approach worked well for me when I had a flat map function that used non-serializable native code / objects. FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master has a slight refactor). Agree it's worth checking the number of partitions in your 1.1 vs 1.2 test. On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Support for SQL on unions of tables (merge tables?)
Dear List, What are common approaches for addressing over a union of tables / RDDs? E.g. suppose I have a collection of log files in HDFS, one log file per day, and I want to compute the sum of some field over a date range in SQL. Using log schema, I can read each as a distinct SchemaRDD, but I want to union them all and query against one 'table'. If this data were in MySQL, I could have a table for each day of data and use a MyISAM merge table to union these tables together and just query against the merge table. What's nice here is that MySQL persists the merge table, and the merge table is r/w, so one can just update the merge table once per day. (What's not nice is that merge tables scale poorly, backup admin is a pain, and oh hey I'd like to use Spark not MySQL). One naive and untested idea (that achieves implicit persistence): scan an HDFS directory for log files, create one RDD per file, union() the RDDs, then create a Schema RDD from that union(). A few specific questions: * Any good approaches to a merge / union table? (Other than the naive idea above). Preferably with some way to persist that table / RDD between Spark runs. (How does Impala approach this problem?) * Has anybody tried joining against such a union of tables / RDDs on a very large amount of data? When I've tried (non-spark-sql) union()ing Sequence Files, and then join()ing them against another RDD, Spark seems to try to compute the full union before doing any join() computation (and eventually OOMs the cluster because the union of Sequence Files is so big). I haven't tried something similar with Spark SQL. * Are there any plans related to this in the Spark roadmap? (This feature would be a nice compliment to, say, persistent RDD indices for interactive querying). * Related question: are there plans to use Parquet Index Pages to make Spark SQL faster? E.g. log indices over date ranges would be relevant here. All the best, -Paul
Re: Downloads from S3 exceedingly slow when running on spark-ec2
I would suggest checking out disk IO on the nodes in your cluster and then reading up on the limiting behaviors that accompany different kinds of EC2 storage. Depending on how things are configured for your nodes, you may have a local storage configuration that provides bursty IOPS where you get apparently good performance at first and then limiting kicks in and slows down the rate at which you can write data to local storage. — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Dec 18, 2014 at 5:56 AM, Jon Chase jon.ch...@gmail.com wrote: I'm running a very simple Spark application that downloads files from S3, does a bit of mapping, then uploads new files. Each file is roughly 2MB and is gzip'd. I was running the same code on Amazon's EMR w/Spark and not having any download speed issues (Amazon's EMR provides a custom implementation of the s3n:// file system, FWIW). When I say exceedingly slow, I mean that it takes about 2 minutes to download and process a 2MB file (this was taking ~2 seconds on the same instance types in Amazon's EMR). When I download the same file from the EC2 machine with wget or curl, it downloads in ~ 1 second. I've also done other bandwidth checks for downloads from other external hosts - no speed problems there. Tried this w/Spark 1.1.0 and 1.1.1. When I do a thread dump on a worker, I typically see this a lot: Executor task launch worker-7 daemon prio=10 tid=0x7fd174039000 nid=0x59e9 runnable [0x7fd1f7dfb000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) at sun.security.ssl.InputRecord.read(InputRecord.java:480) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) - locked 0x0007e44dd140 (a java.lang.Object) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) - locked 0x0007e44e1350 (a sun.security.ssl.AppInputStream) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read(BufferedInputStream.java:254) - locked 0x0007e44ea800 (a java.io.BufferedInputStream) at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78) at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106) at org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413) at org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973) at org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735) at org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126) at org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256) at
Using S3 block file system
Hi, I'm trying to use S3 Block file system in spark, i.e. s3:// urls (*not* s3n://). And I always get the following error: Py4JJavaError: An error occurred while calling o3188.saveAsParquetFile. : org.apache.hadoop.fs.s3.S3FileSystemException: Not a Hadoop S3 file. at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.checkMetadata(Jets3tFileSystemStore.java:206) at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:165) at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy24.retrieveINode(Unknown Source) at org.apache.hadoop.fs.s3.S3FileSystem.mkdir(S3FileSystem.java:158) at org.apache.hadoop.fs.s3.S3FileSystem.mkdirs(S3FileSystem.java:151) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1815) at org.apache.hadoop.fs.s3.S3FileSystem.create(S3FileSystem.java:234) [.. snip ..] I believe that I must somehow initialize file system (in particular the metadata), but I can't find out how to do it. I use spark 1.2.0rc1 with hadoop 2.4 and Riak CS (instead of S3) if that matters. The s3n:// protocol with same settings work. Thanks. -- Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parsing a large XML file using Spark
Unfortunately, unless you impose restrictions on the XML file (e.g., where namespaces are declared, whether entity replacement is used, etc.), you really can't parse only a piece of it even if you have start/end elements grouped together. If you want to deal effectively (and scalably) with large XML files consisting of many records, the right thing to do is to write them as one XML document per line just like the one JSON document per line, at which point the data can be split effectively. Something like Woodstox and a little custom code should make an effective pre-processor. Once you have the line-delimited XML, you can shred it however you want: JAXB, Jackson XML, etc. — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Fri, Nov 21, 2014 at 3:38 AM, Prannoy pran...@sigmoidanalytics.com wrote: Hi, Parallel processing of xml files may be an issue due to the tags in the xml file. The xml file has to be intact as while parsing it matches the start and end entity and if its distributed in parts to workers possibly it may or may not find start and end tags within the same worker which will give an exception. Thanks. On Wed, Nov 19, 2014 at 6:26 AM, ssimanta [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=19477i=0 wrote: If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump that all revision information also) that is stored in HDFS, is it possible to parse it in parallel/faster using Spark? Or do we have to use something like a PullParser or Iteratee? My current solution is to read the single XML file in the first pass - write it to HDFS and then read the small files in parallel on the Spark workers. Thanks -Soumya -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=19477i=1 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Parsing a large XML file using Spark http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239p19477.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Native / C/C++ code integration
More thoughts. I took a deeper look at BlockManager, RDD, and friends. Suppose one wanted to get native code access to un-deserialized blocks. This task looks very hard. An RDD behaves much like a Scala iterator of deserialized values, and interop with BlockManager is all on deserialized data. One would probably need to rewrite much of RDD, CacheManager, etc in native code; an RDD subclass (e.g. PythonRDD) probably wouldn't work. So exposing raw blocks to native code looks intractable. I wonder how fast Java/Kyro can SerDe of byte arrays. E.g. suppose we have an RDDT where T is immutable and most of the memory for a single T is a byte array. What is the overhead of SerDe-ing T? (Does Java/Kyro copy the underlying memory?) If the overhead is small, then native access to raw blocks wouldn't really yield any advantage. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-C-C-code-integration-tp18347p18640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Native / C/C++ code integration
Dear List, Has anybody had experience integrating C/C++ code into Spark jobs? I have done some work on this topic using JNA. I wrote a FlatMapFunction that processes all partition entries using a C++ library. This approach works well, but there are some tradeoffs: * Shipping the native dylib with the app jar and loading it at runtime requires a bit of work (on top of normal JNA usage) * Native code doesn't respect the executor heap limits. Under heavy memory pressure, the native code can sometimes ENOMEM sporadically. * While JNA can map Strings, structs, and Java primitive types, the user still needs to deal with more complex objects. E.g. re-serialize protobuf/thrift objects, or provide some other encoding for moving data between Java and C/C++. * C++ static is not thread-safe before C++11, so the user sometimes needs to take care running inside multi-threaded executors * Avoiding memory copies can be a little tricky One other alternative approach comes to mind is pipe(). However, PipedRDD requires copying data over pipes, does not support binary data (?), and native code errors that crash the subprocess don't bubble up to the Spark job as nicely as with JNA. Is there a way to expose raw, in-memory partition/block data to native code? Has anybody else attacked this problem a different way? All the best, -Paul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-C-C-code-integration-tp18347.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SQL] PERCENTILE is not working
Hi all, I encounter this error when execute the query sqlContext.sql(select percentile(age, array(0, 0.5, 1)) from people).collect() java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to [Ljava.lang.Object; at org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector.getListLength(StandardListObjectInspector.java:83) at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$ListConverter.convert(ObjectInspectorConverters.java:259) at org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.convertIfNecessary(GenericUDFUtils.java:349) at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.iterate(GenericUDAFBridge.java:170) at org.apache.spark.sql.hive.HiveUdafFunction.update(hiveUdfs.scala:342) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:135) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:128) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) Thanks, Kevin Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Do Spark executors restrict native heap vs JVM heap?
Thanks Sean! My novice understanding is that the 'native heap' is the address space not allocated to the JVM heap, but I wanted to check to see if I'm missing something. I found out my issue appeared to be actual memory pressure on the executor machine. There was space for the JVM heap but not much more. On Thu, Oct 30, 2014 at 12:49 PM, Sean Owen so...@cloudera.com javascript:; wrote: No, but, the JVM also does not allocate memory for native code on the heap. I dont think heap has any bearing on whether your native code can't allocate more memory except that of course the heap is also taking memory. On Oct 30, 2014 6:43 PM, Paul Wais pw...@yelp.com javascript:; wrote: Dear Spark List, I have a Spark app that runs native code inside map functions. I've noticed that the native code sometimes sets errno to ENOMEM indicating a lack of available memory. However, I've verified that the /JVM/ has plenty of heap space available-- Runtime.getRuntime().freeMemory() shows gigabytes free and the native code needs only megabytes. Does spark limit the /native/ heap size somehow? Am poking through the executor code now but don't see anything obvious. Best Regards, -Paul Wais - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
SchemaRDD.where clause error
Hi all, I tried to use the function SchemaRDD.where() but got some error: val people = sqlCtx.sql(select * from people) people.where('age === 10) console:27: error: value === is not a member of Symbol where did I go wrong? Thanks, Kevin Paul
Re: SparkSQL on Hive error
Thanks Michael, your patch works for me :) Regards, Kelvin Paul On Fri, Oct 3, 2014 at 3:52 PM, Michael Armbrust mich...@databricks.com wrote: Are you running master? There was briefly a regression here that is hopefully fixed by spark#2635 https://github.com/apache/spark/pull/2635. On Fri, Oct 3, 2014 at 1:43 AM, Kevin Paul kevinpaulap...@gmail.com wrote: Hi all, I tried to launch my application with spark-submit, the command I use is: bin/spark-submit --class ${MY_CLASS} --jars ${MY_JARS} --master local myApplicationJar.jar I've buillt spark with SPARK_HIVE=true, and was able to start HiveContext, and was able to run command like, hiveContext.sql(select * from myTable) or hiveContext.sql(select count(*) from myTable) myTable is a table on my hive database. However, when I run the command: hiveContext.sql(show tables), I got the following error: java.lang.NullPointerException at org.apache.hadoop.hive.ql.Driver.validateConfVariables(Driver.java:1057) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:948) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:298) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) Thanks, Kelvin Paul
Setting SparkSQL configuration
Hi all, I tried to set the configuration spark.sql.inMemoryColumnarStorage.compressed, and spark.sql.inMemoryColumnarStorage.batchSize in spark.executor.extraJavaOptions but it does not work, my spark.executor.extraJavaOptions contains Dspark.sql.inMemoryColumnarStorage.compressed=true -Dspark.sql.inMemoryColumnarStorage.batchSize=100, and SparkUI did indicate that the same setting, but somehow the memory footprint of my cacheTable RDDs are the same as without the setting. Only when I use HiveContext.setConf, it reduce my RDDs memory usage. Is it a bug here, or user are required to set the config using HiveContext's setConf function? Regards, Kelvin Paul
Re: Any issues with repartition?
Looks like an OOM issue? Have you tried persisting your RDDs to allow disk writes? I've seen a lot of similar crashes in a Spark app that reads from HDFS and does joins. I.e. I've seen java.io.IOException: Filesystem closed, Executor lost, FetchFailed, etc etc with non-deterministic crashes. I've tried persisting RDDs, tuning other params, and verifying that the Executor JVMs don't come close to their max allocated memory during operation. Looking through user@ tonight, there are a ton of email threads with similar crashes and no answers. It looks like a lot of people are struggling with OOMs. Could one of the Spark committers please comment on this thread, or one of the other unanswered threads with similar crashes? Is this simply how Spark behaves if Executors OOM? What can the user do other than increase memory or reduce RDD size? (And how can one deduce how much of either is needed?) One general workaround for OOMs could be to programmatically break the job input (i.e. from HDFS, input from #parallelize() ) into chunks, and only create/process RDDs related to one chunk at a time. However, this approach has the limitations of Spark Streaming and no formal library support. What might be nice is that if tasks fail, Spark could try to re-partition in order to avoid OOMs. On Fri, Oct 3, 2014 at 2:55 AM, jamborta jambo...@gmail.com wrote: I have two nodes with 96G ram 16 cores, my setup is as follows: conf = (SparkConf() .setMaster(yarn-cluster) .set(spark.executor.memory, 30G) .set(spark.cores.max, 32) .set(spark.executor.instances, 2) .set(spark.executor.cores, 8) .set(spark.akka.timeout, 1) .set(spark.akka.askTimeout, 100) .set(spark.akka.frameSize, 500) .set(spark.cleaner.ttl, 86400) .set(spark.tast.maxFailures, 16) .set(spark.worker.timeout, 150) thanks a lot, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15674.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL on Hive error
Hi all, I tried to launch my application with spark-submit, the command I use is: bin/spark-submit --class ${MY_CLASS} --jars ${MY_JARS} --master local myApplicationJar.jar I've buillt spark with SPARK_HIVE=true, and was able to start HiveContext, and was able to run command like, hiveContext.sql(select * from myTable) or hiveContext.sql(select count(*) from myTable) myTable is a table on my hive database. However, when I run the command: hiveContext.sql(show tables), I got the following error: java.lang.NullPointerException at org.apache.hadoop.hive.ql.Driver.validateConfVariables(Driver.java:1057) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:948) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:298) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) Thanks, Kelvin Paul
Worker Random Port
I am trying to get an edge server up and running connecting to our Spark 1.1 cluster. The edge server is in a different DMZ than the rest of the cluster and we have to specifically open firewall ports between the edge server and the rest of the cluster. I can log on to any node in the cluster (other than the edge) and submit code through spark-shell just fine. I have port 7077 from the edge to the master open (verified), and I have port 7078 open from the edge to all the workers (also verified). I have tried setting the worker port to not be dynamic by using SPARK_WORKER_PORT in the spark-env.sh but it does not seem to stop the dynamic port behavior. I have included the startup output when running spark-shell from the edge server in a different dmz and then from a node in the cluster. Any help greatly appreciated. Paul Magid Toyota Motor Sales IS Enterprise Architecture (EA) Architect I RD Ph: 310-468-9091 (X69091) PCN 1C2970, Mail Drop PN12 Running spark-shell from the edge server 14/09/23 14:20:38 INFO SecurityManager: Changing view acls to: root, 14/09/23 14:20:38 INFO SecurityManager: Changing modify acls to: root, 14/09/23 14:20:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/09/23 14:20:38 INFO HttpServer: Starting HTTP Server 14/09/23 14:20:39 INFO Utils: Successfully started service 'HTTP class server' on port 22788. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_55) Type in expressions to have them evaluated. Type :help for more information. 14/09/23 14:20:42 INFO SecurityManager: Changing view acls to: root, 14/09/23 14:20:42 INFO SecurityManager: Changing modify acls to: root, 14/09/23 14:20:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/09/23 14:20:43 INFO Slf4jLogger: Slf4jLogger started 14/09/23 14:20:43 INFO Remoting: Starting remoting 14/09/23 14:20:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356] 14/09/23 14:20:43 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356] 14/09/23 14:20:43 INFO Utils: Successfully started service 'sparkDriver' on port 32356. 14/09/23 14:20:43 INFO SparkEnv: Registering MapOutputTracker 14/09/23 14:20:43 INFO SparkEnv: Registering BlockManagerMaster 14/09/23 14:20:43 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140923142043-4454 14/09/23 14:20:43 INFO Utils: Successfully started service 'Connection manager for block manager' on port 48469. 14/09/23 14:20:43 INFO ConnectionManager: Bound socket to port 48469 with id = ConnectionManagerId(votlbdcd09.tms.toyota.com,48469) 14/09/23 14:20:43 INFO MemoryStore: MemoryStore started with capacity 265.9 MB 14/09/23 14:20:43 INFO BlockManagerMaster: Trying to register BlockManager 14/09/23 14:20:43 INFO BlockManagerMasterActor: Registering block manager votlbdcd09.tms.toyota.com:48469 with 265.9 MB RAM 14/09/23 14:20:43 INFO BlockManagerMaster: Registered BlockManager 14/09/23 14:20:43 INFO HttpFileServer: HTTP File server directory is /tmp/spark-888c359a-5a2a-4aaa-80e3-8009cdfa25c8 14/09/23 14:20:43 INFO HttpServer: Starting HTTP Server 14/09/23 14:20:43 INFO Utils: Successfully started service 'HTTP file server' on port 12470. 14/09/23 14:20:43 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/09/23 14:20:43 INFO SparkUI: Started SparkUI at http://votlbdcd09.tms.toyota.com:4040 14/09/23 14:20:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/23 14:20:44 INFO EventLoggingListener: Logging events to file:/user/spark/applicationHistory//spark-shell-1411507243973 14/09/23 14:20:44 INFO AppClient$ClientActor: Connecting to master spark://votlbdcd01.tms.toyota.com:7077... 14/09/23 14:20:44 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 14/09/23 14:20:44 INFO SparkILoop: Created spark context.. 14/09/23 14:20:44 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140923142044-0006 Spark context available as sc. scala 14/09/23 14:21:26 INFO AppClient$ClientActor: Executor added: app-20140923142044-0006/0 on worker-20140923084845-votlbdcd03.tms.toyota.com-7078 (votlbdcd03.tms.toyota.com:7078) with 8 cores 14/09/23 14:21:26 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140923142044-0006/0 on hostPort votlbdcd03
Re: Unable to find proto buffer class error with RDDprotobuf
Well it looks like this is indeed a protobuf issue. Poked a little more with Kryo. Since protobuf messages are serializable, I tried just making Kryo use the JavaSerializer for my messages. The resulting stack trace made it look like protobuf GeneratedMessageLite is actually using the classloader that loaded it, which I believe would be the root loader? * https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529 * See note: http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220 So I guess protobuf java serialization is sensitive to the class loader. I wonder if Kenton ever saw this one coming :) I do have a solution, though (see way below) Here's the code and stack trace: SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(myapp); sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.set(spark.kryo.registrator, MyKryoRegistrator); ... public class MyKryoRegistrator implements KryoRegistrator { public void registerClasses(Kryo kryo) { kryo.register(MyProtoMessage.class, new JavaSerializer()); } } ... 14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2) com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
Re: Unable to find proto buffer class error with RDDprotobuf
Derp, one caveat to my solution: I guess Spark doesn't use Kryo for Function serde :( On Fri, Sep 19, 2014 at 12:44 AM, Paul Wais pw...@yelp.com wrote: Well it looks like this is indeed a protobuf issue. Poked a little more with Kryo. Since protobuf messages are serializable, I tried just making Kryo use the JavaSerializer for my messages. The resulting stack trace made it look like protobuf GeneratedMessageLite is actually using the classloader that loaded it, which I believe would be the root loader? * https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529 * See note: http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220 So I guess protobuf java serialization is sensitive to the class loader. I wonder if Kenton ever saw this one coming :) I do have a solution, though (see way below) Here's the code and stack trace: SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(myapp); sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.set(spark.kryo.registrator, MyKryoRegistrator); ... public class MyKryoRegistrator implements KryoRegistrator { public void registerClasses(Kryo kryo) { kryo.register(MyProtoMessage.class, new JavaSerializer()); } } ... 14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2) com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method
Unable to find proto buffer class error with RDDprotobuf
Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala and unfortunately I haven't been able to grok exactly what Spark is doing with the remote/local JVMs. Cheers, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Exception
All: I am putting Spark SQL 1.1 through its paces (in a POC) and have been pleasantly surprised with what can be done with such a young technology.I have run into an exception (listed below) that I suspect relates to the number of columns in the table I am querying. There are 336 columns in the table. I have included the Scala / Spark SQL I am running. This Spark SQL code runs just fine when run against narrower tables. Also, we have purpose built this POC cluster with lots of memory and we have set up Impala and Spark SQL with roughly the same amounts of memory. There are 7 worker nodes with 20GB memory for Impala and Spark SQL each. We are using Impala as a comparative benchmark and sanity check. The equivalent SQL runs just fine in Impala (see below). I am a bit of a noob and any help (even with the code below) is greatly appreciated. Also, is there a document that lists current Spark SQL limitations/issues? Paul Magid Toyota Motor Sales IS Enterprise Architecture (EA) Architect I RD Ph: 310-468-9091 (X69091) PCN 1C2970, Mail Drop PN12 Successful Result In Impala + ++ | marital_status | ++ | M | | S | | U | | null | ++ Returned 4 row(s) in 0.91s Code + //Timer code def time[R](block: = R): R = { val t0 = System.nanoTime() val result = block// call-by-name val t1 = System.nanoTime() println(Elapsed time: + (t1 - t0).toFloat/10 + s) result } //Declare and import SQLContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ //Load Parquet file into a table val parquetFile_db2 = sqlContext.parquetFile(hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/) parquetFile_db2.registerAsTable(customer_demographic_pq) //Run SQL code with timer val records= time {sql(select marital_status from customer_demographic_pq group by marital_status order by marital_status ).collect().foreach(println)} Exception + 14/09/18 08:50:39 INFO SparkContext: Job finished: RangePartitioner at Exchange.scala:79, took 21.885859255 s org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: Sort [marital_status#9 ASC], true Exchange (RangePartitioning [marital_status#9 ASC], 200) Aggregate false, [marital_status#9], [marital_status#9] Exchange (HashPartitioning [marital_status#9], 200) Aggregate true, [marital_status#9], [marital_status#9] ParquetTableScan [marital_status#9], (ParquetRelation hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), [] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcV$sp(console:19) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:19) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:19) at $iwC$$iwC$$iwC$$iwC.time(console:12) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:19) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC.init(console:30) at $iwC.init(console:32) at init(console:34) at .init(console:38) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala
RE: Spark SQL Exception
Michael: Thanks for the quick response. I can confirm that once I removed the “order by” clause the exception below went away. So, I believe this confirms what you were say and I will be opening a new feature request in JIRA. However, that exception was replaced by a java.lang.OutOfMemoryError: Java heap space error. I am guessing this relates to any of the following Issues: SPARK-2902 Change default options to be more agressive (In memory columnar compression) SPARK-3056 Sort-based Aggregation (SparkSQL only support the hash-based aggregation, which may cause OOM if too many identical keys in the input tuples.) SPARK-2926 Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle The Exception is included below. Paul Magid Toyota Motor Sales IS Enterprise Architecture (EA) Architect I RD Ph: 310-468-9091 (X69091) PCN 1C2970, Mail Drop PN12 Exception + 14/09/18 10:11:03 INFO TaskSetManager: Finished task 36.0 in stage 0.0 (TID 57) in 18681 ms on votlbdcd04.tms.toyota.com (5/200) 14/09/18 10:11:09 ERROR Utils: Uncaught exception in thread Result resolver thread-0 java.lang.OutOfMemoryError: Java heap space Exception in thread Result resolver thread-0 14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. java.lang.OutOfMemoryError: Java heap space 14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/09/18 10:11:09 INFO Remoting: Remoting shut down 14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) scala 14/09/18 10:11:09 INFO TaskSetManager: Finished task 50.0 in stage 0.0 (TID 71) in 27100 ms on votlbdcd04.tms.toyota.com (6/200) 14/09/18 10:11:10 INFO TaskSetManager: Finished task 22.0 in stage 0.0 (TID 43) in 27520 ms on votlbdcd04.tms.toyota.com (7/200) 14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438) 14/09/18 10:11:10 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@7a542d34 14/09/18 10:11:10 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@7a542d34 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438) 14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438) 14/09/18 10:11:10 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438) 14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438) 14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438) 14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd06.tms.toyota.com,19998) 14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId
Re: Unable to find proto buffer class error with RDDprotobuf
Well, it looks like Spark is just not loading my code into the driver/executors E.g.: ListString foo = JavaRDDMyMessage bars.map( new Function MyMessage, String() { { System.err.println(classpath: + System.getProperty(java.class.path)); CodeSource src = com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); if (src2 != null) { URL jar = src2.getLocation(); System.err.println(aaacom.google.protobuf.GeneratedMessageLite from jar: + jar.toString()); } @Override public String call(MyMessage v1) throws Exception { return v1.getString(); } }).collect(); prints: classpath: ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar com.google.protobuf.GeneratedMessageLite from jar: file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar I do see after those lines: 14/09/18 23:28:09 INFO Executor: Adding file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class loader This is with: spart-submit --master local --class MyClass --jars uber.jar uber.jar My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 should use protobuf 2.5[1] and even shade it properly. I read claims in this list that Spark shades protobuf correctly since 0.9.? and looking thru the pom.xml on github it looks like Spark includes protobuf 2.5 in the hadoop 2.3 profile. I guess I'm still at What's the deal with getting Spark to distribute and load code from my jar correctly? [1] http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote: Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala and unfortunately I haven't been able to grok exactly what Spark is doing with the remote/local JVMs. Cheers, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find proto buffer class error with RDDprotobuf
Ah, can one NOT create an RDD of any arbitrary Serializable type? It looks like I might be getting bitten by the same java.io.ObjectInputStream uses root class loader only bugs mentioned in: * http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html * https://github.com/apache/spark/pull/181 * http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com wrote: Well, it looks like Spark is just not loading my code into the driver/executors E.g.: ListString foo = JavaRDDMyMessage bars.map( new Function MyMessage, String() { { System.err.println(classpath: + System.getProperty(java.class.path)); CodeSource src = com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); if (src2 != null) { URL jar = src2.getLocation(); System.err.println(aaacom.google.protobuf.GeneratedMessageLite from jar: + jar.toString()); } @Override public String call(MyMessage v1) throws Exception { return v1.getString(); } }).collect(); prints: classpath: ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar com.google.protobuf.GeneratedMessageLite from jar: file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar I do see after those lines: 14/09/18 23:28:09 INFO Executor: Adding file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class loader This is with: spart-submit --master local --class MyClass --jars uber.jar uber.jar My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 should use protobuf 2.5[1] and even shade it properly. I read claims in this list that Spark shades protobuf correctly since 0.9.? and looking thru the pom.xml on github it looks like Spark includes protobuf 2.5 in the hadoop 2.3 profile. I guess I'm still at What's the deal with getting Spark to distribute and load code from my jar correctly? [1] http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote: Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala and unfortunately I haven't been able to grok exactly what Spark is doing with the remote/local JVMs. Cheers, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find proto buffer class error with RDDprotobuf
hmm would using kyro help me here? On Thursday, September 18, 2014, Paul Wais pw...@yelp.com wrote: Ah, can one NOT create an RDD of any arbitrary Serializable type? It looks like I might be getting bitten by the same java.io.ObjectInputStream uses root class loader only bugs mentioned in: * http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html * https://github.com/apache/spark/pull/181 * http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com javascript:; wrote: Well, it looks like Spark is just not loading my code into the driver/executors E.g.: ListString foo = JavaRDDMyMessage bars.map( new Function MyMessage, String() { { System.err.println(classpath: + System.getProperty(java.class.path)); CodeSource src = com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); if (src2 != null) { URL jar = src2.getLocation(); System.err.println(aaacom.google.protobuf.GeneratedMessageLite from jar: + jar.toString()); } @Override public String call(MyMessage v1) throws Exception { return v1.getString(); } }).collect(); prints: classpath: ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar com.google.protobuf.GeneratedMessageLite from jar: file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar I do see after those lines: 14/09/18 23:28:09 INFO Executor: Adding file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class loader This is with: spart-submit --master local --class MyClass --jars uber.jar uber.jar My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 should use protobuf 2.5[1] and even shade it properly. I read claims in this list that Spark shades protobuf correctly since 0.9.? and looking thru the pom.xml on github it looks like Spark includes protobuf 2.5 in the hadoop 2.3 profile. I guess I'm still at What's the deal with getting Spark to distribute and load code from my jar correctly? [1] http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com javascript:; wrote: Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala and unfortunately I haven't been able to grok exactly what Spark is doing with the remote/local JVMs. Cheers, -Paul
Re: Unable to find proto buffer class error with RDDprotobuf
). I think the root problem might be related to this change: https://github.com/apache/spark/commit/cc3648774e9a744850107bb187f2828d447e0a48#diff-7b43397a89d8249663cbd13374a48db0R42 That change did not appear to touch ParallelCollectionRDD, which I believe is using the root classloader (would explain why --driver-class-path fixes the problem): https://github.com/apache/spark/blob/2f9b2bd7844ee8393dc9c319f4fefedf95f5e460/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L74 If uber.jar is on the classpath, then the root classloader would have the code, hence why --driver-class-path fixes the bug. On Thu, Sep 18, 2014 at 5:42 PM, Paul Wais pw...@yelp.com wrote: hmm would using kyro help me here? On Thursday, September 18, 2014, Paul Wais pw...@yelp.com wrote: Ah, can one NOT create an RDD of any arbitrary Serializable type? It looks like I might be getting bitten by the same java.io.ObjectInputStream uses root class loader only bugs mentioned in: * http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html * https://github.com/apache/spark/pull/181 * http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com wrote: Well, it looks like Spark is just not loading my code into the driver/executors E.g.: ListString foo = JavaRDDMyMessage bars.map( new Function MyMessage, String() { { System.err.println(classpath: + System.getProperty(java.class.path)); CodeSource src = com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); if (src2 != null) { URL jar = src2.getLocation(); System.err.println(aaacom.google.protobuf.GeneratedMessageLite from jar: + jar.toString()); } @Override public String call(MyMessage v1) throws Exception { return v1.getString(); } }).collect(); prints: classpath: ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar com.google.protobuf.GeneratedMessageLite from jar: file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar I do see after those lines: 14/09/18 23:28:09 INFO Executor: Adding file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class loader This is with: spart-submit --master local --class MyClass --jars uber.jar uber.jar My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 should use protobuf 2.5[1] and even shade it properly. I read claims in this list that Spark shades protobuf correctly since 0.9.? and looking thru the pom.xml on github it looks like Spark includes protobuf 2.5 in the hadoop 2.3 profile. I guess I'm still at What's the deal with getting Spark to distribute and load code from my jar correctly? [1] http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote: Dear List, I'm writing an application where I have RDDs of protobuf messages. When I run the app via bin/spar-submit with --master local --driver-class-path path/to/my/uber.jar, Spark is able to ser/deserialize the messages correctly. However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I try --master spark://my.master:7077 , then I run into errors that make it look like my protobuf message classes are not on the classpath: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.RuntimeException: Unable to find proto buffer class com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) ... Why do I need --driver-class-path in the local scenario? And how can I ensure my classes are on the classpath no matter how my app is submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) ? I've tried poking through the shell scripts and SparkSubmit.scala
Re: Spark 1.1 / cdh4 stuck using old hadoop client?
Thanks Christian! I tried compiling from source but am still getting the same hadoop client version error when reading from HDFS. Will have to poke deeper... perhaps I've got some classpath issues. FWIW I compiled using: $ MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m mvn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package and hadoop 2.3 / cdh5 from http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz On Mon, Sep 15, 2014 at 6:49 PM, Christian Chua cc8...@icloud.com wrote: Hi Paul. I would recommend building your own 1.1.0 distribution. ./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests I downloaded the Pre-build for Hadoop 2.4 binary, and it had this strange behavior where spark-submit --master yarn-cluster ... will work, but spark-submit --master yarn-client ... will fail. But on the personal build obtained from the command above, both will then work. -Christian On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote: Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1 / cdh4 stuck using old hadoop client?
Hi Sean, Great catch! Yes I was including Spark as a dependency and it was making its way into my uber jar. Following the advice I just found at Stackoverflow[1], I marked Spark as a provided dependency and that appeared to fix my Hadoop client issue. Thanks for your help!!! Perhaps they maintainers might consider setting this in the Quickstart guide pom.xml ( http://spark.apache.org/docs/latest/quick-start.html ) In summary, here's what worked: * Hadoop 2.3 cdh5 http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz * Spark 1.1 for Hadoop 2.3 http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.3.tgz pom.xml snippets: https://gist.github.com/ypwais/ff188611d4806aa05ed9 [1] http://stackoverflow.com/questions/24747037/how-to-define-a-dependency-scope-in-maven-to-include-a-library-in-compile-run Thanks everybody!! -Paul On Tue, Sep 16, 2014 at 3:55 AM, Sean Owen so...@cloudera.com wrote: From the caller / application perspective, you don't care what version of Hadoop Spark is running on on the cluster. The Spark API you compile against is the same. When you spark-submit the app, at runtime, Spark is using the Hadoop libraries from the cluster, which are the right version. So when you build your app, you mark Spark as a 'provided' dependency. Therefore in general, no, you do not build Spark for yourself if you are a Spark app creator. (Of course, your app would care if it were also using Hadoop libraries directly. In that case, you will want to depend on hadoop-client, and the right version for your cluster, but still mark it as provided.) The version Spark is built against only matters when you are deploying Spark's artifacts on the cluster to set it up. Your error suggests there is still a version mismatch. Either you deployed a build that was not compatible, or, maybe you are packaging a version of Spark with your app which is incompatible and interfering. For example, the artifacts you get via Maven depend on Hadoop 1.0.4. I suspect that's what you're doing -- packaging Spark(+Hadoop1.0.4) with your app, when it shouldn't be packaged. Spark works out of the box with just about any modern combo of HDFS and YARN. On Tue, Sep 16, 2014 at 2:28 AM, Paul Wais pw...@yelp.com wrote: Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.1 / cdh4 stuck using old hadoop client?
Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: increase parallelism of reading from hdfs
Hi Chen, You need to set the max input split size so that the underlying hadoop libraries will calculate the splits appropriately. I have done the following successfully: val job = new Job() FileInputFormat.setMaxInputSplitSize(job, 12800L) And then use job.getConfiguration when creating a NewHadoopRDD. I am sure there is some way to use it with convenience methods like SparkContext.textFile, you could probably set the system property mapreduce.input.fileinputformat.split.maxsize. Regards, Paul Hamilton From: Chen Song chen.song...@gmail.com Date: Friday, August 8, 2014 at 9:13 PM To: user@spark.apache.org user@spark.apache.org Subject: increase parallelism of reading from hdfs In Spark Streaming, StreamContext.fileStream gives a FileInputDStream. Within each batch interval, it would launch map tasks for the new files detected during that interval. It appears that the way Spark compute the number of map tasks is based oo block size of files. Below is the quote from Spark documentation. Spark automatically sets the number of ³map² tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile, etc) In my testing, if files are loaded as 512M blocks, each map task seems to process 512M chunk of data, no matter what value I set dfs.blocksize on driver/executor. I am wondering if there is a way to increase parallelism, say let each map read 128M data and increase the number of map tasks? -- Chen Song - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to read a multipart s3 file?
darkjh wrote But in my experience, when reading directly from s3n, spark create only 1 input partition per file, regardless of the file size. This may lead to some performance problem if you have big files. This is actually not true, Spark uses the underlying hadoop input formats to read the files so if the input format you are using supports splittable files (text, avro etc.) then it can use multiple splits per file (leading to multiple map tasks per file). You do have to set the max input split size, as an example: FileInputFormat.setMaxInputSplitSize(job, 25600L) In this case any file larger than 256,000,000 bytes is split. If you don't explicitly set it the limit is infinite which leads to the behavior you are seeing where it is 1 split per file. Regards, Paul Hamilton -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p11673.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Release date for new pyspark
Thanks all! (And thanks Matei for the developer link!) I was able to build using maven[1] but `./sbt/sbt assembly` results in build errors. (Not familiar enough with the build to know why; in the past sbt worked for me and maven did not). I was able to run the master version of pyspark, which was what I wanted, though I discovered a bug when trying to read spark-pickled data from HDFS. (Looks similar to https://spark-project.atlassian.net/browse/SPARK-1034 from my naive point of view). For the curious: Code: conf = SparkConf() conf.set('spark.local.dir', '/nail/tmp') conf.set('spark.executor.memory', '28g') conf.set('spark.app.name', 'test') sc = SparkContext(conf=conf) sc.parallelize(range(10)).saveAsPickleFile(hdfs://host:9000/test_pickle) unpickled_rdd = sc.pickleFile(hdfs://host:9000/test_pickle) print unpickled_rdd.takeSample(False, 3) Traceback (most recent call last): File /path/to/my/home/spark-master/tast.py, line 33, in module print unpickled_rdd.takeSample(False, 3) File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 391, in takeSample initialCount = self.count() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 791, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 782, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 703, in reduce vals = self.mapPartitions(func).collect() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 667, in collect bytesInJava = self._jrdd.collect().iterator() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 1600, in _jrdd class_tag) File /path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 669, in __call__ File /path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 304, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.org.apache.spark.api.python.PythonRDD. Trace: py4j.Py4JException: Constructor org.apache.spark.api.python.PythonRDD([class org.apache.spark.rdd.FlatMappedRDD, class [B, class java.util.HashMap, class java.util.ArrayList, class java.lang.Boolean, class java.lang.String, class java.util.ArrayList, class org.apache.spark.Accumulator, class scala.reflect.ManifestFactory$$anon$2]) does not exist at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:184) at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:202) at py4j.Gateway.invoke(Gateway.java:213) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:662) [1] mvn -Phadoop-2.3 -Dhadoop.verson=2.3.0 -DskipTests clean package On Wed, Jul 16, 2014 at 8:39 PM, Michael Armbrust mich...@databricks.com wrote: You should try cleaning and then building. We have recently hit a bug in the scala compiler that sometimes causes non-clean builds to fail. On Wed, Jul 16, 2014 at 7:56 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Yeah, we try to have a regular 3 month release cycle; see https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage for the current window. Matei On Jul 16, 2014, at 4:21 PM, Mark Hamstra m...@clearstorydata.com wrote: You should expect master to compile and run: patches aren't merged unless they build and pass tests on Jenkins. You shouldn't expect new features to be added to stable code in maintenance releases (e.g. 1.0.1). AFAIK, we're still on track with Spark 1.1.0 development, which means that it should be released sometime in the second half of next month (or shortly thereafter). On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote: Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais
Release date for new pyspark
Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais
Re: Recommended pipeline automation tool? Oozie?
We use Luigi for this purpose. (Our pipelines are typically on AWS (no EMR) backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and Spark. We run Spark jobs by connecting drivers/clients to the master, and those are what is invoked from Luigi.) — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote: I'm just wondering what's the general recommendation for data pipeline automation. Say, I want to run Spark Job A, then B, then invoke script C, then do D, and if D fails, do E, and if Job A fails, send email F, etc... It looks like Oozie might be the best choice. But I'd like some advice/suggestions. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jackson-core-asl jar (1.8.8 vs 1.9.x) conflict with the spark-sql (version 1.x)
Hi, Mans -- Both of those versions of Jackson are pretty ancient. Do you know which of the Spark dependencies is pulling them in? It would be good for us (the Jackson, Woodstox, etc., folks) to see if we can get people to upgrade to more recent versions of Jackson. -- Paul — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Fri, Jun 27, 2014 at 12:58 PM, M Singh mans6si...@yahoo.com wrote: Hi: I am using spark to stream data to cassandra and it works fine in local mode. But when I execute the application in a standalone clustered env I got exception included below (java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass). I think this is due to the jackson-core-asl dependency conflict (jackson-core-asl 1.8.8 has the JsonClass but 1.9.x does not). The 1.9.x version is being pulled in by spark-sql project. I tried adding jackson-core-asl 1.8.8 with --jars argument while submitting the application for execution but it did not work. So I created a custom spark build excluding sql project. With this custom spark install I was able to resolve the issue at least on a single node cluster (separate master and worker). If there is an alternate way to resolve this conflicting jar issue without a custom build (eg: configuration to use the user defined jars in the executor class path first), please let me know. Also, is there a comprehensive list of configuration properties available for spark ? Thanks Mans Exception trace TaskSetManager: Loss was due to java.lang.NoClassDefFoundError java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass at org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:229) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:386) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(StdDeserializerProvider.java:136) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeserializer(StdDeserializerProvider.java:157) at org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:2468) at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402) at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1602)
Re: Upgrading to Spark 1.0.0 causes NoSuchMethodError
Hi, Robert -- I wonder if this is an instance of SPARK-2075: https://issues.apache.org/jira/browse/SPARK-2075 -- Paul — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Wed, Jun 25, 2014 at 6:28 AM, Robert James srobertja...@gmail.com wrote: On 6/24/14, Robert James srobertja...@gmail.com wrote: My app works fine under Spark 0.9. I just tried upgrading to Spark 1.0, by downloading the Spark distro to a dir, changing the sbt file, and running sbt assembly, but I get now NoSuchMethodErrors when trying to use spark-submit. I copied in the SimpleApp example from http://spark.apache.org/docs/latest/quick-start.html and get the same error: $/usr/local/share/spark/bin/spark-submit --class SimpleApp target/scala-2.10/myproj-assembly-1.0.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.SparkContext$.$lessinit$greater$default$2()Lscala/collection/Map; at SimpleApp$.main(SimpleApp.scala:10) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) How can I migrate to Spark 1.0.0? I've done `sbt clean`, deleted the entire ivy2 cache, and still get the above error on both my code and the official Spark example. Can anyone guide me on how to debug this? How does Spark find the /usr/local/share/spark directory? Is there a variable somewhere I need to set to point to that, or that might point to the old spark? I've left the old spark dir on the machine (just changed the symlink) - can that be causing problems? How should I approach this?
Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0
Moving over to the dev list, as this isn't a user-scope issue. I just ran into this issue with the missing saveAsTestFile, and here's a little additional information: - Code ported from 0.9.1 up to 1.0.0; works with local[n] in both cases. - Driver built as an uberjar via Maven. - Deployed to smallish EC2 cluster in standalone mode (S3 storage) with Spark 1.0.0-hadoop1 downloaded from Apache. Given that it functions correctly in local mode but not in a standalone cluster, this suggests to me that the issue is in a difference between the Maven version and the hadoop1 version. In the spirit of taking the computer at its word, we can just have a look in the JAR files. Here's what's in the Maven dep as of 1.0.0: jar tvf ~/.m2/repository/org/apache/spark/spark-core_2.10/1.0.0/spark-core_2.10-1.0.0.jar | grep 'rdd/RDD' | grep 'saveAs' 1519 Mon May 26 13:57:58 PDT 2014 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$1.class 1560 Mon May 26 13:57:58 PDT 2014 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$2.class And here's what's in the hadoop1 distribution: jar tvf spark-assembly-1.0.0-hadoop1.0.4.jar| grep 'rdd/RDD' | grep 'saveAs' I.e., it's not there. It is in the hadoop2 distribution: jar tvf spark-assembly-1.0.0-hadoop2.2.0.jar| grep 'rdd/RDD' | grep 'saveAs' 1519 Mon May 26 07:29:54 PDT 2014 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$1.class 1560 Mon May 26 07:29:54 PDT 2014 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$2.class So something's clearly broken with the way that the distribution assemblies are created. FWIW and IMHO, the right way to publish the hadoop1 and hadoop2 flavors of Spark to Maven Central would be as *entirely different* artifacts (spark-core-h1, spark-core-h2). Logged as SPARK-2075 https://issues.apache.org/jira/browse/SPARK-2075. Cheers. -- Paul — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Fri, Jun 6, 2014 at 2:45 AM, HenriV henri.vanh...@vdab.be wrote: I'm experiencing the same error while upgrading from 0.9.1 to 1.0.0. Im using google compute engine and cloud storage. but saveAsTextFile is returning errors while saving in the cloud or saving local. When i start a job in the cluster i do get an error but after this error it keeps on running fine untill the saveAsTextFile. ( I don't know if the two are connected) ---Error at job startup--- ERROR metrics.MetricsSystem: Sink class org.apache.spark.metrics.sink.MetricsServlet cannot be instantialized java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:136) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:130) at org.apache.spark.metrics.MetricsSystem.init(MetricsSystem.scala:84) at org.apache.spark.metrics.MetricsSystem$.createMetricsSystem(MetricsSystem.scala:167) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at Hello$.main(Hello.scala:101) at Hello.main(Hello.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at sbt.Run.invokeMain(Run.scala:72) at sbt.Run.run0(Run.scala:65) at sbt.Run.sbt$Run$$execute$1(Run.scala:54) at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:58) at sbt.Run$$anonfun$run$1.apply(Run.scala:58) at sbt.Run$$anonfun$run$1.apply(Run.scala:58) at sbt.Logger$$anon$4.apply(Logger.scala:90) at sbt.TrapExit$App.run(TrapExit.scala:244) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonFactory.requiresPropertyOrdering()Z
Unexpected results when caching data
I have been experimenting with a data set with and without persisting the RDD and have come across some unexpected results. The files we are reading are Avro files so we are using the following to define the RDD, what we end up with is a RDD[CleansedLogFormat]: val f = new NewHadoopRDD(sc, classOf[AvroKeyInputFormat[CleansedLogFormat]], classOf[AvroKey[CleansedLogFormat]], classOf[NullWritable], job.getConfiguration).map(_._1.datum()) f.count() = 110268763 f.persist(StorageLevel.MEMORY_AND_DISK).count() = 110268763 So far so good. Both the persisted and non-persisted RDDs return the same results for the count. Where things get weird is when I try and do some reduce by key or other grouping operations. Something like: f.map(record = (record.getProviderId.toString, record)).join(bandwidthKv).map { pair = val hour = new DateTime(pair._2._1.getTimeStamp).toString(MMddHH) (hour, Set(pair._2._1.getGuid)) }.reduceByKey(_ ++ _).collect().foreach { a = println(a._1 + : + a._2.size)} We then get different results from the non-persisted vs. the persisted version. Non-persisted: 2014050917: 7 2014050918: 42 Persisted: 2014050917: 7 2014050918: 12 Any idea what could account for the differences? BTW I am using Spark 0.9.1. Thanks, Paul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unexpected-results-when-caching-data-tp5619.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: missing method in my slf4j after excluding Spark ZK log4j
Hi, Adrian -- If my memory serves, you need 1.7.7 of the various slf4j modules to avoid that issue. Best. -- Paul — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Mon, May 12, 2014 at 7:51 AM, Adrian Mocanu amoc...@verticalscope.comwrote: Hey guys, I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j dependency and was told that it was gone. However I still find it part of zookeeper imports. This is fine since I exclude it myself in the sbt file, but another issue arises. I wonder if anyone else has run into this. Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 1.2.17 because I get missing method error: java.lang.NoSuchMethodError: org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at scala.Option.map(Option.scala:145) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76) ... Is there a way to find out what versions of slf4j I need to make it work with log4j 1.2.17? -Adrian
CDH 5.0 and Spark 0.9.0
Hello, So I was unable to run the following commands from the spark shell with CDH 5.0 and spark 0.9.0, see below. Once I removed the property property nameio.compression.codec.lzo.class/name valuecom.hadoop.compression.lzo.LzoCodec/value finaltrue/final /property from the core-site.xml on the node, the spark commands worked. Is there a specific setup I am missing? scala var log = sc.textFile(hdfs://jobs-ab-hnn1//input/core-site.xml) 14/04/30 22:43:16 INFO MemoryStore: ensureFreeSpace(78800) called with curMem=150115, maxMem=308713881 14/04/30 22:43:16 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 77.0 KB, free 294.2 MB) 14/04/30 22:43:16 WARN Configuration: mapred-default.xml:an attempt to override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring. 14/04/30 22:43:16 WARN Configuration: yarn-site.xml:an attempt to override final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring. 14/04/30 22:43:16 WARN Configuration: hdfs-site.xml:an attempt to override final parameter: mapreduce.map.output.compress.codec; Ignoring. log: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at console:12 scala log.count() 14/04/30 22:43:03 WARN JobConf: The variable mapred.child.ulimit is no longer used. 14/04/30 22:43:04 WARN Configuration: mapred-default.xml:an attempt to override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring. 14/04/30 22:43:04 WARN Configuration: yarn-site.xml:an attempt to override final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring. 14/04/30 22:43:04 WARN Configuration: hdfs-site.xml:an attempt to override final parameter: mapreduce.map.output.compress.codec; Ignoring. java.lang.IllegalArgumentException: java.net.UnknownHostException: jobs-a-hnn1 at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:576) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:521) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.SparkContext.runJob(SparkContext.scala:902) at org.apache.spark.rdd.RDD.count(RDD.scala:720) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935) at
Can't run a simple spark application with 0.9.1
Hello, Currently I deployed 0.9.1 spark using a new way of starting up spark exec start-stop-daemon --start --pidfile /var/run/spark.pid --make-pidfile --chuid ${SPARK_USER}:${SPARK_GROUP} --chdir ${SPARK_HOME} --exec /usr/bin/java -- -cp ${CLASSPATH} -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10111 -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -XX:ReservedCodeCacheSize=512M -XX:+UseCodeCacheFlushing -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -Dspark.executor.memory=10G -Xmx10g ${MAIN_CLASS} ${MAIN_CLASS_ARGS} where class path points to the spark jar that we compile with sbt. When I try to run a job I receive the following warning WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory My first question is do I need the entire spark project on disk in order to run jobs? Or what else am I doing wrong?
Re: Can't run a simple spark application with 0.9.1
I am a dork please disregard this issue. I did not have the slaves correctly configured. This error is very misleading On Tue, Apr 15, 2014 at 11:21 AM, Paul Schooss paulmscho...@gmail.comwrote: Hello, Currently I deployed 0.9.1 spark using a new way of starting up spark exec start-stop-daemon --start --pidfile /var/run/spark.pid --make-pidfile --chuid ${SPARK_USER}:${SPARK_GROUP} --chdir ${SPARK_HOME} --exec /usr/bin/java -- -cp ${CLASSPATH} -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10111 -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -XX:ReservedCodeCacheSize=512M -XX:+UseCodeCacheFlushing -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -Dspark.executor.memory=10G -Xmx10g ${MAIN_CLASS} ${MAIN_CLASS_ARGS} where class path points to the spark jar that we compile with sbt. When I try to run a job I receive the following warning WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory My first question is do I need the entire spark project on disk in order to run jobs? Or what else am I doing wrong?
JMX with Spark
Has anyone got this working? I have enabled the properties for it in the metrics.conf file and ensure that it is placed under spark's home directory. Any ideas why I don't see spark beans ?
Shutdown with streaming driver running in cluster broke master web UI permanently
I had a cluster running with a streaming driver deployed into it. I shut down the cluster using sbin/stop-all.sh. Upon restarting (and restarting, and restarting), the master web UI cannot respond to requests. The cluster seems to be otherwise functional. Below is the master's log, showing stack traces. pmogren@streamproc01:~/streamproc/spark-0.9.1-bin-hadoop2$ cat /home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/sbin/../logs/spark-pmogren-org.apache.spark.deploy.master.Master-1-streamproc01.outSpark Command: /usr/lib/jvm/java-8-oracle-amd64/bin/java -cp :/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/conf:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m -Dspark.streaming.unpersist=true -Djava.net.preferIPv4Stack=true -Dsun.io.serialization.extendedDebugInfo=true -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=pubsub01:2181 org.apache.spark.deploy.master.Master --ip 10.10.41.19 --port 7077 --webui-port 8080 log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/04/11 16:07:55 INFO Master: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/04/11 16:07:55 INFO Master: Starting Spark master at spark://10.10.41.19:7077 14/04/11 16:07:55 INFO MasterWebUI: Started Master web UI at http://10.10.41.19:8080 14/04/11 16:07:55 INFO Master: Persisting recovery state to ZooKeeper 14/04/11 16:07:55 INFO ZooKeeper: Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT 14/04/11 16:07:55 INFO ZooKeeper: Client environment:host.name=streamproc01.nexus.commercehub.com 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.version=1.8.0 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.vendor=Oracle Corporation 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.home=/usr/lib/jvm/jdk1.8.0/jre 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.class.path=:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/conf:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.library.path= 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp 14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.compiler=NA 14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.name=Linux 14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.arch=amd64 14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.version=3.5.0-23-generic 14/04/11 16:07:55 INFO ZooKeeper: Client environment:user.name=pmogren 14/04/11 16:07:55 INFO ZooKeeper: Client environment:user.home=/home/pmogren 14/04/11 16:07:55 INFO ZooKeeper: Client environment:user.dir=/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2 14/04/11 16:07:55 INFO ZooKeeper: Initiating client connection, connectString=pubsub01:2181 sessionTimeout=3 watcher=org.apache.spark.deploy.master.SparkZooKeeperSession$ZooKeeperWatcher@744bfbb6 14/04/11 16:07:55 INFO ZooKeeperLeaderElectionAgent: Starting ZooKeeper LeaderElection agent 14/04/11 16:07:55 INFO ZooKeeper: Initiating client connection, connectString=pubsub01:2181 sessionTimeout=3 watcher=org.apache.spark.deploy.master.SparkZooKeeperSession$ZooKeeperWatcher@7f7e6043 14/04/11 16:07:55 INFO ClientCnxn: Opening socket connection to server pubsub01.nexus.commercehub.com/10.10.40.39:2181. Will not attempt to authenticate using SASL (unknown error) 14/04/11 16:07:55 INFO ClientCnxn: Socket connection established to pubsub01.nexus.commercehub.com/10.10.40.39:2181, initiating session 14/04/11 16:07:55 INFO ClientCnxn: Opening socket connection to server pubsub01.nexus.commercehub.com/10.10.40.39:2181. Will not attempt to authenticate using SASL (unknown error) 14/04/11 16:07:55 WARN ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable 14/04/11 16:07:55 INFO ClientCnxn: Session establishment complete on server pubsub01.nexus.commercehub.com/10.10.40.39:2181, sessionid = 0x14515d9a11300ce, negotiated timeout = 3 14/04/11 16:07:55 INFO ClientCnxn: Socket connection established to pubsub01.nexus.commercehub.com/10.10.40.39:2181, initiating session 14/04/11 16:07:55 WARN ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable 14/04/11 16:07:55 INFO ClientCnxn: Session establishment complete on server pubsub01.nexus.commercehub.com/10.10.40.39:2181, sessionid = 0x14515d9a11300cf, negotiated timeout = 3 14/04/11 16:07:55 WARN ZooKeeperLeaderElectionAgent: Cleaning up old ZK master election file that points to this master. 14/04/11 16:07:55 INFO ZooKeeperLeaderElectionAgent: Leader
CheckpointRDD has different number of partitions than original RDD
Hello, Spark community! My name is Paul. I am a Spark newbie, evaluating version 0.9.0 without any Hadoop at all, and need some help. I run into the following error with the StatefulNetworkWordCount example (and similarly in my prototype app, when I use the updateStateByKey operation). I get this when running against my small cluster, but not (so far) against local[2]. 61904 [spark-akka.actor.default-dispatcher-2] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1396905956000 ms.0 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take at DStream.scala:586(0) has different number of partitions than original RDD MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:855) at org.apache.spark.SparkContext.runJob(SparkContext.scala:870) at org.apache.spark.SparkContext.runJob(SparkContext.scala:884) at org.apache.spark.rdd.RDD.take(RDD.scala:844) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Please let me know what other information would be helpful; I didn't find any question submission guidelines. Thanks, Paul