Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds
Hi, On Thu, Jan 29, 2015 at 9:52 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Thu, Jan 29, 2015 at 1:54 AM, YaoPau jonrgr...@gmail.com wrote: My thinking is to maintain state in an RDD and update it an persist it with each 2-second pass, but this also seems like it could get messy. Any thoughts or examples that might help me? I have just implemented some timestamp-based windowing on DStreams (can't share the code now, but will be published a couple of months ahead), although with the assumption that items are in correct order. The main challenge (rather technical) was to keep proper state across RDD boundaries and to tell the state you can mark this partial window from the last interval as 'complete' now without shuffling too much data around. For example, if there are some empty intervals, you don't know when the next item to go into the partial window will arrive, or if there will be one at all. I guess if you want to have out-of-order tolerance, that will become even trickier, as you need to define and think about some timeout for partial windows in your state... Sorry, it took ages to get the code published, and I am not involved in that project any more and cannot provide a lot of support, but if you are interested in the code, here it is: https://github.com/jubatus/jubaql-server/blob/master/processor/src/main/scala/us/jubat/jubaql_server/processor/SlidingWindow.scala The usage can be seen in https://github.com/jubatus/jubaql-server/blob/master/processor/src/test/scala/us/jubat/jubaql_server/processor/SlidingStreamSpec.scala and it basically boils down to val inputStream: DStream[(Long, T)] = ssc.queueStream(itemsQueue, oneAtATime = true) val windowStream: DStream[(Long, (Long, T))] = SlidingWindow.byTimestamp(inputStream, length, step) where the first Long in the result is a window ID and the second Long is the original timestamp, so now you can use any of the groupByKey, reduceByKey etc. functions. Hope this helps anyone, Tobias
Re: Spark shell never leaves ACCEPTED state in YARN CDH5
Hi, On Thu, Mar 26, 2015 at 4:08 AM, Khandeshi, Ami ami.khande...@fmr.com.invalid wrote: I am seeing the same behavior. I have enough resources….. CPU *and* memory are sufficient? No previous (unfinished) jobs eating them? Tobias
Re: SchemaRDD: SQL Queries vs Language Integrated Queries
Hi, On Wed, Mar 11, 2015 at 11:05 PM, Cesar Flores ces...@gmail.com wrote: Thanks for both answers. One final question. *This registerTempTable is not an extra process that the SQL queries need to do that may decrease performance over the language integrated method calls? * As far as I know, registerTempTable is just a Map[String, SchemaRDD] insertion, nothing that would be measurable. But there are no distributed/RDD operations involved, I think. Tobias
Re: Timed out while stopping the job generator plus subsequent failures
Sean, On Wed, Mar 11, 2015 at 7:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: it seems like I am unable to shut down my StreamingContext properly, both in local[n] and yarn-cluster mode. In addition, (only) in yarn-cluster mode, subsequent use of a new StreamingContext will raise an InvalidActorNameException. I was wondering if this is related to your question on spark-dev http://tinyurl.com/q5cd5px Did you get any additional insight into this issue? In my case the processing of the first batch completes, but I don't know if there is anything wrong with the checkpoints? When I look to the corresponding checkpoint directory in HDFS, it doesn't seem like all state RDDs are persisted there, just a subset. Any ideas? Thanks Tobias
Re: SQL with Spark Streaming
Hi, On Thu, Mar 12, 2015 at 12:08 AM, Huang, Jie jie.hu...@intel.com wrote: According to my understanding, your approach is to register a series of tables by using transformWith, right? And then, you can get a new Dstream (i.e., SchemaDstream), which consists of lots of SchemaRDDs. Yep, it's basically the following: case class SchemaDStream(sqlc: SQLContext, dataStream: DStream[Row], schemaStream: DStream[StructType]) { def registerStreamAsTable(name: String): Unit = { foreachRDD(_.registerTempTable(name)) } def foreachRDD(func: SchemaRDD = Unit): Unit = { def executeFunction(dataRDD: RDD[Row], schemaRDD: RDD[StructType]): RDD[Unit] = { val schema: StructType = schemaRDD.collect.head val dataWithSchema: SchemaRDD = sqlc.applySchema(dataRDD, schema) val result = func(dataWithSchema) schemaRDD.map(x = result) // return RDD[Unit] } dataStream.transformWith(schemaStream, executeFunction _).foreachRDD(_.count()) } } In a similar way you could add a `transform(func: SchemaRDD = SchemaRDD)` method. But as I said, I am not sure about performance. Tobias
Timed out while stopping the job generator plus subsequent failures
Hi, it seems like I am unable to shut down my StreamingContext properly, both in local[n] and yarn-cluster mode. In addition, (only) in yarn-cluster mode, subsequent use of a new StreamingContext will raise an InvalidActorNameException. I use logger.info(stoppingStreamingContext) staticStreamingContext.stop(stopSparkContext=false, stopGracefully=true) logger.debug(done) and have in my output logs 19:16:47.708 [ForkJoinPool-2-worker-11] INFO stopping StreamingContext [... output from other threads ...] 19:17:07.729 [ForkJoinPool-2-worker-11] WARN scheduler.JobGenerator - Timed out while stopping the job generator (timeout = 2) 19:17:07.739 [ForkJoinPool-2-worker-11] DEBUG done The processing itself is complete, i.e., the batch currently processed at the time of stop() is finished and no further batches are processed. However, something keeps the streaming context from stopping properly. In local[n] mode, this is not actually a problem (other than I have to wait 20 seconds for shutdown), but in yarn-cluster mode, I get an error akka.actor.InvalidActorNameException: actor name [JobGenerator] is not unique! when I start a (newly created) StreamingContext, and I was wondering what * is the issue with stop() * is the difference between local[n] and yarn-cluster mode. Some possible reasons: * On my executors, I use a networking library that depends on netty and doesn't properly shut down the event loop. (That has not been a problem in the past, though.) * I have a non-empty state (from using updateStateByKey()) that is checkpointed to /tmp/spark (in local mode) and hdfs:///tmp/spark (in yarn-cluster) mode, could that be an issue? (In fact, I have not seen this error in any non-stateful stream applications before.) Any help much appreciated! Thanks Tobias
Re: Timed out while stopping the job generator plus subsequent failures
Hi, I discovered what caused my issue when running on YARN and was able to work around it. On Wed, Mar 11, 2015 at 7:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: The processing itself is complete, i.e., the batch currently processed at the time of stop() is finished and no further batches are processed. However, something keeps the streaming context from stopping properly. In local[n] mode, this is not actually a problem (other than I have to wait 20 seconds for shutdown), but in yarn-cluster mode, I get an error akka.actor.InvalidActorNameException: actor name [JobGenerator] is not unique! It seems that not all checkpointed RDDs are cleaned (metadata cleared, checkpoint directories deleted etc.?) at the time when the streamingContext is stopped, but only afterwards. In particular, when I add `Thread.sleep(5000)` after my streamingContext.stop() call, then it works and I can start a different streamingContext afterwards. This is pretty ugly, so does anyone know a method to poll whether it's safe to continue or whether there are still some RDDs waiting to be cleaned up? Thanks Tobias
Re: SchemaRDD: SQL Queries vs Language Integrated Queries
Hi, On Tue, Mar 10, 2015 at 2:13 PM, Cesar Flores ces...@gmail.com wrote: I am new to the SchemaRDD class, and I am trying to decide in using SQL queries or Language Integrated Queries ( https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD ). Can someone tell me what is the main difference between the two approaches, besides using different syntax? Are they interchangeable? Which one has better performance? One difference is that the language integrated queries are method calls on the SchemaRDD you want to work on, which requires you have access to the object at hand. The SQL queries are passed to a method of the SQLContext and you have to call registerTempTable() on the SchemaRDD you want to use beforehand, which can basically happen at an arbitrary location of your program. (I don't know if I could express what I wanted to say.) That may have an influence on how you design your program and how the different parts work together. Tobias
Re: SQL with Spark Streaming
Hi, On Wed, Mar 11, 2015 at 9:33 AM, Cheng, Hao hao.ch...@intel.com wrote: Intel has a prototype for doing this, SaiSai and Jason are the authors. Probably you can ask them for some materials. The github repository is here: https://github.com/intel-spark/stream-sql Also, what I did is writing a wrapper class SchemaDStream that internally holds a DStream[Row] and a DStream[StructType] (the latter having just one element in every RDD) and then allows to do - operations SchemaRDD = SchemaRDD using `rowStream.transformWith(schemaStream, ...)` - in particular you can register this stream's data as a table this way - and via a companion object with a method `fromSQL(sql: String): SchemaDStream` you can get a new stream from previously registered tables. However, you are limited to batch-internal operations, i.e., you can't aggregate across batches. I am not able to share the code at the moment, but will within the next months. It is not very advanced code, though, and should be easy to replicate. Also, I have no idea about the performance of transformWith Tobias
Re: Spark with data on NFS v HDFS
Hi, On Thu, Mar 5, 2015 at 10:58 PM, Ashish Mukherjee ashish.mukher...@gmail.com wrote: I understand Spark can be used with Hadoop or standalone. I have certain questions related to use of the correct FS for Spark data. What is the efficiency trade-off in feeding data to Spark from NFS v HDFS? As I understand it, one performance advantage of using HDFS is that the task will be computed at a cluster node that has data on its local disk already, so the tasks go to where the data is. In the case of NFS, all data must be downloaded from the file server(s) first, so there is no such thing as data locality. Tobias
Re: scala.Double vs java.lang.Double in RDD
Hi, On Thu, Mar 5, 2015 at 12:20 AM, Imran Rashid iras...@cloudera.com wrote: This doesn't involve spark at all, I think this is entirely an issue with how scala deals w/ primitives and boxing. Often it can hide the details for you, but IMO it just leads to far more confusing errors when things don't work out. The issue here is that your map has value type Any, which leads scala to leave it as a boxed java.lang.Double. I see, thank you very much for your explanation and the code examples! Helps very much! Thanks Tobias
scala.Double vs java.lang.Double in RDD
Hi, I have a function with signature def aggFun1(rdd: RDD[(Long, (Long, Double))]): RDD[(Long, Any)] and one with def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]): RDD[(_Key, Double)] where all Double classes involved are scala.Double classes (according to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type parameters _Key and _Index are inferred by the Scala compiler). Now I am writing a test as follows: val result: Map[Long, Any] = aggFun1(input).collect().toMap result.values.foreach(v = println(v.getClass)) result.values.foreach(_ shouldBe a[Double]) and I get the following output: class java.lang.Double class java.lang.Double [info] avg [info] - should compute the average *** FAILED *** [info] 1.75 was not an instance of double, but an instance of java.lang.Double So I am wondering about what magic is going on here. Are scala.Double values in RDDs automatically converted to java.lang.Doubles or am I just missing the implicit back-conversion etc.? Any help appreciated, Tobias
Re: Spark sql results can't be printed out to system console from spark streaming application
Hi, can you explain how you copied that into your *streaming* application? Like, how do you issue the SQL, what data do you operate on, how do you view the logs etc.? Tobias On Wed, Mar 4, 2015 at 8:55 AM, Cui Lin cui@hds.com wrote: Dear all, I found the below sample code can be printed out only in spark shell, but when I moved them into my spark streaming application, nothing can be printed out into system console. Can you explain why it happened? anything related to new spark context? Thanks a lot! val anotherPeopleRDD = sc_context.parallelize( {name:Yin,address:{city:Columbus,state:Ohio}} :: Nil) anotherPeopleRDD.toArray().foreach(line = System.out.println(line)) val jsonMessage = sqlContext.jsonRDD(anotherPeopleRDD) jsonMessage.toArray().foreach(line = System.out.println(line)) jsonMessage.registerTempTable(people) val test: SchemaRDD = sqlContext.sql(select count(*) from people) test.toArray().foreach(line = System.out.println(line)) Best regards, Cui Lin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue with yarn cluster - hangs in accepted state.
Hi, On Wed, Mar 4, 2015 at 6:20 AM, Zhan Zhang zzh...@hortonworks.com wrote: Do you have enough resource in your cluster? You can check your resource manager to see the usage. Yep, I can confirm that this is a very annoying issue. If there is not enough memory or VCPUs available, your app will just stay in ACCEPTED state until resources are available. You can have a look at https://github.com/jubatus/jubaql-docker/blob/master/hadoop/yarn-site.xml#L35 to see some settings that might help. Tobias
Re: Best practices for query creation in Spark SQL.
Hi, I think your chances for a satisfying answer would increase dramatically if you elaborated a bit more on what you actually want to know. (Holds for any of your last four questions about Spark SQL...) Tobias
Re: Dealing with 'smaller' data
Hi On Fri, Feb 27, 2015 at 10:50 AM, Gary Malouf malouf.g...@gmail.com wrote: The honest answer is that it is unclear to me at this point. I guess what I am really wondering is if there are cases where one would find it beneficial to use Spark against one or more RDBs? Well, RDBs are all about *storage*, while Spark is about *computation*. If you have a very expensive computation (that can be parallelized in some way), then you might want to use Spark, even though your data lives in an ordinary RDB. Think raytracing, where you do something for every pixel in the output image and you could get your scene description from a database, write the result to a database, but use Spark to do two minutes of calculation for every pixel in parallel (or so). Tobias
Re: Dealing with 'smaller' data
Gary, On Fri, Feb 27, 2015 at 8:40 AM, Gary Malouf malouf.g...@gmail.com wrote: I'm considering whether or not it is worth introducing Spark at my new company. The data is no-where near Hadoop size at this point (it sits in an RDS Postgres cluster). Will it ever become Hadoop size? Looking at the overhead of running even a simple Hadoop setup (securely and with good performance, given about 1e6 configuration parameters), I think it makes sense to stay in non-Hadoop mode as long as possible. People may disagree ;-) Tobias PS. You may also want to have a look at http://aadrake.com/command-line-tools-can-be-235x-faster-than-your-hadoop-cluster.html
Re: Dealing with 'smaller' data
On Fri, Feb 27, 2015 at 10:57 AM, Gary Malouf malouf.g...@gmail.com wrote: So when deciding whether to take on installing/configuring Spark, the size of the data does not automatically make that decision in your mind. You got me there ;-) Tobias
Re: Spark Streaming - Collecting RDDs into array in the driver program
Hi, On Thu, Feb 26, 2015 at 11:24 AM, Thanigai Vellore thanigai.vell...@gmail.com wrote: It appears that the function immediately returns even before the foreachrdd stage is executed. Is that possible? Sure, that's exactly what happens. foreachRDD() schedules a computation, it does not perform it. Maybe your streaming application would not ever terminate, but still the function needs to return, right? If you remove the toArray(), you will return a reference to the ArrayBuffer that will be appended to over time. You can then, in a different thread, check the contents of that ArrayBuffer as processing happens, or wait until processing ends. Tobias
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
Sean, thanks for your message! On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen so...@cloudera.com wrote: What I haven't investigated is whether you can enable checkpointing for the state in updateStateByKey separately from this mechanism, which is exactly your question. What happens if you set a checkpoint dir, but do *not* use StreamingContext.getOrCreate, but *do* call DStream.checkpoint? I didn't even use StreamingContext.getOrCreate(), just calling streamingContext.checkpoint(...) blew everything up. Well, blew up in the sense that actor.OneForOneStrategy will print the stack trace of the java.io.NotSerializableException every couple of seconds and something is not going right with execution (I think). BUT, indeed, just calling sparkContext.setCheckpointDir seems to be sufficient for updateStateByKey! Looking at what streamingContext.checkpoint() does, I don't get why ;-) and I am not sure that this is a robust solution, but in fact that seems to work! Thanks a lot, Tobias
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
Hi, On Tue, Feb 24, 2015 at 4:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: There are different kinds of checkpointing going on. updateStateByKey requires RDD checkpointing which can be enabled only by called sparkContext.setCheckpointDirectory. But that does not enable Spark Streaming driver checkpoints, which is necessary for recovering from driver failures. That is enabled only by streamingContext.checkpoint(...) which internally calls sparkContext.setCheckpointDirectory and also enables other stuff. I see, thank you very much! I'm happy to see I will not have to rewrite the entire application :-) Tobias
Re: Use Spark Streaming for Batch?
Hi, On Sat, Feb 21, 2015 at 1:05 AM, craigv craigvanderbo...@gmail.com wrote: /Might it be possible to perform large batches processing on HDFS time series data using Spark Streaming?/ 1.I understand that there is not currently an InputDStream that could do what's needed. I would have to create such a thing. 2. Time is a problem. I would have to use the timestamps on our events for any time-based logic and state management 3. The batch duration would become meaningless in this scenario. Could I just set it to something really small (say 1 second) and then let it fall behind, processing the data as quickly as it could? So, if it is not an issue for you if everything is processed in one batch, you can use streamingContext.textFileStream(hdfsDirectory). This will create a DStream that has a huge RDD with all data in the first batch and then empty batches afterwards. You can have small batch size, should not be a problem. An alternative would be to write some code that creates one RDD per file in your HDFS directory, create a Queue of those RDDs and then use streamingContext.queueStream(), possibly with the oneAtATime=true parameter (which will process only one RDD per batch). However, to do window computations etc with the timestamps embedded *in* your data will be a major effort, as in: You cannot use the existing windowing functionality from Spark Streaming. If you want to read more about that, there have been a number of discussions about that topic on this list; maybe you can look them up. Tobias
Re: SQL query over (Long, JSON string) tuples
Hi Ayoub, thanks for your mail! On Thu, Jan 29, 2015 at 6:23 PM, Ayoub benali.ayoub.i...@gmail.com wrote: SQLContext and hiveContext have a jsonRDD method which accept an RDD[String] where the string is a JSON String a returns a SchemaRDD, it extends RDD[Row] which the type you want. After words you should be able to do a join to keep your tuple. I'm afraid that's not so easy, because you can only join on a certain key, and the key is exactly what I have to drop in order to infer the schema. Thanks Tobias
SQL query over (Long, JSON string) tuples
Hi, I have data as RDD[(Long, String)], where the Long is a timestamp and the String is a JSON-encoded string. I want to infer the schema of the JSON and then do a SQL statement on the data (no aggregates, just column selection and UDF application), but still have the timestamp associated with each row of the result. I completely fail to see how that would be possible. Any suggestions? I can't even see how I would get an RDD[(Long, Row)] so that I *might* be able to add the timestamp to the row after schema inference. Is there *any* way other than string-manipulating the JSON string and adding the timestamp to it? Thanks Tobias
Re: spark challenge: zip with next???
Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null element but got an error message about zip requiring data sizes in each partition to match. Tobias
Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds
Hi, On Thu, Jan 29, 2015 at 1:54 AM, YaoPau jonrgr...@gmail.com wrote: My thinking is to maintain state in an RDD and update it an persist it with each 2-second pass, but this also seems like it could get messy. Any thoughts or examples that might help me? I have just implemented some timestamp-based windowing on DStreams (can't share the code now, but will be published a couple of months ahead), although with the assumption that items are in correct order. The main challenge (rather technical) was to keep proper state across RDD boundaries and to tell the state you can mark this partial window from the last interval as 'complete' now without shuffling too much data around. For example, if there are some empty intervals, you don't know when the next item to go into the partial window will arrive, or if there will be one at all. I guess if you want to have out-of-order tolerance, that will become even trickier, as you need to define and think about some timeout for partial windows in your state... Tobias
Error reporting/collecting for users
Hi, in my Spark Streaming application, computations depend on users' input in terms of * user-defined functions * computation rules * etc. that can throw exceptions in various cases (think: exception in UDF, division by zero, invalid access by key etc.). Now I am wondering about what is a good/reasonable way to deal with those errors. I think I want to continue processing (the whole stream processing pipeline should not die because of one single malformed item in the stream), i.e., catch the exception, but still I need a way to tell the user something went wrong. So how can I get the information that something went wrong back to the driver and what is a reasonable way to do that? While writing this, something like the following came into my mind: val errors = sc.accumulator(...) // of type List[Throwable] dstream.map(item = { Try { someUdf(item) } match { case Success(value) = value case Failure(err) = errors += err // remember error 0 // default value } }) Does this make sense? Thanks Tobias
Re: Error reporting/collecting for users
Hi, On Wed, Jan 28, 2015 at 1:45 PM, Soumitra Kumar kumar.soumi...@gmail.com wrote: It is a Streaming application, so how/when do you plan to access the accumulator on driver? Well... maybe there would be some user command or web interface showing the errors that have happened during processing...? Thanks Tobias
Re: Error reporting/collecting for users
Hi, thanks for your mail! On Wed, Jan 28, 2015 at 11:44 AM, Tathagata Das tathagata.das1...@gmail.com wrote: That seems reasonable to me. Are you having any problems doing it this way? Well, actually I haven't done that yet. The idea of using accumulators to collect errors just came while writing the email, but I thought I'd just keep writing and see if anyone has any other suggestions ;-) Thanks Tobias
Why must the dstream.foreachRDD(...) parameter be serializable?
Hi, I want to do something like dstream.foreachRDD(rdd = if (someCondition) ssc.stop()) so in particular the function does not touch any element in the RDD and runs completely within the driver. However, this fails with a NotSerializableException because $outer is not serializable etc. The DStream code says: def foreachRDD(foreachFunc: (RDD[T], Time) = Unit) { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } To be honest, I don't understand the comment. Why must that function be serializable even when there is no RDD action involved? Thanks Tobias
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
Hi, thanks for the answers! On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai saisai.s...@intel.com wrote: Also this `foreachFunc` is more like an action function of RDD, thinking of rdd.foreach(func), in which `func` need to be serializable. So maybe I think your way of use it is not a normal way :). Yeah I totally understand why func in rdd.foreach(func) must be serializable (because it's sent to the executors), but I didn't get why a function that's not shipped around must be serializable, too. The explanations made sense, though :-) Thanks Tobias
Re: Pairwise Processing of a List
Hi, On Mon, Jan 26, 2015 at 9:32 AM, Steve Nunez snu...@hortonworks.com wrote: I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: Are you saying you want all combinations (N^2) of distances? That should be possible with rdd.cartesian(): val points = sc.parallelize(List((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))) points.cartesian(points).collect -- Array[((Double, Double), (Double, Double))] = Array(((1.0,2.0),(1.0,2.0)), ((1.0,2.0),(3.0,4.0)), ((1.0,2.0),(5.0,6.0)), ((3.0,4.0),(1.0,2.0)), ((3.0,4.0),(3.0,4.0)), ((3.0,4.0),(5.0,6.0)), ((5.0,6.0),(1.0,2.0)), ((5.0,6.0),(3.0,4.0)), ((5.0,6.0),(5.0,6.0))) I guess this is a very expensive operation, though. Tobias
Re: spark streaming with checkpoint
Hi, On Tue, Jan 20, 2015 at 8:16 PM, balu.naren balu.na...@gmail.com wrote: I am a beginner to spark streaming. So have a basic doubt regarding checkpoints. My use case is to calculate the no of unique users by day. I am using reduce by key and window for this. Where my window duration is 24 hours and slide duration is 5 mins. Adding to what others said, this feels more like a task for run a Spark job every five minutes using cron than using the sliding window functionality from Spark Streaming. Tobias
Re: [SQL] Conflicts in inferred Json Schemas
Hi, On Thu, Jan 22, 2015 at 2:26 AM, Corey Nolet cjno...@gmail.com wrote: Let's say I have 2 formats for json objects in the same file schema1 = { location: 12345 My Lane } schema2 = { location:{houseAddres:1234 My Lane} } From my tests, it looks like the current inferSchema() function will end up with only StructField(location, StringType). In Spark SQL columns need to have a well-defined type (as in SQL in general). So inferring the schema requires that there is a schema, and I am afraid that there is not an easy way to achieve what you want in Spark SQL, as there is no data type covering both values you see. (I am pretty sure it can be done if you dive deep into the internals, add data types etc., though.) Tobias
Re: Serializability: for vs. while loops
Aaron, On Thu, Jan 15, 2015 at 5:05 PM, Aaron Davidson ilike...@gmail.com wrote: Scala for-loops are implemented as closures using anonymous inner classes which are instantiated once and invoked many times. This means, though, that the code inside the loop is actually sitting inside a class, which confuses Spark's Closure Cleaner, whose job is to remove unused references from closures to make otherwise-unserializable objects serializable. My understanding is, in particular, that the closure cleaner will null out unused fields in the closure, but cannot go past the first level of depth (i.e., it will not follow field references and null out *their *unused, and possibly unserializable, references), because this could end up mutating state outside of the closure itself. Thus, the extra level of depth of the closure that was introduced by the anonymous class (where presumably the outer this pointer is considered used by the closure cleaner) is sufficient to make it unserializable. Now, two weeks later, let me add that this is one of the most helpful comments I have received on this mailing list! This insight helped me save 90% of the time I spent with debugging NotSerializableExceptions. Thank you very much! Tobias
Re: Pairwise Processing of a List
Sean, On Mon, Jan 26, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Note that RDDs don't really guarantee anything about ordering though, so this only makes sense if you've already sorted some upstream RDD by a timestamp or sequence number. Speaking of order, is there some reading on guarantees and non-guarantees about order in RDDs? For example, when reading a file and doing zipWithIndex, can I assume that the lines are numbered in order? Does this hold for receiving data from Kafka, too? Tobias
Re: Closing over a var with changing value in Streaming application
Hi, On Wed, Jan 21, 2015 at 9:13 PM, Bob Tiernay btier...@hotmail.com wrote: Maybe I'm misunderstanding something here, but couldn't this be done with broadcast variables? I there is the following caveat from the docs: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later) Well, I think I need a modifiable state (modifiable = changes once per interval) that stores the number of total items seen so far in the lifetime of my application, and I need this number on each executor. Since this number changes after every interval processed, I think broadcast variables are probably not appropriate in this case. Thanks Tobias
Closing over a var with changing value in Streaming application
Hi, I am developing a Spark Streaming application where I want every item in my stream to be assigned a unique, strictly increasing Long. My input data already has RDD-local integers (from 0 to N-1) assigned, so I am doing the following: var totalNumberOfItems = 0L // update the keys of the stream data val globallyIndexedItems = inputStream.map(keyVal = (keyVal._1 + totalNumberOfItems, keyVal._2)) // increase the number of total seen items inputStream.foreachRDD(rdd = { totalNumberOfItems += rdd.count }) Now this works on my local[*] Spark instance, but I was wondering if this is actually an ok thing to do. I don't want this to break when going to a YARN cluster... The function increasing totalNumberOfItems is closing over a var and running in the driver, so I think this is ok. Here is my concern: What about the function in the inputStream.map(...) block? This one is closing over a var that has a different value in every interval. Will the closure be serialized with that new value in every interval? Or only once with the initial value and this will always be 0 during the runtime of the program? As I said, it works locally, but I was wondering if I can really assume that the closure is serialized with a new value in every interval. Thanks, Tobias
Re: Closing over a var with changing value in Streaming application
Hi, On Wed, Jan 21, 2015 at 4:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: How about using accumulators http://spark.apache.org/docs/1.2.0/programming-guide.html#accumulators? As far as I understand, they solve the part of the problem that I am not worried about, namely increasing the counter. I was more worried about getting that counter/accumulator value back to the executors. Thanks Tobias
Re: If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?
Hi, On Sat, Jan 17, 2015 at 3:37 AM, Peng Cheng pc...@uow.edu.au wrote: I'm talking about RDD1 (not persisted or checkpointed) in this situation: ...(somewhere) - RDD1 - RDD2 || V V RDD3 - RDD4 - Action! To my experience the change RDD1 get recalculated is volatile, sometimes once, sometimes twice. That should not happen if your access pattern to RDD2 and RDD3 is always the same. A related problem might be in $SQLContest.jsonRDD(), since the source jsonRDD is used twice (one for schema inferring, another for data read). It almost guarantees that the source jsonRDD is calculated twice. Has this problem be addressed so far? That's exactly why schema inference is expensive. However, I am afraid in general you have to make a decision between store or recompute (cf. http://en.wikipedia.org/wiki/Space%E2%80%93time_tradeoff). There is no way to avoid recomputation on each access except than storing the value, I guess. Tobias
Re: How to get the master URL at runtime inside driver program?
Hi, On Sun, Jan 18, 2015 at 11:08 AM, guxiaobo1982 guxiaobo1...@qq.com wrote: Driver programs submitted by the spark-submit script will get the runtime spark master URL, but how it get the URL inside the main method when creating the SparkConf object? The master will be stored in the spark.master property. I use the following snippet: // When run through spark-submit, the Java system property spark.master // will contain the master passed to spark-submit and we *must* use the // same; otherwise use local[3]. val master = scala.util.Properties.propOrElse(spark.master, local[3]) Tobias
Re: MatchError in JsonRDD.toLong
Hi, On Fri, Jan 16, 2015 at 6:14 PM, Wang, Daoyuan daoyuan.w...@intel.com wrote: The second parameter of jsonRDD is the sampling ratio when we infer schema. OK, I was aware of this, but I guess I understand the problem now. My sampling ratio is so low that I only see the Long values of data items and infer it's a Long. When I meet the data that's actually longer than Long, I get the error I posted; basically it's the same situation as when specifying a wrong schema manually. So is there any way around this other than increasing the sample ratio to discover also the very BigDecimal-sized numbers? Thanks Tobias
Re: Determine number of running executors
Hi, On Sat, Jan 17, 2015 at 3:05 AM, Shuai Zheng szheng.c...@gmail.com wrote: Can you share more information about how do you do that? I also have similar question about this. Not very proud about it ;-), but here you go: // find the number of workers available to us. val _runCmd = scala.util.Properties.propOrElse(sun.java.command, ) val numCoresRe = .*--executor-cores ([0-9]+) --num-executors ([0-9]+).*.r val totalNumCores = _runCmd match { case numCoresRe(coresPerExecutor, numExecutors) = coresPerExecutor.toInt * numExecutors.toInt case _ = 0 } if (totalNumCores 0) logger.debug(total number of cores: + totalNumCores) else logger.warn(could not extract number of cores from run command: + _runCmd) Tobias
Re: MatchError in JsonRDD.toLong
Hi again, On Fri, Jan 16, 2015 at 4:25 PM, Tobias Pfeiffer t...@preferred.jp wrote: Now I'm wondering where this comes from (I haven't touched this component in a while, nor upgraded Spark etc.) [...] So the reason that the error is showing up now is that suddenly data from a different dataset is showing up in my test dataset... don't ask me... anyway, this different dataset contains data like {Click:nonclicked, Impression:1, DisplayURL:4401798909506983219, AdId:21215341, ...} {Click:nonclicked, Impression:1, DisplayURL:14452800566866169008, AdId:10587781, ...} and the DisplayURL seems to be too long for Long, while it is still inferred as a Long column. So, what to do about this? Is jsonRDD inherently incapable of handling those long numbers or is it just an issue in the schema inference and I should file a JIRA issue? Thanks Tobias
Re: MatchError in JsonRDD.toLong
Hi, On Fri, Jan 16, 2015 at 5:55 PM, Wang, Daoyuan daoyuan.w...@intel.com wrote: Can you provide how you create the JsonRDD? This should be reproducible in the Spark shell: - import org.apache.spark.sql._ val sqlc = new SparkContext(sc) val rdd = sc.parallelize({Click:nonclicked, Impression:1, DisplayURL:4401798909506983219, AdId:21215341} :: {Click:nonclicked, Impression:1, DisplayURL:14452800566866169008, AdId:10587781} :: Nil) // works fine val json = sqlc.jsonRDD(rdd) json.registerTempTable(test) sqlc.sql(SELECT * FROM test).collect // - MatchError val json2 = sqlc.jsonRDD(rdd, 0.1) json2.registerTempTable(test2) sqlc.sql(SELECT * FROM test2).collect - I guess the issue in the latter case is that the column is inferred as Long when some rows actually are too big for Long... Thanks Tobias
Re: Testing if an RDD is empty?
Hi, On Fri, Jan 16, 2015 at 7:31 AM, freedafeng freedaf...@yahoo.com wrote: I myself saw many times that my app threw out exceptions because an empty RDD cannot be saved. This is not big issue, but annoying. Having a cheap solution testing if an RDD is empty would be nice if there is no such thing available now. I think the cheapest you can have is computing at least one element in the RDD, which in the case of, say, val maybeEmptyRDD = veryExpensiveRDD.filter(false) will be just as expensive as .count(). Tobias
MatchError in JsonRDD.toLong
Hi, I am experiencing a weird error that suddenly popped up in my unit tests. I have a couple of HDFS files in JSON format and my test is basically creating a JsonRDD and then issuing a very simple SQL query over it. This used to work fine, but now suddenly I get: 15:58:49.039 [Executor task launch worker-1] ERROR executor.Executor - Exception in task 1.0 in stage 29.0 (TID 117) scala.MatchError: 14452800566866169008 (of class java.math.BigInteger) at org.apache.spark.sql.json.JsonRDD$.toLong(JsonRDD.scala:282) at org.apache.spark.sql.json.JsonRDD$.enforceCorrectType(JsonRDD.scala:353) at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1$$anonfun$apply$12.apply(JsonRDD.scala:381) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1.apply(JsonRDD.scala:380) at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1.apply(JsonRDD.scala:365) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.sql.json.JsonRDD$.org$apache$spark$sql$json$JsonRDD$$asRow(JsonRDD.scala:365) at org.apache.spark.sql.json.JsonRDD$$anonfun$jsonStringToRow$1.apply(JsonRDD.scala:38) at org.apache.spark.sql.json.JsonRDD$$anonfun$jsonStringToRow$1.apply(JsonRDD.scala:38) ... java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) The stack trace contains none of my classes, so it's a bit hard to track down where this starts. The code of JsonRDD.toLong is in fact private def toLong(value: Any): Long = { value match { case value: java.lang.Integer = value.asInstanceOf[Int].toLong case value: java.lang.Long = value.asInstanceOf[Long] } } so if value is a BigInteger, toLong doesn't work. Now I'm wondering where this comes from (I haven't touched this component in a while, nor upgraded Spark etc.), but in particular I would like to know how to work around this. Thanks Tobias
Re: Serializability: for vs. while loops
Aaron, thanks for your mail! On Thu, Jan 15, 2015 at 5:05 PM, Aaron Davidson ilike...@gmail.com wrote: Scala for-loops are implemented as closures using anonymous inner classes [...] While loops, on the other hand, involve none of this trickery, and everyone is happy. Ah, I was suspecting something like that... thank you very much for the detailed explanation! Tobias
Re: *ByKey aggregations: performance + order
Sean, thanks for your message. On Wed, Jan 14, 2015 at 8:36 PM, Sean Owen so...@cloudera.com wrote: On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp wrote: OK, it seems like even on a local machine (with no network overhead), the groupByKey version is about 5 times slower than any of the other (reduceByKey, combineByKey etc.) functions... Even without network overhead, you're still paying the cost of setting up the shuffle and serialization. Can I pick an appropriate scheduler some time before so that Spark knows all items with the same key are on the same host? (Or enforce this?) Thanks Tobias
Re:
Hi, On Thu, Jan 15, 2015 at 12:23 AM, Ted Yu yuzhih...@gmail.com wrote: On Wed, Jan 14, 2015 at 6:58 AM, Jianguo Li flyingfromch...@gmail.com wrote: I am using Spark-1.1.1. When I used sbt test, I ran into the following exceptions. Any idea how to solve it? Thanks! I think somebody posted this question before, but no one seemed to have answered it. Could it be the version of io.netty I put in my build.sbt? I included an dependency libraryDependencies += io.netty % netty % 3.6.6.Final in my build.sbt file. From my personal experience, netty dependencies are very painful to get right with Spark. I recommend to look at the dependency tree using https://github.com/jrudolph/sbt-dependency-graph and then fine-tune your sbt ignores until it works. There are too many issues depending on what other packages you use to give a general advice, I'm afraid. And once you have them right and use `sbt assembly` to build your application jar and want to run it on a cluster with spark-submit, you'll find that the netty version bundled with Spark will be put on the classpath before the version you want to use. It seems like there are various Spark configuration options to change this, http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html and a unification process is running, I think: https://issues.apache.org/jira/browse/SPARK-2996 https://github.com/apache/spark/pull/3233 I'm also looking forward to this one, as I am stuck with an ancient version of Finagle due to these Netty issues. Good luck, Tobias
Serializability: for vs. while loops
Hi, sorry, I don't like questions about serializability myself, but still... Can anyone give me a hint why for (i - 0 to (maxId - 1)) { ... } throws a NotSerializableException in the loop body while var i = 0 while (i maxId) { // same code as in the for loop i += 1 } works fine? I guess there is something fundamentally different in the way Scala realizes for loops? Thanks Tobias
Re: quickly counting the number of rows in a partition?
Hi again, On Wed, Jan 14, 2015 at 10:06 AM, Tobias Pfeiffer t...@preferred.jp wrote: If you think of items.map(x = /* throw exception */).count() then even though the count you want to get does not necessarily require the evaluation of the function in map() (i.e., the number is the same), you may not want to get the count if that code actually fails. Sorry, I think that was a bit confusing. What I mean is: You have to compute the whole RDD in order to give a meaningful count() result (whether you use rdd.count() or the mapPartitions() approach). Tobias
Re: *ByKey aggregations: performance + order
Hi, On Wed, Jan 14, 2015 at 12:11 PM, Tobias Pfeiffer t...@preferred.jp wrote: Now I don't know (yet) if all of the functions I want to compute can be expressed in this way and I was wondering about *how much* more expensive we are talking about. OK, it seems like even on a local machine (with no network overhead), the groupByKey version is about 5 times slower than any of the other (reduceByKey, combineByKey etc.) functions... val rdd = sc.parallelize(1 to 500) val withKeys = rdd.zipWithIndex.map(kv = (kv._2/10, kv._1)) withKeys.cache() withKeys.count // around 850-1100 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.reduceByKey(_ + _).count() System.currentTimeMillis - start } // around 800-1100 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.combineByKey((x: Int) = x, (x: Int, y: Int) = x + y, (x: Int, y: Int) = x + y).count() System.currentTimeMillis - start } // around 1500-1900 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.foldByKey(0)(_ + _).count() System.currentTimeMillis - start } // around 1400-1800 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.aggregateByKey(0)(_ + _, _ + _).count() System.currentTimeMillis - start } // around 5500-6200 ms for (i - 1 to 5) yield { val start = System.currentTimeMillis withKeys.groupByKey().mapValues(_.reduceLeft(_ + _)).count() System.currentTimeMillis - start } Tobias
*ByKey aggregations: performance + order
Hi, I have an RDD[(Long, MyData)] where I want to compute various functions on lists of MyData items with the same key (this will in general be a rather short lists, around 10 items per key). Naturally I was thinking of groupByKey() but was a bit intimidated by the warning: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance. Now I don't know (yet) if all of the functions I want to compute can be expressed in this way and I was wondering about *how much* more expensive we are talking about. Say I have something like rdd.zipWithIndex.map(kv = (kv._2/10, kv._1)).groupByKey(), i.e. items that will be grouped will 99% live in the same partition (do they?), does this change the performance? Also, if my operations depend on the order in the original RDD (say, string concatenation), how could I make sure the order of the original RDD is respected? Thanks Tobias
Re: quickly counting the number of rows in a partition?
Hi, On Mon, Jan 12, 2015 at 8:09 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Use the mapPartitions function. It returns an iterator to each partition. Then just get that length by converting to an array. On Tue, Jan 13, 2015 at 2:50 PM, Kevin Burton bur...@spinn3r.com wrote: Doesn’t that just read in all the values? The count isn’t pre-computed? It’s not the end of the world if it’s not but would be faster. Well, converting to an array may not work due to memory constraints, counting the items in the iterator may be better. However, there is no pre-computed value. For counting, you need to compute all values in the RDD, in general. If you think of items.map(x = /* throw exception */).count() then even though the count you want to get does not necessarily require the evaluation of the function in map() (i.e., the number is the same), you may not want to get the count if that code actually fails. Tobias
Re: RDD Moving Average
Hi, On Wed, Jan 7, 2015 at 9:47 AM, Asim Jalis asimja...@gmail.com wrote: One approach I was considering was to use mapPartitions. It is straightforward to compute the moving average over a partition, except for near the end point. Does anyone see how to fix that? Well, I guess this is not a perfect use case for mapPartitions, in particular since you would have to implement the behavior near the beginning and end of a partition yourself. I would rather go with the high-level RDD functions that are partition-independent. By the way, I am now also trying to implement sliding windows based on count and embedded timestamp... seems like I should have had a look at rdd.sliding() before... Tobias
Re: Create DStream consisting of HDFS and (then) Kafka data
Hi, On Thu, Jan 8, 2015 at 2:19 PM, rekt...@voodoowarez.com wrote: dstream processing bulk HDFS data- is something I don't feel is super well socialized yet, fingers crossed that base gets built up a little more. Just out of interest (and hoping not to hijack my own thread), why are you not doing plain RDD processing when you are only processing HDFS data? What's the advantage of doing DStream? Thanks Tobias
Create DStream consisting of HDFS and (then) Kafka data
Hi, I have a setup where data from an external stream is piped into Kafka and also written to HDFS periodically for long-term storage. Now I am trying to build an application that will first process the HDFS files and then switch to Kafka, continuing with the first item that was not yet in HDFS. (The items have an increasing timestamp that I can use to find the first item not yet processed.) I am wondering what an elegant method to provide a unified view of the data would be. Currently, I am using two StreamingContexts one after another: - start one StreamingContext A and process all data found in HDFS (updating the largest seen timestamp in an accumulator), stopping when there was an RDD with 0 items in it, - stop that StreamingContext A, - start a new StreamingContext B and process the Kafka stream (filtering out all items with a timestamp smaller than the value in the accumulator), - stop when the user requests it. This *works* as it is now, but I was planning to add sliding windows (based on item counts or the timestamps in the data), which will get unmanageably complicated when I have a window spanning data in both HDFS and Kafka, I guess. Therefore I would like to have a single DStream that is fed with first HDFS and then Kafka data. Does anyone have a suggestion on how to realize that (with as few copying of private[spark] classes as possible)? I guess one issue is that the Kafka processing requires a receiver and therefore needs to be treated quite a bit differently than HDFS? Thanks Tobias
Re: I think I am almost lost in the internals of Spark
Hi, On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote: I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. I recommend understanding what an RDD is and how it is processed, using http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds and probably also http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (once the server is back). Understanding how an RDD is processed is probably most helpful to understand the whole of Spark. Tobias
Re: How to replace user.id to user.names in a file
Hi, it looks to me as if you need the whole user database on every node, so maybe put the id-name information as a Map[Id, String] in a broadcast variable and then do something like recommendations.map(line = { line.map(uid = usernames(uid)) }) or so? Tobias
Re: How to replace user.id to user.names in a file
Hi, On Wed, Jan 7, 2015 at 10:47 AM, Riginos Samaras samarasrigi...@gmail.com wrote: Yes something like this. Can you please give me an example to create a Map? That depends heavily on the shape of your input file. What about something like: (for (line - Source.fromFile(filename).getLines()) { val items = line.trim.split( ) (items(0).toInt, items(1)) }).toMap Tobias
Re: How to replace user.id to user.names in a file
Hi, On Wed, Jan 7, 2015 at 11:13 AM, Riginos Samaras samarasrigi...@gmail.com wrote: exactly thats what I'm looking for, my code is like this: //code val users_map = users_file.map{ s = val parts = s.split(,) (parts(0).toInt, parts(1)) }.distinct //code but i get the error: error: value toMap is not a member of org.apache.spark.rdd.RDD[(Int, String)] user_map.toMap If you want to distribute the Map as a broadcast variable, it must not be an RDD but a normal Scala map. Make your users_file a regular List, then it should work. Tobias
Re: Add StructType column to SchemaRDD
Hi Michael, On Tue, Jan 6, 2015 at 3:43 PM, Michael Armbrust mich...@databricks.com wrote: Oh sorry, I'm rereading your email more carefully. Its only because you have some setup code that you want to amortize? Yes, exactly that. Concerning the docs, I'd be happy to contribute, but I don't really understand what is happening here and why ;-) Thanks Tobias
Add StructType column to SchemaRDD
Hi, I have a SchemaRDD where I want to add a column with a value that is computed from the rest of the row. As the computation involves a network operation and requires setup code, I can't use SELECT *, myUDF(*) FROM rdd, but I wanted to use a combination of: - get schema of input SchemaRDD - issue a mapPartitions call (including the setup code), obtaining a new RDD[Row] - extend the schema manually - create a new RDD by combining the RDD[Row] with the extended schema. This works very well, but I run into trouble querying that resulting SchemaRDD with SQL if: - the result of my computation is a case class - and I want to use values in this case class in the SQL query. In particular, while SELECT column FROM resultrdd works well, SELECT column.key_name FROM resultrdd gives a java.lang.ClassCastException: example.MyCaseClass cannot be cast to org.apache.spark.sql.catalyst.expressions.Row Here is an example to illustrate that: --- import org.apache.spark._import org.apache.spark.sql._import org.apache.spark.sql.catalyst.types._ val sc = new SparkContext(local[3], Test) val sqlc = new SQLContext(sc)import sqlc._ // this is the case class that my operation is returningcase class Result(string_values: Map[String, String], num_values: Map[String, Double])// dummy result dataval data = (Result(Map(team - a), Map(score - 0.8)) :: Result(Map(team - b), Map(score - 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my computation by creating an RDD[Row] and creating// a schema programmaticallyval rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil))val progSchema = StructType(StructField(hello, IntegerType, false) :: StructField(newcol, rdd.schema, true) :: Nil)val progRdd = sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable(progrdd)// the following call will *fail* with a ClassCastExceptionsqlc.sql(SELECT newcol.string_values['team'] FROM progrdd).foreach(println)// however, the schema I specified is correct. see how embedding// my result in a proper case class works:case class ResultContainer(hello: Int, newcol: Result)val caseClassRdd = rdd.map(dr = ResultContainer(7, dr))caseClassRdd.registerTempTable(caseclassrdd)// the following call will *work*sqlc.sql(SELECT newcol.string_values['team'] FROM caseclassrdd).foreach(println)// even though the schema for both RDDs is the same:progRdd.schema == caseClassRdd.schema --- It turns out that I cannot use the case class directly, but I have to convert it to a Row as well. That is, instead of val rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil)) I have to use val rowRdd = rdd.map(dr = Row.fromSeq(7 :: Row.fromSeq(dr.productIterator.toSeq) :: Nil)) and then, I can use SELECT newcol.string_values['team'] FROM progrdd So now I found that out and I'm happy that it works, but it was quite hard to track it down, so I was wondering if this is the most intuitive way to add a column to a SchemaRDD using mapPartitions (as opposed to using a UDF, where the conversion case class - Row seems to happen automatically). Or, even if there is no more intuitive way, just wanted to have this documented ;-) Thanks Tobias
Re: unable to do group by with 1st column
Hi, On Fri, Dec 26, 2014 at 5:22 AM, Amit Behera amit.bd...@gmail.com wrote: How can I do it? Please help me to do. Have you considered using groupByKey? http://spark.apache.org/docs/latest/programming-guide.html#transformations Tobias
Re: serialization issue with mapPartitions
Hi, On Fri, Dec 26, 2014 at 1:32 AM, ey-chih chow eyc...@hotmail.com wrote: I got some issues with mapPartitions with the following piece of code: val sessions = sc .newAPIHadoopFile( ... path to an avro file ..., classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]], classOf[AvroKey[ByteBuffer]], classOf[NullWritable], job.getConfiguration()) .mapPartitions { valueIterator = val config = job.getConfiguration() . } .collect() Why job.getConfiguration() in the function mapPartitions will generate the following message? Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job The functions inside mapPartitions() will be executed on the Spark executors, not the Spark driver. Therefore, the function body needs to be serialized and sent to the executors via network. If that is not possible (in your case, `job` cannot be serialized), you will get a NotSerializableException. It works inside newAPIHadoopFile because this is executed on the driver. Tobias
Re: Discourse: A proposed alternative to the Spark User list
Nick, uh, I would have expected a rather heated discussion, but the opposite seems to be the case ;-) Independent of my personal preferences w.r.t. usability, habits etc., I think it is not good for a software/tool/framework if questions and discussions are spread over too many places. I guess everyone of us knows an example of where this makes/has made it very hard for newcomers to get started ;-) As it is now, I think the mailing list has somewhat of an official touch, while Stack Overflow is, well, Stack Overflow ;-) To introduce another discussion platform next to the mailing list (your proposal (2.)) would increase confusion, the number of double-postings and, as you said, effectively fork the community. Your proposal (1.) sounds attractive, but I highly doubt that the user experience can match people's expectations towards the pure solution on either the mailing list or Discourse, given the rather different discussion styles. Having said that, I totally agree to the points you mentioned; even just linking to a thread where a question has been discussed before is very time-consuming and I would be happy to use a platform where all those points are addressed. Stack Overflow seems to provide that, too, and except for the broader range of discussions you mentioned, I don't see the benefit of using Discourse over Stack Overflow. So personally, I would suggest to go with (3.) and encourage SO as a platform for questions that are ok to be asked there and try to reduce/focus mailing list communication for everything else. (Note that this is pretty much the same state as now plus encouraging people in an unspecified way, which means that maybe nothing changes at all.) Just my 2 cent, Tobias On Wed Dec 24 2014 at 21:50:48 Nick Chammas nicholas.cham...@gmail.com wrote: When people have questions about Spark, there are 2 main places (as far as I can tell) where they ask them: - Stack Overflow, under the apache-spark tag http://stackoverflow.com/questions/tagged/apache-spark - This mailing list The mailing list is valuable as an independent place for discussion that is part of the Spark project itself. Furthermore, it allows for a broader range of discussions than would be allowed on Stack Overflow http://stackoverflow.com/help/dont-ask. As the Spark project has grown in popularity, I see that a few problems have emerged with this mailing list: - It’s hard to follow topics (e.g. Streaming vs. SQL) that you’re interested in, and it’s hard to know when someone has mentioned you specifically. - It’s hard to search for existing threads and link information across disparate threads. - It’s hard to format code and log snippets nicely, and by extension, hard to read other people’s posts with this kind of information. There are existing solutions to all these (and other) problems based around straight-up discipline or client-side tooling, which users have to conjure up for themselves. I’d like us as a community to consider using Discourse http://www.discourse.org/ as an alternative to, or overlay on top of, this mailing list, that provides better out-of-the-box solutions to these problems. Discourse is a modern discussion platform built by some of the same people who created Stack Overflow. It has many neat features http://v1.discourse.org/about/ that I believe this community would benefit from. For example: - When a user starts typing up a new post, they get a panel *showing existing conversations that look similar*, just like on Stack Overflow. - It’s easy to search for posts and link between them. - *Markdown support* is built-in to composer. - You can *specifically mention people* and they will be notified. - Posts can be categorized (e.g. Streaming, SQL, etc.). - There is a built-in option for mailing list support which forwards all activity on the forum to a user’s email address and which allows for creation of new posts via email. What do you think of Discourse as an alternative, more manageable way to discus Spark? There are a few options we can consider: 1. Work with the ASF as well as the Discourse team to allow Discourse to act as an overlay on top of this mailing list https://meta.discourse.org/t/discourse-as-a-front-end-for-existing-asf-mailing-lists/23167?u=nicholaschammas, allowing people to continue to use the mailing list as-is if they want. (This is the toughest but perhaps most attractive option.) 2. Create a new Discourse forum for Spark that is not bound to this user list. This is relatively easy but will effectively fork the community on this list. (We cannot shut down this mailing in favor of one managed by Discourse.) 3. Don’t use Discourse. Just encourage people on this list to post instead on Stack Overflow whenever possible. 4. Something else. What does everyone think? Nick
Re: serialization issue with mapPartitions
Hi, On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.com wrote: I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Well, you could try to pull the `val config = job.getConfiguration()` out of the function and just use `config` inside the function, hoping that this one is serializable. Tobias
Re: SchemaRDD to RDD[String]
Hi, On Wed, Dec 24, 2014 at 3:18 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: I want to convert a schemaRDD into RDD of String. How can we do that? Currently I am doing like this which is not converting correctly no exception but resultant strings are empty here is my code Hehe, this is the most Java-ish Scala code I have ever seen ;-) Having said that, are you sure that your rows are not empty? The code looks correct to me, actually. Tobias
Re: How to run an action and get output?
Hi, On Fri, Dec 19, 2014 at 6:53 PM, Ashic Mahtab as...@live.com wrote: val doSomething(entry:SomeEntry, session:Session) : SomeOutput = { val result = session.SomeOp(entry) SomeOutput(entry.Key, result.SomeProp) } I could use a transformation for rdd.map, but in case of failures, the map would run on another executor for the same rdd. I could do rdd.foreach, but that returns unit. Is there something like a foreach that can return values? I think `map()` is pretty much `foreach()` that can return values. If you want to prevent re-execution on errors, wrap the whole thing in a scala.util.Try{} block or something. rdd.map(item = { Try{ ... } }).flatMap(_ match { case Success(something) = Some(something) case Failure(e) = None }) or so. Tobias
Semantics of foreachPartition()
Hi, I have the following code in my application: tmpRdd.foreach(item = { println(abc: + item) }) tmpRdd.foreachPartition(iter = { iter.map(item = { println(xyz: + item) }) }) In the output, I see only the abc prints (i.e. from the foreach() call). (The result is the same also if I exchange the order.) What exactly is the meaning of foreachPartition and how would I use it correctly? Thanks Tobias
Re: Semantics of foreachPartition()
Hi again, On Thu, Dec 18, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: tmpRdd.foreachPartition(iter = { iter.map(item = { println(xyz: + item) }) }) Uh, with iter.foreach(...) it works... the reason being apparently that iter.map() returns itself an iterator, is thus evaluated lazily (in this case: never), while iter.foreach() is evaluated immediately. Thanks Tobias
Re: spark streaming kafa best practices ?
Hi, On Thu, Dec 18, 2014 at 3:08 AM, Patrick Wendell pwend...@gmail.com wrote: On Wed, Dec 17, 2014 at 5:43 AM, Gerard Maas gerard.m...@gmail.com wrote: I was wondering why one would choose for rdd.map vs rdd.foreach to execute a side-effecting function on an RDD. Personally, I like to get the count of processed items, so I do something like rdd.map(item = processItem(item)).count() instead of rdd.foreach(item = processItem(item)) but I would be happy to learn about a better way. Tobias
Re: Spark SQL DSL for joins?
Jerry, On Wed, Dec 17, 2014 at 3:35 PM, Jerry Raj jerry@gmail.com wrote: Another problem with the DSL: t1.where('term == dmin).count() returns zero. Looks like you need ===: https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD Tobias
Re: Running spark-submit from a remote machine using a YARN application
Hi, On Fri, Dec 12, 2014 at 7:01 AM, ryaminal tacmot...@gmail.com wrote: Now our solution is to make a very simply YARN application which execustes as its command spark-submit --master yarn-cluster s3n://application/jar.jar This seemed so simple and elegant, but it has some weird issues. We get NoClassDefFoundErrors. When we ssh to the box, run the same spark-submit command it works, but doing this through YARN leads in the NoClassDefFoundErrors mentioned. I do something similar, I start Spark using spark-submit from a non-Spark server application. Make sure that HADOOP_CONF_DIR is set correctly when running spark-submit from your program so that the YARN configuration can be found correctly. Also, keep in mind that some parameters to spark-submit have a different behavior when using yarn-cluster vs. local[*] master. For example, system properties set using `--conf` will be available in your Spark application only in local[*] mode, for YARN you need to wrap them with `--conf spark.executor.extraJavaOptions=...`. Tobias
Re: Adding a column to a SchemaRDD
Nathan, On Fri, Dec 12, 2014 at 3:11 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. nkronenf...@oculusinfo.com I guess you would have to do two things: - schemardd.map(row = { extend the row here }) which will give you a plain RDD[Row] without a schema - take the schema from the schemardd and extend it manually by the name and type of the newly added column, - create a new SchemaRDD from your mapped RDD and the manually extended schema. Does that make sense? Tobias
Count-based windows
Hi, I am interested in building an application that uses sliding windows not based on the time when the item was received, but on either * a timestamp embedded in the data, or * a count (like: every 10 items, look at the last 100 items). Also, I want to do this on stream data received from Kafka, but also on HDFS data (where clearly the aspect received in is not present). I found http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-windowing-Driven-by-absolutely-time-td1733.html#a1843 as an instruction for how to use the timestamp, but does anyone have a suggestion on how to use item count as window size constraint? Thanks Tobias
Re: spark-submit on YARN is slow
Hi, On Tue, Dec 9, 2014 at 4:39 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Can you try using the YARN Fair Scheduler and set yarn.scheduler.fair.continuous-scheduling-enabled to true? I'm using Cloudera 5.2.0 and my configuration says yarn.resourcemanager.scheduler.class = org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler yarn.scheduler.fair.continuous-scheduling-enabled = true by default. Changing to a different Scheduler doesn't really change anything, from ACCEPTED to RUNNING always takes about 10 seconds. Thanks Tobias
Re: spark-submit on YARN is slow
Hi, thanks for your responses! On Sat, Dec 6, 2014 at 4:22 AM, Sandy Ryza sandy.r...@cloudera.com wrote: What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. I am using Spark 1.1.1. As Andrew mentioned, I guess most of the 10 seconds waiting time probably comes from YARN itself. (Other YARN applications also take a while to start up.) I'm just really puzzled about what exactly takes so long there... for a job that runs an hour or so, that is of course negligible, but I am starting up an instance per client to do interactive job processing *for this client*, and it feels like yeah, thanks for logging in, now please wait a while until you can actually use the program, that's a bit suboptimal. Tobias
Re: Market Basket Analysis
Hi, On Thu, Dec 4, 2014 at 11:58 PM, Rohit Pujari rpuj...@hortonworks.com wrote: I'd like to do market basket analysis using spark, what're my options? To do it or not to do it ;-) Seriously, could you elaborate a bit on what you want to know? Tobias
Re: Stateful mapPartitions
Hi, On Fri, Dec 5, 2014 at 3:56 AM, Akshat Aranya aara...@gmail.com wrote: Is it possible to have some state across multiple calls to mapPartitions on each partition, for instance, if I want to keep a database connection open? If you're using Scala, you can use a singleton object, this will exist once per JVM (i.e., once per executor), like object DatabaseConnector { lazy val conn = ... } Please be aware that shutting down the connection is much harder than opening it, because you basically have no idea when processing is done for an executor, AFAIK. Tobias
Re: SPARK LIMITATION - more than one case class is not allowed !!
On Fri, Dec 5, 2014 at 12:53 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: Is it a limitation that spark does not support more than one case class at a time. What do you mean? I do not have the slightest idea what you *could* possibly mean by to support a case class. Tobias
Re: SPARK LIMITATION - more than one case class is not allowed !!
Rahul, On Fri, Dec 5, 2014 at 1:29 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: I have created objectfiles [person_obj,office_obj] from csv[person_csv,office_csv] files using case classes[person,office] with API (saveAsObjectFile) Now I restarted spark-shell and load objectfiles using API(objectFile). *Once any of one object-class is loaded successfully, rest of object-class gives serialization error.* I have not used saveAsObjectFile, but I think that if you define your case classes in the spark-shell and serialized the objects, and then you restart the spark-shell, the *classes* (structure, names etc.) will not be known to the JVM any more. So if you try to restore the *objects* from a file, the JVM may fail in restoring them, because there is no class it could create objects of. Just a guess. Try to write a Scala program, compile it and see if it still fails when executed. Tobias
Re: SPARK LIMITATION - more than one case class is not allowed !!
Rahul, On Fri, Dec 5, 2014 at 2:50 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: I have done so thats why spark is able to load objectfile [e.g. person_obj] and spark has maintained serialVersionUID [person_obj]. Next time when I am trying to load another objectfile [e.g. office_obj] and I think spark is matching serialVersionUID [person_obj] with previous serialVersionUID [person_obj] and giving mismatch error. In my first post, I have give statements which can be executed easily to replicate this issue. Can you post the Scala source for your case classes? I have tried the following in spark-shell: case class Dog(name: String) case class Cat(age: Int) val dogs = sc.parallelize(Dog(foo) :: Dog(bar) :: Nil) val cats = sc.parallelize(Cat(1) :: Cat(2) :: Nil) dogs.saveAsObjectFile(test_dogs) cats.saveAsObjectFile(test_cats) This gives two directories test_dogs/ and test_cats/. Then I restarted spark-shell and entered: case class Dog(name: String) case class Cat(age: Int) val dogs = sc.objectFile(test_dogs) val cats = sc.objectFile(test_cats) I don't get an exception, but: dogs: org.apache.spark.rdd.RDD[Nothing] = FlatMappedRDD[1] at objectFile at console:12 Trying to access the elements of the RDD gave: scala dogs.collect() 14/12/05 15:08:58 INFO FileInputFormat: Total input paths to process : 8 ... org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:980) ... at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ArrayStoreException: [Ljava.lang.Object; at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1129) ... org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:976) ... 10 more So even in the simplest of cases, this doesn't work for me in the spark-shell, but with a different error. I guess we need to see more of your code to help. Tobias
Re: SPARK LIMITATION - more than one case class is not allowed !!
Rahul, On Fri, Dec 5, 2014 at 3:51 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: 1. Copy csv files in current directory. 2. Open spark-shell from this directory. 3. Run one_scala file which will create object-files from csv-files in current directory. 4. Restart spark-shell 5. a. Run two_scala file, while running it is giving error during loading of office_csv b. If we edit two_scala file by below contents --- case class person(id: Int, name: String, fathername: String, officeid: Int) case class office(id: Int, name: String, landmark: String, areacode: String) sc.objectFile[office](office_obj).count sc.objectFile[person](person_obj).count while running it is giving error during loading of person_csv One good news is: I can reproduce the error you see. Another good news is: I can tell you how to fix this. In your one.scala file, define all case classes *before* you use saveAsObjectFile() for the first time. With case class person(id: Int, name: String, fathername: String, officeid: Int) case class office(id: Int, name: String, landmark: String, areacode: String) val baseperson = sc.textFile(person_csv)saveAsObjectFile(person_obj) val baseoffice = sc.textFile(office_csv)saveAsObjectFile(office_obj) I can deserialize the obj files (in any order). The bad news is: I have no idea about the reason for this. I blame it on the REPL/shell and assume it would not happen for a compiled application. Tobias
Does count() evaluate all mapped functions?
Hi, I have an RDD and a function that should be called on every item in this RDD once (say it updates an external database). So far, I used rdd.map(myFunction).count() or rdd.mapPartitions(iter = iter.map(myFunction)) but I am wondering if this always triggers the call of myFunction in both cases. Actually, in the first case, the count() will be the same whether or not myFunction is called for each element, so I was just wondering if I can rely on count() evaluating the whole pipeline including functions that cannot change the count. Thanks Tobias
Re: Spark SQL UDF returning a list?
Hi, On Wed, Dec 3, 2014 at 4:31 PM, Jerry Raj jerry@gmail.com wrote: Exception in thread main java.lang.RuntimeException: [1.57] failure: ``('' expected but identifier myudf found I also tried returning a List of Ints, that did not work either. Is there a way to write a UDF that returns a list? You seem to be hitting a parser limitation before your function is even called. The message you are seeing is saying there must be an opening bracket here, and I am afraid you won't get around this whatever function you write... (maybe the HiveContext provides a possibility, though). Tobias
Re: textFileStream() issue?
Hi, On Wed, Dec 3, 2014 at 5:31 PM, Bahubali Jain bahub...@gmail.com wrote: I am trying to use textFileStream(some_hdfs_location) to pick new files from a HDFS location.I am seeing a pretty strange behavior though. textFileStream() is not detecting new files when I move them from a location with in hdfs to location at which textFileStream() is checking for new files. But when I copy files from a location in linux filesystem to hdfs then the textFileStream is detecting the new files. Is it possible that the timestamp of the moved files is actually older than the ones of previously processed files? I think only new files are picked up. Try moving the file and set the timestamp to now() to see if it makes a difference. Tobias
Re: Best way to have some singleton per worker
Hi, On Thu, Dec 4, 2014 at 2:59 AM, Ashic Mahtab as...@live.com wrote: I've been doing this with foreachPartition (i.e. have the parameters for creating the singleton outside the loop, do a foreachPartition, create the instance, loop over entries in the partition, close the partition), but it's quite cludgy. Is there a pattern by which I can have an instance of something nonserializable on each worker? I think the pattern you describe is the standard way of doing this, several people on this list (including me) have used it for database access etc. Tobias
spark-submit on YARN is slow
Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: netty on classpath when using spark-submit
Markus, On Tue, Nov 11, 2014 at 10:40 AM, M. Dale medal...@yahoo.com wrote: I never tried to use this property. I was hoping someone else would jump in. When I saw your original question I remembered that Hadoop has something similar. So I searched and found the link below. A quick JIRA search seems to indicate that there is another property: https://issues.apache.org/jira/browse/SPARK-2996 Maybe that property will work with yarn: spark.yarn.user.classpath.first Thank you very much! That property does in fact load the classes from my jar file first when running on YARN, great! However, in local[N] mode, neither that one nor the spark.files.userClassPathFirst one works. So when using spark-submit with --master local[3] instead of --master yarn-cluster, the value for spark.files.userClassPathFirst is displayed correctly, but the classes are still loaded from the wrong jar... Tobias
Re: Passing Java Options to Spark AM launching
Hi, have a look at the documentation for spark.driver.extraJavaOptions (which seems to have disappeared since I looked it up last week) and spark.executor.extraJavaOptions at http://spark.apache.org/docs/latest/configuration.html#runtime-environment. Tobias
Re: kafka pipeline exactly once semantics
Josh, On Sun, Nov 30, 2014 at 10:17 PM, Josh J joshjd...@gmail.com wrote: I would like to setup a Kafka pipeline whereby I write my data to a single topic 1, then I continue to process using spark streaming and write the transformed results to topic2, and finally I read the results from topic 2. Not really related to your question, but you may also want to look into Samza http://samza.incubator.apache.org/ which was built exactly for this kind of processing. Tobias
Re: Determine number of running executors
Hi, Thanks for your help! Sandy, I had a bit of trouble finding the spark.executor.cores property. (It wasn't there although its value should have been 2.) I ended up throwing regular expressions on scala.util.Properties.propOrElse(sun.java.command, ), which worked surprisingly well ;-) Thanks Tobias
Re: Is spark streaming +MlLib for online learning?
Hi, On Tue, Nov 25, 2014 at 9:40 AM, Joanne Contact joannenetw...@gmail.com wrote: I seemed to read somewhere that spark is still batch learning, but spark streaming could allow online learning. Spark doesn't do Machine Learning itself, but MLlib does. MLlib currently can do online learning only for linear regression https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html#streaming-linear-regression, as far as I know. Tobias
Re: Setup Remote HDFS for Spark
Hi, On Sat, Nov 22, 2014 at 12:13 AM, EH eas...@gmail.com wrote: Unfortunately whether it is possible to have both Spark and HDFS running on the same machine is not under our control. :( Right now we have Spark and HDFS running in different machines. In this case, is it still possible to hook up a remote HDFS with Spark so that we can use Spark Streaming checkpoints? Thank you for your help. In my case, after I copied my cluster's core-site.xml, yarn-site.xml, and hdfs-site.xml to my CLASSPATH, I could access YARN and HDFS remotely without any problems (read: modulo network/firewall issues), which I was pretty surprised by myself. Tobias
spark-submit and logging
Hi, I am using spark-submit to submit my application jar to a YARN cluster. I want to deliver a single jar file to my users, so I would like to avoid to tell them also, please put that log4j.xml file somewhere and add that path to the spark-submit command. I thought it would be sufficient that my application jar file contains a log4j.xml file, but that does not seem to be the case. If I don't add a log4j.xml file to the classpath before launching spark-submit, the one bundled with spark will be used -- which has a negative influence on my program execution. Is there any way I can tell spark-submit to use the log4j configuration bundled in my jar file? Thanks Tobias
Re: Cannot access data after a join (error: value _1 is not a member of Product with Serializable)
Hi, it looks what you are trying to use as a Tuple cannot be inferred to be a Tuple from the compiler. Try to add type declarations and maybe you will see where things fail. Tobias
Re: Spark Streaming with Kafka is failing with Error
Hi, do you have some logging backend (log4j, logback) on your classpath? This seems a bit like there is no particular implementation of the abstract `log()` method available. Tobias
Re: Parsing a large XML file using Spark
Hi, see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for one solution. One issue with those XML files is that they cannot be processed line by line in parallel; plus you inherently need shared/global state to parse XML or check for well-formedness, I think. (Same issue with multi-line JSON, by the way.) Tobias