Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
Thanks for pointing to the issue. Yes I think its the same issue, below is Exception ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Regards, Vasu C On Thu, Nov 6, 2014 at 1:14 PM, Sean Owen so...@cloudera.com wrote: You didn't say what isn't serializable or where the exception occurs, but, is it the same as this issue? https://issues.apache.org/jira/browse/SPARK-4196 On Thu, Nov 6, 2014 at 5:42 AM, Vasu C vasuc.bigd...@gmail.com wrote: Dear All, I am getting java.io.NotSerializableException for below code. if jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception Please help JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(300)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); // TODO create checkpoint directory for fault tolerance
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
No, not the same thing then. This just means you accidentally have a reference to the unserializable enclosing test class in your code. Just make sure the reference is severed. On Thu, Nov 6, 2014 at 8:00 AM, Vasu C vasuc.bigd...@gmail.com wrote: Thanks for pointing to the issue. Yes I think its the same issue, below is Exception ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
CheckPoint Issue with JsonRDD
When we enable checkpoint and use JsonRDD we get the following error: Is this bug ? Exception in thread main java.lang.NullPointerException at org.apache.spark.rdd.RDD.init(RDD.scala:125) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.SQLContext.applySchema(SQLContext.scala:132) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:194) at SparkStreamingToParquet$$anonfun$createContext$1.apply(SparkStreamingToParquet.scala:69) at SparkStreamingToParquet$$anonfun$createContext$1.apply(SparkStreamingToParquet.scala:63) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) = import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.catalyst.types.{StructType, StructField, StringType} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{Logging, SparkConf} import org.apache.spark.sql.api.java.JavaSchemaRDD import org.apache.spark.sql.hive.api.java.JavaHiveContext import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} object SparkStreamingToParquet extends Logging { /** * * @param args * @throws Exception */ def main(args: Array[String]) { if (args.length 3) { logInfo(Please provide valid parameters: hdfsFilesLocation: hdfs://ip:8020/user/hdfs/--/ IMPALAtableloc hdfs://ip:8020/user/hive/--/ tablename) logInfo(make user you give full folder path with '/' at the end i.e /user/hdfs/abc/) System.exit(1) } val HDFS_FILE_LOC = args(0) val IMPALA_TABLE_LOC = args(1) val TEMP_TABLE_NAME = args(2) val CHECKPOINT_DIR = args(3) val jssc: StreamingContext = StreamingContext.getOrCreate(CHECKPOINT_DIR, ()={ createContext(args) }) jssc.start jssc.awaitTermination } def createContext(args:Array[String]): StreamingContext = { val HDFS_FILE_LOC = args(0) val IMPALA_TABLE_LOC = args(1) val TEMP_TABLE_NAME = args(2) val CHECKPOINT_DIR = args(3) val sparkConf: SparkConf = new SparkConf().setAppName(Json to Parquet).set(spark.cores.max, 3) val jssc: StreamingContext = new StreamingContext(sparkConf, new Duration(3)) val hivecontext: HiveContext = new HiveContext(jssc.sparkContext) hivecontext.createParquetFile[Person](IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME); val schemaString = name age val schema = StructType( schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) val textFileStream = jssc.textFileStream(HDFS_FILE_LOC) textFileStream.foreachRDD(rdd = { if(rdd !=null rdd.count()0) { val schRdd = hivecontext.jsonRDD(rdd,schema) logInfo(inserting into table: + TEMP_TABLE_NAME) schRdd.insertInto(TEMP_TABLE_NAME) } }) jssc.checkpoint(CHECKPOINT_DIR) jssc } } case class Person(name:String, age:String) extends Serializable Regards, Madhu jahagirdar The information contained in this message may be confidential and legally protected under applicable law. The message is intended solely for the addressee(s). If you are not the intended recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction of this message is strictly prohibited and may be unlawful. If you are not the intended recipient, please contact the sender by return e-mail and destroy all copies of the original message.
Re: why decision trees do binary split?
I meant above, that in the case of categorical variables it might be more efficient to create a node on each categorical value. Is there a reason why spark went down the binary route? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-decision-trees-do-binary-split-tp18188p18265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why decision trees do binary split?
I haven't seen that done before, which may be most of the reason - I am not sure that is common practice. I can see upsides - you need not pick candidate splits to test since there is only one N-way rule possible. The binary split equivalent is N levels instead of 1. The big problem is that you are always segregating the data set entirely, and making the equivalent of those N binary rules, even when you would not otherwise bother because they don't add information about the target. The subsets matching each child are therefore unnecessarily small and this makes learning on each independent subset weaker. On Nov 6, 2014 9:36 AM, jamborta jambo...@gmail.com wrote: I meant above, that in the case of categorical variables it might be more efficient to create a node on each categorical value. Is there a reason why spark went down the binary route? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-decision-trees-do-binary-split-tp18188p18265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Snappy temp files not cleaned up
Default value is infinite, so you need to enable it. Personally I’ve setup a couple of cron jobs to clean up /tmp and /var/run/spark. On 06.11.2014, at 08:15, Romi Kuntsman r...@totango.com wrote: Hello, Spark has an internal cleanup mechanism (defined by spark.cleaner.ttl, see http://spark.apache.org/docs/latest/configuration.html) which cleans up tasks and stages. However, in our installation, we noticed that Snappy temporary files and never cleaned up. Is it a misconfiguration? Missing feature? How do you deal with build-up of temp files? Thanks, Romi Kuntsman, Big Data Engineer http://www.totango.com
Re: why decision trees do binary split?
Hello, There is a big compelling reason for binary splits in general for trees: a split is made if the difference between the two resulting branches is significant.You also want to compare the significance of this candidate split vs all the other candidate splits. There are many statistical tests to compare two groups. You can even generate something like p-values that, according to some, allow you to compare different candidate splits. If you introduce multibranch splits... things become much more messy. Also, mind that breaking categorical variables into as many groups as there are levels would in some cases separate subgroups of variables which are not that different. Successive binary splits could potentially provide you with the required homogeneous subsets. Best, Carlos J. Gil Bellosta http://www.datanalytics.com 2014-11-06 10:46 GMT+01:00 Sean Owen so...@cloudera.com: I haven't seen that done before, which may be most of the reason - I am not sure that is common practice. I can see upsides - you need not pick candidate splits to test since there is only one N-way rule possible. The binary split equivalent is N levels instead of 1. The big problem is that you are always segregating the data set entirely, and making the equivalent of those N binary rules, even when you would not otherwise bother because they don't add information about the target. The subsets matching each child are therefore unnecessarily small and this makes learning on each independent subset weaker. On Nov 6, 2014 9:36 AM, jamborta jambo...@gmail.com wrote: I meant above, that in the case of categorical variables it might be more efficient to create a node on each categorical value. Is there a reason why spark went down the binary route? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-decision-trees-do-binary-split-tp18188p18265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why decision trees do binary split?
Thanks for the reply, Sean. I can see that splitting on all the categories would probably overfit the tree, on the other hand, it might give more insight on the subcategories (probably only would work if the data is uniformly distributed between the categories). I haven't really found any comparison between the two methods in terms of performance and interpretability. thanks, On Thu, Nov 6, 2014 at 9:46 AM, Sean Owen so...@cloudera.com wrote: I haven't seen that done before, which may be most of the reason - I am not sure that is common practice. I can see upsides - you need not pick candidate splits to test since there is only one N-way rule possible. The binary split equivalent is N levels instead of 1. The big problem is that you are always segregating the data set entirely, and making the equivalent of those N binary rules, even when you would not otherwise bother because they don't add information about the target. The subsets matching each child are therefore unnecessarily small and this makes learning on each independent subset weaker. On Nov 6, 2014 9:36 AM, jamborta jambo...@gmail.com wrote: I meant above, that in the case of categorical variables it might be more efficient to create a node on each categorical value. Is there a reason why spark went down the binary route? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-decision-trees-do-binary-split-tp18188p18265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: I want to make clear the difference about executor-cores number.
the only difference between the two setups (if you vary change the executor cores) is how many tasks are running in parallel (the number of tasks would depend on other factors), so try to inspect the stages while running (probably easier to do that with longer running tasks) by clicking on one of the stages. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/I-want-to-make-clear-the-difference-about-executor-cores-number-tp18183p18272.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
HI Sean, Below is my java code and using spark 1.1.0. Still getting the same error. Here Bean class is serialized. Not sure where exactly is the problem. What am I doing wrong here ? public class StreamingJson { public static void main(String[] args) throws Exception { final String HDFS_FILE_LOC = args[0]; final String IMPALA_TABLE_LOC = args[1]; final String TEMP_TABLE_NAME = args[2]; final String HDFS_CHECKPOINT_DIR = args[3]; JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().setAppName( test).set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(500)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); final JavaDStreamString textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); } } Regards, Vasu C On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen so...@cloudera.com wrote: No, not the same thing then. This just means you accidentally have a reference to the unserializable enclosing test class in your code. Just make sure the reference is severed. On Thu, Nov 6, 2014 at 8:00 AM, Vasu C vasuc.bigd...@gmail.com wrote: Thanks for pointing to the issue. Yes I think its the same issue, below is Exception ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson
Re: why decision trees do binary split?
You can imagine this same logic applying to the continuous case. E.g. what if all the quartiles or deciles of a particular value have different behavior - this could capture that too. Of what if some combination of features was highly discriminitive but only into n buckets, rather than two.. you can see there are lots of different options here. In general in MLlib, we're trying to support widely accepted and frequently used ML models, and simply offer a platform to efficiently train these with spark. While decision trees with n-ary splits might be a sensible thing to explore, they are not widely used in practice, and I'd want to see some compelling results from proper ML/stats researchers before shipping them as a default feature. If you're looking for a way to control variance and pick up nuance in your dataset that's not covered by plain decision trees, I recommend looking at Random Forests - a well studied extension to decision trees that's also widely used in practice - and coming to MLlib soon! On Thu, Nov 6, 2014 at 3:29 AM, Tamas Jambor jambo...@gmail.com wrote: Thanks for the reply, Sean. I can see that splitting on all the categories would probably overfit the tree, on the other hand, it might give more insight on the subcategories (probably only would work if the data is uniformly distributed between the categories). I haven't really found any comparison between the two methods in terms of performance and interpretability. thanks, On Thu, Nov 6, 2014 at 9:46 AM, Sean Owen so...@cloudera.com wrote: I haven't seen that done before, which may be most of the reason - I am not sure that is common practice. I can see upsides - you need not pick candidate splits to test since there is only one N-way rule possible. The binary split equivalent is N levels instead of 1. The big problem is that you are always segregating the data set entirely, and making the equivalent of those N binary rules, even when you would not otherwise bother because they don't add information about the target. The subsets matching each child are therefore unnecessarily small and this makes learning on each independent subset weaker. On Nov 6, 2014 9:36 AM, jamborta jambo...@gmail.com wrote: I meant above, that in the case of categorical variables it might be more efficient to create a node on each categorical value. Is there a reason why spark went down the binary route? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-decision-trees-do-binary-split-tp18188p18265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
Erm, you are trying to do all the work in the create() method. This is definitely not what you want to do. It is just supposed to make the JavaSparkStreamingContext. A further problem is that you're using anonymous inner classes, which are non-static and contain a reference to the outer class. The closure cleaner can sometimes get rid of that, but perhaps not here. Consider a static inner class if you can't resolve it other ways. There is probably however at least another issue in this code ... On Thu, Nov 6, 2014 at 1:43 PM, Vasu C vasuc.bigd...@gmail.com wrote: HI Sean, Below is my java code and using spark 1.1.0. Still getting the same error. Here Bean class is serialized. Not sure where exactly is the problem. What am I doing wrong here ? public class StreamingJson { public static void main(String[] args) throws Exception { final String HDFS_FILE_LOC = args[0]; final String IMPALA_TABLE_LOC = args[1]; final String TEMP_TABLE_NAME = args[2]; final String HDFS_CHECKPOINT_DIR = args[3]; JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().setAppName( test).set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(500)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); final JavaDStreamString textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); } } Regards, Vasu C On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen so...@cloudera.com wrote: No, not the same thing then. This just means you accidentally have a reference to the unserializable enclosing test class in your code. Just make sure the reference is severed. On Thu, Nov 6, 2014 at 8:00 AM, Vasu C vasuc.bigd...@gmail.com wrote: Thanks for pointing to the issue. Yes I think its the same issue, below is Exception ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to blend a DStream and a broadcast variable?
Excellent. Is there an example of this somewhere? Sent from my iPhone On Nov 6, 2014, at 1:43 AM, Sean Owen so...@cloudera.com wrote: Broadcast vars should work fine in Spark streaming. Broadcast vars are immutable however. If you have some info to cache which might change from batch to batch, you should be able to load it at the start of your 'foreachRDD' method or equivalent. That's simple and works assuming your batch interval isn't so short and data so big that loading it every time is a burden. On Wed, Nov 5, 2014 at 11:30 PM, spr s...@yarcdata.com wrote: My use case has one large data stream (DS1) that obviously maps to a DStream. The processing of DS1 involves filtering it for any of a set of known values, which will change over time, though slowly by streaming standards. If the filter data were static, it seems to obviously map to a broadcast variable, but it's dynamic. (And I don't think it works to implement it as a DStream, because the new values need to be copied redundantly to all executors, not partitioned among the executors.) Looking at the Spark and Spark Streaming documentation, I have two questions: 1) There's no mention in the Spark Streaming Programming Guide of broadcast variables. Do they coexist properly? 2) Once I have a broadcast variable in place in the periodic function that Spark Streaming executes, how can I update its value? Obviously I can't literally update the value of that broadcast variable, which is immutable, but how can I get a new version of the variable established in all the executors? (And the other ever-present implicit question...) 3) Is there a better way to implement this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-blend-a-DStream-and-a-broadcast-variable-tp18227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
multiple spark context in same driver program
Hi, quick question: I found this: http://docs.sigmoidanalytics.com/index.php/Problems_and_their_Solutions#Multiple_SparkContext:Failed_to_bind_to:.2F127.0.1.1:45916 My main question: is this constrain still valid? AM I not allowed to have two SparkContexts pointing to the same Spark Master in one driver program? Regards, Pawel Szulc
RE: Any Replicated RDD in Spark?
Matei, Thanks for reply. I don't worry that much about more code because I migrate from mapreduce, so I have existing code to handle it. But if I want to use a new tech, I will always prefer right way not a temporary easy way!. I will go with RDD first to test the performance. Thanks! Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Wednesday, November 05, 2014 6:27 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? If you start with an RDD, you do have to collect to the driver and broadcast to do this. Between the two options you listed, I think this one is simpler to implement, and there won't be a huge difference in performance, so you can go for it. Opening InputStreams to a distributed file system by hand can be a lot of code. Matei On Nov 5, 2014, at 12:37 PM, Shuai Zheng szheng.c...@gmail.com wrote: And another similar case: If I have get a RDD from previous step, but for next step it should be a map side join (so I need to broadcast this RDD to every nodes). What is the best way for me to do that? Collect RDD in driver first and create broadcast? Or any shortcut in spark for this? Thanks! -Original Message- From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, November 05, 2014 3:32 PM To: 'Matei Zaharia' Cc: 'user@spark.apache.org' Subject: RE: Any Replicated RDD in Spark? Nice. Then I have another question, if I have a file (or a set of files: part-0, part-1, might be a few hundreds MB csv to 1-2 GB, created by other program), need to create hashtable from it, later broadcast it to each node to allow query (map side join). I have two options to do it: 1, I can just load the file in a general code (open a inputstream, etc), parse content and then create the broadcast from it. 2, I also can use a standard way to create the RDD from these file, run the map to parse it, then collect it as map, wrap the result as broadcast to push to all nodes again. I think the option 2 might be more consistent with spark's concept (and less code?)? But how about the performance? The gain is can parallel load and parse the data, penalty is after load we need to collect and broadcast result again? Please share your opinion. I am not sure what is the best practice here (in theory, either way works, but in real world, which one is better?). Regards, Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Monday, November 03, 2014 4:15 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? You need to use broadcast followed by flatMap or mapPartitions to do map-side joins (in your map function, you can look at the hash table you broadcast and see what records match it). Spark SQL also does it by default for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). Matei On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Task duration graph on Spark stage UI
Even though the stage UI has min, 25th%, median, 75th%, and max durations, I am often still left clueless about the distribution. For example, 100 out of 200 tasks (started at the same time) have completed in 1 hour. How much longer do I have to wait? I cannot guess well based on the five numbers. A graph of the durations will not answer the question either, but I think it gives a better idea. I can hopefully see if the distribution is linearly sloped or bimodal or exponentially slowing down, etc. It's easy to draw this graph, so I set it up as a Chrome extension: https://chrome.google.com/webstore/detail/spark-distributions/hhgnppbenlghmcimkmiccfiemdohdgoo And here's the complete source code that you can throw in the JavaScript console for the same results: var x = $('table:eq(2)').find('td:nth-child(8)').map(function (i, e) { return parseInt($(e).attr('sorttable_customkey')); }); x.sort(function(a, b) { return a - b; }); var w = x.length; var h = x[w - 1]; var W = 180; var H = 80; var canvas = $('canvas width=' + W + ' height=' + H + ''); canvas.css({ position: 'absolute', top: '100px', left: '500px' }); $('body').append(canvas); var ctx = canvas[0].getContext('2d'); ctx.fillStyle = 'orange'; ctx.beginPath(); ctx.moveTo(0, H); for (var i = 0; i w; ++i) { ctx.lineTo(i * W / (w - 1), H - x[i] * H / h); } ctx.lineTo(W, H); ctx.fill(); It should not be much work to add this to the stage status page itself either, if there is interest.
Re: Unable to use HiveContext in spark-shell
Help please! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-use-HiveContext-in-spark-shell-tp18261p18280.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark sql: join sql fails after sqlCtx.cacheTable()
I am getting exception at sparksheel at the following line: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) error: bad symbolic reference. A signature in HiveContext.class refers to term hive in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling HiveContext.class. error: while compiling: console during phase: erasure library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: last tree to typer: Apply(value $outer) symbol: value $outer (flags: method synthetic stable expandedname triedcooking) symbol definition: val $outer(): $iwC.$iwC.type tpe: $iwC.$iwC.type symbol owners: value $outer - class $iwC - class $iwC - class $iwC - class $read - package $line5 context owners: class $iwC - class $iwC - class $iwC - class $iwC - class $read - package $line5 == Enclosing template or block == ClassDef( // class $iwC extends Serializable 0 $iwC [] Template( // val local $iwC: notype, tree.tpe=$iwC java.lang.Object, scala.Serializable // parents ValDef( private _ tpt empty ) // 5 statements DefDef( // def init(arg$outer: $iwC.$iwC.$iwC.type): $iwC method triedcooking init [] // 1 parameter list ValDef( // $outer: $iwC.$iwC.$iwC.type param $outer tpt // tree.tpe=$iwC.$iwC.$iwC.type empty ) tpt // tree.tpe=$iwC Block( // tree.tpe=Unit Apply( // def init(): Object in class Object, tree.tpe=Object $iwC.super.init // def init(): Object in class Object, tree.tpe=()Object Nil ) () ) ) ValDef( // private[this] val sqlContext: org.apache.spark.sql.hive.HiveContext private local triedcooking sqlContext tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext Apply( // def init(sc: org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class HiveContext, tree.tpe=org.apache.spark.sql.hive.HiveContext new org.apache.spark.sql.hive.HiveContext.init // def init(sc: org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class HiveContext, tree.tpe=(sc: org.apache.spark.SparkContext)org.apache.spark.sql.hive.HiveContext Apply( // val sc(): org.apache.spark.SparkContext, tree.tpe=org.apache.spark.SparkContext $iwC.this.$line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$$outer().$VAL1().$iw().$iw().sc // val sc(): org.apache.spark.SparkContext, tree.tpe=()org.apache.spark.SparkContext Nil ) ) ) DefDef( // val sqlContext(): org.apache.spark.sql.hive.HiveContext method stable accessor sqlContext [] List(Nil) tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext $iwC.this.sqlContext // private[this] val sqlContext: org.apache.spark.sql.hive.HiveContext, tree.tpe=org.apache.spark.sql.hive.HiveContext ) ValDef( // protected val $outer: $iwC.$iwC.$iwC.type protected synthetic paramaccessor triedcooking $outer tpt // tree.tpe=$iwC.$iwC.$iwC.type empty ) DefDef( // val $outer(): $iwC.$iwC.$iwC.type method synthetic stable expandedname triedcooking $line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer [] List(Nil) tpt // tree.tpe=Any $iwC.this.$outer // protected val $outer: $iwC.$iwC.$iwC.type, tree.tpe=$iwC.$iwC.$iwC.type ) ) ) == Expanded type of tree == ThisType(class $iwC) uncaught exception during compilation: scala.reflect.internal.Types$TypeError scala.reflect.internal.Types$TypeError: bad symbolic reference. A signature in HiveContext.class refers to term conf in value org.apache.hadoop.hive which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling HiveContext.class. That entry seems to have slain the compiler. Shall I replay your session? I can re-run each line except the last one. Thanks Tridib Date: Tue, 21 Oct 2014 09:39:49 -0700 Subject: Re: spark sql: join sql fails after sqlCtx.cacheTable() From: ri...@infoobjects.com To: tridib.sama...@live.com CC: u...@spark.incubator.apache.org Hi Tridib, I changed SQLContext to HiveContext and it started working. These are steps I used. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)val person = sqlContext.jsonFile(json/person.json)person.printSchema()person.registerTempTable(person)val address = sqlContext.jsonFile(json/address.json)address.printSchema()address.registerTempTable(address)sqlContext.cacheTable(person)sqlContext.cacheTable(address)val rs2 = sqlContext.sql(select p.id,p.name,a.city from person
Spark and Kafka
Hi Guys, I am doing some tests with Spark Streaming and Kafka, but I have seen something strange, I have modified the JavaKafkaWordCount to use ReducebyKeyandWindow and to print in the screen the accumulated numbers of the words, in the beginning spark works very well in each interaction the numbers of the words increase but after 12 a 13 sec the results repeats continually. My program producer remain sending the words toward the kafka. Does anyone have any idea about this? --- Time: 1415272266000 ms --- (accompanied them,6) (merrier,5) (it possessed,5) (the treacherous,5) (Quite,12) (offer,273) (rabble,58) (exchanging,16) (Genoa,18) (merchant,41) ... --- Time: 1415272267000 ms --- (accompanied them,12) (merrier,12) (it possessed,12) (the treacherous,11) (Quite,24) (offer,602) (rabble,132) (exchanging,35) (Genoa,36) (merchant,84) ... --- Time: 1415272268000 ms --- (accompanied them,17) (merrier,18) (it possessed,17) (the treacherous,17) (Quite,35) (offer,889) (rabble,192) (the bed,1) (exchanging,51) (Genoa,54) ... --- Time: 1415272269000 ms --- (accompanied them,17) (merrier,18) (it possessed,17) (the treacherous,17) (Quite,35) (offer,889) (rabble,192) (the bed,1) (exchanging,51) (Genoa,54) ... --- Time: 141527227 ms --- (accompanied them,17) (merrier,18) (it possessed,17) (the treacherous,17) (Quite,35) (offer,889) (rabble,192) (the bed,1) (exchanging,51) (Genoa,54) ... -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Unable to use HiveContext in spark-shell
can you be more specific what version of spark, hive, hadoop, etc... what are you trying to do? what are the issues you are seeing? J ᐧ *JIMMY MCERLAIN* DATA SCIENTIST (NERD) *. . . . . . . . . . . . . . . . . .* *IF WE CAN’T DOUBLE YOUR SALES,* *ONE OF US IS IN THE WRONG BUSINESS.* *E*: ji...@sellpoints.com *M*: *510.303.7751* On Thu, Nov 6, 2014 at 9:22 AM, tridib tridib.sama...@live.com wrote: Help please! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-use-HiveContext-in-spark-shell-tp18261p18280.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to use HiveContext in spark-shell
What version of Spark are you using? Did you compile your Spark version and if so, what compile options did you use? On 11/6/14, 9:22 AM, tridib tridib.sama...@live.com wrote: Help please! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-use-HiveCont ext-in-spark-shell-tp18261p18280.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Unable to use HiveContext in spark-shell
I am using spark 1.1.0. I built it using: ./make-distribution.sh -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests My ultimate goal is to execute a query on parquet file with nested structure and cast a date string to Date. This is required to calculate the age of Person entity. but I am even unable to pass this line:val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) I made sure that org.apache.hadoop package is in the spark assembly jar. Re-attaching the stack trace for quick reference. scala val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) error: bad symbolic reference. A signature in HiveContext.class refers to term hive in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling HiveContext.class. error: while compiling: console during phase: erasure library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: last tree to typer: Apply(value $outer) symbol: value $outer (flags: method synthetic stable expandedname triedcooking) symbol definition: val $outer(): $iwC.$iwC.type tpe: $iwC.$iwC.type symbol owners: value $outer - class $iwC - class $iwC - class $iwC - class $read - package $line5 context owners: class $iwC - class $iwC - class $iwC - class $iwC - class $read - package $line5 == Enclosing template or block == ClassDef( // class $iwC extends Serializable 0 $iwC [] Template( // val local $iwC: notype, tree.tpe=$iwC java.lang.Object, scala.Serializable // parents ValDef( private _ tpt empty ) // 5 statements DefDef( // def init(arg$outer: $iwC.$iwC.$iwC.type): $iwC method triedcooking init [] // 1 parameter list ValDef( // $outer: $iwC.$iwC.$iwC.type $outer tpt // tree.tpe=$iwC.$iwC.$iwC.type empty ) tpt // tree.tpe=$iwC Block( // tree.tpe=Unit Apply( // def init(): Object in class Object, tree.tpe=Object $iwC.super.init // def init(): Object in class Object, tree.tpe=()Object Nil ) () ) ) ValDef( // private[this] val sqlContext: org.apache.spark.sql.hive.HiveContext private local triedcooking sqlContext tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext Apply( // def init(sc: org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class HiveContext, tree.tpe=org.apache.spark.sql.hive.HiveContext new org.apache.spark.sql.hive.HiveContext.init // def init(sc: org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class HiveContext, tree.tpe=(sc: org.apache.spark.SparkContext)org.apache.spark.sql.hive.HiveContext Apply( // val sc(): org.apache.spark.SparkContext, tree.tpe=org.apache.spark.SparkContext $iwC.this.$line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$$outer().$VAL1().$iw().$iw().sc // val sc(): org.apache.spark.SparkContext, tree.tpe=()org.apache.spark.SparkContext Nil ) ) ) DefDef( // val sqlContext(): org.apache.spark.sql.hive.HiveContext method stable accessor sqlContext [] List(Nil) tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext $iwC.this.sqlContext // private[this] val sqlContext: org.apache.spark.sql.hive.HiveContext, tree.tpe=org.apache.spark.sql.hive.HiveContext ) ValDef( // protected val $outer: $iwC.$iwC.$iwC.type protected synthetic paramaccessor triedcooking $outer tpt // tree.tpe=$iwC.$iwC.$iwC.type empty ) DefDef( // val $outer(): $iwC.$iwC.$iwC.type method synthetic stable expandedname triedcooking $line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer [] List(Nil) tpt // tree.tpe=Any $iwC.this.$outer // protected val $outer: $iwC.$iwC.$iwC.type, tree.tpe=$iwC.$iwC.$iwC.type ) ) ) == Expanded type of tree == ThisType(class $iwC) uncaught exception during compilation: scala.reflect.internal.Types$TypeError scala.reflect.internal.Types$TypeError: bad symbolic reference. A signature in HiveContext.class refers to term conf in value org.apache.hadoop.hive which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling HiveContext.class. That entry seems to have slain the compiler. Shall I replay your session? I can re-run each line except the last one. [y/n] Thanks Tridib From: terry@smartfocus.com To: tridib.sama...@live.com; u...@spark.incubator.apache.org Subject: Re: Unable to use
Re: Spark and Kafka
This is my window: reduceByKeyAndWindow( new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }, new Function2Integer, Integer, Integer() { public Integer call(Integer i1, Integer i2) { return i1 - i2; } }, new Duration(60 * 5 * 1000), new Duration(1 * 1000) ); On Nov 6, 2014, at 18:37, Gwen Shapira gshap...@cloudera.com wrote: What's the window size? If the window is around 10 seconds and you are sending data at very stable rate, this is expected. On Thu, Nov 6, 2014 at 9:32 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I am doing some tests with Spark Streaming and Kafka, but I have seen something strange, I have modified the JavaKafkaWordCount to use ReducebyKeyandWindow and to print in the screen the accumulated numbers of the words, in the beginning spark works very well in each interaction the numbers of the words increase but after 12 a 13 sec the results repeats continually. My program producer remain sending the words toward the kafka. Does anyone have any idea about this? --- Time: 1415272266000 ms --- (accompanied them,6) (merrier,5) (it possessed,5) (the treacherous,5) (Quite,12) (offer,273) (rabble,58) (exchanging,16) (Genoa,18) (merchant,41) ... --- Time: 1415272267000 ms --- (accompanied them,12) (merrier,12) (it possessed,12) (the treacherous,11) (Quite,24) (offer,602) (rabble,132) (exchanging,35) (Genoa,36) (merchant,84) ... --- Time: 1415272268000 ms --- (accompanied them,17) (merrier,18) (it possessed,17) (the treacherous,17) (Quite,35) (offer,889) (rabble,192) (the bed,1) (exchanging,51) (Genoa,54) ... --- Time: 1415272269000 ms --- (accompanied them,17) (merrier,18) (it possessed,17) (the treacherous,17) (Quite,35) (offer,889) (rabble,192) (the bed,1) (exchanging,51) (Genoa,54) ... --- Time: 141527227 ms --- (accompanied them,17) (merrier,18) (it possessed,17) (the treacherous,17) (Quite,35) (offer,889) (rabble,192) (the bed,1) (exchanging,51) (Genoa,54) ... -- Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155
most efficient way to send data from Scala to python
Hi all, Is there a way in spark to send data (RDD[Array] from the scala component to the python component? I saw a method that serialises double arrays (serializeDoubleMatrix), but it requires the data to be collected before. I assume that step would pull all the data to the driver. Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/most-efficient-way-to-send-data-from-Scala-to-python-tp18287.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark issue with sortByKey: IndexError: list index out of range
I don't have any insight into this bug, but on Spark version 1.0.0 I ran into the same bug running the 'sort.py' example. On a smaller data set, it worked fine. On a larger data set I got this error: Traceback (most recent call last): File /home/skane/spark/examples/src/main/python/sort.py, line 30, in module .sortByKey(lambda x: x) File /usr/lib/spark/python/pyspark/rdd.py, line 480, in sortByKey bounds.append(samples[index]) IndexError: list index out of range -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-issue-with-sortByKey-IndexError-list-index-out-of-range-tp16445p18288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
specifying sort order for sort by value
Hi, I am using rdd.sortBy(_._2) to get an RDD sorted by value. The default order is ascending order. How can I get it sorted in descending order? I could not find an option to specify the order. I need to get the top K elements of the list sorted in descending order. If there is no option to get the descending order, I would like to know if there is a way to get the last K elements of the list sorted in ascending order. take(k) gets the first k elements, is there an option to get the last K elements of an RDD ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-sort-order-for-sort-by-value-tp18289.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSubmitDriverBootstrapper and JVM parameters
/usr/lib/jvm/java-1.7.0-openjdk-amd64/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper When I execute /usr/local/spark-1.1.0/bin/spark-submit local[32] for my app, I see two processes get spun off. One is the org.apache.spark.deploy.SparkSubmitDriverBootstrapper and org.apache.spark.deploy.SparkSubmit. My understanding is first one is the driver and the latter is the executor, can you confirm? If that is true, my spark my application defaults don't seem to be picked-up from the following parmeters. My SparkSubmit picks up JVM parameters from here. spark-defaults.conf spark.daemon.memory=45g spark.driver.memory=45g spark.executor.memory=45g It is not clear to me, when spark uses spark-defaults? and when spark-env? Can some one help me understand. spark-env.sh SPARK_DAEMON_MEMORY=30g SPARK_EXECUTOR_MEMORY=30g SPARK_DRIVER_MEMORY=30g I am running into GC/OOM issues, and I am wondering whether tweaking SparkSubmitDriverBootstrapper or SparkSubmit JVM parameter will help. I did look at the configuration on Spark's site, and tried many different approaches as suggested there. Thanks, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSubmitDriverBootstrapper-and-JVM-parameters-tp18290.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: specifying sort order for sort by value
Yes you can sort it in desc, you simply specify a boolean value in the second argument to the sortBy function. Default is ascending. So it will look like: rdd.sortBy(_._2, false) Read more over here http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD Thanks Best Regards On Fri, Nov 7, 2014 at 12:20 AM, SK skrishna...@gmail.com wrote: Hi, I am using rdd.sortBy(_._2) to get an RDD sorted by value. The default order is ascending order. How can I get it sorted in descending order? I could not find an option to specify the order. I need to get the top K elements of the list sorted in descending order. If there is no option to get the descending order, I would like to know if there is a way to get the last K elements of the list sorted in ascending order. take(k) gets the first k elements, is there an option to get the last K elements of an RDD ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-sort-order-for-sort-by-value-tp18289.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: specifying sort order for sort by value
Thanks. I was looking at an older RDD documentation that did not specify the ordering option. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-sort-order-for-sort-by-value-tp18289p18292.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Kinesis integration with Spark Streaming in EMR cluster - Output is not showing up
Hello, I am new to spark and trying to run the spark program (bundled as jar) in a EMR cluster. In one terminal session, i am loading data into kinesis stream. In another window, i am trying to run the spark streaming program, and trying to print out the output. Whenever i run the spark streaming program, i am continuously seeing the below INFO messages, but not seeing any output (counters). When I run the same program with normal Spark RDDs instead of stream RDDs, i am seeing the output in hdfs files (taking input by reading from a file instead of reading it from kinesis stream). Pls also note that i ran with JavaStreamingContext.awaitTermination and when i use this, it is continuously outputting the below messages, and later i tried JavaStreamingContext.awaitTermination.stop to see if i can see the output, but it is not working. Any help is really appreciated. Thank you. Here is the main program: = public JavaDStreambyte[] getDStream() { int numShards = kin_client.describeStream(ken_stream_name).getStreamDescription().getShards().size(); System.out.println(Number of shards are : + numShards); Duration batchInterval = new Duration(2000); /* Setup the Spark config. */ SparkConf sparkConfig = new SparkConf().setAppName(TestJSON); /* Kinesis checkpoint interval. Same as batchInterval for this example. */ Duration checkpointInterval = batchInterval; /* Setup the StreamingContext */ jssc = new JavaStreamingContext(sparkConfig, batchInterval); dstream = KinesisUtils.createStream(jssc, ken_stream_name, endPoint, checkpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); System.out.println(DStream count is : + dstream.count()); return dstream; } public void startContext() { jssc.start(); /* * jssc.stop(); jssc.awaitTermination(); try { * java.lang.Thread.sleep(2); } catch (InterruptedException e) { * e.printStackTrace(); } * * jssc.stop(); */ jssc.stop(); } public static void main(String[] args) { SentinelQueryNRecoCount qnr = new SentinelQueryNRecoCount(); JavaDStreambyte[] data = qnr.getDStream(); JavaDStreamString lines = data.map(new GetLines()); System.out.println(Lines D Stream first row is : + lines.count()); JavaDStreamString filterdata = lines.filter(new GetFilterData()); System.out.println(Filtered records are: + filterdata.count()); JavaDStreamDuoKey rdd_records = filterdata.map(new GetRecords()); System.out.println(Filtered records are: + filterdata.count()); System.out.println(RDD records are: + rdd_records.count()); JavaPairDStreamString, Tuple2lt;Integer, Integer pair_map_records = rdd_records.mapToPair(new ProcessMapper()); System.out.println(Pair Mapper Records Count is : + pair_map_records.count()); JavaPairDStreamString, Tuple2lt;Integer, Integer result_reduce_records = pair_map_records .reduceByKey(new ProcessReducer()); System.out.println(Result Reduce record count is : + result_reduce_records.count()); result_reduce_records.print(); // result_reduce_records.saveAsHadoopFiles(prefix, suffix); qnr.startContext(); } output below. = Spark assembly has been built with Hive, including Datanucleus jars on classpath SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/.versions/spark-1.1.0/lib/spark-assembly-1.1.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Number of shards are : 2 14/11/06 19:34:26 WARN spark.SparkConf: SPARK_CLASSPATH was detected (set to '/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar:/home/hadoop/hive/conf/*'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath 14/11/06 19:34:26 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar:/home/hadoop/hive/conf/*' as a work-around. 14/11/06 19:34:26 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to
Re: loading, querying schemaRDD using SparkSQL
It can, but currently that method uses the default hive serde which is not very robust (does not deal well with \n in strings) and probably is not super fast. You'll also need to be using a HiveContext for it to work. On Tue, Nov 4, 2014 at 8:20 PM, vdiwakar.malladi vdiwakar.mall...@gmail.com wrote: Thanks Michael for your response. Just now, i saw saveAsTable method on JavaSchemaRDD object (in Spark 1.1.0 API). But I couldn't find the corresponding documentation. Will that help? Please let me know. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/loading-querying-schemaRDD-using-SparkSQL-tp18052p18137.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to use HiveContext in spark-shell
Those are the same options I used, except I had —tgz to package it and I built off of the master branch. Unfortunately, my only guess is that these errors stem from your build environment. In your spark assembly, do you have any classes which belong to the org.apache.hadoop.hive package? From: Tridib Samanta tridib.sama...@live.commailto:tridib.sama...@live.com Date: Thursday, November 6, 2014 at 9:49 AM To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com, u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: RE: Unable to use HiveContext in spark-shell I am using spark 1.1.0. I built it using: ./make-distribution.sh -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests My ultimate goal is to execute a query on parquet file with nested structure and cast a date string to Date. This is required to calculate the age of Person entity. but I am even unable to pass this line: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) I made sure that org.apache.hadoop package is in the spark assembly jar. Re-attaching the stack trace for quick reference. scala val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) error: bad symbolic reference. A signature in HiveContext.class refers to term hive in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling HiveContext.class. error: while compiling: console during phase: erasure library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: last tree to typer: Apply(value $outer) symbol: value $outer (flags: method synthetic stable expandedname triedcooking) symbol definition: val $outer(): $iwC.$iwC.type tpe: $iwC.$iwC.type symbol owners: value $outer - class $iwC - class $iwC - class $iwC - class $read - package $line5 context owners: class $iwC - class $iwC - class $iwC - class $iwC - class $read - package $line5 == Enclosing template or block == ClassDef( // class $iwC extends Serializable 0 $iwC [] Template( // val local $iwC: notype, tree.tpe=$iwC java.lang.Object, scala.Serializable // parents ValDef( private _ tpt empty ) // 5 statements DefDef( // def init(arg$outer: $iwC.$iwC.$iwC.type): $iwC method triedcooking init [] // 1 parameter list ValDef( // $outer: $iwC.$iwC.$iwC.type $outer tpt // tree.tpe=$iwC.$iwC.$iwC.type empty ) tpt // tree.tpe=$iwC Block( // tree.tpe=Unit Apply( // def init(): Object in class Object, tree.tpe=Object $iwC.super.init // def init(): Object in class Object, tree.tpe=()Object Nil ) () ) ) ValDef( // private[this] val sqlContext: org.apache.spark.sql.hive.HiveContext private local triedcooking sqlContext tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext Apply( // def init(sc: org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class HiveContext, tree.tpe=org.apache.spark.sql.hive.HiveContext new org.apache.spark.sql.hive.HiveContext.init // def init(sc: org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class HiveContext, tree.tpe=(sc: org.apache.spark.SparkContext)org.apache.spark.sql.hive.HiveContext Apply( // val sc(): org.apache.spark.SparkContext, tree.tpe=org.apache.spark.SparkContext $iwC.this.$line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$$outer().$VAL1().$iw().$iw().sc // val sc(): org.apache.spark.SparkContext, tree.tpe=()org.apache.spark.SparkContext Nil ) ) ) DefDef( // val sqlContext(): org.apache.spark.sql.hive.HiveContext method stable accessor sqlContext [] List(Nil) tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext $iwC.this.sqlContext // private[this] val sqlContext: org.apache.spark.sql.hive.HiveContext, tree.tpe=org.apache.spark.sql.hive.HiveContext ) ValDef( // protected val $outer: $iwC.$iwC.$iwC.type protected synthetic paramaccessor triedcooking $outer tpt // tree.tpe=$iwC.$iwC.$iwC.type empty ) DefDef( // val $outer(): $iwC.$iwC.$iwC.type method synthetic stable expandedname triedcooking $line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer [] List(Nil) tpt // tree.tpe=Any $iwC.this.$outer // protected val $outer: $iwC.$iwC.$iwC.type, tree.tpe=$iwC.$iwC.$iwC.type ) ) ) == Expanded type of tree == ThisType(class $iwC) uncaught exception during compilation: scala.reflect.internal.Types$TypeError
Re: PySpark issue with sortByKey: IndexError: list index out of range
It should be fixed in 1.1+. Could you have a script to reproduce it? On Thu, Nov 6, 2014 at 10:39 AM, skane sk...@websense.com wrote: I don't have any insight into this bug, but on Spark version 1.0.0 I ran into the same bug running the 'sort.py' example. On a smaller data set, it worked fine. On a larger data set I got this error: Traceback (most recent call last): File /home/skane/spark/examples/src/main/python/sort.py, line 30, in module .sortByKey(lambda x: x) File /usr/lib/spark/python/pyspark/rdd.py, line 480, in sortByKey bounds.append(samples[index]) IndexError: list index out of range -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-issue-with-sortByKey-IndexError-list-index-out-of-range-tp16445p18288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: AVRO specific records
Benjamin, Thanks for the snippet. I have tried using it, but unfortunately I get the following exception. I am clueless at what might be wrong. Any ideas? java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:115) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini On Wed, Nov 5, 2014 at 4:24 PM, Laird, Benjamin benjamin.la...@capitalone.com wrote: Something like this works and is how I create an RDD of specific records. val avroRdd = sc.newAPIHadoopFile(twitter.avro, classOf[AvroKeyInputFormat[twitter_schema]], classOf[AvroKey[twitter_schema]], classOf[NullWritable], conf) (From https://github.com/julianpeeters/avro-scala-macro-annotation-examples/blob/master/spark/src/main/scala/AvroSparkScala.scala) Keep in mind you'll need to use the kryo serializer as well. From: Frank Austin Nothaft fnoth...@berkeley.edu Date: Wednesday, November 5, 2014 at 5:06 PM To: Simone Franzini captainfr...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: AVRO specific records Hi Simone, Matt Massie put together a good tutorial on his blog http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. If you’re looking for more code using Avro, we use it pretty extensively in our genomics project. Our Avro schemas are here https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl, and we have serialization code here https://github.com/bigdatagenomics/adam/tree/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization. We use Parquet for storing the Avro records, but there is also an Avro HadoopInputFormat. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Nov 5, 2014, at 1:25 PM, Simone Franzini captainfr...@gmail.com wrote: How can I read/write AVRO specific records? I found several snippets using generic records, but nothing with specific records so far. Thanks, Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Redploying a spark streaming application
Hello,I'm trying to find the best way of redeploying a spark streaming application. Ideally, I was thinking of a scenario where a build server packages up a jar and a deployment step submits it to a Spark Master. On the next successful build, the next version would get deployed taking down the previous version. What would be the best way of achieving this? Thanks,Ashic.
Re: Redploying a spark streaming application
You’ve basically got it. Deployment step can be simply scp-ing the file to a known location on the server and then executing a run script on the server that actually runs the spark-submit. From: Ashic Mahtab as...@live.commailto:as...@live.com Date: Thursday, November 6, 2014 at 5:01 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Redploying a spark streaming application Hello, I'm trying to find the best way of redeploying a spark streaming application. Ideally, I was thinking of a scenario where a build server packages up a jar and a deployment step submits it to a Spark Master. On the next successful build, the next version would get deployed taking down the previous version. What would be the best way of achieving this? Thanks, Ashic. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: sparse x sparse matrix multiplication
See this thread for examples of sparse matrix x sparse matrix: https://groups.google.com/forum/#!topic/spark-users/CGfEafqiTsA We thought about providing matrix multiplies on CoordinateMatrix, however, the matrices have to be very dense for the overhead of having many little (i, j, value) objects to be worth it. For this reason, we are focused on doing block matrix multiplication first. The goal is version 1.3. Best, Reza On Wed, Nov 5, 2014 at 11:48 PM, Wei Tan w...@us.ibm.com wrote: I think Xiangrui's ALS code implement certain aspect of it. You may want to check it out. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center [image: Inactive hide details for Xiangrui Meng ---11/05/2014 01:13:40 PM---You can use breeze for local sparse-sparse matrix multiplic]Xiangrui Meng ---11/05/2014 01:13:40 PM---You can use breeze for local sparse-sparse matrix multiplication and then define an RDD of sub-matri From: Xiangrui Meng men...@gmail.com To: Duy Huynh duy.huynh@gmail.com Cc: user u...@spark.incubator.apache.org Date: 11/05/2014 01:13 PM Subject: Re: sparse x sparse matrix multiplication -- You can use breeze for local sparse-sparse matrix multiplication and then define an RDD of sub-matrices RDD[(Int, Int, CSCMatrix[Double])] (blockRowId, blockColId, sub-matrix) and then use join and aggregateByKey to implement this feature, which is the same as in MapReduce. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
job works well on small data set but fails on large data set
Hello all, I am running the following operations: val part1= maOutput.toArray.flatten val part2 = sc.parallelize(part1) val reduceOutput = part2.combineByKey( (v) = (v, 1), (acc: (Double, Int), v) = ( acc._1 + v, acc._2 + 1), (acc1: (Double, Int), acc2: (Double, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2) ) while mapOutput is an output of map function which is a tuple of (x,y) where y is a Double value and x is a tuple of 4 strings. When I used float instead of Double, it worked with small data set but failed on the large file. I changed it to Double and on the large file it works till I get the mapOutput. But when I include the remaining part , it fails. Can someone please help me understand where I am going wrong? Thank you for your time. -- Regards, Haripriya Ayyalasomayajula Graduate Student Department of Computer Science University of Houston Contact : 650-796-7112
Any patterns for multiplexing the streaming data
We are looking at consuming the kafka stream using Spark Streaming and transform into various subsets like applying some transformation or de-normalizing some fields, etc. and feed it back into Kafka as a different topic for downstream consumers. Wanted to know if there are any existing patterns for achieving this. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-patterns-for-multiplexing-the-streaming-data-tp18303.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MatrixFactorizationModel predict(Int, Int) API
I reproduced the problem in mllib tests ALSSuite.scala using the following functions: val arrayPredict = userProductsRDD.map{case(user,product) = val recommendedProducts = model.recommendProducts(user, products) val productScore = recommendedProducts.find{x=x.product == product } require(productScore != None) productScore.get }.collect arrayPredict.foreach { elem = if (allRatings.get(elem.user, elem.product) != elem.rating) fail(Prediction APIs don't match) } If the usage of model.recommendProducts is correct, the test fails with the same error I sent before... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 316.0 failed 1 times, most recent failure: Lost task 0.0 in stage 316.0 (TID 79, localhost): scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:825) org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:81) It is a blocker for me and I am debugging it. I will open up a JIRA if this is indeed a bug... Do I have to cache the models to make userFeatures.lookup(user).head to work ? On Mon, Nov 3, 2014 at 9:24 PM, Xiangrui Meng men...@gmail.com wrote: Was user presented in training? We can put a check there and return NaN if the user is not included in the model. -Xiangrui On Mon, Nov 3, 2014 at 5:25 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am testing MatrixFactorizationModel.predict(user: Int, product: Int) but the code fails on userFeatures.lookup(user).head In computeRmse MatrixFactorizationModel.predict(RDD[(Int, Int)]) has been called and in all the test-cases that API has been used... I can perhaps refactor my code to do the same but I was wondering whether people test the lookup(user) version of the code.. Do I need to cache the model to make it work ? I think right now default is STORAGE_AND_DISK... Thanks. Deb
Store DStreams into Hive using Hive Streaming
Hello, Is there a built-in way or connector to store DStream results into an existing Hive ORC table using the Hive/HCatalog Streaming API? Otherwise, do you have any suggestions regarding the implementation of such component? Thank you, -Geovani
Re: MatrixFactorizationModel predict(Int, Int) API
model.recommendProducts can only be called from the master then ? I have a set of 20% users on whom I am performing the test...the 20% users are in a RDD...if I have to collect them all to master node and then call model.recommendProducts, that's a issue... Any idea how to optimize this so that we can calculate MAP statistics on large samples of data ? On Thu, Nov 6, 2014 at 4:41 PM, Xiangrui Meng men...@gmail.com wrote: ALS model contains RDDs. So you cannot put `model.recommendProducts` inside a RDD closure `userProductsRDD.map`. -Xiangrui On Thu, Nov 6, 2014 at 4:39 PM, Debasish Das debasish.da...@gmail.com wrote: I reproduced the problem in mllib tests ALSSuite.scala using the following functions: val arrayPredict = userProductsRDD.map{case(user,product) = val recommendedProducts = model.recommendProducts(user, products) val productScore = recommendedProducts.find{x=x.product == product} require(productScore != None) productScore.get }.collect arrayPredict.foreach { elem = if (allRatings.get(elem.user, elem.product) != elem.rating) fail(Prediction APIs don't match) } If the usage of model.recommendProducts is correct, the test fails with the same error I sent before... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 316.0 failed 1 times, most recent failure: Lost task 0.0 in stage 316.0 (TID 79, localhost): scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:825) org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:81) It is a blocker for me and I am debugging it. I will open up a JIRA if this is indeed a bug... Do I have to cache the models to make userFeatures.lookup(user).head to work ? On Mon, Nov 3, 2014 at 9:24 PM, Xiangrui Meng men...@gmail.com wrote: Was user presented in training? We can put a check there and return NaN if the user is not included in the model. -Xiangrui On Mon, Nov 3, 2014 at 5:25 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am testing MatrixFactorizationModel.predict(user: Int, product: Int) but the code fails on userFeatures.lookup(user).head In computeRmse MatrixFactorizationModel.predict(RDD[(Int, Int)]) has been called and in all the test-cases that API has been used... I can perhaps refactor my code to do the same but I was wondering whether people test the lookup(user) version of the code.. Do I need to cache the model to make it work ? I think right now default is STORAGE_AND_DISK... Thanks. Deb
Re: avro + parquet + vectorstring + NullPointerException while reading
Thanks for the advice! What seems to work for is is that I define the array type as: type: { type: array, items: string, java-class: java.util.ArrayList }It seems to be creating an avro.Generic.List, which spark doesn't know how to serialize, instead of a guava.util.List, which spark likes. Hive at 0.13.1 still can't read it though...Thanks!-Mike From: Michael Armbrust mich...@databricks.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Tuesday, November 4, 2014 2:37 PM Subject: Re: avro + parquet + vectorstring + NullPointerException while reading You might consider using the native parquet support built into Spark SQL instead of using the raw library: http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! I'm trying to use avro and parquet with the following schema:{ name: TestStruct, namespace: bughunt, type: record, fields: [ { name: string_array, type: { type: array, items: string } } ]}The writing process seems to be OK, but when I try to read it with Spark, I get:com.esotericsoftware.kryo.KryoException: java.lang.NullPointerExceptionSerialization trace:string_array (bughunt.TestStruct) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)When I try to read it with Hive, I get this:Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.ArrayWritableWhich would lead me to suspect that this might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to be Hive specific, and I am not seeing Spark read the data it claims to have written itself. I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and spark 1.1.0.Has anyone else observed this sort of behavior? For completeness, here is the code that writes the data:package bughunt import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._ import parquet.avro.AvroWriteSupportimport parquet.avro.AvroParquetOutputFormatimport parquet.hadoop.ParquetOutputFormat import java.util.ArrayList object GenData { val outputPath = /user/x/testdata val words = List( List(apple, banana, cherry), List(car, boat, plane), List(lion, tiger, bear), List(north, south, east, west), List(up, down, left, right), List(red, green, blue)) def main(args: Array[String]) { val conf = new SparkConf(true) .setAppName(IngestLoanApplicattion) //.set(spark.kryo.registrator, // classOf[CommonRegistrator].getName) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, 4.toString) .set(spark.kryo.referenceTracking, false) val sc = new SparkContext(conf) val rdd = sc.parallelize(words) val job = new Job(sc.hadoopConfiguration) ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) AvroParquetOutputFormat.setSchema(job, TestStruct.SCHEMA$) rdd.map(p = { val xs = new java.util.ArrayList[String] for (z-p) { xs.add(z) } val bldr = TestStruct.newBuilder() bldr.setStringArray(xs) (null, bldr.build()) }) .saveAsNewAPIHadoopFile(outputPath, classOf[Void], classOf[TestStruct], classOf[ParquetOutputFormat[TestStruct]], job.getConfiguration) }} To read the data, I use this sort of code from the spark-shell::paste import bughunt.TestStruct import org.apache.hadoop.mapreduce.Jobimport org.apache.spark.SparkContext import parquet.hadoop.ParquetInputFormatimport parquet.avro.AvroReadSupport def openRddSpecific(sc: SparkContext) = { val job = new Job(sc.hadoopConfiguration) ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[TestStruct]]) sc.newAPIHadoopFile(/user/malbert/testdata, classOf[ParquetInputFormat[TestStruct]], classOf[Void], classOf[TestStruct], job.getConfiguration)}I start the Spark shell as follows:spark-shell \ --jars
Re: MatrixFactorizationModel predict(Int, Int) API
There is a JIRA for it: https://issues.apache.org/jira/browse/SPARK-3066 The easiest case is when one side is small. If both sides are large, this is a super-expensive operation. We can do block-wise cross product and then find top-k for each user. Best, Xiangrui On Thu, Nov 6, 2014 at 4:51 PM, Debasish Das debasish.da...@gmail.com wrote: model.recommendProducts can only be called from the master then ? I have a set of 20% users on whom I am performing the test...the 20% users are in a RDD...if I have to collect them all to master node and then call model.recommendProducts, that's a issue... Any idea how to optimize this so that we can calculate MAP statistics on large samples of data ? On Thu, Nov 6, 2014 at 4:41 PM, Xiangrui Meng men...@gmail.com wrote: ALS model contains RDDs. So you cannot put `model.recommendProducts` inside a RDD closure `userProductsRDD.map`. -Xiangrui On Thu, Nov 6, 2014 at 4:39 PM, Debasish Das debasish.da...@gmail.com wrote: I reproduced the problem in mllib tests ALSSuite.scala using the following functions: val arrayPredict = userProductsRDD.map{case(user,product) = val recommendedProducts = model.recommendProducts(user, products) val productScore = recommendedProducts.find{x=x.product == product} require(productScore != None) productScore.get }.collect arrayPredict.foreach { elem = if (allRatings.get(elem.user, elem.product) != elem.rating) fail(Prediction APIs don't match) } If the usage of model.recommendProducts is correct, the test fails with the same error I sent before... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 316.0 failed 1 times, most recent failure: Lost task 0.0 in stage 316.0 (TID 79, localhost): scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:825) org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:81) It is a blocker for me and I am debugging it. I will open up a JIRA if this is indeed a bug... Do I have to cache the models to make userFeatures.lookup(user).head to work ? On Mon, Nov 3, 2014 at 9:24 PM, Xiangrui Meng men...@gmail.com wrote: Was user presented in training? We can put a check there and return NaN if the user is not included in the model. -Xiangrui On Mon, Nov 3, 2014 at 5:25 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am testing MatrixFactorizationModel.predict(user: Int, product: Int) but the code fails on userFeatures.lookup(user).head In computeRmse MatrixFactorizationModel.predict(RDD[(Int, Int)]) has been called and in all the test-cases that API has been used... I can perhaps refactor my code to do the same but I was wondering whether people test the lookup(user) version of the code.. Do I need to cache the model to make it work ? I think right now default is STORAGE_AND_DISK... Thanks. Deb - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Selecting Based on Nested Values using Language Integrated Query Syntax
Michael, Thanks for the explanation. I was able to get this running. On Wed, Oct 29, 2014 at 3:07 PM, Michael Armbrust mich...@databricks.com wrote: We are working on more helpful error messages, but in the meantime let me explain how to read this output. org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'p.name,'p.age, tree: Project ['p.name,'p.age] Filter ('location.number = 2300) Join Inner, Some((location#110.number AS number#111 = 'ln.streetnumber)) Generate explode(locations#10), true, false, Some(l) LowerCaseSchema Subquery p Subquery people SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) LowerCaseSchema Subquery ln Subquery locationNames SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) 'tickedFields indicate a failure to resolve, where as numbered#10 attributes have been resolved. (The numbers are globally unique and can be used to disambiguate where a column is coming from when the names are the same) Resolution happens bottom up. So the first place that there is a problem is 'ln.streetnumber, which prevents the rest of the query from resolving. If you look at the subquery ln, it is only producing two columns: locationName and locationNumber. So streetnumber is not valid. On Tue, Oct 28, 2014 at 8:02 PM, Corey Nolet cjno...@gmail.com wrote: scala locations.queryExecution warning: there were 1 feature warning(s); re-run with -feature for details res28: _4.sqlContext.QueryExecution forSome { val _4: org.apache.spark.sql.SchemaRDD } = == Parsed Logical Plan == SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) == Analyzed Logical Plan == SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) == Optimized Logical Plan == SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) == Physical Plan == ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38 Code Generation: false == RDD == scala people.queryExecution warning: there were 1 feature warning(s); re-run with -feature for details res29: _5.sqlContext.QueryExecution forSome { val _5: org.apache.spark.sql.SchemaRDD } = == Parsed Logical Plan == SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) == Analyzed Logical Plan == SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) == Optimized Logical Plan == SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) == Physical Plan == ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38 Code Generation: false == RDD == Here's when I try executing the join and the lateral view explode() : 14/10/28 23:05:35 INFO ParseDriver: Parse Completed org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'p.name,'p.age, tree: Project ['p.name,'p.age] Filter ('location.number = 2300) Join Inner, Some((location#110.number AS number#111 = 'ln.streetnumber)) Generate explode(locations#10), true, false, Some(l) LowerCaseSchema Subquery p Subquery people SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) LowerCaseSchema Subquery ln Subquery locationNames SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at
Re: Store DStreams into Hive using Hive Streaming
Ted, any pointers? On Thu, Nov 6, 2014 at 4:46 PM, Luiz Geovani Vier lgv...@gmail.com wrote: Hello, Is there a built-in way or connector to store DStream results into an existing Hive ORC table using the Hive/HCatalog Streaming API? Otherwise, do you have any suggestions regarding the implementation of such component? Thank you, -Geovani
Re: Store DStreams into Hive using Hive Streaming
Geovani, You can use HiveContext to do inserts into a Hive table in a Streaming app just as you would a batch app. A DStream is really a collection of RDDs so you can run the insert from within the foreachRDD. You just have to be careful that you’re not creating large amounts of small files. So you may want to either increase the duration of your Streaming batches or repartition right before you insert. You’ll just need to do some testing based on your ingest volume. You may also want to consider streaming into another data store though. Thanks, Silvio From: Luiz Geovani Vier lgv...@gmail.commailto:lgv...@gmail.com Date: Thursday, November 6, 2014 at 7:46 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Store DStreams into Hive using Hive Streaming Hello, Is there a built-in way or connector to store DStream results into an existing Hive ORC table using the Hive/HCatalog Streaming API? Otherwise, do you have any suggestions regarding the implementation of such component? Thank you, -Geovani
Is there a way to limit the sql query result size?
Hi spark-users, When I use spark-sql or beeline to query a large dataset, sometimes the query result may cause driver OOM. So I wonder is there a config property in spark sql to limit the max return result size (without LIMIT clause in sql query)? For example, before the select query, I run these in sql client: set maxRows = 100; And then, all queries have a default return limit of 100. - Best Regards
Re: Executor Log Rotation Is Not Working?
Hi, I figure out that in standalone mode these configuration should add to worker process's configs, like adding the following line in spark-env.sh: SPARK_WORKER_OPTS=-Dspark.executor.logs.rolling.strategy=time -Dspark.executor.logs.rolling.time.interval=daily -Dspark.executor.logs.rolling.maxRetainedFiles=3 Maybe in yarn mode the spark-defaults.conf would be sufficient, but I've not tested. On Tue, Nov 4, 2014 at 12:24 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.1, and I have the following logs keep growing: /opt/spark-1.1.0-bin-cdh4/work/app-20141029175309-0005/2/stderr I think it is executor log, so I setup the following options in spark-defaults.conf: spark.executor.logs.rolling.strategy time spark.executor.logs.rolling.time.interval daily spark.executor.logs.rolling.maxRetainedFiles 10 I can see these options on Web UI, so I suppose they are effective. However, the stderr is still not rotated. Am I doing wrong? Thanks. -- Jerry -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Task size variation while using Range Vs List
Thanks for the response!! Will try to see the behaviour with Cache() -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-size-variation-while-using-Range-Vs-List-tp18243p18318.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to trace/debug serialization?
Will this work even with Kryo Serialization ? Now spark.closure.serializer must be org.apache.spark.serializer.JavaSerializer. Therefore the serialization closure functions won’t be involved with Kryo. Kryo is only used to serialize the data. Best Regards, Shixiong Zhu 2014-11-07 12:27 GMT+08:00 nsareen nsar...@gmail.com: Will this work even with Kryo Serialization ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-trace-debug-serialization-tp18230p18319.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to use HiveContext in spark-shell
Yes. I have org.apache.hadoop.hive package in spark assembly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-use-HiveContext-in-spark-shell-tp18261p18322.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to use HiveContext in spark-shell
I built spark-1.1.0 in a new fresh machine. This issue is gone! Thank you all for your help. Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-use-HiveContext-in-spark-shell-tp18261p18324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Nesting RDD
Hi Naveen, Nesting RDDs inside of transformations or actions is not supported. Instead if you need access to the other RDDs contents you can try doing a join or (if the data is small enough) collecting and broadcasting the second RDD. Cheers, Holden :) On Thu, Nov 6, 2014 at 10:28 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Hi, I am trying to execute a sample program by nesting the RDD inside the transformations. It is throwing null pointer exception. Any solution or alternative would be helpful. Thanks regards, Naveen. -- Cell : 425-233-8271
Parallelize on spark context
Hi, JavaRDDInteger distData = sc.parallelize(data); On what basis parallelize splits the data into multiple datasets. How to handle if we want these many datasets to be executed per executor? For example, my data is of 1000 integers list and I am having 2 node yarn cluster. It is diving into 2 batches of 500 size. Regards, Naveen.
word2vec: how to save an mllib model and reload it?
what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: word2vec: how to save an mllib model and reload it?
Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: word2vec: how to save an mllib model and reload it?
that works. is there a better way in spark? this seems like the most common feature for any machine learning work - to be able to save your model after training it and load it later. On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks evan.spa...@gmail.com wrote: Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Parallelize on spark context
Hi, In the documentation is I found something like this. spark.default.parallelism · Local mode: number of cores on the local machine · Mesos fine grained mode: 8 · Others: total number of cores on all executor nodes or 2, whichever is larger I am using 2 node cluster with 48 cores(24+24). As per above no of data sets should be 1000/48=20.83, can be around 20 or 21. But it is dividing into 2 sets of each 500 size. I have used the function sc.parallelize(data, 10). But 10 datasets of size 100. 8 datasets executing on one node and 2 datasets on another node. How to check how many cores are running to complete task of 8 datasets?(Is there any commands or UI to check that) Regards, Naveen. From: holden.ka...@gmail.com [mailto:holden.ka...@gmail.com] On Behalf Of Holden Karau Sent: Friday, November 07, 2014 12:46 PM To: Naveen Kumar Pokala Cc: user@spark.apache.org Subject: Re: Parallelize on spark context Hi Naveen, So by default when we call parallelize it will be parallelized by the default number (which we can control with the property spark.default.parallelism) or if we just want a specific instance of parallelize to have a different number of partitions, we can instead call sc.parallelize(data, numpartitions). The default value of this is documented in http://spark.apache.org/docs/latest/configuration.html#spark-properties Cheers, Holden :) On Thu, Nov 6, 2014 at 10:43 PM, Naveen Kumar Pokala npok...@spcapitaliq.commailto:npok...@spcapitaliq.com wrote: Hi, JavaRDDInteger distData = sc.parallelize(data); On what basis parallelize splits the data into multiple datasets. How to handle if we want these many datasets to be executed per executor? For example, my data is of 1000 integers list and I am having 2 node yarn cluster. It is diving into 2 batches of 500 size. Regards, Naveen. -- Cell : 425-233-8271
Re: word2vec: how to save an mllib model and reload it?
There's some work going on to support PMML - https://issues.apache.org/jira/browse/SPARK-1406 - but it's not yet been merged into master. What are you used to doing in other environments? In R I'm used to running save(), same with matlab. In python either pickling things or dumping to json seems pretty common. (even the scikit-learn docs recommend pickling - http://scikit-learn.org/stable/modules/model_persistence.html). These all seem basically equivalent java serialization to me.. Would some helper functions (in, say, mllib.util.modelpersistence or something) make sense to add? On Thu, Nov 6, 2014 at 11:36 PM, Duy Huynh duy.huynh@gmail.com wrote: that works. is there a better way in spark? this seems like the most common feature for any machine learning work - to be able to save your model after training it and load it later. On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks evan.spa...@gmail.com wrote: Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: multiple spark context in same driver program
Hi Pawel, That doc was created during the initial days (Spark 0.8.0), you can of course create multiple sparkContexts in the same driver program now. Thanks Best Regards On Thu, Nov 6, 2014 at 9:30 PM, Paweł Szulc paul.sz...@gmail.com wrote: Hi, quick question: I found this: http://docs.sigmoidanalytics.com/index.php/Problems_and_their_Solutions#Multiple_SparkContext:Failed_to_bind_to:.2F127.0.1.1:45916 My main question: is this constrain still valid? AM I not allowed to have two SparkContexts pointing to the same Spark Master in one driver program? Regards, Pawel Szulc