Re: Windowed Operations
I also met the same issue. Any updates on this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Windowed-Operations-tp15133p23094.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Create dataframe from saved objectfile RDD
Hi, what is the method to create ddf from an RDD which is saved as objectfile. I don't have a java object but a structtype I want to use as schema for ddf. How to load the objectfile without the object. I tried retrieving as Row val myrdd = sc.objectFile[org.apache.spark.sql.Row](/home/bipin/rawdata/+name) But I get java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to org.apache.spark.sql.Row How to work around this. Is there a better way. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-dataframe-from-saved-objectfile-RDD-tp23095.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL can't read S3 path for hive external table
This sounds like a problem that was fixed in Spark 1.3.1. https://issues.apache.org/jira/browse/SPARK-6351 On Mon, Jun 1, 2015 at 5:44 PM, Akhil Das ak...@sigmoidanalytics.com wrote: This thread http://stackoverflow.com/questions/24048729/how-to-read-input-from-s3-in-a-spark-streaming-ec2-cluster-application has various methods on accessing S3 from spark, it might help you. Thanks Best Regards On Sun, May 24, 2015 at 8:03 AM, ogoh oke...@gmail.com wrote: Hello, I am using Spark1.3 in AWS. SparkSQL can't recognize Hive external table on S3. The following is the error message. I appreciate any help. Thanks, Okehee -- 15/05/24 01:02:18 ERROR thriftserver.SparkSQLDriver: Failed in [select count(*) from api_search where pdate='2015-05-08'] java.lang.IllegalArgumentException: Wrong FS: s3://test-emr/datawarehouse/api_s3_perf/api_search/pdate=2015-05-08/phour=00, expected: hdfs://10.128.193.211:9000 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:647) at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:467) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:252) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:251) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-t-read-S3-path-for-hive-external-table-tp23002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
Hi Antonio, First, what version of the Spark Cassandra Connector are you using? You are using Spark 1.3.1, which the Cassandra connector today supports in builds from the master branch only - the release with public artifacts supporting Spark 1.3.1 is coming soon ;) Please see https://github.com/datastax/spark-cassandra-connector#version-compatibility https://github.com/datastax/spark-cassandra-connector#version-compatibility Try the version change and LMK. What does your cassandra log say? Note that you can read from a Spark stream like Flume, for instance in your flumeStreamNavig.map(..) code (in scala at least, with a lot less code - I have not used java) (here it’s kafka) https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39 And write inline to Cassandra https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64 https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64 Helena tw: @helenaedelson On May 29, 2015, at 6:11 AM, Antonio Giambanco antogia...@gmail.com wrote: Hi all, I have in a single server installed spark 1.3.1 and cassandra 2.0.14 I'm coding a simple java class for Spark Streaming as follow: reading header events from flume sink based on header I write the event body on navigation or transaction table (cassandra) unfortunatly I get NoHostAvailableException, if I comment the code for saving one of the two tables everything works here the code public static void main(String[] args) { // Create a local StreamingContext with two working thread and batch interval of 1 second SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(DWXNavigationApp); conf.set(spark.cassandra.connection.host, 127.0.0.1); conf.set(spark.cassandra.connection.native.port,9042); conf.set(spark.cassandra.output.batch.size.rows, 1); conf.set(spark.cassandra.output.concurrent.writes, 1); final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaReceiverInputDStreamSparkFlumeEvent flumeStreamNavig = FlumeUtils.createPollingStream(jssc, 127.0.0.1, ); JavaDStreamString logRowsNavig = flumeStreamNavig.map( new FunctionSparkFlumeEvent,String(){ @Override public String call(SparkFlumeEvent arg0) throws Exception { // TODO Auto-generated method stub0. MapCharSequence,CharSequence headers = arg0.event().getHeaders(); ByteBuffer bytePayload = arg0.event().getBody(); String s = headers.get(source_log).toString() + # + new String(bytePayload.array()); System.out.println(RIGA: + s); return s; } }); logRowsNavig.foreachRDD( new FunctionJavaRDDString,Void(){ @Override public Void call(JavaRDDString rows) throws Exception { if(!rows.isEmpty()){ //String header = getHeaderFronRow(rows.collect()); ListNavigation listNavigation = new ArrayListNavigation(); ListTransaction listTransaction = new ArrayListTransaction(); for(String row : rows.collect()){ String header = row.substring(0, row.indexOf(#)); if(header.contains(controller_log)){ listNavigation.add(createNavigation(row)); System.out.println(Added Element in Navigation List); }else if(header.contains(business_log)){
Re: Windows of windowed streams not displaying the expected results
Yes, I also met this issue. And wanna check if you fixed this issue or do you have other solution for the same goal. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-tp466p23096.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Don't understand schedule jobs within an Application
Hi, sparks, Following is copied from the spark online document http://spark.apache.org/docs/latest/job-scheduling.html. Basically, I have two questions on it: 1. If two jobs in an application has dependencies, that is one job depends on the result of the other job, then I think they will have to run sequentially. 2. Since jobs scheduling happens within one application, I don't think job scheduing will give benefits to multi-users as the last sentence says.in my opinion, multi users can benifit only from cross applications scheduling. Maybe i haven't had a good understanding on the job scheduing, could someone elaborate this? Thanks very much By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly. Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish. This mode is best for multi-user settings bit1...@163.com
Cassanda example
Hi, I want to write my RDD to Cassandra database and I took an example from this site http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java. I add that to my project but I have errors. Here is my project in gist https://gist.github.com/yaseminn/aba86dad9a3e6d6a03dc. errors : - At line 40 (can not recognize Session) - At line 106 (flatmap is not applicaple) Have a nice day yasemin -- hiç ender hiç
Re: SparkSQL can't read S3 path for hive external table
This thread http://stackoverflow.com/questions/24048729/how-to-read-input-from-s3-in-a-spark-streaming-ec2-cluster-application has various methods on accessing S3 from spark, it might help you. Thanks Best Regards On Sun, May 24, 2015 at 8:03 AM, ogoh oke...@gmail.com wrote: Hello, I am using Spark1.3 in AWS. SparkSQL can't recognize Hive external table on S3. The following is the error message. I appreciate any help. Thanks, Okehee -- 15/05/24 01:02:18 ERROR thriftserver.SparkSQLDriver: Failed in [select count(*) from api_search where pdate='2015-05-08'] java.lang.IllegalArgumentException: Wrong FS: s3://test-emr/datawarehouse/api_s3_perf/api_search/pdate=2015-05-08/phour=00, expected: hdfs://10.128.193.211:9000 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:647) at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:467) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:252) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:251) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-t-read-S3-path-for-hive-external-table-tp23002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: FW: Websphere MQ as a data source for Apache Spark Streaming
Thanks for your suggestion. Yes by Dstream.SaveAsTextFile(); I was doing a mistake by writing StorageLevel.NULL while overriding the storageLevel method in my custom receiver. When I changed it to StorageLevel.MEMORY_AND_DISK_2() , data started to save at disk. Now it’s running without any issue. From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 29, 2015 3:30 AM To: Chaudhary, Umesh Cc: Arush Kharbanda; user@spark.apache.org Subject: Re: FW: Websphere MQ as a data source for Apache Spark Streaming Are you sure that the data can be saved as strings? Another, more controlled approach is use DStream.foreachRDD , which takes a Function2 parameter, RDD and Time. There you can explicitly do stuff with the RDD, save it to separate files (separated by time), or whatever. Might help you to debug what is going on. Might help if you shows the streaming program in a pastebin. TD On Fri, May 29, 2015 at 12:55 AM, Chaudhary, Umesh umesh.chaudh...@searshc.commailto:umesh.chaudh...@searshc.com wrote: Hi, I have written manual receiver for Websphere MQ and its working fine. If I am doing JavaDStream.SaveAsTextFile(“/home/user/out.txt”) then its generating a directory naming out.txt appending its timestamp. In this directory only _SUCCESS file is present. I can see data on console while running in local mode but not able to save it as text file. Is there any other way for saving streaming data? From: Chaudhary, Umesh Sent: Tuesday, May 26, 2015 2:39 AM To: 'Arush Kharbanda'; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Websphere MQ as a data source for Apache Spark Streaming Thanks for the suggestion, I will try and post the outcome. From: Arush Kharbanda [mailto:ar...@sigmoidanalytics.com] Sent: Monday, May 25, 2015 12:24 PM To: Chaudhary, Umesh; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Websphere MQ as a data source for Apache Spark Streaming Hi Umesh, You can write a customer receiver for Websphere MQ, using the API for websphere MQ. https://spark.apache.org/docs/latest/streaming-custom-receivers.html Thanks Arush On Mon, May 25, 2015 at 8:04 PM, Chaudhary, Umesh umesh.chaudh...@searshc.commailto:umesh.chaudh...@searshc.com wrote: I have seen it but it has different configuration for connecting the MQ. I mean for Websphere MQ we need Host, Queue Manager, Channel And Queue Name but here according to MQTT protocol client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) It only expects Broker URL which is in appropriate for establishing connection with Websphere MQ. Please Suggest ! From: Arush Kharbanda [mailto:ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com] Sent: Monday, May 25, 2015 6:29 AM To: Chaudhary, Umesh Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Websphere MQ as a data source for Apache Spark Streaming Hi Umesh, You can connect to Spark Streaming with MQTT refer to the example. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala Thanks Arush On Mon, May 25, 2015 at 3:43 PM, umesh9794 umesh.chaudh...@searshc.commailto:umesh.chaudh...@searshc.com wrote: I was digging into the possibilities for Websphere MQ as a data source for spark-streaming becuase it is needed in one of our use case. I got to know that MQTT http://mqtt.org/ is the protocol that supports the communication from MQ data structures but since I am a newbie to spark streaming I need some working examples for the same. Did anyone try to connect the MQ with spark streaming. Please devise the best way for doing so. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Websphere-MQ-as-a-data-source-for-Apache-Spark-Streaming-tp23013.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org -- [Image removed by sender. Sigmoid Analytics]http://htmlsig.com/www.sigmoidanalytics.com Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com || www.sigmoidanalytics.comhttp://www.sigmoidanalytics.com/ This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you. -- [Image removed by sender. Sigmoid Analytics]http://htmlsig.com/www.sigmoidanalytics.com Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com || www.sigmoidanalytics.comhttp://www.sigmoidanalytics.com/ This message,
RE: Spark Executor Memory Usage
#1 I not sure if I got you point, as I known, Xmx is not turn into physical memory as soon as the process running. it first loaded into virtual memory, if you heap is need more, it will gradually increase in physical memory until to the max heap. #2 Physical memory contains not only heap, but also stack, direct memory, shared lib, and perm space, and also there have VSS, RSS, PSS, USS concept, you can google. simple says:Vss = virtual set sizeRss = resident set sizePss = proportional set size Uss = unique set size Best Regards,Andy Hu(胡 珊) Date: Fri, 29 May 2015 07:41:41 -0700 Subject: Re: Spark Executor Memory Usage From: yuzhih...@gmail.com To: valeramoisee...@gmail.com CC: user@spark.apache.org For #2, see http://unix.stackexchange.com/questions/65835/htop-reporting-much-higher-memory-usage-than-free-or-top Cheers On Fri, May 29, 2015 at 6:56 AM, Valerii Moisieienko valeramoisee...@gmail.com wrote: Hello! My name is Valerii. I have noticed strange memory behaivour of Spark's executor on my cluster. Cluster works in standalone mode with 3 workers. Application runs in cluster mode. From topology configuration spark.executor.memory 1536m I checked heap usage via JVisualVM: http://joxi.ru/Q2KqBMdSvYpDrj and via htop: http://joxi.ru/Vm63RWeCvG6L2Z I have 2 questions regarding Spark's executors memory usage: 1. Why does Max Heap Size change during executor work? 2. Why does Memory usage via htop greater than executor's heap size? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Executor-Memory-Usage-tp23083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD boundaries and triggering processing using tags in the data
May be you can make use of the Window operations https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#window-operations, Also another approach would be to keep your incoming data in Hbase/Redis/Cassandra kind of database and then whenever you need to average it, you just query the database and average it. Thanks Best Regards On Thu, May 28, 2015 at 1:22 AM, David Webber david.web...@gmail.com wrote: Hi All, I'm new to Spark and I'd like some help understanding if a particular use case would be a good fit for Spark Streaming. I have an imaginary stream of sensor data consisting of integers 1-10. Every time the sensor reads 10 I'd like to average all the numbers that were received since the last 10 example input: 10 5 8 4 6 2 1 2 8 8 8 1 6 9 1 3 10 1 3 10 ... desired output: 4.8, 2.0 I'm confused about what happens if sensor readings fall into different RDDs. RDD1: 10 5 8 4 6 2 1 2 8 8 8 RDD2: 1 6 9 1 3 10 1 3 10 output: ???, 2.0 My imaginary sensor doesn't read at fixed time intervals, so breaking the stream into RDDs by time interval won't ensure the data is packaged properly. Additionally, multiple sensors are writing to the same stream (though I think flatMap can parse the origin stream into streams for individual sensors, correct?). My best guess for processing goes like 1) flatMap() to break out individual sensor streams 2) Custom parser to accumulate input data until 10 is found, then create a new output RDD for each sensor and data grouping 3) average the values from step 2 I would greatly appreciate pointers to some specific documentation or examples if you have seen something like this before. Thanks, David -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-boundaries-and-triggering-processing-using-tags-in-the-data-tp23060.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cassanda example
Here's a more detailed documentation https://github.com/datastax/spark-cassandra-connector from Datastax, You can also shoot an email directly to their mailing list http://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user since its more related to their code. Thanks Best Regards On Mon, Jun 1, 2015 at 2:18 PM, Yasemin Kaya godo...@gmail.com wrote: Hi, I want to write my RDD to Cassandra database and I took an example from this site http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java. I add that to my project but I have errors. Here is my project in gist https://gist.github.com/yaseminn/aba86dad9a3e6d6a03dc. errors : - At line 40 (can not recognize Session) - At line 106 (flatmap is not applicaple) Have a nice day yasemin -- hiç ender hiç
Streaming K-medoids
Hello everyone, I have an idea and I would like to get a validation from community about it. In Mahout there is an implementation of Streaming K-means. I'm interested in your opinion would it make sense to make a similar implementation of Streaming K-medoids? K-medoids has even bigger problems than K-means because it's not scalable, but can be useful in some cases (e.g. It allows more sophisticated distance measures). What is your opinion about such an approach? Does anyone see problems with it? Best regards, Marko - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming K-medoids
I haven't given any thought to streaming it, but in case it's useful I do have a k-medoids implementation for Spark: http://silex.freevariable.com/latest/api/#com.redhat.et.silex.cluster.KMedoids Also a blog post about multi-threading it: http://erikerlandson.github.io/blog/2015/05/06/parallel-k-medoids-using-scala-parseq/ - Original Message - Hello everyone, I have an idea and I would like to get a validation from community about it. In Mahout there is an implementation of Streaming K-means. I'm interested in your opinion would it make sense to make a similar implementation of Streaming K-medoids? K-medoids has even bigger problems than K-means because it's not scalable, but can be useful in some cases (e.g. It allows more sophisticated distance measures). What is your opinion about such an approach? Does anyone see problems with it? Best regards, Marko - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Event Logging to HDFS on Standalone Cluster In Progress
Hi, In Spark 1.3.0 I've enabled event logging to write to an existing HDFS folder on a Standalone cluster. This is generally working, all the logs are being written. However, from the Master Web UI, the vast majority of completed applications are labeled as not having a history: http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914) The log does exists though: # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914 -rw-rw 3 user group1027848 2015-06-01 16:09 /eventLogs/app-20150601160846-1914 and `cat` the file ends with: {Event:SparkListenerApplicationEnd,Timestamp:1433174955077} This seems to indicate it saw and logged the application end. Is there a known issue here or a workaround? Looking at the source code I might have expected these files to end in `.inprogress` given the UI error message, but they don't. Thanks, Richard
Re: Spark stages very slow to complete
Would you mind posting the code? On 2 Jun 2015 00:53, Karlson ksonsp...@siberie.de wrote: Hi, In all (pyspark) Spark jobs, that become somewhat more involved, I am experiencing the issue that some stages take a very long time to complete and sometimes don't at all. This clearly correlates with the size of my input data. Looking at the stage details for one such stage, I am wondering where Spark spends all this time. Take this table of the stages task metrics for example: Metric Min 25th percentile Median 75th percentile Max Duration1.4 min 1.5 min 1.7 min 1.9 min 2.3 min Scheduler Delay 1 ms3 ms4 ms 5 ms23 ms Task Deserialization Time 1 ms2 ms3 ms 8 ms22 ms GC Time 0 ms0 ms0 ms 0 ms0 ms Result Serialization Time 0 ms0 ms0 ms 0 ms1 ms Getting Result Time 0 ms0 ms0 ms 0 ms0 ms Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 KB / 1 24.1 KB / 1 24.3 KB / 1 Why is the overall duration almost 2min? Where is all this time spent, when no progress of the stages is visible? The progress bar simply displays 0 succeeded tasks for a very long time before sometimes slowly progressing. Also, the name of the stage displayed above is `javaToPython at null:-1`, which I find very uninformative. I don't even know which action exactly is responsible for this stage. Does anyone experience similar issues or have any advice for me? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
UNSUBSCRIBE
UNSUBSCRIBE - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Don't understand schedule jobs within an Application
1. Yes if two tasks depend on each other they cant parallelize 2. Imagine something like a web application driver. You only get to have 1 spark context but now you want to run many concurrent jobs. They have nothing 2 do with each other; no reason to keep them sequential. Hope this helps div Original message /divdivFrom: bit1...@163.com /divdivDate:06/01/2015 4:14 AM (GMT-05:00) /divdivTo: user user@spark.apache.org /divdivSubject: Don't understand schedule jobs within an Application /divdiv /divHi, sparks, Following is copied from the spark online document http://spark.apache.org/docs/latest/job-scheduling.html. Basically, I have two questions on it: 1. If two jobs in an application has dependencies, that is one job depends on the result of the other job, then I think they will have to run sequentially. 2. Since jobs scheduling happens within one application, I don't think job scheduing will give benefits to multi-users as the last sentence says.in my opinion, multi users can benifit only from cross applications scheduling. Maybe i haven't had a good understanding on the job scheduing, could someone elaborate this? Thanks very much By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly. Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish. This mode is best for multi-user settings bit1...@163.com
Spark stages very slow to complete
Hi, In all (pyspark) Spark jobs, that become somewhat more involved, I am experiencing the issue that some stages take a very long time to complete and sometimes don't at all. This clearly correlates with the size of my input data. Looking at the stage details for one such stage, I am wondering where Spark spends all this time. Take this table of the stages task metrics for example: Metric Min 25thpercentile Median 75th percentile Max Duration1.4 min 1.5 min 1.7 min 1.9 min 2.3 min Scheduler Delay 1 ms3 ms4 ms 5 ms23 ms Task Deserialization Time 1 ms2 ms3 ms 8 ms22 ms GC Time 0 ms0 ms0 ms 0 ms0 ms Result Serialization Time 0 ms0 ms0 ms 0 ms1 ms Getting Result Time 0 ms0 ms0 ms 0 ms0 ms Input Size / Records 23.9 KB / 1 24.0 KB / 1 24.1 KB / 1 24.1 KB / 1 24.3 KB / 1 Why is the overall duration almost 2min? Where is all this time spent, when no progress of the stages is visible? The progress bar simply displays 0 succeeded tasks for a very long time before sometimes slowly progressing. Also, the name of the stage displayed above is `javaToPython at null:-1`, which I find very uninformative. I don't even know which action exactly is responsible for this stage. Does anyone experience similar issues or have any advice for me? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle
I am seeing the same issue with Spark 1.3.1. I see this issue when reading sequence file stored in Sequence File format (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? ) All i do is sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new org.apache.spark.HashPartitioner(2053)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) //.set(spark.akka.askTimeout, arguments.get(askTimeout).get) //.set(spark.akka.timeout, arguments.get(akkaTimeout).get) //.set(spark.worker.timeout, arguments.get(workerTimeout).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) and values are buffersize=128 maxbuffersize=1068 maxResultSize=200G On Thu, May 7, 2015 at 8:04 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I'm using the default settings. Jianshi On Wed, May 6, 2015 at 7:05 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Can you please share your compression etc settings, which you are using. Thanks, Twinkle On Wed, May 6, 2015 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I'm facing this error in Spark 1.3.1 https://issues.apache.org/jira/browse/SPARK-4105 Anyone knows what's the workaround? Change the compression codec for shuffle output? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Deepak
java.io.IOException: FAILED_TO_UNCOMPRESS(5)
Any suggestions ? I using Spark 1.3.1 to read sequence file stored in Sequence File format (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? ) with this code and settings sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new org.apache.spark.HashPartitioner(2053)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) //.set(spark.akka.askTimeout, arguments.get(askTimeout).get) //.set(spark.akka.timeout, arguments.get(akkaTimeout).get) //.set(spark.worker.timeout, arguments.get(workerTimeout).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) and values are buffersize=128 maxbuffersize=1068 maxResultSize=200G And i see this exception in each executor task FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com, 54757), shuffleId=6, mapId=2810, reduceId=1117, message= org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)* at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) ... 18 more
Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
:D very happy Helena I'll check tomorrow morning A G Il giorno 01/giu/2015, alle ore 19:45, Helena Edelson helena.edel...@datastax.com ha scritto: Hi Antonio, It’s your lucky day ;) We just released Spark Cassandra Connector 1.3.0-M1 for Spark 1.3 and DataSources API Give it a little while to propagate to http://search.maven.org/#search%7Cga%7C1%7Cspark-cassandra-connector 'spark-cassandra-connector-java_2.10’ just tells me you are using the version that is compiled against scala 2.10, what is the actual connector version itself? Thanks, Helena @helenaedelson On Jun 1, 2015, at 1:08 PM, Antonio Giambanco antogia...@gmail.com wrote: Hi Helena, thanks for answering me . . . I didn't realize it could be the connector version, unfortunately i didn't try yet. I know scala is better but i'm using drools and i'm forced to use java in my project i'm using spark-cassandra-connector-java_2.10 from cassandra I have only this log INFO [ScheduledTasks:1] 2015-06-01 15:49:03,260 ColumnFamilyStore.java (line 795) Enqueuing flush of Memtable-sstable_activity@361028148(455/4550 serialized/live bytes, 180 ops) INFO [FlushWriter:76] 2015-06-01 15:49:03,261 Memtable.java (line 362) Writing Memtable-sstable_activity@361028148(455/4550 serialized/live bytes, 180 ops) INFO [FlushWriter:76] 2015-06-01 15:49:03,273 Memtable.java (line 402) Completed flushing /var/lib/cassandra/data/system/sstable_activity/system-sstable_activity-jb-103-Data.db (248 bytes) for commitlog position ReplayPosition(segmentId=1432896540485, position=1217022) also on spark I found this exception 15/06/01 16:43:30 ERROR Executor: Exception in task 0.0 in stage 61.0 (TID 81) java.io.IOException: Failed to prepare statement INSERT INTO cassandrasink.transaction (event_id, isin, security_type, security_name, date, time, price, currency, user_id, quantity, amount, session_id) VALUES (:event_id, :isin, :security_type, :security_name, :date, :time, :price, :currency, :user_id, :quantity, :amount, :session_id): All host(s) tried for query failed (no host was tried) at com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:96) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:122) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried) at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91) at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) at $Proxy17.prepare(Unknown Source) at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28) at $Proxy17.prepare(Unknown Source) at
RE: Need some Cassandra integration help
Hi Yana, Not sure whether you already solved this issue. As far as I know, the DataFrame support in Spark Cassandra connector was added in version 1.3. The first milestone release of SCC v1.3 was just announced. Mohammed From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com] Sent: Tuesday, May 26, 2015 1:31 PM To: user@spark.apache.org Subject: Need some Cassandra integration help Hi folks, for those of you working with Cassandra, wondering if anyone has been successful processing a mix of Cassandra and hdfs data. I have a dataset which is stored partially in HDFS and partially in Cassandra (schema is the same in both places) I am trying to do the following: val dfHDFS = sqlContext.parquetFile(foo.parquet) val cassDF = cassandraContext.sql(SELECT * FROM keyspace.user) dfHDFS.unionAll(cassDF).count This is failing for me with the following - Exception in thread main java.lang.AssertionError: assertion failed: No plan for CassandraRelation TableDef(yana_test,test_connector,ArrayBuffer(ColumnDef(key,PartitionKeyColumn,IntType,false)),ArrayBuff er(),ArrayBuffer(ColumnDef(value,RegularColumn,VarCharType,false))), None, None at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:278) at org.apache.spark.sql.catalyst.planning.QueryPlanner$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$anonfun$14.apply(SparkStrategies.scala:292) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$anonfun$14.apply(SparkStrategies.scala:292) at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:292) at org.apache.spark.sql.catalyst.planning.QueryPlanner$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:152) at org.apache.spark.sql.catalyst.planning.QueryPlanner$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:1092) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:1090) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:1096) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:1096) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at org.apache.spark.sql.DataFrame.count(DataFrame.scala:899) at com.akamai.rum.test.unit.SparkTest.sparkPerfTest(SparkTest.java:123) at com.akamai.rum.test.unit.SparkTest.main(SparkTest.java:37) Is there a way to pull the data out of cassandra on each executor but not try to push logic down into casandra?
Re: SparkSQL can't read S3 path for hive external table
Thanks, Michael and Akhil. Yes, it worked with Spark 1.3.1 along with AWS EMR AMI 3.7. Sorry I didn't update the status. On Mon, Jun 1, 2015 at 5:17 AM, Michael Armbrust mich...@databricks.com wrote: This sounds like a problem that was fixed in Spark 1.3.1. https://issues.apache.org/jira/browse/SPARK-6351 On Mon, Jun 1, 2015 at 5:44 PM, Akhil Das ak...@sigmoidanalytics.com wrote: This thread has various methods on accessing S3 from spark, it might help you. Thanks Best Regards On Sun, May 24, 2015 at 8:03 AM, ogoh oke...@gmail.com wrote: Hello, I am using Spark1.3 in AWS. SparkSQL can't recognize Hive external table on S3. The following is the error message. I appreciate any help. Thanks, Okehee -- 15/05/24 01:02:18 ERROR thriftserver.SparkSQLDriver: Failed in [select count(*) from api_search where pdate='2015-05-08'] java.lang.IllegalArgumentException: Wrong FS: s3://test-emr/datawarehouse/api_s3_perf/api_search/pdate=2015-05-08/phour=00, expected: hdfs://10.128.193.211:9000 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:647) at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:467) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:252) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:251) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-t-read-S3-path-for-hive-external-table-tp23002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
flatMap output on disk / flatMap memory overhead
Hi, Is there any way to force the output RDD of a flatMap op to be stored in both memory and disk as it is computed ? My RAM would not be able to fit the entire output of flatMap, so it really needs to starts using disk after the RAM gets full. I didn't find any way to force this. Also, what is the memory overhead of flatMap ? From my computations, the output RDD should fit in memory, but I get the following error after a while (and I know it's because of memory issues, since running the program with 1/3 of the input data finishes succesfully) 15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478) java.io.IOException: sendMessageReliably failed because ack was not received within 60 sec at org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866) at org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865) at scala.Option.foreach(Option.scala:236) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865) at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581) at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656) at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367) at java.lang.Thread.run(Thread.java:745) Also, I've seen also this: https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence but my understanding is that one should apply something like: rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire output of flatMap is first stored in memory (which is not possible in my case) and, only when it's done, is stored on the disk. Please correct me if I'm wrong. Anways, I've tried using this , but I got the same error. My config: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 125g) conf.set(spark.shuffle.file.buffer.kb, 1000) conf.set(spark.shuffle.consolidateFiles, true) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
Hi Antonio, It’s your lucky day ;) We just released Spark Cassandra Connector 1.3.0-M1 for Spark 1.3 and DataSources API Give it a little while to propagate to http://search.maven.org/#search%7Cga%7C1%7Cspark-cassandra-connector http://search.maven.org/#search|ga|1|spark-cassandra-connector 'spark-cassandra-connector-java_2.10’ just tells me you are using the version that is compiled against scala 2.10, what is the actual connector version itself? Thanks, Helena @helenaedelson On Jun 1, 2015, at 1:08 PM, Antonio Giambanco antogia...@gmail.com wrote: Hi Helena, thanks for answering me . . . I didn't realize it could be the connector version, unfortunately i didn't try yet. I know scala is better but i'm using drools and i'm forced to use java in my project i'm using spark-cassandra-connector-java_2.10 from cassandra I have only this log INFO [ScheduledTasks:1] 2015-06-01 15:49:03,260 ColumnFamilyStore.java (line 795) Enqueuing flush of Memtable-sstable_activity@361028148(455/4550 serialized/live bytes, 180 ops) INFO [FlushWriter:76] 2015-06-01 15:49:03,261 Memtable.java (line 362) Writing Memtable-sstable_activity@361028148(455/4550 serialized/live bytes, 180 ops) INFO [FlushWriter:76] 2015-06-01 15:49:03,273 Memtable.java (line 402) Completed flushing /var/lib/cassandra/data/system/sstable_activity/system-sstable_activity-jb-103-Data.db (248 bytes) for commitlog position ReplayPosition(segmentId=1432896540485, position=1217022) also on spark I found this exception 15/06/01 16:43:30 ERROR Executor: Exception in task 0.0 in stage 61.0 (TID 81) java.io.IOException: Failed to prepare statement INSERT INTO cassandrasink.transaction (event_id, isin, security_type, security_name, date, time, price, currency, user_id, quantity, amount, session_id) VALUES (:event_id, :isin, :security_type, :security_name, :date, :time, :price, :currency, :user_id, :quantity, :amount, :session_id): All host(s) tried for query failed (no host was tried) at com.datastax.spark.connector.writer.TableWriter.com http://com.datastax.spark.connector.writer.tablewriter.com/$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:96) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:122) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried) at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91) at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) at $Proxy17.prepare(Unknown Source) at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28) at $Proxy17.prepare(Unknown Source) at com.datastax.spark.connector.writer.TableWriter.com
RE: Anybody using Spark SQL JDBC server with DSE Cassandra?
Nobody using Spark SQL JDBC/Thrift server with DSE Cassandra? Mohammed From: Mohammed Guller [mailto:moham...@glassbeam.com] Sent: Friday, May 29, 2015 11:49 AM To: user@spark.apache.org Subject: Anybody using Spark SQL JDBC server with DSE Cassandra? Hi - We have successfully integrated Spark SQL with Cassandra. We have a backend that provides a REST API that allows users to execute SQL queries on data in C*. Now we would like to also support JDBC/ODBC connectivity , so that user can use tools like Tableau to query data in C* through the Spark SQL JDBC server. However, I have been unable to find a driver that would allow the Spark SQL Thrift/JDBC server to connect with Cassandra. DataStax provides a closed-source driver that comes only with the DSE version of Cassandra. I would like to find out how many people are using the Spark SQL JDBC server + DSE Cassandra combination. If you do use Spark SQL JDBC server + DSE, I would appreciate if you could share your experience. For example, what kind of issues you have run into? How is the performance? What reporting tools you are using? Thank you. Mohammed
Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
Hi Helena, thanks for answering me . . . I didn't realize it could be the connector version, unfortunately i didn't try yet. I know scala is better but i'm using drools and i'm forced to use java in my project i'm using spark-cassandra-connector-java_2.10 from cassandra I have only this log INFO [ScheduledTasks:1] 2015-06-01 15:49:03,260 ColumnFamilyStore.java (line 795) Enqueuing flush of Memtable-sstable_activity@361028148(455/4550 serialized/live bytes, 180 ops) INFO [FlushWriter:76] 2015-06-01 15:49:03,261 Memtable.java (line 362) Writing Memtable-sstable_activity@361028148(455/4550 serialized/live bytes, 180 ops) INFO [FlushWriter:76] 2015-06-01 15:49:03,273 Memtable.java (line 402) Completed flushing /var/lib/cassandra/data/system/sstable_activity/ system-sstable_activity-jb-103-Data.db (248 bytes) for commitlog position ReplayPosition(segmentId=1432896540485, position=1217022) also on spark I found this exception 15/06/01 16:43:30 ERROR Executor: Exception in task 0.0 in stage 61.0 (TID 81) java.io.IOException: Failed to prepare statement INSERT INTO cassandrasink.transaction (event_id, isin, security_type, security_name, date, time, price, currency, user_id, quantity, amount, session_id) VALUES (:event_id, :isin, :security_type, :security_name, :date, :time, :price, :currency, :user_id, :quantity, :amount, :session_id): All host(s) tried for query failed (no host was tried) at com.datastax.spark.connector.writer.TableWriter.com http://com.datastax.spark.connector.writer.tablewriter.com/$ datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter. scala:96) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1.apply(TableWriter.scala:122) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried) at com.datastax.driver.core.exceptions. NoHostAvailableException.copy(NoHostAvailableException.java:84) at com.datastax.driver.core.DefaultResultSetFuture. extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.AbstractSession.prepare( AbstractSession.java:91) at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) at $Proxy17.prepare(Unknown Source) at com.datastax.spark.connector.cql.PreparedStatementCache$. prepareStatement(PreparedStatementCache.scala:45) at com.datastax.spark.connector.cql.SessionProxy.invoke( SessionProxy.scala:28) at $Proxy17.prepare(Unknown Source) at com.datastax.spark.connector.writer.TableWriter.com http://com.datastax.spark.connector.writer.tablewriter.com/$ datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter. scala:92) ... 15 more Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107) at com.datastax.driver.core.SessionManager.execute(SessionManager.java:538) at com.datastax.driver.core.SessionManager.prepareAsync( SessionManager.java:124) at com.datastax.driver.core.AbstractSession.prepareAsync( AbstractSession.java:103) at com.datastax.driver.core.AbstractSession.prepare( AbstractSession.java:89) ... 24 more 15/06/01 16:43:30 INFO TaskSetManager: Starting task 1.0 in
RE: Migrate Relational to Distributed
Brant, You should be able to migrate most of your existing SQL code to Spark SQL, but remember that Spark SQL does not yet support the full ANSI standard. So you may need to rewrite some of your existing queries. Another thing to keep in mind is that Spark SQL is not real-time. The response time for Spark SQL + Cassandra will not be the same as that of a properly-indexed database table (up to a certain size). On the other hand, the Spark SQL + Cassandra solution will scale better and provide higher throughput and availability more economically than an Oracle based solution. Mohammed -Original Message- From: Brant Seibert [mailto:brantseib...@hotmail.com] Sent: Friday, May 22, 2015 3:23 PM To: user@spark.apache.org Subject: Migrate Relational to Distributed Hi, The healthcare industry can do wonderful things with Apache Spark. But, there is already a very large base of data and applications firmly rooted in the relational paradigm and they are resistent to change - stuck on Oracle. ** QUESTION 1 - Migrate legacy relational data (plus new transactions) to distributed storage? DISCUSSION 1 - The primary advantage I see is not having to engage in the lengthy (1+ years) process of creating a relational data warehouse and cubes. Just store the data in a distributed system and analyze first in memory with Spark. ** QUESTION 2 - Will we have to re-write the enormous amount of logic that is already built for the old relational system? DISCUSSION 2 - If we move the data to distributed, can we simply run that existing relational logic as SparkSQL queries? [existing SQL -- Spark Context -- Cassandra -- process in SparkSQL -- display in existing UI]. Can we create an RDD that uses existing SQL? Or do we need to rewrite all our SQL? ** DATA SIZE - We are adding many new data sources to a system that already manages health care data for over a million people. The number of rows may not be enormous right now compared to the advertising industry, for example, but the number of dimensions runs well into the thousands. If we add to this, IoT data for each health care patient, that creates billions of events per day, and the number of rows then grows exponentially. We would like to be prepared to handle that huge data scenario. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Migrate-Relational-to-Distributed-tp22999.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: union and reduceByKey wrong shuffle?
switching to use simple pojos instead of using avro for spark serialization solved the problem(I mean reading avro from s3 and than mapping each avro object to it's pojo serializable counterpart with same fields, pojo is registered withing kryo) Any thought where to look for a problem/misconfiguration? On 31 May 2015 at 22:48, Igor Berman igor.ber...@gmail.com wrote: Hi We are using spark 1.3.1 Avro-chill (tomorrow will check if its important) we register avro classes from java Avro 1.7.6 On May 31, 2015 22:37, Josh Rosen rosenvi...@gmail.com wrote: Which Spark version are you using? I'd like to understand whether this change could be caused by recent Kryo serializer re-use changes in master / Spark 1.4. On Sun, May 31, 2015 at 11:31 AM, igor.berman igor.ber...@gmail.com wrote: after investigation the problem is somehow connected to avro serialization with kryo + chill-avro(mapping avro object to simple scala case class and running reduce on these case class objects solves the problem) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/union-and-reduceByKey-wrong-shuffle-tp23092p23093.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Dataframe random permutation?
I would like to know what will be the best approach to randomly permute a Data Frame. I have tried: df.sample(false,1.0,x).show(100) where x is the seed. However, it gives the same result no matter the value of x (it only gives different values when the fraction is smaller than 1.0) . I have tried also: hc.createDataFrame(df.rdd.repartition(100),df.schema) which appears to be a random permutation. Can some one confirm me that the last line is in fact a random permutation, or point me out to a better approach? Thanks -- Cesar Flores
Re: Event Logging to HDFS on Standalone Cluster In Progress
Ah, apologies, I found an existing issue and fix has already gone out for this in 1.3.1 and up: https://issues.apache.org/jira/browse/SPARK-6036. On Mon, Jun 1, 2015 at 3:39 PM, Richard Marscher rmarsc...@localytics.com wrote: It looks like it is possibly a race condition between removing the IN_PROGRESS and building the history UI for the application. `AppClient` sends an `UnregisterApplication(appId)` message to the `Master` actor, which triggers the process to look for the app's eventLogs. If they are suffixed with `.inprogress` then it will not build out the history UI and instead build the error page I've seen. Tying this together, calling SparkContext.stop() has the following block: if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null } if (_listenerBusStarted) { listenerBus.stop() _listenerBusStarted = false } _eventLogger.foreach(_.stop()) Dag Scheduler has a TaskScheduler which has a SparkDeploySchedulerBackend which has an AppClient. AppClient sends itself a message to stop itself, and like mentioned above, it then sends a message to the Master where it tries to build the history UI. Meanwhile, EventLoggingListener.stop() is where the `.inprogress` suffix is removed in the file-system. It seems like the race condition of the Akka message passing to trigger the Master's building of the history UI may be the only reason the history UI ever gets properly setup in the first place. Because if the ordering of calls were all strict in the SparkContext.stop method then you would expect the Master to always see the event logs as in in progress. Maybe I have missed something in tracing through the code? Is there a reason that the eventLogger cannot be closed before the dagScheduler? Thanks, Richard On Mon, Jun 1, 2015 at 12:23 PM, Richard Marscher rmarsc...@localytics.com wrote: Hi, In Spark 1.3.0 I've enabled event logging to write to an existing HDFS folder on a Standalone cluster. This is generally working, all the logs are being written. However, from the Master Web UI, the vast majority of completed applications are labeled as not having a history: http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914) The log does exists though: # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914 -rw-rw 3 user group1027848 2015-06-01 16:09 /eventLogs/app-20150601160846-1914 and `cat` the file ends with: {Event:SparkListenerApplicationEnd,Timestamp:1433174955077} This seems to indicate it saw and logged the application end. Is there a known issue here or a workaround? Looking at the source code I might have expected these files to end in `.inprogress` given the UI error message, but they don't. Thanks, Richard
Re: Restricting the number of iterations in Mllib Kmeans
Hi Suman Meethu, Apologies---I was wrong about KMeans supporting an initial set of centroids! JIRA created: https://issues.apache.org/jira/browse/SPARK-8018 If you're interested in submitting a PR, please do! Thanks, Joseph On Mon, Jun 1, 2015 at 2:25 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Joseph, I was unable to find any function in Kmeans.scala where the initial centroids could be specified by the user. Kindly help. Thanks Regards, Meethu M On Tuesday, 19 May 2015 6:54 AM, Joseph Bradley jos...@databricks.com wrote: Hi Suman, For maxIterations, are you using the DenseKMeans.scala example code? (I'm guessing yes since you mention the command line.) If so, then you should be able to specify maxIterations via an extra parameter like --numIterations 50 (note the example uses numIterations in the current master instead of maxIterations, which is sort of a bug in the example). If that does not cap the max iterations, then please report it as a bug. To specify the initial centroids, you will need to modify the DenseKMeans example code. Please see the KMeans API docs for details. Good luck, Joseph On Mon, May 18, 2015 at 3:22 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I think you cant supply an initial set of centroids to kmeans Thanks Regards, Meethu M On Friday, 15 May 2015 12:37 AM, Suman Somasundar suman.somasun...@oracle.com wrote: Hi,, I want to run a definite number of iterations in Kmeans. There is a command line argument to set maxIterations, but even if I set it to a number, Kmeans runs until the centroids converge. Is there a specific way to specify it in command line? Also, I wanted to know if we can supply the initial set of centroids to the program instead of it choosing the centroids in random? Thanks, Suman.
SparkSQL's performance gets degraded depending on number of partitions of Hive tables..is it normal?
Hello, I posted this question a while back but am posting it again to get your attention. I am using SparkSQL 1.3.1 and Hive 0.13.1 on AWS YARN (tested under both 1.3.0 1.3.1). My hive table is partitioned. I noticed that the query response time is bad depending on the number of partitions though the query targets a small subset of the partitions. TRACE level logs (ThriftServer's) showed that it runs commands like getFileInfo, getListing, getBlockLocation for each every partitions ( also runs getBlockLocation for each every files) though they are not part of the queried partitions. I don't know why it is necessary. Is it a bug of SparkSql? Is there a way to avoid that? Below is the detail of reporting this issue including logs. Thanks, -- My Hive table as an external table is partitioned with date and hour. I expected that a query with certain partitions will read only the data files of the partitions. I turned on TRACE level logging for ThriftServer since the query response time even for narrowed partitions was very long. And I found that all the available partitions are checked during some steps. The logs showed as a execution flow such as: == Step 1: Contacted HiveMetastore to get partition info (cmd : get_partitions) Step 2: Came up with an execution rule Step 3: Contact namenode to make at least 4 calls (data is in HDFS) for all available partitions of the table : getFileInfo once, getListing once, and the repeat them again for each partition. Step 4: Contact NameNode to find blocklocation of all the partitions Step 5: Contact DataNode for each file of all the partitions Step 6: Contact NameNode again for all the partitions Step 7: SparkSQL generated some optimal plan Step 8: Contacted corresponding datanodes for the narrowed partitions (it seems) And more. === Why Step3, 4, 5, and 6 should check all partitions? After removing partitions from the table, the query was much quicker while processing same volume of data. I don't know if it is normal or Hive issue or SparkSQL issue or my configuration issue. I added some logs below for some steps. I appreciate any of your advice. Thanks a lot, Okehee some logs of some steps Query: select count(*) from api_search where pdate='2015-05-23'; ( Step 2: 2015-05-25 16:37:43 TRACE HiveContext$$anon$3:67 - === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates === !'Project [COUNT(1) AS _c0#25L]Aggregate [], [COUNT(1) AS _c0#25L] Filter (pdate#26 = 2015-05-23)Filter (pdate#26 = 2015-05-23) MetastoreRelation api_hdfs_perf, api_search, None MetastoreRelation api_hdfs_perf, api_search, None .. Step 3: 2015-05-25 16:37:44 TRACE ProtobufRpcEngine:206 - 84: Call - /10.128.193.211:9000: getFileInfo {src: /user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00} 2015-05-25 16:37:44 DEBUG Client:424 - The ping interval is 6 ms. 2015-05-25 16:37:44 DEBUG Client:693 - Connecting to /10.128.193.211:9000 2015-05-25 16:37:44 DEBUG Client:1007 - IPC Client (2100771791) connection to /10.128.193.211:9000 from ogoh sending #151 2015-05-25 16:37:44 DEBUG Client:944 - IPC Client (2100771791) connection to /10.128.193.211:9000 from ogoh: starting, having connections 2 2015-05-25 16:37:44 DEBUG Client:1064 - IPC Client (2100771791) connection to /10.128.193.211:9000 from ogoh got value #151 2015-05-25 16:37:44 DEBUG ProtobufRpcEngine:235 - Call: getFileInfo took 13ms 2015-05-25 16:37:44 TRACE ProtobufRpcEngine:250 - 84: Response - /10.128.193.211:9000: getFileInfo {fs { fileType: IS_DIR path: length: 0 permission { perm: 493 } owner: hadoop group: supergroup modification_time: 1432364487906 access_time: 0 block_replication: 0 blocksize: 0 fileId: 100602 childrenNum: 2 }} 2015-05-25 16:37:44 TRACE ProtobufRpcEngine:206 - 84: Call - /10.128.193.211:9000: getFileInfo {src: /user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00} 2015-05-25 16:37:44 DEBUG Client:1007 - IPC Client (2100771791) connection to /10.128.193.211:9000 from ogoh sending #152 2015-05-25 16:37:44 DEBUG Client:1064 - IPC Client (2100771791) connection to /10.128.193.211:9000 from ogoh got value #152 2015-05-25 16:37:44 DEBUG ProtobufRpcEngine:235 - Call: getFileInfo took 2ms. .. Step 4: 2015-05-25 16:37:47 TRACE ProtobufRpcEngine:206 - 89: Call - /10.128.193.211:9000: getBlockLocations {src: /user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00/part-r-2015050800-1.parquet offset: 0 length: 1342177280} ... Step 5: 2015-05-25 16:37:48 DEBUG DFSClient:951 - Connecting to datanode 10.191.137.197:9200 2015-05-25 16:37:48 TRACE BlockReaderFactory:653 - BlockReaderFactory(fileName=/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00/part-r-2015050800-2.parquet, block=BP-1843960649-10.128.193.211-1427923845046:blk_1073758677_981812):
Re: Event Logging to HDFS on Standalone Cluster In Progress
It looks like it is possibly a race condition between removing the IN_PROGRESS and building the history UI for the application. `AppClient` sends an `UnregisterApplication(appId)` message to the `Master` actor, which triggers the process to look for the app's eventLogs. If they are suffixed with `.inprogress` then it will not build out the history UI and instead build the error page I've seen. Tying this together, calling SparkContext.stop() has the following block: if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null } if (_listenerBusStarted) { listenerBus.stop() _listenerBusStarted = false } _eventLogger.foreach(_.stop()) Dag Scheduler has a TaskScheduler which has a SparkDeploySchedulerBackend which has an AppClient. AppClient sends itself a message to stop itself, and like mentioned above, it then sends a message to the Master where it tries to build the history UI. Meanwhile, EventLoggingListener.stop() is where the `.inprogress` suffix is removed in the file-system. It seems like the race condition of the Akka message passing to trigger the Master's building of the history UI may be the only reason the history UI ever gets properly setup in the first place. Because if the ordering of calls were all strict in the SparkContext.stop method then you would expect the Master to always see the event logs as in in progress. Maybe I have missed something in tracing through the code? Is there a reason that the eventLogger cannot be closed before the dagScheduler? Thanks, Richard On Mon, Jun 1, 2015 at 12:23 PM, Richard Marscher rmarsc...@localytics.com wrote: Hi, In Spark 1.3.0 I've enabled event logging to write to an existing HDFS folder on a Standalone cluster. This is generally working, all the logs are being written. However, from the Master Web UI, the vast majority of completed applications are labeled as not having a history: http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914) The log does exists though: # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914 -rw-rw 3 user group1027848 2015-06-01 16:09 /eventLogs/app-20150601160846-1914 and `cat` the file ends with: {Event:SparkListenerApplicationEnd,Timestamp:1433174955077} This seems to indicate it saw and logged the application end. Is there a known issue here or a workaround? Looking at the source code I might have expected these files to end in `.inprogress` given the UI error message, but they don't. Thanks, Richard
Spark 1.3.1 On Mesos Issues.
All - I am facing and odd issue and I am not really sure where to go for support at this point. I am running MapR which complicates things as it relates to Mesos, however this HAS worked in the past with no issues so I am stumped here. So for starters, here is what I am trying to run. This is a simple show tables using the Hive Context: from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext, Row, HiveContext sparkhc = HiveContext(sc) test = sparkhc.sql(show tables) for r in test.collect(): print r When I run it on 1.3.1 using ./bin/pyspark --master local This works with no issues. When I run it using Mesos with all the settings configured (as they had worked in the past) I get lost tasks and when I zoom in them, the error that is being reported is below. Basically it's a NullPointerException on the com.mapr.fs.ShimLoader. What's weird to me is is I took each instance and compared both together, the class path, everything is exactly the same. Yet running in local mode works, and running in mesos fails. Also of note, when the task is scheduled to run on the same node as when I run locally, that fails too! (Baffling). Ok, for comparison, how I configured Mesos was to download the mapr4 package from spark.apache.org. Using the exact same configuration file (except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0. When I run this example with the mapr4 for 1.2.0 there is no issue in Mesos, everything runs as intended. Using the same package for 1.3.1 then it fails. (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as well). So basically When I used 1.2.0 and followed a set of steps, it worked on Mesos and 1.3.1 fails. Since this is a current version of Spark, MapR is supports 1.2.1 only. (Still working on that). I guess I am at a loss right now on why this would be happening, any pointers on where I could look or what I could tweak would be greatly appreciated. Additionally, if there is something I could specifically draw to the attention of MapR on this problem please let me know, I am perplexed on the change from 1.2.0 to 1.3.1. Thank you, John Full Error on 1.3.1 on Mesos: 15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity 1060.3 MB java.lang.NullPointerException at com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847) at org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141) at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98) at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1959) at org.apache.spark.storage.BlockManager.(BlockManager.scala:104) at org.apache.spark.storage.BlockManager.(BlockManager.scala:179) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:310) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:186) at org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:70) java.lang.RuntimeException: Failure loading MapRClient. at com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:283) at com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847) at org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141) at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98) at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at
Re: union and reduceByKey wrong shuffle?
How much work is to produce a small standalone reproduction? Can you create an Avro file with some mock data, maybe 10 or so records, then reproduce this locally? On Mon, Jun 1, 2015 at 12:31 PM, Igor Berman igor.ber...@gmail.com wrote: switching to use simple pojos instead of using avro for spark serialization solved the problem(I mean reading avro from s3 and than mapping each avro object to it's pojo serializable counterpart with same fields, pojo is registered withing kryo) Any thought where to look for a problem/misconfiguration? On 31 May 2015 at 22:48, Igor Berman igor.ber...@gmail.com wrote: Hi We are using spark 1.3.1 Avro-chill (tomorrow will check if its important) we register avro classes from java Avro 1.7.6 On May 31, 2015 22:37, Josh Rosen rosenvi...@gmail.com wrote: Which Spark version are you using? I'd like to understand whether this change could be caused by recent Kryo serializer re-use changes in master / Spark 1.4. On Sun, May 31, 2015 at 11:31 AM, igor.berman igor.ber...@gmail.com wrote: after investigation the problem is somehow connected to avro serialization with kryo + chill-avro(mapping avro object to simple scala case class and running reduce on these case class objects solves the problem) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/union-and-reduceByKey-wrong-shuffle-tp23092p23093.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
map - reduce only with disk
Dear all, Does anyone know how can I force Spark to use only the disk when doing a simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataframe random permutation?
Hi Cesar, try to do: hc.createDataFrame(df.rdd.coalesce(NUM_PARTITIONS, shuffle =true),df.schema) It's a bit inefficient, but should shuffle the whole dataframe. Thanks, Peter Rudenko On 2015-06-01 22:49, Cesar Flores wrote: I would like to know what will be the best approach to randomly permute a Data Frame. I have tried: df.sample(false,1.0,x).show(100) where x is the seed. However, it gives the same result no matter the value of x (it only gives different values when the fraction is smaller than 1.0) . I have tried also: hc.createDataFrame(df.rdd.repartition(100),df.schema) which appears to be a random permutation. Can some one confirm me that the last line is in fact a random permutation, or point me out to a better approach? Thanks -- Cesar Flores
Re: PySpark with OpenCV causes python worker to crash
Could you run the single thread version in worker machine to make sure that OpenCV is installed and configured correctly? On Sat, May 30, 2015 at 6:29 AM, Sam Stoelinga sammiest...@gmail.com wrote: I've verified the issue lies within Spark running OpenCV code and not within the sequence file BytesWritable formatting. This is the code which can reproduce that spark is causing the failure by not using the sequencefile as input at all but running the same function with same input on spark but fails: def extract_sift_features_opencv(imgfile_imgbytes): imgfilename, discardsequencefile = imgfile_imgbytes imgbytes = bytearray(open(/tmp/img.jpg, rb).read()) nparr = np.fromstring(buffer(imgbytes), np.uint8) img = cv2.imdecode(nparr, 1) gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) sift = cv2.xfeatures2d.SIFT_create() kp, descriptors = sift.detectAndCompute(gray, None) return (imgfilename, test) And corresponding tests.py: https://gist.github.com/samos123/d383c26f6d47d34d32d6 On Sat, May 30, 2015 at 8:04 PM, Sam Stoelinga sammiest...@gmail.com wrote: Thanks for the advice! The following line causes spark to crash: kp, descriptors = sift.detectAndCompute(gray, None) But I do need this line to be executed and the code does not crash when running outside of Spark but passing the same parameters. You're saying maybe the bytes from the sequencefile got somehow transformed and don't represent an image anymore causing OpenCV to crash the whole python executor. On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com wrote: Could you try to comment out some lines in `extract_sift_features_opencv` to find which line cause the crash? If the bytes came from sequenceFile() is broken, it's easy to crash a C library in Python (OpenCV). On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi sparkers, I am working on a PySpark application which uses the OpenCV library. It runs fine when running the code locally but when I try to run it on Spark on the same Machine it crashes the worker. The code can be found here: https://gist.github.com/samos123/885f9fe87c8fa5abf78f This is the error message taken from STDERR of the worker log: https://gist.github.com/samos123/3300191684aee7fc8013 Would like pointers or tips on how to debug further? Would be nice to know the reason why the worker crashed. Thanks, Sam Stoelinga org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3.1 bundle does not build - unresolved dependency
I downloaded the 1.3.1 distro tarball $ll ../spark-1.3.1.tar.gz -rw-r-@ 1 steve staff 8500861 Apr 23 09:58 ../spark-1.3.1.tar.gz However the build on it is failing with an unresolved dependency: *configuration not public* $ build/sbt assembly -Dhadoop.version=2.5.2 -Pyarn -Phadoop-2.4 [error] (network-shuffle/*:update) sbt.ResolveException: *unresolved dependency: *org.apache.spark#spark-network-common_2.10;1.3.1: *configuration not public* in org.apache.spark#spark-network-common_2.10;1.3.1: 'test'. It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.1 test Is there a known workaround for this? thanks
How to monitor Spark Streaming from Kafka?
Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark updateStateByKey fails with class leak when using case classes - resend
Interesting, only in local[*]! In the github you pointed to, what is the main that you were running. TD On Mon, May 25, 2015 at 9:23 AM, rsearle eggsea...@verizon.net wrote: Further experimentation indicates these problems only occur when master is local[*]. There are no issues if a standalone cluster is used. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-updateStateByKey-fails-with-class-leak-when-using-case-classes-resend-tp22793p23020.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
Hi Deepak, This is a notorious bug that is being tracked at https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one source of this bug (it turns out Snappy had a bug in buffer reuse that caused data corruption). There are other known sources that are being addressed in outstanding patches currently. Since you're using 1.3.1 my guess is that you don't have this patch: https://github.com/apache/spark/pull/6176, which I believe should fix the issue in your case. It's merged for 1.3.2 (not yet released) but not in time for 1.3.1, so feel free to patch it yourself and see if it works. -Andrew 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Any suggestions ? I using Spark 1.3.1 to read sequence file stored in Sequence File format (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? ) with this code and settings sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new org.apache.spark.HashPartitioner(2053)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) //.set(spark.akka.askTimeout, arguments.get(askTimeout).get) //.set(spark.akka.timeout, arguments.get(akkaTimeout).get) //.set(spark.worker.timeout, arguments.get(workerTimeout).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) and values are buffersize=128 maxbuffersize=1068 maxResultSize=200G And i see this exception in each executor task FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com, 54757), shuffleId=6, mapId=2810, reduceId=1117, message= org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)* at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300) at
Re: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
If you can't run a patched Spark version, then you could also consider using LZF compression instead, since that codec isn't affected by this bug. On Mon, Jun 1, 2015 at 3:32 PM, Andrew Or and...@databricks.com wrote: Hi Deepak, This is a notorious bug that is being tracked at https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one source of this bug (it turns out Snappy had a bug in buffer reuse that caused data corruption). There are other known sources that are being addressed in outstanding patches currently. Since you're using 1.3.1 my guess is that you don't have this patch: https://github.com/apache/spark/pull/6176, which I believe should fix the issue in your case. It's merged for 1.3.2 (not yet released) but not in time for 1.3.1, so feel free to patch it yourself and see if it works. -Andrew 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Any suggestions ? I using Spark 1.3.1 to read sequence file stored in Sequence File format (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? ) with this code and settings sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new org.apache.spark.HashPartitioner(2053)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) //.set(spark.akka.askTimeout, arguments.get(askTimeout).get) //.set(spark.akka.timeout, arguments.get(akkaTimeout).get) //.set(spark.worker.timeout, arguments.get(workerTimeout).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) and values are buffersize=128 maxbuffersize=1068 maxResultSize=200G And i see this exception in each executor task FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com, 54757), shuffleId=6, mapId=2810, reduceId=1117, message= org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)* at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at
Re: How to monitor Spark Streaming from Kafka?
KafkaCluster.scala in the spark/extrernal/kafka project has a bunch of api code, including code for updating Kafka-managed ZK offsets. Look at setConsumerOffsets. Unfortunately all of that code is private, but you can either write your own, copy it, or do what I do (sed out private[spark] and rebuild spark...) On Mon, Jun 1, 2015 at 4:51 PM, Tathagata Das t...@databricks.com wrote: In the receiver-less direct approach, there is no concept of consumer group as we dont use the Kafka High Level consumer (that uses ZK). Instead Spark Streaming manages offsets on its own, giving tighter guarantees. If you want to monitor the progress of the processing of offsets, you will have to update ZK yourself. With the code snippet you posted, you can get the range of offsets that were processed in each batch, and accordingly update Zookeeper using some consumer group name of your choice. TD On Mon, Jun 1, 2015 at 2:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to monitor Spark Streaming from Kafka?
In the receiver-less direct approach, there is no concept of consumer group as we dont use the Kafka High Level consumer (that uses ZK). Instead Spark Streaming manages offsets on its own, giving tighter guarantees. If you want to monitor the progress of the processing of offsets, you will have to update ZK yourself. With the code snippet you posted, you can get the range of offsets that were processed in each batch, and accordingly update Zookeeper using some consumer group name of your choice. TD On Mon, Jun 1, 2015 at 2:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.1 On Mesos Issues.
It would be nice to see the code for MapR FS Java API, but my google foo failed me (assuming it's open source)... So, shooting in the dark ;) there are a few things I would check, if you haven't already: 1. Could there be 1.2 versions of some Spark jars that get picked up at run time (but apparently not in local mode) on one or more nodes? (Side question: Does your node experiment fail on all nodes?) Put another way, are the classpaths good for all JVM tasks? 2. Can you use just MapR and Spark 1.3.1 successfully, bypassing Mesos? Incidentally, how are you combining Mesos and MapR? Are you running Spark in Mesos, but accessing data in MapR-FS? Perhaps the MapR shim library doesn't support Spark 1.3.1. HTH, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Jun 1, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote: All - I am facing and odd issue and I am not really sure where to go for support at this point. I am running MapR which complicates things as it relates to Mesos, however this HAS worked in the past with no issues so I am stumped here. So for starters, here is what I am trying to run. This is a simple show tables using the Hive Context: from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext, Row, HiveContext sparkhc = HiveContext(sc) test = sparkhc.sql(show tables) for r in test.collect(): print r When I run it on 1.3.1 using ./bin/pyspark --master local This works with no issues. When I run it using Mesos with all the settings configured (as they had worked in the past) I get lost tasks and when I zoom in them, the error that is being reported is below. Basically it's a NullPointerException on the com.mapr.fs.ShimLoader. What's weird to me is is I took each instance and compared both together, the class path, everything is exactly the same. Yet running in local mode works, and running in mesos fails. Also of note, when the task is scheduled to run on the same node as when I run locally, that fails too! (Baffling). Ok, for comparison, how I configured Mesos was to download the mapr4 package from spark.apache.org. Using the exact same configuration file (except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0. When I run this example with the mapr4 for 1.2.0 there is no issue in Mesos, everything runs as intended. Using the same package for 1.3.1 then it fails. (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as well). So basically When I used 1.2.0 and followed a set of steps, it worked on Mesos and 1.3.1 fails. Since this is a current version of Spark, MapR is supports 1.2.1 only. (Still working on that). I guess I am at a loss right now on why this would be happening, any pointers on where I could look or what I could tweak would be greatly appreciated. Additionally, if there is something I could specifically draw to the attention of MapR on this problem please let me know, I am perplexed on the change from 1.2.0 to 1.3.1. Thank you, John Full Error on 1.3.1 on Mesos: 15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity 1060.3 MB java.lang.NullPointerException at com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847) at org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141) at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98) at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1959) at org.apache.spark.storage.BlockManager.(BlockManager.scala:104) at org.apache.spark.storage.BlockManager.(BlockManager.scala:179) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:310) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:186) at org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:70)
Re: How to monitor Spark Streaming from Kafka?
I think you can use SPM - http://sematext.com/spm - it will give you all Spark and all Kafka metrics, including offsets broken down by topic, etc. out of the box. I see more and more people using it to monitor various components in data processing pipelines, a la http://blog.sematext.com/2015/04/22/monitoring-stream-processing-tools-cassandra-kafka-and-spark/ Otis On Mon, Jun 1, 2015 at 5:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to monitor Spark Streaming from Kafka?
Thank you, Tathagata, Cody, Otis. - Dmitry On Mon, Jun 1, 2015 at 6:57 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: I think you can use SPM - http://sematext.com/spm - it will give you all Spark and all Kafka metrics, including offsets broken down by topic, etc. out of the box. I see more and more people using it to monitor various components in data processing pipelines, a la http://blog.sematext.com/2015/04/22/monitoring-stream-processing-tools-cassandra-kafka-and-spark/ Otis On Mon, Jun 1, 2015 at 5:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HDFS Rest Service not available
Hello All, A bit scared I did something stupid...I killed a few PIDs that were listening to ports 2183 (kafka), 4042 (spark app), some of the PIDs didn't even seem to be stopped as they still are running when i do lsof -i:[port number] I'm not sure if the problem started after or before I did these kill commands, but I now can't connect to HDFS or start spark. I can't seem to access Hue. I am afraid I accidentally killed an important process related to HDFS. But, I am not sure what it would be as I couldn't even kill the PIDs. Is it a coincidence that HDFS failed randomly? Likely that I killed an important PID? How can I maybe restart HDFS? Thanks a lot! Error on Hue: Cannot access: /user/ec2-user. The HDFS REST service is not available. Note: You are a Hue admin but not a HDFS superuser (which is hdfs). HTTPConnectionPool(host='ec2-ip-address.us-west-1.compute.amazonaws.com', port=50070): Max retries exceeded with url: /webhdfs/v1/user/ec2-user?op=GETFILESTATUSuser.name=huedoas=ec2-user (Caused by class 'socket.error': [Errno 111] Connection refused) Error when I try to open spark-shell or a spark app: java.net.ConnectException: Call From ip-10-0-2-216.us-west-1.compute.internal/10.0.2.216 to ip-10-0-2-216.us-west-1.compute.internal:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783) at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730) at org.apache.hadoop.ipc.Client.call(Client.java:1415) at org.apache.hadoop.ipc.Client.call(Client.java:1364) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy20.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy21.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1921) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1089) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1085) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1085) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:353) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at
Building Spark for Hadoop 2.6.0
Does this build Spark for hadoop version 2.6.0? build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package Thanks!
Re: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
Hello Josh, Are you suggesting to store the source data in LZF compression and use the same Spark code as is ? Currently its stored in sequence file format and compressed with GZIP. First line of the data: (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text' org.apache.hadoop.io.compress.GzipCodec?v? ) Regards, Deepak On Tue, Jun 2, 2015 at 4:16 AM, Josh Rosen rosenvi...@gmail.com wrote: If you can't run a patched Spark version, then you could also consider using LZF compression instead, since that codec isn't affected by this bug. On Mon, Jun 1, 2015 at 3:32 PM, Andrew Or and...@databricks.com wrote: Hi Deepak, This is a notorious bug that is being tracked at https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one source of this bug (it turns out Snappy had a bug in buffer reuse that caused data corruption). There are other known sources that are being addressed in outstanding patches currently. Since you're using 1.3.1 my guess is that you don't have this patch: https://github.com/apache/spark/pull/6176, which I believe should fix the issue in your case. It's merged for 1.3.2 (not yet released) but not in time for 1.3.1, so feel free to patch it yourself and see if it works. -Andrew 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Any suggestions ? I using Spark 1.3.1 to read sequence file stored in Sequence File format (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? ) with this code and settings sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new org.apache.spark.HashPartitioner(2053)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) //.set(spark.akka.askTimeout, arguments.get(askTimeout).get) //.set(spark.akka.timeout, arguments.get(akkaTimeout).get) //.set(spark.worker.timeout, arguments.get(workerTimeout).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) and values are buffersize=128 maxbuffersize=1068 maxResultSize=200G And i see this exception in each executor task FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com, 54757), shuffleId=6, mapId=2810, reduceId=1117, message= org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)* at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160) at
Re: GroupBy on RDD returns empty collection
I just ran the same app with limited data on my personal machine - no error. Seems to be a mesos issue. Will investigate further. If anyone knows anything, let me know :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GroupBy-on-RDD-returns-empty-collection-tp23105p23107.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best strategy for Pandas - Spark
The second one sounds reasonable, I think. On Thu, Apr 30, 2015 at 1:42 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Let's assume I have a complex workflow of more than 10 datasources as input - 20 computations (some creating intermediary datasets and some merging everything for the final computation) - some taking on average 1 minute to complete and some taking more than 30 minutes. What would be for you the best strategy to port this to Apache Spark ? Transform the whole flow into a Spark Job (PySpark or Scala) Transform only part of the flow (the heavy lifting ~30 min parts) using the same language (PySpark) Transform only part of the flow and pipe the rest from Scala to Python Regards, Olivier. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: deos randomSplit return a copy or a reference to the original rdd? [Python]
No, all of the RDDs (including those returned from randomSplit()) are read-only. On Mon, Apr 27, 2015 at 11:28 AM, Pagliari, Roberto rpagli...@appcomsci.com wrote: Suppose I have something like the code below for idx in xrange(0, 10): train_test_split = training.randomSplit(weights=[0.75, 0.25]) train_cv = train_test_split[0] test_cv = train_test_split[1] # scale train_cv and test_cv by scaling train_cv and test_cv, will the original data be affected? Thanks, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3.0: how to let Spark history load old records?
When I start the Spark master process, the old records are not shown in the monitoring UI. How to show the old records? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Building Spark for Hadoop 2.6.0
Looks good. -Dhadoop.version is not needed because the profile already defines it. profile idhadoop-2.6/id properties hadoop.version2.6.0/hadoop.version On Mon, Jun 1, 2015 at 5:51 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: Does this build Spark for hadoop version 2.6.0? build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package Thanks!