Re: What can be done if a FlatMapFunctions generated more data that can be held in memory
Yes, the problem is that the Java API inadvertently requires an Iterable return value, not an Iterator: https://issues.apache.org/jira/browse/SPARK-3369 I think this can't be fixed until Spark 2.x. It seems possible to cheat and return a wrapper like the IteratorIterable I posted in the JIRA. You can return an Iterator instead this way, and as long as Spark happens to consume it only once, it will work fine. I don't know if this is guaranteed but seems to be the case anecdotally. On Thu, Oct 2, 2014 at 2:01 AM, Steve Lewis lordjoe2...@gmail.com wrote: I number of the problems I want to work with generate datasets which are too large to hold in memory. This becomes an issue when building a FlatMapFunction and also when the data used in combineByKey cannot be held in memory. The following is a simple, if a little silly, example of a FlatMapFunction returning maxMultiples multiples of a long. It works well for maxMultiples = 1000 but what happens if maxMultiples = 10 Billion. The issue is that call cannot return a List or any other structure which is held in memory. What can it return or is there another way to do this?? public static class GenerateMultiplesimplements FlatMapFunctionString, String { private final long maxMultiples; public GenerateMultiplesimplements (final long maxMultiples ) { this,maxMultiples = maxMultiples ; } public IterableLong call(Long l) { ListLong holder = new ArrayListLong(); for (long factor = 1; factor maxMultiples; factor++) { holder.add(new Long(l * factor); } return holder; } } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: any code examples demonstrating spark streaming applications which depend on states?
Hi Yana, Thanks for your kindly response. My question is indeed unclear. What I wanna do is to join a state stream, which is the *updateStateByKey *output of last-run. *updateStateByKey *is useful if application logic doesn't (heavily) rely on states. So that you can run application without knowing current states, and finally update states by *updateStateByKey.* However, if application logic relies on state, it is better to treat states as input, and join states in the beginning of application. I am unsure if Spark Streaming supports this functionality. Thanks, Chia-Chun 2014-10-01 21:56 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com: I don't think your question is very clear -- *updateStateByKey* usually updates the previous state. For example, the StatefulNetworkWordCount example that ships with Spark show the following snippet: val updateFunc = (values: Seq[Int], state: Option[Int]) = { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } So if you have a state (K,V) the latest iteration will produce (K,V+V1) where the V1 is the update from the new batch...And I'm using + since the example shows simple addition/counting but your state could really be any operation (e.g.append or something). The assingment of previousCount shows how you retrieve or initialize the state for a key So I think what you seek is what happens out of the box (unless I'm misunderstanding the question) On Wed, Oct 1, 2014 at 4:13 AM, Chia-Chun Shih chiachun.s...@gmail.com wrote: Hi, Are there any code examples demonstrating spark streaming applications which depend on states? That is, last-run *updateStateByKey* results are used as inputs. Thanks.
Re: persistent state for spark streaming
Hi Yana, So, user quotas need another data store, which can guarantee persistence and afford frequent data updates/access. Is it correct? Thanks, Chia-Chun 2014-10-01 21:48 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com: I don't think persist is meant for end-user usage. You might want to call saveAsTextFiles, for example, if you're saving to the file system as strings. You can also dump the DStream to a DB -- there are samples on this list (you'd have to do a combo of foreachRDD and mapPartitions, likely) On Wed, Oct 1, 2014 at 3:49 AM, Chia-Chun Shih chiachun.s...@gmail.com wrote: Hi, My application is to digest user logs and deduct user quotas. I need to maintain latest states of user quotas persistently, so that latest user quotas will not be lost. I have tried *updateStateByKey* to generate and a DStream for user quotas and called *persist(StorageLevel.MEMORY_AND_DISK())*, but it didn't work. Are there better approaches to persist states for spark streaming? Thanks.
Re: still GC overhead limit exceeded after increasing heap space
This looks like you are just running your own program. To run Spark programs, you use spark-submit. It has options that control the executor and driver memory. The settings below are not affecting Spark. On Wed, Oct 1, 2014 at 10:21 PM, 陈韵竹 anny9...@gmail.com wrote: Thanks Sean. This is how I set this memory. I set it when I start to run the job java -Xms64g -Xmx64g -cp /root/spark/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/root/scala/lib/scala-library.jar:./target/MyProject.jar MyClass Is there some problem with it? On Wed, Oct 1, 2014 at 2:03 PM, Sean Owen so...@cloudera.com wrote: How are you setting this memory? You may be configuring the wrong process's memory, like the driver and not the executors. On Oct 1, 2014 9:37 PM, anny9699 anny9...@gmail.com wrote: Hi, After reading some previous posts about this issue, I have increased the java heap space to -Xms64g -Xmx64g, but still met the java.lang.OutOfMemoryError: GC overhead limit exceeded error. Does anyone have other suggestions? I am reading a data of 200 GB and my total memory is 120 GB, so I use MEMORY_AND_DISK_SER and kryo serialization. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/still-GC-overhead-limit-exceeded-after-increasing-heap-space-tp15540.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Implicit conversion RDD - SchemaRDD
I am noticing disparities in behavior between the REPL and in my standalone program in terms of implicit conversion of an RDD to SchemaRDD. In the REPL the following sequence works: import sqlContext._ val mySchemaRDD = myNormalRDD.where(1=1) However when attempting similar in a standalone program it does not compile -with message: value where is not a member of org.apache.spark.rdd.RDD[MyRecord]' What is the required recipe for proper implict conversion - given I have done the import sqlContext._ in the standalone program as well but it is not sufficient there. Note: intellij IDE *does *seem to think that import sqlContext._ were enough - it understands the implicit use of where. But even in IJ it does not actually compile. Rather strange.
how to send message to specific vertex by Pregel api
Hi, Is there anyone having clue of sending messages to specific vertex(not to immediate neighbour), whose vId is stored in property of source vertex, in Pregel api? More precisely, how to do this in sendMessage() ? to pass more general Triplets into above function? (Obviously we can do it using basic spark table operations(join, etc), for instance, in [1]) [1] http://event.cwi.nl/grades2014/03-salihoglu.pdf Best, Yifan LI
Re: registering Array of CompactBuffer to Kryo
How about this? Class.forName([Lorg.apache.spark.util.collection.CompactBuffer;) On Tue, Sep 30, 2014 at 5:33 PM, Andras Barjak andras.bar...@lynxanalytics.com wrote: Hi, what is the correct scala code to register an Array of this private spark class to Kryo? java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.util.collection.CompactBuffer[] Note: To register this class use: kryo.register(org.apache.spark.util.collection.CompactBuffer[].class); thanks, András Barják - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: registering Array of CompactBuffer to Kryo
i used this solution to get the class name correctly at runtime: kryo.register(ClassTag(Class.forName(org.apache.spark.util.collection.CompactBuffer)).wrap.runtimeClass) 2014-10-02 12:50 GMT+02:00 Daniel Darabos daniel.dara...@lynxanalytics.com : How about this? Class.forName([Lorg.apache.spark.util.collection.CompactBuffer;) On Tue, Sep 30, 2014 at 5:33 PM, Andras Barjak andras.bar...@lynxanalytics.com wrote: Hi, what is the correct scala code to register an Array of this private spark class to Kryo? java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.util.collection.CompactBuffer[] Note: To register this class use: kryo.register(org.apache.spark.util.collection.CompactBuffer[].class); thanks, András Barják
Is there a way to provide individual property to each Spark executor?
Hi, here in Sematext we almost done with Spark monitoring http://www.sematext.com/spm/index.html But we need 1 thing from Spark, something like https://groups.google.com/forum/#!topic/storm-user/2fNCF341yqU in Storm. Something like 'placeholder' in java opts which Spark will fills for executor, with executorId (0,1,2,3...). For example I will write in spark-defaults.conf: spark.executor.extraJavaOptions -Dcom.sun.management.jmxremote -javaagent:/opt/spm/spm-monitor/lib/spm-monitor-spark.jar=myValue- *%executoId*:spark-executor:default and will get in executor processes: -Dcom.sun.management.jmxremote -javaagent:/opt/spm/spm-monitor/lib/spm-monitor-spark.jar=myValue-*0* :spark-executor:default -Dcom.sun.management.jmxremote -javaagent:/opt/spm/spm-monitor/lib/spm-monitor-spark.jar=myValue-*1* :spark-executor:default -Dcom.sun.management.jmxremote -javaagent:/opt/spm/spm-monitor/lib/spm-monitor-spark.jar=myValue-*2* :spark-executor:default ... ... ... Can I do something like that in Spark for executor? If not maybe it can be done in the future? Will be useful. Thx, best redgards, Vladimir Tretyakov.
Re: persistent state for spark streaming
Yes -- persist is more akin to caching -- it's telling Spark to materialize that RDD for fast reuse but it's not meant for the end user to query/use across processes, etc.(at least that's my understanding). On Thu, Oct 2, 2014 at 4:04 AM, Chia-Chun Shih chiachun.s...@gmail.com wrote: Hi Yana, So, user quotas need another data store, which can guarantee persistence and afford frequent data updates/access. Is it correct? Thanks, Chia-Chun 2014-10-01 21:48 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com: I don't think persist is meant for end-user usage. You might want to call saveAsTextFiles, for example, if you're saving to the file system as strings. You can also dump the DStream to a DB -- there are samples on this list (you'd have to do a combo of foreachRDD and mapPartitions, likely) On Wed, Oct 1, 2014 at 3:49 AM, Chia-Chun Shih chiachun.s...@gmail.com wrote: Hi, My application is to digest user logs and deduct user quotas. I need to maintain latest states of user quotas persistently, so that latest user quotas will not be lost. I have tried *updateStateByKey* to generate and a DStream for user quotas and called *persist(StorageLevel.MEMORY_AND_DISK())*, but it didn't work. Are there better approaches to persist states for spark streaming? Thanks.
Re: Spark Streaming for time consuming job
Hi Mayur, Thanks for your suggestion. In fact, that's i'm thinking about; to pass those data, and return only the percentage of the outlier in a particular window. I also have some doubt if i would implement the outlier detection on rdd as you have suggested. From what i understand that those RDD are distributed among spark workers; so, i imagine that i would do as the following (code_905 is a PairDStream) code_905.foreachRDD(new Function2JavaPairRDDString,Long,Time,Void(){ public Void call(JavaPairRDDString, Long pair,Time time) throws Exception { if(pair.count()0){ final ListDouble data=new LinkedListDouble(); pair.foreach(new VoidFunctionTuple2String,Long(){ @Override public void call(Tuple2String, Long t) throws Exception { double doubleValue=t._2.doubleValue(); //register data from this window to be checked data.add(doubleValue); //register the data to the outlier detector outlierDetector.addData(doubleValue); } }); //get percentage of the outlier for this window. double percentage=outlierDetector.getOutlierPercentageFromThisData(data); } return null; } }); the variable outlierDetector is declared on class static variable. the call outlierDetector.addData is needed because i would like to run the outlier detection from the data obtained from previous window(s). My concern on writing the, outlier detection on spark is it would slow down the spark streaming since, the outlier detection would involve sorting data, calculating some statistic stuff. especially, i would need to run many instances of outlier detection (each instances to handle different set of data). So, what do you think about this model? On Wed, Oct 1, 2014 at 1:59 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Calling collect on anything is almost always a bad idea. The only exception is if you are looking to pass that data on to any other system never see it again :) . I would say you need to implement outlier detection on the rdd process it in spark itself rather than calling collect on it. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Sep 30, 2014 at 3:22 PM, Eko Susilo eko.harmawan.sus...@gmail.com wrote: Hi All, I have a problem that i would like to consult about spark streaming. I have a spark streaming application that parse a file (which will be growing as time passed by)This file contains several columns containing lines of numbers, these parsing is divided into windows (each 1 minute). Each column represent different entity while each row within a column represent the same entity (for example, first column represent temprature, second column represent humidty, etc, while each row represent the value of each attribute). I use PairDStream for each column. Afterwards, I need to run a time consuming algorithm (outlier detection, for now i use box plot algorithm) for each RDD of each PairDStream. To run the outlier detection, currently i am thinking about to call collect on each of the PairDStream from method forEachRDD and then i get the List of the items, and then pass the each list of items to a thread. Each thread runs the outlier detection algorithm and process the result. I run the outlier detection in separate thread in order not to put too much burden on spark streaming task. So, I would like to ask if this model has a risk? or is there any alternatives provided by the framework such that i don't have to run a separate thread for this? Thank you for your attention. -- Best Regards, Eko Susilo -- Best Regards, Eko Susilo
Re: Spark inside Eclipse
You don't need to do anything special to run in local mode from within Eclipse. Just create a simple SparkConf and create a SparkContext from that. I have unit tests which execute on a local SparkContext, and they work from inside Eclipse as well as SBT. val conf = new SparkConf().setMaster(local).setAppName(sWhatever) val sc = new SparkContext(sparkConf) Keep in mind you can only have one local SparkContext at a time, otherwise you will get some weird errors. If you have tests running sequentially, make sure to close the SparkContext in your tear down method. If tests run in parallel you'll need to share the SparkContext between tests. For unit testing, you can make use of SparkContext.parallelize to set up your test inputs and RDD.collect to retrieve the outputs. On Wed, Oct 1, 2014 at 7:43 PM, Ashish Jain ashish@gmail.com wrote: Hello Sanjay, This can be done, and is a very effective way to debug. 1) Compile and package your project to get a fat jar 2) In your SparkConf use setJars and give location of this jar. Also set your master here as local in SparkConf 3) Use this SparkConf when creating JavaSparkContext 4) Debug your program like you would any normal program. Hope this helps. Thanks Ashish On Oct 1, 2014 4:35 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com.invalid wrote: hey guys Is there a way to run Spark in local mode from within Eclipse. I am running Eclipse Kepler on a Macbook Pro with Mavericks Like one can run hadoop map/reduce applications from within Eclipse and debug and learn. thanks sanjay -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
[SparkSQL] Function parity with Shark?
Hi, in an effort to migrate off of Shark I recently tried the Thrift JDBC server that comes with Spark 1.1.0. However I observed that conditional functions do not work (I tried 'case' and 'coalesce') some string functions like 'concat' also did not work. Is there a list of what's missing or a roadmap of when it will be added? (I know percentiles are pending, for example but do not see JIRAs for the others in this email).
Re: Confusion over how to deploy/run JAR files to a Spark Cluster
Hello Mark, I am no expert but I can answer some of your questions. On Oct 2, 2014 2:15 AM, Mark Mandel mark.man...@gmail.com wrote: Hi, So I'm super confused about how to take my Spark code and actually deploy and run it on a cluster. Let's assume I'm writing in Java, and we'll take a simple example such as: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java, and this is a process I want to be running quite regularly (say more than once a minute). From the documentation ( http://spark.apache.org/docs/1.1.0/submitting-applications.html), it reads as if I need to create a jar from the above code, and every time I want to run this code, I use ./bin/spark-submit to upload it to the cluster, which would then run it straight away. This would mean that every time I want to run my process, I need to have a .jar file travel over the network? Is this correct? (seems like this would be very slow? I should try it however). Doing some digging around the JavaDocs, I can see that the Java/SparkContext has the option to .addJar()'s , but I can't see any documentation that actually outlines how this can be used? If someone can point me towards an article or tutorial on how this is meant to work, I'd greatly appreciate it. It would seem like I could write a simple process that ran, quite probably on the same machine as master, that added a Jar through the SparkContext... but then, how to run the code from that Jar? Or is the Jar include the code that I would run, that would then create the SparkContext that would addJar itself? (now my head hurts). On a single node program this works. The way I think of this is, though I might be wrong here, you specify spark configurations in your driver program. This is where you specify master, the jar containing all dependencies, memory serialization parameters. When you do a SparkContext in this driver program the embedded(?) Spark instance runs and picks up the jar that you specified. The code that you wrote is added like any other dependency. If you have all configuration provided through SparkConf along with the setJars, you could do a sbt 'runMain className args[]' to invoke your application. Would Spark also be smart enough to know that the JAR was already uploaded, if addJar was called once it had already been uploaded? I'm not seeing this shown in the examples either. I'm really excited by what I see in Spark, but I am totally confused by how to actually get code up on Spark and make it run, and nothing I read seems to explain this aspect very well (at least to my thick head). I have seen: https://github.com/spark-jobserver/spark-jobserver, but from initial review, it looks like it will only work with Scala, (because you need to use the ScalaJob trait), and I have a Java dependency. You can implement this as an interface in Java. You can pass the SparkContext as a parameter to JavaSparkContext. The only non intuitive effort here is to return a valid job you will need to write this - return SparkValid$.MODULES$; Any help on this aspect would be greatly appreciated! Mark -- E: mark.man...@gmail.com T: http://www.twitter.com/neurotic W: www.compoundtheory.com 2 Devs from Down Under Podcast http://www.2ddu.com/
Re: partition size for initial read
If you are using textFiles() to read data in, it also takes in a parameter the number of minimum partitions to create. Would that not work for you? On Oct 2, 2014 7:00 AM, jamborta jambo...@gmail.com wrote: Hi all, I have been testing repartitioning to ensure that my algorithms get similar amount of data. Noticed that repartitioning is very expensive. Is there a way to force Spark to create a certain number of partitions when the data is read in? How does it decided on the partition size initially? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/partition-size-for-initial-read-tp15603.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Implicit conversion RDD - SchemaRDD
Here is the specific code val sc = new SparkContext(slocal[$NWorkers], HBaseTestsSparkContext) val ctx = new SQLContext(sc) import ctx._ case class MyTable(col1: String, col2: Byte) val myRows = ctx.sparkContext.parallelize((Range(1,21).map{ix = MyTable(scol1$ix, ix.toByte) })) val myRowsSchema = myRows.where(1=1) // Line 127 val TempTabName = MyTempTab myRowsSchema.registerTempTable(TempTabName) The above does not compile: Error:(127, 31) value where is not a member of org.apache.spark.rdd.RDD[MyTable] val myRowsSchema = myRows.where(1=1) ^ However copying the above code into the Spark-Shell, it works - notice we get a Logical/PhysicalPlan from the heretefore line 127: scala val myRowsSchema = myRows.where(1=1) myRowsSchema: org.apache.spark.sql.SchemaRDD = SchemaRDD[29] at RDD at SchemaRDD.scala:102 == Query Plan == == Physical Plan == Filter 1=1 ExistingRdd [col1#8,col2#9], MapPartitionsRDD[27] at mapPartitions at basicOperators.scala:219 So .. what is the magic formula for setting up the imports for the SchemaRDD imports to work properly? 2014-10-02 2:00 GMT-07:00 Stephen Boesch java...@gmail.com: I am noticing disparities in behavior between the REPL and in my standalone program in terms of implicit conversion of an RDD to SchemaRDD. In the REPL the following sequence works: import sqlContext._ val mySchemaRDD = myNormalRDD.where(1=1) However when attempting similar in a standalone program it does not compile -with message: value where is not a member of org.apache.spark.rdd.RDD[MyRecord]' What is the required recipe for proper implict conversion - given I have done the import sqlContext._ in the standalone program as well but it is not sufficient there. Note: intellij IDE *does *seem to think that import sqlContext._ were enough - it understands the implicit use of where. But even in IJ it does not actually compile. Rather strange.
Re: partition size for initial read
That would work - I normally use hive queries through spark sql, I have not seen something like that there. On Thu, Oct 2, 2014 at 3:13 PM, Ashish Jain ashish@gmail.com wrote: If you are using textFiles() to read data in, it also takes in a parameter the number of minimum partitions to create. Would that not work for you? On Oct 2, 2014 7:00 AM, jamborta jambo...@gmail.com wrote: Hi all, I have been testing repartitioning to ensure that my algorithms get similar amount of data. Noticed that repartitioning is very expensive. Is there a way to force Spark to create a certain number of partitions when the data is read in? How does it decided on the partition size initially? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/partition-size-for-initial-read-tp15603.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: partition size for initial read
Hi Tamas, Can you try to set mapred.map.tasks and see if it works? Thanks, Yin On Thu, Oct 2, 2014 at 10:33 AM, Tamas Jambor jambo...@gmail.com wrote: That would work - I normally use hive queries through spark sql, I have not seen something like that there. On Thu, Oct 2, 2014 at 3:13 PM, Ashish Jain ashish@gmail.com wrote: If you are using textFiles() to read data in, it also takes in a parameter the number of minimum partitions to create. Would that not work for you? On Oct 2, 2014 7:00 AM, jamborta jambo...@gmail.com wrote: Hi all, I have been testing repartitioning to ensure that my algorithms get similar amount of data. Noticed that repartitioning is very expensive. Is there a way to force Spark to create a certain number of partitions when the data is read in? How does it decided on the partition size initially? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/partition-size-for-initial-read-tp15603.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Type problem in Java when using flatMapValues
Hi all, I successfully implemented my algorithm in Scala but my team wants it in Java. I have a problem with Generics, can anyone help me? I have a first JavaPairRDD with a structure like ((ean, key), [from, to, value]) * ean and key are string * from and to are DateTime * value is a Double JavaPairRDDStringString, ListSerializable eanKeyTsParameters = javaRDD.mapToPair( ... ); Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it takes the /from, to /and /values/ to generate a ListLongDouble. Here is the error I get when compiling: error: incompatible types: no instance(s) of type variable(s) U exist so that JavaPairRDDStringString,U conforms to JavaPairRDDString,LongDouble Here is what IntelliJ tells me: flatMapValues( FunctionListSerializable, IterableU ) in JavaPairRDD cannot be applied to Transformations.GenerateTimeSeries Here is the problematic transformation: JavaPairRDDString, LongDouble keyLongDouble = eanKeyTsParameters.flatMapValues(new Transformations.GenerateTimeSeries()); And here is the Function: import org.apache.spark.api.java.function.Function; [...] public class Transformations { public static class GenerateTimeSeries implements FunctionListSerializable, IterableLongDouble { @Override public IterableLongDouble call(ListSerializable args) { DateTime start = (DateTime) args.get(0); DateTime end = (DateTime) args.get(1); Double value = (Double) args.get(2); int granularity = 24*60*60*1000; // 1 day return AggregationUtils.createTimeSeries(start, end, value, granularity); } } } Any idea? Thanks -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be
Re: Kafka Spark Streaming job has an issue when the worker reading from Kafka is killed
I am seeing this same issue. Bumping for visibility. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-job-has-an-issue-when-the-worker-reading-from-Kafka-is-killed-tp12595p15611.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Application details for failed and teminated jobs
Hi, Currently the history server provides application details for only the successfully completed jobs (where the APPLICATION_COMPLETE file is generated). However, (long-running) jobs that we terminate manually or failed jobs where the APPLICATION_COMPLETE may not be generated, dont show up on the history server page. They however do show up on the 4040 interface as long as they are running. Is it possible to save those logs and load them up on the history server (even when the APPLICATION_COMPLETE is not present)? This would allow us troubleshoot the failed and terminated jobs without holding up the cluster. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Application-details-for-failed-and-teminated-jobs-tp15627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Application details for failed and teminated jobs
You may want to take a look at this PR: https://github.com/apache/spark/pull/1558 Long story short: while not a terrible idea to show running applications, your particular case should be solved differently. Applications are responsible for calling SparkContext.stop() at the end of their run, currently, so you should make sure your code does that even when something goes wrong. If that is done, they'll show up in the History Server. On Thu, Oct 2, 2014 at 11:31 AM, SK skrishna...@gmail.com wrote: Hi, Currently the history server provides application details for only the successfully completed jobs (where the APPLICATION_COMPLETE file is generated). However, (long-running) jobs that we terminate manually or failed jobs where the APPLICATION_COMPLETE may not be generated, dont show up on the history server page. They however do show up on the 4040 interface as long as they are running. Is it possible to save those logs and load them up on the history server (even when the APPLICATION_COMPLETE is not present)? This would allow us troubleshoot the failed and terminated jobs without holding up the cluster. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Application-details-for-failed-and-teminated-jobs-tp15627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not see any spark metrics on ganglia-web
Hi tsingfu, I want to see metrics in ganglia too. But I don't understand this step: ./make-distribution.sh --tgz --skip-java-test -Phadoop-2.3 -Pyarn -Phive -Pspark-ganglia-lgpl Are you installing the hadoop, yarn, hive AND ganglia?? If I want to install just ganglia? Can you suggest me something? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p15631.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Block removal causes Akka timeouts
I'm seeing a lot of Akka timeouts which eventually lead to job failure in spark streaming when removing blocks (Example stack trace below). It appears to be related to these issues: SPARK-3015 https://issues.apache.org/jira/browse/SPARK-3015 and SPARK-3139 https://issues.apache.org/jira/browse/SPARK-3139 but while workarounds were provided for those scenarios there doesn't seem to be a workaround for block removal. Any suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-tp15632.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sorting a Sequence File
All, I am having trouble getting a sequence file sorted. My sequence file is (Text, Text) and when trying to sort it, Spark complains that it can not because Text is not serializable. To get around this issue, I performed a map on the sequence file to turn it into (String, String). I then perform the sort and then write it back out as a sequence file to hdfs. My issue is that this solution does not scale. I can run this code for a 32GB file and it runs without issues. When I run it with at 500GB file, it runs some of the data nodes out of physical disk space. It spills like crazy (usually 2-3 times the amount of original data). So my 32 GB file spills 74GB. I believe my issue is that there is a better way to get the data into a form that sort will accept. Is there a better way to do it other than mapping the key and value to Strings? Thanks, Joe -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sorting-a-Sequence-File-tp15633.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not see any spark metrics on ganglia-web
Hi, I am sure you can use the -Pspark-ganglia-lgpl switch to enable Ganglia. This step only adds the support for Hadoop,Yarn,Hive et al in the spark executable.No need to run if one is not using them. Cheers k/ On Thu, Oct 2, 2014 at 12:29 PM, danilopds danilob...@gmail.com wrote: Hi tsingfu, I want to see metrics in ganglia too. But I don't understand this step: ./make-distribution.sh --tgz --skip-java-test -Phadoop-2.3 -Pyarn -Phive -Pspark-ganglia-lgpl Are you installing the hadoop, yarn, hive AND ganglia?? If I want to install just ganglia? Can you suggest me something? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p15631.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HiveContext: cache table not supported for partitioned table?
Hi, In Spark 1.1 HiveContext, I ran a create partitioned table command followed by a cache table command and got a java.sql.SQLSyntaxErrorException: Table/View 'PARTITIONS' does not exist. But cache table worked fine if the table is not a partitioned table. Can anybody confirm that cache of partitioned table is not supported yet in current version? Thanks, Du
Re: Can not see any spark metrics on ganglia-web
Ok Krishna Sankar, In relation to this information on Spark monitoring webpage, For sbt users, set the SPARK_GANGLIA_LGPL environment variable before building. For Maven users, enable the -Pspark-ganglia-lgpl profile Do you know what I need to do to install with sbt? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p15636.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL DataType mappings
Hi Yin, Thanks for the reply. I've found the section as well, a couple of days ago and managed to integrate es-hadoop with Spark SQL [1] Cheers, [1] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html On 10/2/14 6:32 PM, Yin Huai wrote: Hi Costin, I am answering your questions below. 1. You can find Spark SQL data type reference at here http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#spark-sql-datatype-reference. It explains the underlying data type for a Spark SQL data type for Scala, Java, and Python APIs. For example, in Scala API, the underlying Scala type of MapType is scala.collection.Map. While, in Java API, it is java.util.Map. For StructType, yes, it should be cast to Row. 2. Interfaces like getFloat and getInteger are for primitive data types. For other types, you can access values by ordinal. For example, row(1). Right now, you have to cast values accessed by ordinal. Once https://github.com/apache/spark/pull/1759 is in, accessing values in a row will be much easier. 3. We are working on supporting CSV files (https://github.com/apache/spark/pull/1351). Right now, you can use our programatic APIs http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#programmatically-specifying-the-schema to create SchemaRDDs. Basically, you first define the schema (represented by a StructType) of the SchemaRDD. Then, convert your RDD (for example, RDD[String]) directly to RDD[Row]. Finally, use applySchema provided in SQLContext/HiveContext to apply the defined schema to the RDD[Row]. The return value of applySchema is the SchemaRDD you want. Thanks, Yin On Tue, Sep 30, 2014 at 5:05 AM, Costin Leau costin.l...@gmail.com mailto:costin.l...@gmail.com wrote: Hi, I'm working on supporting SchemaRDD in Elasticsearch Hadoop [1] but I'm having some issues with the SQL API, in particular in what the DataTypes translate to. 1. A SchemaRDD is composed of a Row and StructType - I'm using the latter to decompose a Row into primitives. I'm not clear however how to deal with _rich_ types, namely array, map and struct. MapType gives me type information about the key and its value however what's the actual Map object? j.u.Map, scala.Map? For example assuming row(0) has a MapType associated with it, to what do I cast row(0)? Same goes for StructType; if row(1) has a StructType associated with it, do I cast the value to Row? 2. Similar to the above, I've noticed the Row interface has cast methods so ideally one should use row(index).getFloat|Integer|__Boolean etc... but I didn't see any methods for Binary or Decimal. Also the _rich_ types are missing; I presume this is for pluggability reasons however whats the generic way to access/unwrap the generic Any/Object in this case to the desired DataType? 3. On a separate note, for RDDs containing just values (think CSV,TSV files) is there an option to have a header associated with it without having to wrap each row with a case class? As each entry has exactly the same structure, the wrapping is just overhead that doesn't provide any extra information (you know the structure of one row, you know it for all of them). Thanks, [1] github.com/elasticsearch/__elasticsearch-hadoop http://github.com/elasticsearch/elasticsearch-hadoop -- Costin --__--__- To unsubscribe, e-mail: user-unsubscribe@spark.apache.__org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Costin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL: ArrayIndexOutofBoundsException
Hi, I am trying to extract the number of distinct users from a file using Spark SQL, but I am getting the following error: ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 15) java.lang.ArrayIndexOutOfBoundsException: 1 I am following the code in examples/sql/RDDRelation.scala. My code is as follows. The error is appearing when it executes the SQL statement. I am new to Spark SQL. I would like to know how I can fix this issue. thanks for your help. val sql_cxt = new SQLContext(sc) import sql_cxt._ // read the data using th e schema and create a schema RDD val tusers = sc.textFile(inp_file) .map(_.split(\t)) .map(p = TUser(p(0), p(1).trim.toInt)) // register the RDD as a table tusers.registerTempTable(tusers) // get the number of unique users val unique_count = sql_cxt.sql(SELECT COUNT (DISTINCT userid) FROM tusers).collect().head.getLong(0) println(unique_count) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL: ArrayIndexOutofBoundsException
The bug is likely in your data. Do you have lines in your input file that do not contain the \t character? If so .split will only return a single element and p(1) from the .map() is going to throw java.lang. ArrayIndexOutOfBoundsException: 1 On Thu, Oct 2, 2014 at 3:35 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to extract the number of distinct users from a file using Spark SQL, but I am getting the following error: ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 15) java.lang.ArrayIndexOutOfBoundsException: 1 I am following the code in examples/sql/RDDRelation.scala. My code is as follows. The error is appearing when it executes the SQL statement. I am new to Spark SQL. I would like to know how I can fix this issue. thanks for your help. val sql_cxt = new SQLContext(sc) import sql_cxt._ // read the data using th e schema and create a schema RDD val tusers = sc.textFile(inp_file) .map(_.split(\t)) .map(p = TUser(p(0), p(1).trim.toInt)) // register the RDD as a table tusers.registerTempTable(tusers) // get the number of unique users val unique_count = sql_cxt.sql(SELECT COUNT (DISTINCT userid) FROM tusers).collect().head.getLong(0) println(unique_count) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Spark SQL: ArrayIndexOutofBoundsException
-- Forwarded message -- From: Liquan Pei liquan...@gmail.com Date: Thu, Oct 2, 2014 at 3:42 PM Subject: Re: Spark SQL: ArrayIndexOutofBoundsException To: SK skrishna...@gmail.com There is only one place you use index 1. One possible issue is that your may have only one element after your split by \t. Can you try to run the following code to make sure every line has at least two elements? val tusers = sc.textFile(inp_file) .map(_.split(\t)) .filter( x = x.length 2) .count() It should return non zero values if your data contains a line with less than two values Liquan On Thu, Oct 2, 2014 at 3:35 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to extract the number of distinct users from a file using Spark SQL, but I am getting the following error: ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 15) java.lang.ArrayIndexOutOfBoundsException: 1 I am following the code in examples/sql/RDDRelation.scala. My code is as follows. The error is appearing when it executes the SQL statement. I am new to Spark SQL. I would like to know how I can fix this issue. thanks for your help. val sql_cxt = new SQLContext(sc) import sql_cxt._ // read the data using th e schema and create a schema RDD val tusers = sc.textFile(inp_file) .map(_.split(\t)) .map(p = TUser(p(0), p(1).trim.toInt)) // register the RDD as a table tusers.registerTempTable(tusers) // get the number of unique users val unique_count = sql_cxt.sql(SELECT COUNT (DISTINCT userid) FROM tusers).collect().head.getLong(0) println(unique_count) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Fwd: Spark SQL: ArrayIndexOutofBoundsException
Thanks for the help. Yes, I did not realize that the first header line has a different separator. By the way, is there a way to drop the first line that contains the header? Something along the following lines: sc.textFile(inp_file) .drop(1) // or tail() to drop the header line .map // rest of the processing I could not find a drop() function or take the bottom (n) elements for RDD. Alternatively, a way to create the case class schema from the header line of the file and use the rest for the data would be useful - just as a suggestion. Currently I am just deleting this header line manually before processing it in Spark. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639p15642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fwd: Spark SQL: ArrayIndexOutofBoundsException
You can do filter with startswith ? On Thu, Oct 2, 2014 at 4:04 PM, SK skrishna...@gmail.com wrote: Thanks for the help. Yes, I did not realize that the first header line has a different separator. By the way, is there a way to drop the first line that contains the header? Something along the following lines: sc.textFile(inp_file) .drop(1) // or tail() to drop the header line .map // rest of the processing I could not find a drop() function or take the bottom (n) elements for RDD. Alternatively, a way to create the case class schema from the header line of the file and use the rest for the data would be useful - just as a suggestion. Currently I am just deleting this header line manually before processing it in Spark. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639p15642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Strategies for reading large numbers of files
Hello, I'm trying to use Spark to process a large number of files in S3. I'm running into an issue that I believe is related to the high number of files, and the resources required to build the listing within the driver program. If anyone in the Spark community can provide insight or guidance, it would be greatly appreciated. The task at hand is to read ~100 million files stored in S3, and repartition the data into a sensible number of files (perhaps 1,000). The files are organized in a directory structure like so: s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name (Note that each file is very small, containing 1-10 records each. Unfortunately this is an artifact of the upstream systems that put data in S3.) My Spark program is simple, and works when I target a relatively specific subdirectory. For example: sparkContext.textFile(s3n://bucket/purchase/2014/01/01/00/*/*/*/*).coalesce(...).write(...) This targets 1 hour's worth of purchase records, containing about 10,000 files. The driver program blocks (I assume it is making S3 calls to traverse the directories), and during this time no activity is visible in the driver UI. After about a minute, the stages and tasks allocate in the UI, and then everything progresses and completes within a few minutes. I need to process all the data (several year's worth). Something like: sparkContext.textFile(s3n://bucket/*/*/*/*/*/*/*/*/*).coalesce(...).write(...) This blocks forever (I have only run the program for as long as overnight). The stages and tasks never appear in the UI. I assume Spark is building the file listing, which will either take too long and/or cause the driver to eventually run out of memory. I would appreciate any comments or suggestions. I'm happy to provide more information if that would be helpful. Thanks Landon
Re: Strategies for reading large numbers of files
I believe this is known as the Hadoop Small Files Problem, and it affects Spark as well. The best approach I've seen to merging small files like this is by using s3distcp, as suggested here http://snowplowanalytics.com/blog/2013/05/30/dealing-with-hadoops-small-files-problem/, as a pre-processing step. It would be great if Spark could somehow handle this common situation out of the box, but for now I don't think it does. Nick On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn lan...@janrain.com wrote: Hello, I'm trying to use Spark to process a large number of files in S3. I'm running into an issue that I believe is related to the high number of files, and the resources required to build the listing within the driver program. If anyone in the Spark community can provide insight or guidance, it would be greatly appreciated. The task at hand is to read ~100 million files stored in S3, and repartition the data into a sensible number of files (perhaps 1,000). The files are organized in a directory structure like so: s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name (Note that each file is very small, containing 1-10 records each. Unfortunately this is an artifact of the upstream systems that put data in S3.) My Spark program is simple, and works when I target a relatively specific subdirectory. For example: sparkContext.textFile(s3n://bucket/purchase/2014/01/01/00/*/*/*/*).coalesce(...).write(...) This targets 1 hour's worth of purchase records, containing about 10,000 files. The driver program blocks (I assume it is making S3 calls to traverse the directories), and during this time no activity is visible in the driver UI. After about a minute, the stages and tasks allocate in the UI, and then everything progresses and completes within a few minutes. I need to process all the data (several year's worth). Something like: sparkContext.textFile(s3n://bucket/*/*/*/*/*/*/*/*/*).coalesce(...).write(...) This blocks forever (I have only run the program for as long as overnight). The stages and tasks never appear in the UI. I assume Spark is building the file listing, which will either take too long and/or cause the driver to eventually run out of memory. I would appreciate any comments or suggestions. I'm happy to provide more information if that would be helpful. Thanks Landon
how to debug ExecutorLostFailure
hi all, I have a job that runs about for 15 mins, at some point I get an error on both nodes (all executors) saying: 14/10/02 23:14:38 WARN TaskSetManager: Lost task 80.0 in stage 3.0 (TID 253, backend-tes): ExecutorLostFailure (executor lost) In the end, it seems that the job recovers and completes the task. Just wondering what is the best way to understand why these tasks failed (couldn't seem to find anything in the logs), and how to avoid in the future? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-debug-ExecutorLostFailure-tp15646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sorting a Sequence File
Here is the code in question //read in the hadoop sequence file to sort val file = sc.sequenceFile(input, classOf[Text], classOf[Text]) //this is the code we would like to avoid that maps the Hadoop Text Input to Strings so the sortyByKey will run file.map{ case (k,v) = (k.toString(), v.toString())} //perform the sort on the converted data val sortedOutput = file.sortByKey(true, 1) //write out the results sortedOutput.saveAsSequenceFile(output, Some(classOf[DefaultCodec])) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sorting-a-Sequence-File-tp15633p15647.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help Troubleshooting Naive Bayes
Those logs you included are from the Spark executor processes, as opposed to the YARN NodeManager processes. If you don't think you have access to the NodeManager logs, I would try setting spark.yarn.executor.memoryOverhead to something like 1024 or 2048 and seeing if that helps. If it does, it's because YARN was killing the containers. -Sandy On Thu, Oct 2, 2014 at 6:48 AM, Mike Bernico mike.bern...@gmail.com wrote: Hello Xiangrui and Sandy, Thanks for jumping in to help. So, first thing... After my email last night I reran my code using 10 executors, 2G each, and everything ran okay. So, that's good, but I'm still curious as to what I was doing wrong. For Xiangrui's questions: My training set is 49174 observations x 61497 terms in a sparse vector from spark's tf/idf transform. The partition size is 1025, which isn't something I've tuned, I'm guessing it's related to input splits. I've never called coalesce, etc. For Sandy's: I do not see any memory errors in the yarn logs other than this occasionally: 14/10/01 19:25:54 INFO storage.MemoryStore: Will not store rdd_11_195 as it would require dropping another block from the same RDD 14/10/01 19:25:54 WARN spark.CacheManager: Not enough space to cache partition rdd_11_195 in memory! Free memory is 236314377 bytes. 14/10/01 19:25:57 INFO executor.Executor: Finished task 195.0 in stage 2.0 (TID 1220). 1134 bytes result sent to driver The only other badness I see in those logs is: 14/10/01 19:40:35 INFO network.SendingConnection: Initiating connection to [hostname removed :57359 http://rpl001273.opr.etlab.test.statefarm.org/10.233.51.34:57359] 14/10/01 19:40:35 WARN network.SendingConnection: Error finishing connection to hostname removed:57359 http://rpl001273.opr.etlab.test.statefarm.org/10.233.51.34:57359 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:701) at org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:313) at org.apache.spark.network.ConnectionManager$$anon$8.run(ConnectionManager.scala:226) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) I'm guessing those are from after the executors have died their mysterious death. I'm happy ot send you the entire log if you'd like. Thanks! On Thu, Oct 2, 2014 at 2:02 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mike, Do you have access to your YARN NodeManager logs? When executors die randomly on YARN, it's often because they use more memory than allowed for their YARN container. You would see messages to the effect of container killed because physical memory limits exceeded. -Sandy On Wed, Oct 1, 2014 at 8:46 PM, Xiangrui Meng men...@gmail.com wrote: The cost depends on the feature dimension, number of instances, number of classes, and number of partitions. Do you mind sharing those numbers? -Xiangrui On Wed, Oct 1, 2014 at 6:31 PM, Mike Bernico mike.bern...@gmail.com wrote: Hi Everyone, I'm working on training mllib's Naive Bayes to classify TF/IDF vectoried docs using Spark 1.1.0. I've gotten this to work fine on a smaller set of data, but when I increase the number of vectorized documents I get hung up on training. The only messages I'm seeing are below. I'm pretty new to spark and I don't really know where to go next to troubleshoot this. I'm running spark in yarn like this: spark-shell --master yarn-client --executor-memory 7G --driver memory 7G --num-executors 3 I have three workers, each with 64G of ram and 8 cores. scala val model = NaiveBayes.train(training, lambda = 1.0) 14/10/01 19:40:34 ERROR YarnClientClusterScheduler: Lost executor 2 on rpl001273.removed: remote Akka client disassociated 14/10/01 19:40:34 WARN TaskSetManager: Lost task 195.0 in stage 5.0 (TID 2940, rpl001273.removed): ExecutorLostFailure (executor lost) 14/10/01 19:40:34 WARN TaskSetManager: Lost task 190.0 in stage 5.0 (TID 2782, rpl001272.removed): FetchFailed(BlockManagerId(2, rpl001273.removed, 57359, 0), shuffleId=1, mapId=0, reduceId=190) 14/10/01 19:40:35 WARN TaskSetManager: Lost task 195.1 in stage 5.0 (TID 2941, rpl001272.removed): FetchFailed(BlockManagerId(2, rpl001273.removed, 57359, 0), shuffleId=1, mapId=0, reduceId=195) 14/10/01 19:40:36 WARN TaskSetManager: Lost task 185.0 in stage 5.0 (TID 2780, rpl001277.removed): FetchFailed(BlockManagerId(2, rpl001273.removed, 57359, 0), shuffleId=1, mapId=0, reduceId=185) 14/10/01 19:46:24 ERROR YarnClientClusterScheduler: Lost executor 1 on rpl001272.removed: remote Akka client disassociated
Getting table info from HiveContext
Hi, Would anybody know how to get the following information from HiveContext given a Hive table name? - partition key(s) - table directory - input/output format I am new to Spark. And I have a couple tables created using Parquet data like: CREATE EXTERNAL TABLE parquet_table ( COL1 string, COL2 string, COL3 string ) ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT parquet.hive.DeprecatedParquetInputFormat OUTPUTFORMAT parquet.hive.DeprecatedParquetOutputFormat LOCATION '/user/foo/parquet_src'; and some of the tables have partitions. In my Spark Java code, I am able to run queries using the HiveContext like: SparkConf sparkConf = new SparkConf().setAppName(example); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql(select * from parquet_table); Now am I able to get the INPUTFORMAT, OUTPUTFORMAT, LOCATION, and in other cases partition key(s) programmatically through the HiveContext? The only way I know (pardon my ignorance) is to parse from the SchemaRDD returned by hiveCtx.sql(describe extended parquet_table); If anybody could shed some light on a better way, I would appreciate that. Thanks :) -BC
Load multiple parquet file as single RDD
Hi, I am trying to play around with Spark and Spark SQL. I have logs being stored in HDFS on a 10 minute window. Each 10 minute window could have as many as 10 files with random names of 2GB each. Now, I want to run some analysis on these files. These files are parquet files. I am trying to run Spark SQL queries on them. I notice that the API only can take a single parquet File and not a directory or a GLOB pattern where all the files can be loaded as a single Schema RDD. I tried doing a unionAll, but from the output of the job it looked like it was merging the files and writing to disk ( not confirmed but from the time it took I am assuming). I tried insertInto, but that definitely wrote to disk and times were comparable to unionAll operation. Is there a way to run jobs on multiple files as if they were a single RDD. I am not restricted to using Spark SQL, this is what I started to play around with. What has stopped us from creating an API that takes a GLOB pattern and create a single RDD from all of the files inside. Thanks mohnish
Re: Load multiple parquet file as single RDD
parquetFile accepts a comma separated list of files. Also, unionAll does not write to disk. However, unless you are running a recent version (compiled from master since this was added https://github.com/apache/spark/commit/f858f466862541c3faad76a1fa2391f1c17ec9dd) its missing an optimization and thus reads all the columns, instead of just the ones required, from the disk On Thu, Oct 2, 2014 at 6:05 PM, Mohnish Kodnani mohnish.kodn...@gmail.com wrote: Hi, I am trying to play around with Spark and Spark SQL. I have logs being stored in HDFS on a 10 minute window. Each 10 minute window could have as many as 10 files with random names of 2GB each. Now, I want to run some analysis on these files. These files are parquet files. I am trying to run Spark SQL queries on them. I notice that the API only can take a single parquet File and not a directory or a GLOB pattern where all the files can be loaded as a single Schema RDD. I tried doing a unionAll, but from the output of the job it looked like it was merging the files and writing to disk ( not confirmed but from the time it took I am assuming). I tried insertInto, but that definitely wrote to disk and times were comparable to unionAll operation. Is there a way to run jobs on multiple files as if they were a single RDD. I am not restricted to using Spark SQL, this is what I started to play around with. What has stopped us from creating an API that takes a GLOB pattern and create a single RDD from all of the files inside. Thanks mohnish
Re: Getting table info from HiveContext
We actually leave all the DDL commands up to hive, so there is no programatic way to access the things you are looking for. On Thu, Oct 2, 2014 at 5:17 PM, Banias calvi...@yahoo.com.invalid wrote: Hi, Would anybody know how to get the following information from HiveContext given a Hive table name? - partition key(s) - table directory - input/output format I am new to Spark. And I have a couple tables created using Parquet data like: CREATE EXTERNAL TABLE parquet_table ( COL1 string, COL2 string, COL3 string ) ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT parquet.hive.DeprecatedParquetInputFormat OUTPUTFORMAT parquet.hive.DeprecatedParquetOutputFormat LOCATION '/user/foo/parquet_src'; and some of the tables have partitions. In my Spark Java code, I am able to run queries using the HiveContext like: SparkConf sparkConf = new SparkConf().setAppName(example); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql(select * from parquet_table); Now am I able to get the INPUTFORMAT, OUTPUTFORMAT, LOCATION, and in other cases partition key(s) programmatically through the HiveContext? The only way I know (pardon my ignorance) is to parse from the SchemaRDD returned by hiveCtx.sql(describe extended parquet_table); If anybody could shed some light on a better way, I would appreciate that. Thanks :) -BC
Re: Fwd: Spark SQL: ArrayIndexOutofBoundsException
This is hard to do in general, but you can get what you are asking for by putting the following class in scope. implicit class BetterRDD[A: scala.reflect.ClassTag](rdd: org.apache.spark.rdd.RDD[A]) { def dropOne = rdd.mapPartitionsWithIndex((i, iter) = if(i == 0 iter.hasNext) { iter.next; iter } else iter) } On Thu, Oct 2, 2014 at 4:06 PM, Sunny Khatri sunny.k...@gmail.com wrote: You can do filter with startswith ? On Thu, Oct 2, 2014 at 4:04 PM, SK skrishna...@gmail.com wrote: Thanks for the help. Yes, I did not realize that the first header line has a different separator. By the way, is there a way to drop the first line that contains the header? Something along the following lines: sc.textFile(inp_file) .drop(1) // or tail() to drop the header line .map // rest of the processing I could not find a drop() function or take the bottom (n) elements for RDD. Alternatively, a way to create the case class schema from the header line of the file and use the rest for the data would be useful - just as a suggestion. Currently I am just deleting this header line manually before processing it in Spark. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639p15642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Any issues with repartition?
Hi Arun, Have you found a solution? Seems that I have the same problem. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: new error for me
have you found a solution this problem? (or at least a cause) thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/new-error-for-me-tp10378p15655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext: cache table not supported for partitioned table?
Cache table works with partitioned table. I guess you’re experimenting with a default local metastore and the metastore_db directory doesn’t exist at the first place. In this case, all metastore tables/views don’t exist at first and will throw the error message you saw when the |PARTITIONS| metastore table is accessed for the first time by Hive client. However, you should also see this line before this error: 14/10/03 10:51:30 ERROR ObjectStore: Direct SQL failed, falling back to ORM And then the table is created on the fly. The cache operation is also performed normally. You can verify this by selecting it and check the Spark UI for cached RDDs. If you try to uncache the table and cache it again, you won’t see this error any more. Normally, in production environment you won’t see this error because metastore database is usually setup ahead of time. On 10/3/14 3:39 AM, Du Li wrote: Hi, In Spark 1.1 HiveContext, I ran a create partitioned table command followed by a cache table command and got a java.sql.SQLSyntaxErrorException: Table/View 'PARTITIONS' does not exist. But cache table worked fine if the table is not a partitioned table. Can anybody confirm that cache of partitioned table is not supported yet in current version? Thanks, Du
Re: Getting table info from HiveContext
Thanks Michael. On Thursday, October 2, 2014 8:41 PM, Michael Armbrust mich...@databricks.com wrote: We actually leave all the DDL commands up to hive, so there is no programatic way to access the things you are looking for. On Thu, Oct 2, 2014 at 5:17 PM, Banias calvi...@yahoo.com.invalid wrote: Hi, Would anybody know how to get the following information from HiveContext given a Hive table name? - partition key(s) - table directory - input/output format I am new to Spark. And I have a couple tables created using Parquet data like: CREATE EXTERNAL TABLE parquet_table ( COL1 string, COL2 string, COL3 string ) ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT parquet.hive.DeprecatedParquetInputFormat OUTPUTFORMAT parquet.hive.DeprecatedParquetOutputFormat LOCATION '/user/foo/parquet_src'; and some of the tables have partitions. In my Spark Java code, I am able to run queries using the HiveContext like: SparkConf sparkConf = new SparkConf().setAppName(example); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql(select * from parquet_table); Now am I able to get the INPUTFORMAT, OUTPUTFORMAT, LOCATION, and in other cases partition key(s) programmatically through the HiveContext? The only way I know (pardon my ignorance) is to parse from the SchemaRDD returned by hiveCtx.sql(describe extended parquet_table); If anybody could shed some light on a better way, I would appreciate that. Thanks :) -BC
How to make ./bin/spark-sql work with hive?
I have rebuild package with -Phive Copied hive-site.xml to conf (I am using hive-0.12) When I run ./bin/spark-sql, I get java.lang.NoSuchMethodError for every command. What am I missing here? Could somebody share what would be the right procedure to make it work? java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.Driver.getResults(Ljava/util/ArrayList;)Z at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:58) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:291) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:422) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) spark-sql use mydb; OK java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.Driver.getResults(Ljava/util/ArrayList;)Z at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:58) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:291) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:422) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) spark-sql select count(*) from test; java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
Setup/Cleanup for RDD closures?
Consider there is some connection / external resource allocation required to be accessed/mutated by each of the rows from within a single worker thread. That connection should only be opened/closed before the first row is accessed / after the last row is completed. It is my understanding that there is work presently underway (Reynold Xin and others) on defining an external resources API to address this. What is the recommended approach in the meanwhile?