Re: SPARK LIMITATION - more than one case class is not allowed !!
Tobias, Understand and thanks for quick resolution of problem. Thanks ~Rahul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-in-case-of-case-class-is-more-than-1-tp20334p20446.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Profiling GraphX codes.
Is there any tool to profile GraphX codes in a cluster? Is there a way to know the messages exchanged among the nodes in a cluster? WebUI does not give all the information. Thank You
R: Clarifications on Spark
Hi, 1) yes you can. Spark is supporting a lot of file formats on hdfs/s3 then is supporting cassandra and jdbc in General. 2) yes. Spark has a jdbc thrift server where you can attach BI tools. I suggest to you to pay attention to your Query response time requirements. 3) no you can go with Cassandra. If you are looking at mongodb you should give a try to stratio platform 4) yes. Using JdbcRDD you can leverage rdbms too 5) I suggest to use spark as a computation engine, build your pre-aggregated views and persist them on a data store like Cassandra. Then attach the BI tools to aggregated views directly. Paolo Inviata dal mio Windows Phone Da: Ajaymailto:ajay.ga...@gmail.com Inviato: 05/12/2014 07:25 A: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Oggetto: Clarifications on Spark Hello, I work for an eCommerce company. Currently we are looking at building a Data warehouse platform as described below: DW as a Service | REST API | SQL On No SQL (Drill/Pig/Hive/Spark SQL) | No SQL databases (One or more. May be RDBMS directly too) | (Bulk load) My SQL Database I wish to get a few clarifications on Apache Drill as follows: 1) Can we use Spark for SQL on No SQL or do we need to mix them with Pig/Hive or any other for any reason? 2) Can Spark SQL be used a query interface for Business Intelligence, Analytics and Reporting 3) Is Spark supports only Hadoop, HBase?. We may use Cassandra/MongoDb/CouchBase as well. 4) Is Spark supports RDBMS too?. We can have a single interface to pull out data from multiple data sources? 5) Any recommendations(not limited to usage of Spark) for our specific requirement described above. Thanks Ajay Note : I have posted a similar post on the Drill User list as well as I am not sure which one best fits for our usecase. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Clarifications-on-Spark-tp20440.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode
Hi, all: According to https://github.com/apache/spark/pull/2732, When a spark job fails or exits nonzero in yarn-cluster mode, the spark-submit will get the corresponding return code of the spark job. But I tried in spark-1.1.1 yarn cluster, spark-submit return zero anyway. Here is my spark code: try { val dropTable = sdrop table $DB.$tableName hiveContext.hql(dropTable) val createTbl = do some thing... hiveContext.hql(createTbl)} catch { case ex: Exception = { Util.printLog(ERROR, screate db error.)exit(-1) }} Maybe I did something wrong. Is there any hint? Thanks.
RE: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode
I tried in spark client mode, spark-submit can get the correct return code from spark job. But in yarn-cluster mode, It failed. From: lin_q...@outlook.com To: u...@spark.incubator.apache.org Subject: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode Date: Fri, 5 Dec 2014 16:55:37 +0800 Hi, all: According to https://github.com/apache/spark/pull/2732, When a spark job fails or exits nonzero in yarn-cluster mode, the spark-submit will get the corresponding return code of the spark job. But I tried in spark-1.1.1 yarn cluster, spark-submit return zero anyway. Here is my spark code: try { val dropTable = sdrop table $DB.$tableName hiveContext.hql(dropTable) val createTbl = do some thing... hiveContext.hql(createTbl)} catch { case ex: Exception = { Util.printLog(ERROR, screate db error.)exit(-1) }} Maybe I did something wrong. Is there any hint? Thanks.
Re: SQL query in scala API
Oh, sorry. So neither SQL nor Spark SQL is preferred. Then you may write you own aggregation with |aggregateByKey|: |users.aggregateByKey((0,Set.empty[String]))({case ((count, seen), user) = (count +1, seen + user) }, {case ((count0, seen0), (count1, seen1)) = (count0 + count1, seen0 ++ seen1) }).mapValues {case (count, seen) = (count, seen.size) } | On 12/5/14 3:47 AM, Arun Luthra wrote: Is that Spark SQL? I'm wondering if it's possible without spark SQL. On Wed, Dec 3, 2014 at 8:08 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: You may do this: |table(users).groupBy('zip)('zip, count('user), countDistinct('user)) | On 12/4/14 8:47 AM, Arun Luthra wrote: I'm wondering how to do this kind of SQL query with PairRDDFunctions. SELECT zip, COUNT(user), COUNT(DISTINCT user) FROM users GROUP BY zip In the Spark scala API, I can make an RDD (called users) of key-value pairs where the keys are zip (as in ZIP code) and the values are user id's. Then I can compute the count and distinct count like this: val count = users.mapValues(_ = 1).reduceByKey(_ + _) val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _) Then, if I want count and countDistinct in the same table, I have to join them on the key. Is there a way to do this without doing a join (and without using SQL or spark SQL)? Arun
RE: Spark streaming for v1.1.1 - unable to start application
Hi, I don’t think it’s a problem of Spark Streaming, seeing for call stack, it’s the problem when BlockManager starting to initializing itself. Would you mind checking your configuration of Spark, hardware problem, deployment. Mostly I think it’s not the problem of Spark. Thanks Saisai From: Sourav Chandra [mailto:sourav.chan...@livestream.com] Sent: Friday, December 5, 2014 4:36 PM To: user@spark.apache.org Subject: Spark streaming for v1.1.1 - unable to start application Hi, I am getting the below error and due to this there is no completed stages- all the waiting 14/12/05 03:31:59 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:213) at org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:203) at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:47) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:177) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:147) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:168) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230) at org.apache.spark.executor.Executor.init(Executor.scala:78) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:60) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Could you please let me know the reason and fix for this? Spark version is 1.1.1 -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.commailto:sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.comhttp://www.livestream.com/
RE: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode
I tried anather test code: def main(args: Array[String]) {if (args.length != 1) { Util.printLog(ERROR, Args error - arg1: BASE_DIR) exit(101) }val currentFile = args(0).toStringval DB = test_spark val tableName = src val sparkConf = new SparkConf().setAppName(sHiveFromSpark)val sc = new SparkContext(sparkConf)val hiveContext = new HiveContext(sc) // Before exitUtil.printLog(INFO, Exit)exit(100)} There were two `exit` in this code. If the args was wrong, the spark-submit will get the return code 101, but, if the args is correct, spark-submit cannot get the second return code 100. What's the difference between these two `exit`? I was so confused. From: lin_q...@outlook.com To: u...@spark.incubator.apache.org Subject: RE: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode Date: Fri, 5 Dec 2014 17:11:39 +0800 I tried in spark client mode, spark-submit can get the correct return code from spark job. But in yarn-cluster mode, It failed. From: lin_q...@outlook.com To: u...@spark.incubator.apache.org Subject: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode Date: Fri, 5 Dec 2014 16:55:37 +0800 Hi, all: According to https://github.com/apache/spark/pull/2732, When a spark job fails or exits nonzero in yarn-cluster mode, the spark-submit will get the corresponding return code of the spark job. But I tried in spark-1.1.1 yarn cluster, spark-submit return zero anyway. Here is my spark code: try { val dropTable = sdrop table $DB.$tableName hiveContext.hql(dropTable) val createTbl = do some thing... hiveContext.hql(createTbl)} catch { case ex: Exception = { Util.printLog(ERROR, screate db error.)exit(-1) }} Maybe I did something wrong. Is there any hint? Thanks.
Re: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode
What's the status of this application in the yarn web UI? Best Regards, Shixiong Zhu 2014-12-05 17:22 GMT+08:00 LinQili lin_q...@outlook.com: I tried anather test code: def main(args: Array[String]) { if (args.length != 1) { Util.printLog(ERROR, Args error - arg1: BASE_DIR) exit(101) } val currentFile = args(0).toString val DB = test_spark val tableName = src val sparkConf = new SparkConf().setAppName(sHiveFromSpark) val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) // Before exit Util.printLog(INFO, Exit) exit(100) } There were two `exit` in this code. If the args was wrong, the spark-submit will get the return code 101, but, if the args is correct, spark-submit cannot get the second return code 100. What's the difference between these two `exit`? I was so confused. -- From: lin_q...@outlook.com To: u...@spark.incubator.apache.org Subject: RE: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode Date: Fri, 5 Dec 2014 17:11:39 +0800 I tried in spark client mode, spark-submit can get the correct return code from spark job. But in yarn-cluster mode, It failed. -- From: lin_q...@outlook.com To: u...@spark.incubator.apache.org Subject: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode Date: Fri, 5 Dec 2014 16:55:37 +0800 Hi, all: According to https://github.com/apache/spark/pull/2732, When a spark job fails or exits nonzero in yarn-cluster mode, the spark-submit will get the corresponding return code of the spark job. But I tried in spark-1.1.1 yarn cluster, spark-submit return zero anyway. Here is my spark code: try { val dropTable = sdrop table $DB.$tableName hiveContext.hql(dropTable) val createTbl = do some thing... hiveContext.hql(createTbl) } catch { case ex: Exception = { Util.printLog(ERROR, screate db error.) exit(-1) } } Maybe I did something wrong. Is there any hint? Thanks.
Re: NullPointerException When Reading Avro Sequence Files
Hi all, I've tried the above example on Gist, but it doesn't work (at least for me). Did anyone get this: 14/12/05 10:44:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:115) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/12/05 10:44:40 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main] java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:115) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/12/05 10:44:40 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p20456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Increasing the number of retry in case of job failure
Hello, By some (unknown) reasons some of my tasks, that fetch data from Cassandra, are failing so often, and apparently the master removes a tasks which fails more than 4 times (in my case). Is there any way to increase the number of re-tries ? best, /Shahab
[Graphx] which way is better to access faraway neighbors?
Hi, I have a graph in where each vertex keep several messages to some faraway neighbours(I mean, not to only immediate neighbours, at most k-hops far, e.g. k = 5). now, I propose to distribute these messages to their corresponding destinations(say, faraway neighbours”): - by using pregel api, one superset is enough - by using spark basic operations(groupByKey, leftJoin, etc) on vertices RDD and its intermediate results. w.r.t the communication among machines, and the high cost of groupByKey/leftJoin, I guess that 1st option is better? what’s your idea? Best, Yifan LI
scala.MatchError on SparkSQL when creating ArrayType of StructType
Hi, I am using SparkSQL on 1.1.0 branch. The following code leads to a scala.MatchError at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) val scm = StructType(inputRDD.schema.fields.init :+ StructField(list, ArrayType( StructType( Seq(StructField(date, StringType, nullable = false), StructField(nbPurchase, IntegerType, nullable = false, nullable = false)) // purchaseRDD is RDD[sql.ROW] whose schema is corresponding to scm. It is transformed from inputRDD val schemaRDD = hiveContext.applySchema(purchaseRDD, scm) schemaRDD.registerTempTable(t_purchase) Here's the stackTrace: scala.MatchError: ArrayType(StructType(List(StructField(date,StringType, true ), StructField(n_reachat,IntegerType, true ))),true) (of class org.apache.spark.sql.catalyst.types.ArrayType) at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:84) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:66) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:50) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:149) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) The strange thing is that nullable of date and nbPurchase field are set to true while it were false in the code. If I set both to true, it works. But, in fact, they should not be nullable. Here's what I find at Cast.scala:247 on 1.1.0 branch private[this] lazy val cast: Any = Any = dataType match { case StringType = castToString case BinaryType = castToBinary case DecimalType = castToDecimal case TimestampType = castToTimestamp case BooleanType = castToBoolean case ByteType = castToByte case ShortType = castToShort case IntegerType = castToInt case FloatType = castToFloat case LongType = castToLong case DoubleType = castToDouble } Any idea? Thank you. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-MatchError-on-SparkSQL-when-creating-ArrayType-of-StructType-tp20459.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Streaming: output to cassandra
Here's an example of a Cassandra etl that you can follow which should exit on its own. I'm using it as a blueprint for revolving spark streaming apps on top of. For me, I kill the streaming app w system.exit after a sufficient amount of data is collected. That seems to work for most any scenario... But you I guess could also kill on the stream handler side as well if you are writing a custom dstream. https://github.com/jayunit100/SparkBlueprint/blob/master/src/main/scala/sparkapps/tweetstream/Processor.scala On Dec 5, 2014, at 1:50 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Batch is the batch duration that you are specifying while creating the StreamingContext, so at the end of every batch's computation the data will get flushed to Cassandra, and why are you stopping your program with Ctrl + C? You can always specify the time with the sc.awaitTermination(Duration) Thanks Best Regards On Fri, Dec 5, 2014 at 11:53 AM, m.sar...@accenture.com wrote: Hi Gerard/Akhil, By how do I specify a batch I was trying to ask that when does the data in the JavaDStream gets flushed into Cassandra table?. I read somewhere that the streaming data in batches gets written in Cassandra. This batch can be of some particular time, or one particular run. That was what I was trying to understand, how to set that Batch in my program. Because if a batch means one cycle run of my streaming app, then in my app, I'm hitting a Ctrl+C to kill the program. So the program is terminating, and would the data get inserted successfully into my Cassandra table? For example, in Terminal-A I'm running Kafka-producer to stream-in messages. Terminal-B I'm running my Streaming App. In my App there is a line jssc.awaitTermination(); which will keep running my App till I kill it. Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing it. So its a kind of ungraceful termination. So in this case will the data in my App DStream get written into Cassandra? Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Gerard Maas gerard.m...@gmail.com Sent: Thursday, December 4, 2014 10:22 PM To: Akhil Das Cc: Sarosh, M.; user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector. Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
I worked man.. Thanks alot :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228p20461.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How can I compile only the core and streaming (so that I can get test utilities of streaming)?
Hello, I'm currently developing a Spark Streaming application and trying to write my first unit test. I've used Java for this application, and I also need use Java (and JUnit) for writing unit tests. I could not find any documentation that focuses on Spark Streaming unit testing, all I could find was the Java based unit tests in Spark Streaming source code: https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2fblob%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fjava%2forg%2fapache%2fspark%2fstreaming%2fJavaAPISuite.java that depends on a Scala file: https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2fblob%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fjava%2forg%2fapache%2fspark%2fstreaming%2fJavaTestUtils.scala which, in turn, depends on the Scala test files in https://github.com/apache/spark/tree/branch-1.1/streaming/src/test/scala/org/apache/spark/streaming https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2ftree%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fscala%2forg%2fapache%2fspark%2fstreaming So I thought that I could grab the Spark source code, switch to branch-1.1 branch and then only compile 'core' and 'streaming' modules, hopefully ending up with the compiled classes (or jar files) of the Streaming test utilities, so that I can import them in my Java based Spark Streaming application. However, trying to build it via the following command line failed: mvn -pl core,streaming package You can see the full output at the end of this message. Any ideas how to progress? Full output of the build: emre@emre-ubuntu:~/code/spark$ mvn -pl core,streaming package [INFO] Scanning for projects... [INFO] [INFO] Reactor Build Order: [INFO] [INFO] Spark Project Core [INFO] Spark Project Streaming [INFO] [INFO] [INFO] Building Spark Project Core 1.1.2-SNAPSHOT [INFO] Downloading: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.pom Downloaded: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.pom (5 KB at 5.8 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.jar Downloaded: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.jar (31 KB at 200.4 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.pom Downloaded: https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.pom (24 KB at 178.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.pom Downloaded: https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.pom (3 KB at 22.5 KB/sec) Downloading: https://repo1.maven.org/maven2/org/scala-lang/scalap/2.10.4/scalap-2.10.4.pom Downloaded: https://repo1.maven.org/maven2/org/scala-lang/scalap/2.10.4/scalap-2.10.4.pom (2 KB at 19.2 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/derby/derby/10.4.2.0/derby-10.4.2.0.pom Downloaded: https://repo1.maven.org/maven2/org/apache/derby/derby/10.4.2.0/derby-10.4.2.0.pom (2 KB at 14.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.pom Downloaded: https://repo1.maven.org/maven2/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.pom (1010 B at 4.1 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymockclassextension/3.1/easymockclassextension-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymockclassextension/3.1/easymockclassextension-3.1.pom (5 KB at 42.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymock-parent/3.1/easymock-parent-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymock-parent/3.1/easymock-parent-3.1.pom (13 KB at 133.8 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymock/3.1/easymock-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymock/3.1/easymock-3.1.pom (6 KB at 38.8 KB/sec) Downloading:
Re: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down
Hi, Alexey, I'm getting the same error on startup with Spark 1.1.0. Everything works fine fortunately. The error is mentioned in the logs in https://issues.apache.org/jira/browse/SPARK-4498, so maybe it will also be fixed in Spark 1.2.0 and 1.1.2. I have no insight into it unfortunately. On Tue, Dec 2, 2014 at 1:38 PM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Any ideas? Anyone got the same error? On Mon, Dec 1, 2014 at 2:37 PM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hello spark users! I found lots of strange messages in driver log. Here it is: 2014-12-01 11:54:23,849 [sparkDriver-akka.actor.default-dispatcher-25] ERROR akka.remote.EndpointWriter[akka://sparkDriver/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40data1.hadoop%3A17372-5/endpointWriter] - AssociationError [akka.tcp://sparkDriver@10.54.87.173:55034] - [akka.tcp://sparkExecutor@data1.hadoop:17372]: Error [Shut down address: akka.tcp://sparkExecutor@data1.hadoop:17372] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@data1.hadoop:17372 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] I got this message for every worker twice. First - for driverPropsFetcher and next for sparkExecutor. Looks like spark shutdown remote akka system incorrectly or there is some race condition in this process and driver sent some data to worker, but worker's actor system already in shutdown state. Except for this message everything works fine. But this is ERROR level message and I found it in my ERROR only log. Do you have any idea is it configuration issue, bug in spark or akka or something else? Thanks!
Re: Does filter on an RDD scan every data item ?
Any thoughts, how could Spark SQL help in our scenario ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20465.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Streaming: output to cassandra
You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
cartesian on pyspark not paralleised
Hi, using pyspark 1.1.0 on YARN 2.5.0. all operations run nicely in parallel - I can seen multiple python processes spawned on each nodemanager but from some reason when running cartesian there is only single python process running on each node. the task is indicating thousands of partitions so don't understand why it is not running with higher parallelism. the performance is obviously poor although other operation rocks. any idea how to improve this? thank you, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Increasing the number of retry in case of job failure
It is controlled by spark.task.maxFailures. See http://spark.apache.org/docs/latest/configuration.html#scheduling. On Fri, Dec 5, 2014 at 11:02 AM, shahab shahab.mok...@gmail.com wrote: Hello, By some (unknown) reasons some of my tasks, that fetch data from Cassandra, are failing so often, and apparently the master removes a tasks which fails more than 4 times (in my case). Is there any way to increase the number of re-tries ? best, /Shahab
Re: Market Basket Analysis
Generally I don't think frequent-item-set algorithms are that useful. They're simple and not probabilistic; they don't tell you what sets occurred unusually frequently. Usually people ask for frequent item set algos when they really mean they want to compute item similarity or make recommendations. What's your use case? On Thu, Dec 4, 2014 at 8:23 PM, Rohit Pujari rpuj...@hortonworks.com wrote: Sure, I’m looking to perform frequent item set analysis on POS data set. Apriori is a classic algorithm used for such tasks. Since Apriori implementation is not part of MLLib yet, (see https://issues.apache.org/jira/browse/SPARK-4001) What are some other options/algorithms I could use to perform a similar task? If there’s no spoon to spoon substitute, spoon to fork will suffice too. Hopefully this provides some clarification. Thanks, Rohit From: Tobias Pfeiffer t...@preferred.jp Date: Thursday, December 4, 2014 at 7:20 PM To: Rohit Pujari rpuj...@hortonworks.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Market Basket Analysis Hi, On Thu, Dec 4, 2014 at 11:58 PM, Rohit Pujari rpuj...@hortonworks.com wrote: I'd like to do market basket analysis using spark, what're my options? To do it or not to do it ;-) Seriously, could you elaborate a bit on what you want to know? Tobias CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming kafa best practices ?
hi, What is the bet way to process a batch window in SparkStreaming : kafkaStream.foreachRDD(rdd = { rdd.collect().foreach(event = { // process the event process(event) }) }) Or kafkaStream.foreachRDD(rdd = { rdd.map(event = { // process the event process(event) }).collect() }) thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafa-best-practices-tp20470.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK LIMITATION - more than one case class is not allowed !!
On Fri, Dec 5, 2014 at 7:12 AM, Tobias Pfeiffer t...@preferred.jp wrote: Rahul, On Fri, Dec 5, 2014 at 2:50 PM, Rahul Bindlish rahul.bindl...@nectechnologies.in wrote: I have done so thats why spark is able to load objectfile [e.g. person_obj] and spark has maintained serialVersionUID [person_obj]. Next time when I am trying to load another objectfile [e.g. office_obj] and I think spark is matching serialVersionUID [person_obj] with previous serialVersionUID [person_obj] and giving mismatch error. In my first post, I have give statements which can be executed easily to replicate this issue. Can you post the Scala source for your case classes? I have tried the following in spark-shell: case class Dog(name: String) case class Cat(age: Int) val dogs = sc.parallelize(Dog(foo) :: Dog(bar) :: Nil) val cats = sc.parallelize(Cat(1) :: Cat(2) :: Nil) dogs.saveAsObjectFile(test_dogs) cats.saveAsObjectFile(test_cats) This gives two directories test_dogs/ and test_cats/. Then I restarted spark-shell and entered: case class Dog(name: String) case class Cat(age: Int) val dogs = sc.objectFile(test_dogs) val cats = sc.objectFile(test_cats) I don't get an exception, but: dogs: org.apache.spark.rdd.RDD[Nothing] = FlatMappedRDD[1] at objectFile at console:12 You need to specify the type of the RDD. The compiler does not know what is in test_dogs. val dogs = sc.objectFile[Dog](test_dogs) val cats = sc.objectFile[Cat](test_cats) It's an easy mistake to make... I wonder if an assertion could be implemented that makes sure the type parameter is present.
Re: How can I compile only the core and streaming (so that I can get test utilities of streaming)?
Please specify '-DskipTests' on commandline. Cheers On Dec 5, 2014, at 3:52 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm currently developing a Spark Streaming application and trying to write my first unit test. I've used Java for this application, and I also need use Java (and JUnit) for writing unit tests. I could not find any documentation that focuses on Spark Streaming unit testing, all I could find was the Java based unit tests in Spark Streaming source code: https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java that depends on a Scala file: https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala which, in turn, depends on the Scala test files in https://github.com/apache/spark/tree/branch-1.1/streaming/src/test/scala/org/apache/spark/streaming So I thought that I could grab the Spark source code, switch to branch-1.1 branch and then only compile 'core' and 'streaming' modules, hopefully ending up with the compiled classes (or jar files) of the Streaming test utilities, so that I can import them in my Java based Spark Streaming application. However, trying to build it via the following command line failed: mvn -pl core,streaming package You can see the full output at the end of this message. Any ideas how to progress? Full output of the build: emre@emre-ubuntu:~/code/spark$ mvn -pl core,streaming package [INFO] Scanning for projects... [INFO] [INFO] Reactor Build Order: [INFO] [INFO] Spark Project Core [INFO] Spark Project Streaming [INFO] [INFO] [INFO] Building Spark Project Core 1.1.2-SNAPSHOT [INFO] Downloading: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.pom Downloaded: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.pom (5 KB at 5.8 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.jar Downloaded: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.jar (31 KB at 200.4 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.pom Downloaded: https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.pom (24 KB at 178.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.pom Downloaded: https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.pom (3 KB at 22.5 KB/sec) Downloading: https://repo1.maven.org/maven2/org/scala-lang/scalap/2.10.4/scalap-2.10.4.pom Downloaded: https://repo1.maven.org/maven2/org/scala-lang/scalap/2.10.4/scalap-2.10.4.pom (2 KB at 19.2 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/derby/derby/10.4.2.0/derby-10.4.2.0.pom Downloaded: https://repo1.maven.org/maven2/org/apache/derby/derby/10.4.2.0/derby-10.4.2.0.pom (2 KB at 14.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.pom Downloaded: https://repo1.maven.org/maven2/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.pom (1010 B at 4.1 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymockclassextension/3.1/easymockclassextension-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymockclassextension/3.1/easymockclassextension-3.1.pom (5 KB at 42.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymock-parent/3.1/easymock-parent-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymock-parent/3.1/easymock-parent-3.1.pom (13 KB at 133.8 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymock/3.1/easymock-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymock/3.1/easymock-3.1.pom (6 KB at 38.8 KB/sec) Downloading: https://repo1.maven.org/maven2/cglib/cglib-nodep/2.2.2/cglib-nodep-2.2.2.pom Downloaded: https://repo1.maven.org/maven2/cglib/cglib-nodep/2.2.2/cglib-nodep-2.2.2.pom (2 KB at 9.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.jar Downloading: https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.jar
subscribe me to the list
I would like to subscribe to the user@spark.apache.orgmailto:user@spark.apache.org Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541
Re: subscribe me to the list
Hi Ningjun Please send email to this address to get subscribed: user-subscr...@spark.apache.org On Dec 5, 2014, at 10:36 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I would like to subscribe to the user@spark.apache.org Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541
Why my default partition size is set to 52 ?
Hi all, I'm trying to run some spark job with spark-shell. What I want to do is just to count the number of lines in a file. I start the spark-shell with the default argument i.e just with ./bin/spark-shell. Load the text file with sc.textFile(path) and then call count on my data. When I do this, my data is always split in 52 partitions. I don't understand why since I run it on a local machine with 8 cores and the sc.defaultParallelism gives me 8. Even, if I load the file with sc.textFile(path,8), I always get data.partitions.size = 52 I use spark 1.1.1. Any ideas ? Cheers, Jao
Re: Why my default partition size is set to 52 ?
How big is your file? it's probably of a size that the Hadoop InputFormat would make 52 splits for it. Data drives partitions, not processing resource. Really, 8 splits is the minimum parallelism you want. Several times your # of cores is better. On Fri, Dec 5, 2014 at 8:51 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to run some spark job with spark-shell. What I want to do is just to count the number of lines in a file. I start the spark-shell with the default argument i.e just with ./bin/spark-shell. Load the text file with sc.textFile(path) and then call count on my data. When I do this, my data is always split in 52 partitions. I don't understand why since I run it on a local machine with 8 cores and the sc.defaultParallelism gives me 8. Even, if I load the file with sc.textFile(path,8), I always get data.partitions.size = 52 I use spark 1.1.1. Any ideas ? Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Streaming: output to cassandra
Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.put(tbl); } } ); but seems some type mis-match happenning. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 6:26 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable));? so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.comhttp://www.accenture.com
Re: Why my default partition size is set to 52 ?
Ok, I misunderstood the meaning of the partition. In fact, my file is 1.7G big and with less bigger file I have a different partitions size. Thanks for this clarification. On Fri, Dec 5, 2014 at 4:15 PM, Sean Owen so...@cloudera.com wrote: How big is your file? it's probably of a size that the Hadoop InputFormat would make 52 splits for it. Data drives partitions, not processing resource. Really, 8 splits is the minimum parallelism you want. Several times your # of cores is better. On Fri, Dec 5, 2014 at 8:51 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to run some spark job with spark-shell. What I want to do is just to count the number of lines in a file. I start the spark-shell with the default argument i.e just with ./bin/spark-shell. Load the text file with sc.textFile(path) and then call count on my data. When I do this, my data is always split in 52 partitions. I don't understand why since I run it on a local machine with 8 cores and the sc.defaultParallelism gives me 8. Even, if I load the file with sc.textFile(path,8), I always get data.partitions.size = 52 I use spark 1.1.1. Any ideas ? Cheers, Jao
Re: Spark-Streaming: output to cassandra
I think what you are looking for is something like: JavaRDDDouble pricesRDD = javaFunctions(sc).cassandraTable(ks, tab, mapColumnTo(Double.class)).select(price); JavaRDDPerson rdd = javaFunctions(sc).cassandraTable(ks, people, mapRowTo(Person.class)); noted here: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md ? - Helena @helenaedelson On Dec 5, 2014, at 10:15 AM, m.sar...@accenture.com m.sar...@accenture.com wrote: Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.put(tbl); } } ); but seems some type mis-match happenning. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 6:26 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with
Why KMeans with mllib is so slow ?
Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8 minutes on a single core. Solving the same problem with spark kmean|| take more than 1.5 hours with 8 cores Either they don't implement the same algorithm either I don't understand how the kmeans in spark works. Is my data not big enough to take full advantage of spark ? At least, I expect to the same runtime. Cheers, Jao
Re: How can I compile only the core and streaming (so that I can get test utilities of streaming)?
Hello, Specifying '-DskipTests' on commandline worked, though I can't be sure whether first running 'sbt assembly' also contributed to the solution. (I've tried 'sbt assembly' because branch-1.1's README says to use sbt). Thanks for the answer. Kind regards, Emre Sevinç
Re: Why KMeans with mllib is so slow ?
Spark has much more overhead, since it's set up to distribute the computation. Julia isn't distributed, and so has no such overhead in a completely in-core implementation. You generally use Spark when you have a problem large enough to warrant distributing, or, your data already lives in a distributed store like HDFS. But it's also possible you're not configuring the implementations the same way, yes. There's not enough info here really to say. On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8 minutes on a single core. Solving the same problem with spark kmean|| take more than 1.5 hours with 8 cores Either they don't implement the same algorithm either I don't understand how the kmeans in spark works. Is my data not big enough to take full advantage of spark ? At least, I expect to the same runtime. Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark exception catch
Hi , Is it possible to catch exceptions using pyspark so in case of error, the program will not fail and exit. for example if I am using (key, value) rdd functionality but the data don't have actually (key, value) format, pyspark will throw exception (like ValueError) that I am unable to catch. I guess this is because the errors occurs on each worker where I don't have full control. Also probably because of the DAG. For example (see below), it is useless to catch exception on the .foldByKey since its transformation and not action, as a result the transformation will be piped and materialized when some action applied, like .first(). But even when trying to catch exception on the action, will fail. I would expect that eventually the different exceptions will be collected and return back to the driver, where the developer could control it and decide on the next step. ** of course I can first check the input to verify that it matches (key, value), but for my opinion this will be overhead and will involve extra transformations. code example: data = [((1,),'e'),((2,),'b'),((1,),'aa', 'e'),((2,),'bb', 'e'),((5,),'a', 'e')] pdata = sc.parallelize(data,3) t=pdata.foldByKey([], lambda v1, v2: v1+[v2] if type(v2) != list else v1+v2) t.first() this also fail: try: t.first() except ValueError, e: pass Best, -- *--* *Igor Mazor* Senior Business Intelligence Manager Rocket Internet AG | Johannisstraße 20 | 10117 Berlin | Deutschland skype: igor_rocket_internet | mail: igor.mazor http://goog_862328191 @rocket-internet.de www.rocket-internet.de Geschäftsführer: Dr. Johannes Bruder, Arnt Jeschke, Alexander Kudlich Eingetragen beim Amtsgericht Berlin, HRB 109262 USt-ID DE256469659
Re: Spark-Streaming: output to cassandra
Thank you Helena, But I would like to explain my problem space: The output is supposed to be Cassandra. To achieve that, I have to use spark-cassandra-connecter APIs. So going in a botton-up approach, to write to cassandra, I have to use: javaFunctions(JavaRDD object rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); To use the above function javaFunctions, I need to obtain the JavaRDD object, using the sc.parallelize() like this: JavaRDDTestTable rdd = sc.parallelize(list); The above sc.parallelize(list) accepts List object as a parameter. The above List object will contain the data obtained either from JavaDStream or JavaPairReceiverDStream, that has the streaming data. So the flow is: I need: 1. JavaDStream data to get mapped into List. 2. The above List object to be passed to sc.parallelize(list) to obtain JavaRDD object. 3. The above JavaRDD object to be passed to javaFunctions().saveToCassandra(). For all this I need a code that maps my JavaDStream data into List. Once step 1 is done, steps 2 and 3 can easily be performed. I need help for step 1. I have written the code below to do it: JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); /* connection to cassandra */ CassandraConnector connector = CassandraConnector.apply(sc.getConf()); Session session = connector.openSession(); session.execute(CREATE TABLE testkeyspace.test_table (key TEXT PRIMARY KEY, value TEXT)); ListTestTable list = new ArrayListTestTable(); messages.foreachRDD(new FunctionTuple2String,String, Void() { public Void call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.add(tbl); return null; } } jssc.start(); jssc.awaitTermination(new Duration(60* 1000)); It would be great help if a way is suggested to map the JavaDStream to List. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 9:12 PM To: Sarosh, M. Cc: user Subject: Re: Spark-Streaming: output to cassandra I think what you are looking for is something like: JavaRDDDouble pricesRDD = javaFunctions(sc).cassandraTable(ks, tab, mapColumnTo(Double.class)).select(price); JavaRDDPerson rdd = javaFunctions(sc).cassandraTable(ks, people, mapRowTo(Person.class)); noted here: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md ? - Helena @helenaedelson On Dec 5, 2014, at 10:15 AM, m.sar...@accenture.commailto:m.sar...@accenture.com m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new
Re: Market Basket Analysis
This is a typical use case people who buy electric razors, also tend to buy batteries and shaving gel along with it. The goal is to build a model which will look through POS records and find which product categories have higher likelihood of appearing together in given a transaction. What would you recommend? On Fri, Dec 5, 2014 at 7:21 AM, Sean Owen so...@cloudera.com wrote: Generally I don't think frequent-item-set algorithms are that useful. They're simple and not probabilistic; they don't tell you what sets occurred unusually frequently. Usually people ask for frequent item set algos when they really mean they want to compute item similarity or make recommendations. What's your use case? On Thu, Dec 4, 2014 at 8:23 PM, Rohit Pujari rpuj...@hortonworks.com wrote: Sure, I’m looking to perform frequent item set analysis on POS data set. Apriori is a classic algorithm used for such tasks. Since Apriori implementation is not part of MLLib yet, (see https://issues.apache.org/jira/browse/SPARK-4001) What are some other options/algorithms I could use to perform a similar task? If there’s no spoon to spoon substitute, spoon to fork will suffice too. Hopefully this provides some clarification. Thanks, Rohit From: Tobias Pfeiffer t...@preferred.jp Date: Thursday, December 4, 2014 at 7:20 PM To: Rohit Pujari rpuj...@hortonworks.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Market Basket Analysis Hi, On Thu, Dec 4, 2014 at 11:58 PM, Rohit Pujari rpuj...@hortonworks.com wrote: I'd like to do market basket analysis using spark, what're my options? To do it or not to do it ;-) Seriously, could you elaborate a bit on what you want to know? Tobias CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- Rohit Pujari Solutions Engineer, Hortonworks rpuj...@hortonworks.com 716-430-6899 -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Why KMeans with mllib is so slow ?
Could you post you script to reproduce the results (also how to generate the dataset)? That will help us to investigate it. On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hmm, here I use spark on local mode on my laptop with 8 cores. The data is on my local filesystem. Event thought, there an overhead due to the distributed computation, I found the difference between the runtime of the two implementations really, really huge. Is there a benchmark on how well the algorithm implemented in mllib performs ? On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen so...@cloudera.com wrote: Spark has much more overhead, since it's set up to distribute the computation. Julia isn't distributed, and so has no such overhead in a completely in-core implementation. You generally use Spark when you have a problem large enough to warrant distributing, or, your data already lives in a distributed store like HDFS. But it's also possible you're not configuring the implementations the same way, yes. There's not enough info here really to say. On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8 minutes on a single core. Solving the same problem with spark kmean|| take more than 1.5 hours with 8 cores Either they don't implement the same algorithm either I don't understand how the kmeans in spark works. Is my data not big enough to take full advantage of spark ? At least, I expect to the same runtime. Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming Reusing JDBC Connections
Is there a way I can have a JDBC connection open through a streaming job. I have a foreach which is running once per batch. However, I don’t want to open the connection for each batch but would rather have a persistent connection that I can reuse. How can I do this? Thanks. Asim
Adding Spark Cassandra dependency breaks Spark Streaming?
Hi, Seems adding the cassandra connector and spark streaming causes issues. I've added by build and code file. Running sbt compile gives weird errors like Seconds is not part of org.apache.spark.streaming and object Receiver is not a member of package org.apache.spark.streaming.receiver. If I take out cassandraConnector from the list of dependencies, sbt compile succeeds. How is adding the dependency removing things from spark streaming packages? Is there something I can do (perhaps in sbt) to not have this break? Here's my build file: import sbt.Keys._import sbt._ name := untitled99 version := 1.0 scalaVersion := 2.10.4 val spark = org.apache.spark %% spark-core % 1.1.0val sparkStreaming = org.apache.spark %% spark-streaming % 1.1.0val cassandraConnector = com.datastax.spark %% spark-cassandra-connector % 1.1.0 withSources() withJavadoc() libraryDependencies ++= Seq(cassandraConnector,spark,sparkStreaming) resolvers += Akka Repository at http://repo.akka.io/releases/; And here's my code: import org.apache.spark.SparkContextimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.receiver.Receiverobject Foo {def main(args: Array[String]) {val context = new SparkContext()val ssc = new StreamingContext(context, Seconds(2))}}class Bar extends Receiver[Int]{override def onStart(): Unit = ???override def onStop(): Unit = ???}
Re: Why KMeans with mllib is so slow ?
The code is really simple : *object TestKMeans {* * def main(args: Array[String]) {* *val conf = new SparkConf()* * .setAppName(Test KMeans)* * .setMaster(local[8])* * .set(spark.executor.memory, 8g)* *val sc = new SparkContext(conf)* *val numClusters = 500;* *val numIterations = 2;* *val data = sc.textFile(sample.csv).map(x = Vectors.dense(x.split(',').map(_.toDouble)))* *data.cache()* *val clusters = KMeans.train(data, numClusters, numIterations)* *println(clusters.clusterCenters.size)* *val wssse = clusters.computeCost(data)* *println(serror : $wssse)* * }* *}* For the testing purpose, I was generating a sample random data with julia and store it in a csv file delimited by comma. The dimensions is 248000 x 384. In the target application, I will have more than 248k data to cluster. On Fri, Dec 5, 2014 at 6:03 PM, Davies Liu dav...@databricks.com wrote: Could you post you script to reproduce the results (also how to generate the dataset)? That will help us to investigate it. On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hmm, here I use spark on local mode on my laptop with 8 cores. The data is on my local filesystem. Event thought, there an overhead due to the distributed computation, I found the difference between the runtime of the two implementations really, really huge. Is there a benchmark on how well the algorithm implemented in mllib performs ? On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen so...@cloudera.com wrote: Spark has much more overhead, since it's set up to distribute the computation. Julia isn't distributed, and so has no such overhead in a completely in-core implementation. You generally use Spark when you have a problem large enough to warrant distributing, or, your data already lives in a distributed store like HDFS. But it's also possible you're not configuring the implementations the same way, yes. There's not enough info here really to say. On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8 minutes on a single core. Solving the same problem with spark kmean|| take more than 1.5 hours with 8 cores Either they don't implement the same algorithm either I don't understand how the kmeans in spark works. Is my data not big enough to take full advantage of spark ? At least, I expect to the same runtime. Cheers, Jao
RE: Spark Streaming Reusing JDBC Connections
I've done this: 1. foreachPartition 2. Open connection. 3. foreach inside the partition. 4. close the connection. Slightly crufty, but works. Would love to see a better approach. Regards, Ashic. Date: Fri, 5 Dec 2014 12:32:24 -0500 Subject: Spark Streaming Reusing JDBC Connections From: asimja...@gmail.com To: user@spark.apache.org Is there a way I can have a JDBC connection open through a streaming job. I have a foreach which is running once per batch. However, I don’t want to open the connection for each batch but would rather have a persistent connection that I can reuse. How can I do this? Thanks. Asim
Optimized spark configuration
Hi Could any one help what would be better / optimized configuration for driver memory, worker memory, number of parallelisms etc., parameters to be configured when we are running 1 master node (it itself acting as slave node also) and 1 slave node. Both are of 32 GB RAM with 4 cores. On this, I loaded approx. 17M rows of data (3.2 GB) to hive store and when I try to execute a query on this from jdbc thrift server, it is taking about 10-12 sec to retrieve the data which I think is too much. Or guide please guide me any tutorial which will explain about these optimize configurations. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Optimized-spark-configuration-tp20495.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
I am having problems reading files in the 4GB range
I am using a custom hadoop input format which works well on smaller files but fails with a file at about 4GB size - the format is generating about 800 splits and all variables in my code are longs - Any suggestions? Is anyone reading files of this size? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 113 in stage 0.0 failed 4 times, mostrecent failure: Lost task 113.3 in stage 0.0 (TID 38, pltrd022.labs.uninett.no): java.lang.IllegalArgumentException: Size exceeds Integ er.MAX_VALUE sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104) org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:452) org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:368) org.apache.spark.storage.BlockManager.get(BlockManager.scala:552) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
Re: Adding Spark Cassandra dependency breaks Spark Streaming?
Can you try with maven ? diff --git a/streaming/pom.xml b/streaming/pom.xml index b8b8f2e..6cc8102 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -68,6 +68,11 @@ artifactIdjunit-interface/artifactId scopetest/scope /dependency +dependency + groupIdcom.datastax.spark/groupId + artifactIdspark-cassandra-connector_2.10/artifactId + version1.1.0/version +/dependency /dependencies build outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory You can use the following command: mvn -pl core,streaming package -DskipTests Cheers On Fri, Dec 5, 2014 at 9:35 AM, Ashic Mahtab as...@live.com wrote: Hi, Seems adding the cassandra connector and spark streaming causes issues. I've added by build and code file. Running sbt compile gives weird errors like Seconds is not part of org.apache.spark.streaming and object Receiver is not a member of package org.apache.spark.streaming.receiver. If I take out cassandraConnector from the list of dependencies, sbt compile succeeds. How is adding the dependency removing things from spark streaming packages? Is there something I can do (perhaps in sbt) to not have this break? Here's my build file: *import sbt.Keys._import sbt._* *name := untitled99* *version := 1.0* *scalaVersion := 2.10.4* *val spark = org.apache.spark %% spark-core % 1.1.0val sparkStreaming = org.apache.spark %% spark-streaming % 1.1.0val cassandraConnector = com.datastax.spark %% spark-cassandra-connector % 1.1.0 withSources() withJavadoc()* *libraryDependencies ++= Seq(cassandraConnector,spark,sparkStreaming)* *resolvers += Akka Repository at http://repo.akka.io/releases/ http://repo.akka.io/releases/* And here's my code: *import org.apache.spark.SparkContextimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.receiver.Receiver* *object Foo {def main(args: Array[String]) {val context = new SparkContext()val ssc = new StreamingContext(context, Seconds(2))}}* *class Bar extends Receiver[Int] https://github.com/datastax/spark-cassandra-connector/issues/StorageLevel.MEMORY_AND_DISK_2{override def onStart(): Unit = ???* *override def onStop(): Unit = ???}*
Re: Spark streaming for v1.1.1 - unable to start application
Hey Sourav, are you able to run a simple shuffle in a spark-shell? 2014-12-05 1:20 GMT-08:00 Shao, Saisai saisai.s...@intel.com: Hi, I don’t think it’s a problem of Spark Streaming, seeing for call stack, it’s the problem when BlockManager starting to initializing itself. Would you mind checking your configuration of Spark, hardware problem, deployment. Mostly I think it’s not the problem of Spark. Thanks Saisai *From:* Sourav Chandra [mailto:sourav.chan...@livestream.com] *Sent:* Friday, December 5, 2014 4:36 PM *To:* user@spark.apache.org *Subject:* Spark streaming for v1.1.1 - unable to start application Hi, I am getting the below error and due to this there is no completed stages- all the waiting *14/12/05 03:31:59 WARN AkkaUtils: Error sending message in 1 attempts* *java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]* *at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)* *at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)* *at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)* *at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)* *at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)* *at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)* *at scala.concurrent.Await$.result(package.scala:107)* *at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)* *at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:213)* *at org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:203)* *at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:47)* *at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:177)* *at org.apache.spark.storage.BlockManager.init(BlockManager.scala:147)* *at org.apache.spark.storage.BlockManager.init(BlockManager.scala:168)* *at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230)* *at org.apache.spark.executor.Executor.init(Executor.scala:78)* *at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:60)* *at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)* *at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)* *at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)* *at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)* *at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)* *at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)* *at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)* *at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)* *at akka.actor.ActorCell.invoke(ActorCell.scala:456)* *at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)* *at akka.dispatch.Mailbox.run(Mailbox.scala:219)* *at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)* *at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)* *at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)* *at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)* *at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)* Could you please let me know the reason and fix for this? Spark version is 1.1.1 -- *Sourav Chandra* Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra *Livestream* Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Unable to run applications on clusters on EC2
Hey, the default port is 7077. Not sure if you actually meant to put 7070. As a rule of thumb, you can go to the Master web UI and copy and paste the URL at the top left corner. That almost always works unless your cluster has a weird proxy set up. 2014-12-04 14:26 GMT-08:00 Xingwei Yang happy...@gmail.com: I think it is related to my previous questions, but I separate them. In my previous question, I could not connect to WebUI even though I could log into the cluster without any problem. Also, I tried lynx localhost:8080 and I could get the information about the cluster; I could also user spark-submit to submit job locally by setting master to localhost However, I could not submit the job to the cluster master and I get the error like this: 14/12/04 22:14:39 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/12/04 22:14:42 INFO client.AppClient$ClientActor: Connecting to master spark://ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070... 14/12/04 22:14:42 WARN client.AppClient$ClientActor: Could not connect to akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070: akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070] 14/12/04 22:14:42 WARN client.AppClient$ClientActor: Could not connect to akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070: akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070] 14/12/04 22:14:42 WARN client.AppClient$ClientActor: Could not connect to akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070: akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070] 14/12/04 22:14:42 WARN client.AppClient$ClientActor: Could not connect to akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070: akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkmas...@ec2-54-149-92-187.us-west-2.compute.amazonaws.com:7070] Please let me know if you any any clue about it. Thanks a lot. -- Sincerely Yours Xingwei Yang https://sites.google.com/site/xingweiyang1223/
Re: spark-submit on YARN is slow
Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: [Graphx] which way is better to access faraway neighbors?
At 2014-12-05 02:26:52 -0800, Yifan LI iamyifa...@gmail.com wrote: I have a graph in where each vertex keep several messages to some faraway neighbours(I mean, not to only immediate neighbours, at most k-hops far, e.g. k = 5). now, I propose to distribute these messages to their corresponding destinations(say, faraway neighbours”): - by using pregel api, one superset is enough - by using spark basic operations(groupByKey, leftJoin, etc) on vertices RDD and its intermediate results. w.r.t the communication among machines, and the high cost of groupByKey/leftJoin, I guess that 1st option is better? If messages will only travel along edges (even if they travel over multiple edges), then the Pregel API should be faster. You'll have to run k supersteps for messages to propagate k hops away from their origins. If messages can jump directly between two arbitrary vertices, then doing a single set of Spark basic operations may be faster than running multiple Pregel supersteps. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit on YARN is slow
Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: Monitoring Spark
If you're only interested in a particular instant, a simpler way is to check the executors page on the Spark UI: http://spark.apache.org/docs/latest/monitoring.html. By default each executor runs one task per core, so you can see how many tasks are being run at a given time and this translates directly to how many cores are being used for execution. 2014-12-02 21:49 GMT-08:00 Otis Gospodnetic otis.gospodne...@gmail.com: Hi Isca, I think SPM can do that for you: http://blog.sematext.com/2014/10/07/apache-spark-monitoring/ Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Tue, Dec 2, 2014 at 11:57 PM, Isca Harmatz pop1...@gmail.com wrote: hello, im running spark on a cluster and i want to monitor how many nodes/ cores are active in different (specific) points of the program. is there any way to do this? thanks, Isca
RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Sorry...really don't have enough maven know how to do this quickly. I tried the pom below, and IntelliJ could find org.apache.spark.streaming.StreamingContext and org.apache.spark.streaming.Seconds, but not org.apache.spark.streaming.receiver.Receiver. Is there something specific I can try? I'll try sbt on the home machine in about a couple of hours. ?xml version=1.0 encoding=UTF-8? project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIduntitled100/groupId artifactIduntiled100/artifactId version1.0-SNAPSHOT/version dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector_2.10/artifactId version1.1.0/version /dependency /dependencies /project Date: Fri, 5 Dec 2014 10:58:51 -0800 Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming? From: yuzhih...@gmail.com To: as...@live.com CC: user@spark.apache.org Can you try with maven ? diff --git a/streaming/pom.xml b/streaming/pom.xmlindex b8b8f2e..6cc8102 100644--- a/streaming/pom.xml+++ b/streaming/pom.xml@@ -68,6 +68,11 @@ artifactIdjunit-interface/artifactId scopetest/scope /dependency+dependency+ groupIdcom.datastax.spark/groupId+ artifactIdspark-cassandra-connector_2.10/artifactId+ version1.1.0/version+/dependency /dependencies build outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory You can use the following command:mvn -pl core,streaming package -DskipTests Cheers On Fri, Dec 5, 2014 at 9:35 AM, Ashic Mahtab as...@live.com wrote: Hi, Seems adding the cassandra connector and spark streaming causes issues. I've added by build and code file. Running sbt compile gives weird errors like Seconds is not part of org.apache.spark.streaming and object Receiver is not a member of package org.apache.spark.streaming.receiver. If I take out cassandraConnector from the list of dependencies, sbt compile succeeds. How is adding the dependency removing things from spark streaming packages? Is there something I can do (perhaps in sbt) to not have this break? Here's my build file: import sbt.Keys._ import sbt._ name := untitled99 version := 1.0 scalaVersion := 2.10.4 val spark = org.apache.spark %% spark-core % 1.1.0 val sparkStreaming = org.apache.spark %% spark-streaming % 1.1.0 val cassandraConnector = com.datastax.spark %% spark-cassandra-connector % 1.1.0 withSources() withJavadoc() libraryDependencies ++= Seq( cassandraConnector, spark, sparkStreaming ) resolvers += Akka Repository at http://repo.akka.io/releases/; And here's my code: import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.Receiverobject Foo { def main(args: Array[String]) { val context = new SparkContext() val ssc = new StreamingContext(context, Seconds(2)) } }class Bar extends Receiver[Int]{ override def onStart(): Unit = ???override def onStop(): Unit = ??? }
Re: Issue in executing Spark Application from Eclipse
Hey Stuti, Did you start your standalone Master and Workers? You can do this through sbin/start-all.sh (see http://spark.apache.org/docs/latest/spark-standalone.html). Otherwise, I would recommend launching your application from the command line through bin/spark-submit. I am not sure if we officially support launching Spark applications from an IDE, because spark-submit handles very specific cases of how we set up class paths and JVM memory etc. -Andrew 2014-12-03 22:05 GMT-08:00 Stuti Awasthi stutiawas...@hcl.com: Hi All, I have a standalone Spark(1.1) cluster on one machine and I have installed scala Eclipse IDE (scala 2.10) on my desktop. I am trying to execute a spark code to execute over my standalone cluster but getting errors. Please guide me to resolve this. Code: val logFile = File Path present on desktop // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application).setMaster(spark://IP:PORT).setSparkHome(/home/stuti/Spark/spark-1.1.0-bin-hadoop1); val sc = new SparkContext(conf) println(sc.master) // Print correct master val logData = sc.textFile(logFile, 2).cache() println(logData.count) // throws error Error : Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/12/04 11:05:38 INFO SecurityManager: Changing view acls to: stutiawasthi, 14/12/04 11:05:38 INFO SecurityManager: Changing modify acls to: stutiawasthi, 14/12/04 11:05:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(stutiawasthi, ); users with modify permissions: Set(stutiawasthi, ) 14/12/04 11:05:39 INFO Slf4jLogger: Slf4jLogger started 14/12/04 11:05:39 INFO Remoting: Starting remoting 14/12/04 11:05:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@HOSTNAME_DESKTOP:62308] 14/12/04 11:05:40 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@HOSTNAME_DESKTOP:62308] 14/12/04 11:05:40 INFO Utils: Successfully started service 'sparkDriver' on port 62308. 14/12/04 11:05:40 INFO SparkEnv: Registering MapOutputTracker 14/12/04 11:05:40 INFO SparkEnv: Registering BlockManagerMaster 14/12/04 11:05:40 INFO DiskBlockManager: Created local directory at C:\Users\STUTIA~1\AppData\Local\Temp\spark-local-20141204110540-ad60 14/12/04 11:05:40 INFO Utils: Successfully started service 'Connection manager for block manager' on port 62311. 14/12/04 11:05:40 INFO ConnectionManager: Bound socket to port 62311 with id = ConnectionManagerId(HOSTNAME_DESKTOP,62311) 14/12/04 11:05:41 INFO MemoryStore: MemoryStore started with capacity 133.6 MB 14/12/04 11:05:41 INFO BlockManagerMaster: Trying to register BlockManager 14/12/04 11:05:41 INFO BlockManagerMasterActor: Registering block manager HOSTNAME_DESKTOP:62311 with 133.6 MB RAM 14/12/04 11:05:41 INFO BlockManagerMaster: Registered BlockManager 14/12/04 11:05:41 INFO HttpFileServer: HTTP File server directory is C:\Users\STUTIA~1\AppData\Local\Temp\spark-b65e69f4-69b9-4bb2-b41f-67165909e4c7 14/12/04 11:05:41 INFO HttpServer: Starting HTTP Server 14/12/04 11:05:41 INFO Utils: Successfully started service 'HTTP file server' on port 62312. 14/12/04 11:05:42 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/12/04 11:05:42 INFO SparkUI: Started SparkUI at http:// HOSTNAME_DESKTOP:4040 14/12/04 11:05:43 INFO AppClient$ClientActor: Connecting to master spark://10.112.67.80:7077... 14/12/04 11:05:43 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 spark://10.112.67.80:7077 14/12/04 11:05:44 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 14/12/04 11:05:45 INFO MemoryStore: ensureFreeSpace(31447) called with curMem=0, maxMem=140142182 14/12/04 11:05:45 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 30.7 KB, free 133.6 MB) 14/12/04 11:05:45 INFO MemoryStore: ensureFreeSpace(3631) called with curMem=31447, maxMem=140142182 14/12/04 11:05:45 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.5 KB, free 133.6 MB) 14/12/04 11:05:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on HOSTNAME_DESKTOP:62311 (size: 3.5 KB, free: 133.6 MB) 14/12/04 11:05:45 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 14/12/04 11:05:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/04 11:05:45 WARN LoadSnappy: Snappy native library not loaded 14/12/04 11:05:46 INFO FileInputFormat: Total input paths to process : 1 14/12/04 11:05:46 INFO SparkContext: Starting job: count at Test.scala:15 14/12/04 11:05:46 INFO DAGScheduler: Got job 0 (count at Test.scala:15) with 2 output partitions
Java RDD Union
I'm a bit confused regarding expected behavior of unions. I'm running on 8 cores. I have an RDD that is used to collect cluster associations (cluster id, content id, distance) for internal clusters as well as leaf clusters since I'm doing hierarchical k-means and need all distances for sorting documents appropriately upon examination. It appears that Union simply adds items in the argument to the RDD instance the method is called on rather than just returning a new RDD. If I want to do Union this was as more of an add/append should I be capturing the return value and releasing it from memory. Need help clarifying the semantics here. Also, in another related thread someone mentioned coalesce after union. Would I need to do the same on the instance RDD I'm calling Union on. Perhaps a method such as append would be useful and clearer.
Re: Any ideas why a few tasks would stall
Hi Steve et al., It is possible that there's just a lot of skew in your data, in which case repartitioning is a good idea. Depending on how large your input data is and how much skew you have, you may want to repartition to a larger number of partitions. By the way you can just call rdd.repartition(1000); this is the same as rdd.coalesce(1000, forceShuffle = true). Note that repartitioning is only a good idea if your straggler task is taking a long time. Otherwise, it can be quite expensive since it requires a full shuffle. Another possibility is that you might just have bad nodes in your cluster. To mitigate stragglers, you can try enabling speculative execution through spark.speculation to true. This attempts to re-run any task that takes a long time to complete on a different node in parallel. -Andrew 2014-12-04 11:43 GMT-08:00 akhandeshi ami.khande...@gmail.com: This did not work for me. that is, rdd.coalesce(200, forceShuffle) . Does anyone have ideas on how to distribute your data evenly and co-locate partitions of interest? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-ideas-why-a-few-tasks-would-stall-tp20207p20387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit on YARN is slow
My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: Increasing the number of retry in case of job failure
Increasing max failures is a way to do it, but it's probably a better idea to keep your tasks from failing in the first place. Are your tasks failing with exceptions from Spark or your application code? If from Spark, what is the stack trace? There might be a legitimate Spark bug such that even increasing this max failures won't fix your problem. 2014-12-05 5:12 GMT-08:00 Daniel Darabos daniel.dara...@lynxanalytics.com: It is controlled by spark.task.maxFailures. See http://spark.apache.org/docs/latest/configuration.html#scheduling. On Fri, Dec 5, 2014 at 11:02 AM, shahab shahab.mok...@gmail.com wrote: Hello, By some (unknown) reasons some of my tasks, that fetch data from Cassandra, are failing so often, and apparently the master removes a tasks which fails more than 4 times (in my case). Is there any way to increase the number of re-tries ? best, /Shahab
Re: drop table if exists throws exception
The command run fine for me on master. Note that Hive does print an exception in the logs, but that exception does not propogate to user code. On Thu, Dec 4, 2014 at 11:31 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got exception saying Hive: NoSuchObjectException(message:table table not found) when running DROP TABLE IF EXISTS table Looks like a new regression in Hive module. Anyone can confirm this? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: drop table if exists throws exception
And that is no different from how Hive has worked for a long time. On Fri, Dec 5, 2014 at 11:42 AM, Michael Armbrust mich...@databricks.com wrote: The command run fine for me on master. Note that Hive does print an exception in the logs, but that exception does not propogate to user code. On Thu, Dec 4, 2014 at 11:31 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got exception saying Hive: NoSuchObjectException(message:table table not found) when running DROP TABLE IF EXISTS table Looks like a new regression in Hive module. Anyone can confirm this? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: SchemaRDD partition on specific column values?
It does not appear that the in-memory caching currently preserves the information about the partitioning of the data so this optimization will probably not work. On Thu, Dec 4, 2014 at 8:42 PM, nitin nitin2go...@gmail.com wrote: With some quick googling, I learnt that I can we can provide distribute by coulmn_name in hive ql to distribute data based on a column values. My question now if I use distribute by id, will there be any performance improvements? Will I be able to avoid data movement in shuffle(Excahnge before JOIN step) and improve overall performance? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit on YARN is slow
Just an FYI - I can submit the SparkPi app to YARN in cluster mode on a 1-node m3.xlarge EC2 instance instance and the app finishes running successfully in about 40 seconds. I just figured the 30 - 40 sec run time was normal b/c of the submitting overhead that Andrew mentioned. Denny, you can maybe also try to run SparkPi against YARN as a speed check. spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster --master yarn /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/jars/spark-examples-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar 10 On Fri, Dec 5, 2014 at 2:32 PM, Denny Lee denny.g@gmail.com wrote: My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: spark-submit on YARN is slow
Hi Denny, Those sleeps were only at startup, so if jobs are taking significantly longer on YARN, that should be a different problem. When you ran on YARN, did you use the --executor-cores, --executor-memory, and --num-executors arguments? When running against a standalone cluster, by default Spark will make use of all the cluster resources, but when running against YARN, Spark defaults to a couple tiny executors. -Sandy On Fri, Dec 5, 2014 at 11:32 AM, Denny Lee denny.g@gmail.com wrote: My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: Java RDD Union
No, RDDs are immutable. union() creates a new RDD, and does not modify an existing RDD. Maybe this obviates the question. I'm not sure what you mean about releasing from memory. If you want to repartition the unioned RDD, you repartition the result of union(), not anything else. On Fri, Dec 5, 2014 at 1:27 PM, Ron Ayoub ronalday...@live.com wrote: I'm a bit confused regarding expected behavior of unions. I'm running on 8 cores. I have an RDD that is used to collect cluster associations (cluster id, content id, distance) for internal clusters as well as leaf clusters since I'm doing hierarchical k-means and need all distances for sorting documents appropriately upon examination. It appears that Union simply adds items in the argument to the RDD instance the method is called on rather than just returning a new RDD. If I want to do Union this was as more of an add/append should I be capturing the return value and releasing it from memory. Need help clarifying the semantics here. Also, in another related thread someone mentioned coalesce after union. Would I need to do the same on the instance RDD I'm calling Union on. Perhaps a method such as append would be useful and clearer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Java RDD Union
Hi Ron, Out of curiosity, why do you think that union is modifying an existing RDD in place? In general all transformations, including union, will create new RDDs, not modify old RDDs in place. Here's a quick test: scala val firstRDD = sc.parallelize(1 to 5) firstRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at console:12 scala val secondRDD = sc.parallelize(1 to 3) secondRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at console:12 scala firstRDD.collect() res1: Array[Int] = Array(1, 2, 3, 4, 5) scala secondRDD.collect() res2: Array[Int] = Array(1, 2, 3) scala val newRDD = firstRDD.union(secondRDD) newRDD: org.apache.spark.rdd.RDD[Int] = UnionRDD[4] at union at console:16 scala newRDD.collect() res3: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3) scala firstRDD.collect() res4: Array[Int] = Array(1, 2, 3, 4, 5) scala secondRDD.collect() res5: Array[Int] = Array(1, 2, 3) On Fri, Dec 5, 2014 at 2:27 PM, Ron Ayoub ronalday...@live.com wrote: I'm a bit confused regarding expected behavior of unions. I'm running on 8 cores. I have an RDD that is used to collect cluster associations (cluster id, content id, distance) for internal clusters as well as leaf clusters since I'm doing hierarchical k-means and need all distances for sorting documents appropriately upon examination. It appears that Union simply adds items in the argument to the RDD instance the method is called on rather than just returning a new RDD. If I want to do Union this was as more of an add/append should I be capturing the return value and releasing it from memory. Need help clarifying the semantics here. Also, in another related thread someone mentioned coalesce after union. Would I need to do the same on the instance RDD I'm calling Union on. Perhaps a method such as append would be useful and clearer.
Re: spark-submit on YARN is slow
Hey Sandy, What are those sleeps for and do they still exist? We have seen about a 1min to 1:30 executor startup time, which is a large chunk for jobs that run in ~10min. Thanks, Arun On Fri, Dec 5, 2014 at 3:20 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Denny, Those sleeps were only at startup, so if jobs are taking significantly longer on YARN, that should be a different problem. When you ran on YARN, did you use the --executor-cores, --executor-memory, and --num-executors arguments? When running against a standalone cluster, by default Spark will make use of all the cluster resources, but when running against YARN, Spark defaults to a couple tiny executors. -Sandy On Fri, Dec 5, 2014 at 11:32 AM, Denny Lee denny.g@gmail.com wrote: My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: spark-submit on YARN is slow
Likely this not the case here yet one thing to point out with Yarn parameters like --num-executors is that they should be specified *before* app jar and app args on spark-submit command line otherwise the app only gets the default number of containers which is 2. On Dec 5, 2014 12:22 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Denny, Those sleeps were only at startup, so if jobs are taking significantly longer on YARN, that should be a different problem. When you ran on YARN, did you use the --executor-cores, --executor-memory, and --num-executors arguments? When running against a standalone cluster, by default Spark will make use of all the cluster resources, but when running against YARN, Spark defaults to a couple tiny executors. -Sandy On Fri, Dec 5, 2014 at 11:32 AM, Denny Lee denny.g@gmail.com wrote: My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: spark-submit on YARN is slow
Hey Arun, The sleeps would only cause maximum like 5 second overhead. The idea was to give executors some time to register. On more recent versions, they were replaced with the spark.scheduler.minRegisteredResourcesRatio and spark.scheduler.maxRegisteredResourcesWaitingTime. As of 1.1, by default YARN will wait until either 30 seconds have passed or 80% of the requested executors have registered. -Sandy On Fri, Dec 5, 2014 at 12:46 PM, Ashish Rangole arang...@gmail.com wrote: Likely this not the case here yet one thing to point out with Yarn parameters like --num-executors is that they should be specified *before* app jar and app args on spark-submit command line otherwise the app only gets the default number of containers which is 2. On Dec 5, 2014 12:22 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Denny, Those sleeps were only at startup, so if jobs are taking significantly longer on YARN, that should be a different problem. When you ran on YARN, did you use the --executor-cores, --executor-memory, and --num-executors arguments? When running against a standalone cluster, by default Spark will make use of all the cluster resources, but when running against YARN, Spark defaults to a couple tiny executors. -Sandy On Fri, Dec 5, 2014 at 11:32 AM, Denny Lee denny.g@gmail.com wrote: My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: Java RDD Union
foreach also creates a new RDD, and does not modify an existing RDD. However, in practice, nothing stops you from fiddling with the Java objects inside an RDD when you get a reference to them in a method like this. This is definitely a bad idea, as there is certainly no guarantee that any other operations will see any, some or all of these edits. On Fri, Dec 5, 2014 at 2:40 PM, Ron Ayoub ronalday...@live.com wrote: I tricked myself into thinking it was uniting things correctly. I see I'm wrong now. I have a question regarding your comment that RDD are immutable. Can you change values in an RDD using forEach. Does that violate immutability. I've been using forEach to modify RDD but perhaps I've tricked myself once again into believing it is working. I have object reference so perhaps it is working serendipitously in local mode since the references are in fact not changing but there are referents are and somehow this will no longer work when clustering. Thanks for comments. From: so...@cloudera.com Date: Fri, 5 Dec 2014 14:22:38 -0600 Subject: Re: Java RDD Union To: ronalday...@live.com CC: user@spark.apache.org No, RDDs are immutable. union() creates a new RDD, and does not modify an existing RDD. Maybe this obviates the question. I'm not sure what you mean about releasing from memory. If you want to repartition the unioned RDD, you repartition the result of union(), not anything else. On Fri, Dec 5, 2014 at 1:27 PM, Ron Ayoub ronalday...@live.com wrote: I'm a bit confused regarding expected behavior of unions. I'm running on 8 cores. I have an RDD that is used to collect cluster associations (cluster id, content id, distance) for internal clusters as well as leaf clusters since I'm doing hierarchical k-means and need all distances for sorting documents appropriately upon examination. It appears that Union simply adds items in the argument to the RDD instance the method is called on rather than just returning a new RDD. If I want to do Union this was as more of an add/append should I be capturing the return value and releasing it from memory. Need help clarifying the semantics here. Also, in another related thread someone mentioned coalesce after union. Would I need to do the same on the instance RDD I'm calling Union on. Perhaps a method such as append would be useful and clearer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
R: Optimized spark configuration
What kind of Query are you performing? You should set something like 2 partition per core that would be 400 Mb per partition. As you have a lot of ram I suggest to cache the whole table, performance will increase a lot. Paolo Inviata dal mio Windows Phone Da: vdiwakar.malladimailto:vdiwakar.mall...@gmail.com Inviato: 05/12/2014 18:52 A: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Oggetto: Optimized spark configuration Hi Could any one help what would be better / optimized configuration for driver memory, worker memory, number of parallelisms etc., parameters to be configured when we are running 1 master node (it itself acting as slave node also) and 1 slave node. Both are of 32 GB RAM with 4 cores. On this, I loaded approx. 17M rows of data (3.2 GB) to hive store and when I try to execute a query on this from jdbc thrift server, it is taking about 10-12 sec to retrieve the data which I think is too much. Or guide please guide me any tutorial which will explain about these optimize configurations. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Optimized-spark-configuration-tp20495.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: scala.MatchError on SparkSQL when creating ArrayType of StructType
All values in Hive are always nullable, though you should still not be seeing this error. It should be addressed by this patch: https://github.com/apache/spark/pull/3150 On Fri, Dec 5, 2014 at 2:36 AM, Hao Ren inv...@gmail.com wrote: Hi, I am using SparkSQL on 1.1.0 branch. The following code leads to a scala.MatchError at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) val scm = StructType(inputRDD.schema.fields.init :+ StructField(list, ArrayType( StructType( Seq(StructField(date, StringType, nullable = false), StructField(nbPurchase, IntegerType, nullable = false, nullable = false)) // purchaseRDD is RDD[sql.ROW] whose schema is corresponding to scm. It is transformed from inputRDD val schemaRDD = hiveContext.applySchema(purchaseRDD, scm) schemaRDD.registerTempTable(t_purchase) Here's the stackTrace: scala.MatchError: ArrayType(StructType(List(StructField(date,StringType, true ), StructField(n_reachat,IntegerType, true ))),true) (of class org.apache.spark.sql.catalyst.types.ArrayType) at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:84) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:66) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:50) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:149) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) The strange thing is that nullable of date and nbPurchase field are set to true while it were false in the code. If I set both to true, it works. But, in fact, they should not be nullable. Here's what I find at Cast.scala:247 on 1.1.0 branch private[this] lazy val cast: Any = Any = dataType match { case StringType = castToString case BinaryType = castToBinary case DecimalType = castToDecimal case TimestampType = castToTimestamp case BooleanType = castToBoolean case ByteType = castToByte case ShortType = castToShort case IntegerType = castToInt case FloatType = castToFloat case LongType = castToLong case DoubleType = castToDouble } Any idea? Thank you. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-MatchError-on-SparkSQL-when-creating-ArrayType-of-StructType-tp20459.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using data in RDD to specify HDFS directory to write to
I'm experiencing the same problem when I try to run my app in a standalone Spark cluster. My use case, however, is closer to the problem documented in this thread: http://apache-spark-user-list.1001560.n3.nabble.com/Please-help-running-a-standalone-app-on-a-Spark-cluster-td1596.html. The solution for which did not work for me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789p20526.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why KMeans with mllib is so slow ?
Also, are you using the latest master in this experiment? A PR merged into the master couple days ago will spend up the k-means three times. See https://github.com/apache/spark/commit/7fc49ed91168999d24ae7b4cc46fbb4ec87febc1 Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 5, 2014 at 9:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: The code is really simple : object TestKMeans { def main(args: Array[String]) { val conf = new SparkConf() .setAppName(Test KMeans) .setMaster(local[8]) .set(spark.executor.memory, 8g) val sc = new SparkContext(conf) val numClusters = 500; val numIterations = 2; val data = sc.textFile(sample.csv).map(x = Vectors.dense(x.split(',').map(_.toDouble))) data.cache() val clusters = KMeans.train(data, numClusters, numIterations) println(clusters.clusterCenters.size) val wssse = clusters.computeCost(data) println(serror : $wssse) } } For the testing purpose, I was generating a sample random data with julia and store it in a csv file delimited by comma. The dimensions is 248000 x 384. In the target application, I will have more than 248k data to cluster. On Fri, Dec 5, 2014 at 6:03 PM, Davies Liu dav...@databricks.com wrote: Could you post you script to reproduce the results (also how to generate the dataset)? That will help us to investigate it. On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hmm, here I use spark on local mode on my laptop with 8 cores. The data is on my local filesystem. Event thought, there an overhead due to the distributed computation, I found the difference between the runtime of the two implementations really, really huge. Is there a benchmark on how well the algorithm implemented in mllib performs ? On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen so...@cloudera.com wrote: Spark has much more overhead, since it's set up to distribute the computation. Julia isn't distributed, and so has no such overhead in a completely in-core implementation. You generally use Spark when you have a problem large enough to warrant distributing, or, your data already lives in a distributed store like HDFS. But it's also possible you're not configuring the implementations the same way, yes. There's not enough info here really to say. On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8 minutes on a single core. Solving the same problem with spark kmean|| take more than 1.5 hours with 8 cores Either they don't implement the same algorithm either I don't understand how the kmeans in spark works. Is my data not big enough to take full advantage of spark ? At least, I expect to the same runtime. Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Including data nucleus tools
Can you try to run the same job using the assembly packaged by make-distribution as we discussed in the other thread. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 5, 2014 at 12:25 PM, spark.dubovsky.ja...@seznam.cz wrote: Hi all, I have created assembly jar from 1.2 snapshot source by running [1] which sets correct version of hadoop for our cluster and uses hive profile. I also have written relatively simple test program which starts by reading data from parquet using hive context. I compile the code against assembly jar created and then submited it on a cluster using by [2]. Job fails in its early stage on creating HiveContext itself. Important part of stack trace is [3]. Could please some of you explain what is wrong and how it should be fixed? I have found only SPARK-4532 https://issues.apache.org/jira/browse/SPARK-4532 when looking for something related. Fix for the bug is merged in source I have used so this is ruled out... Thanks for help Jakub [1] ./sbt/sbt -Dhadoop.version=2.3.0-cdh5.1.3 -Pyarn -Phive assembly/assembly [2] ./bin/spark-submit --num-executors 200 --master yarn-cluster --conf spark.yarn.jar=assembly/target/scala-2.10/spark-assembly-1.2.1-SNAPSHOT-hadoop2.3.0-cdh5.1.3.jar --class org.apache.spark.mllib.CreateGuidDomainDictionary root-0.1.jar ...some-args-here [3] 14/12/05 20:28:15 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient) Exception in thread Driver java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate ... Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ...
Cannot PredictOnValues or PredictOn base on the model build with StreamingLinearRegressionWithSGD
Hi, The following example code is able to build the correct model.weights, but its prediction value is zero. Am I passing the PredictOnValues incorrectly? I also coded a batch version base on LinearRegressionWithSGD() with the same train and test data, iteration, stepsize info, and it was able to model.predict with pretty good result. I don' know why the predictOnValues is coming out zero, is there another way to predict on StreamingLinearRegressonWithSGD(). Attached is the test and train data I am using. Numiteration and stepsize to converge to the model is 600 and .0001. val trainingData = ssc.textFileStream(inp(0)).map(LabeledPoint.parse) val testData = ssc.textFileStream(inp(1)).map(LabeledPoint.parse) val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(inp(3).toInt)).setNumIterations(inp(4).toInt).setStepSize(inp(5).toFloat) model.algorithm.setIntercept(true) model.trainOn(trainingData) //model.predictOnValues(testData.map(xp = (xp.label, xp.features))).print() model.predictOn(testData.map(xp = (xp.features))).print() ssc.start() ssc.awaitTermination() Thanks for the help. Tri final.test Description: final.test final.train Description: final.train - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running two different Spark jobs vs multi-threading RDDs
I've read in the documentation that RDDs can be run concurrently when submitted in separate threads. I'm curious how the scheduler would handle propagating these down to the tasks. I have 3 RDDs: - one RDD which loads some initial data, transforms it and caches it - two RDDs which use the cached RDD to provide reports I'm trying to figure out how the resources will be scheduled to perform these stages if I were to concurrently run the two RDDs that depend on the first RDD. Would the two RDDs run sequentially? Will they both run @ the same time and be smart about how they are caching? Would this be a time when I'd want to use Tachyon instead and run this as 2 separate physical jobs: one to place the shared data in the RAMDISK and one to run the two dependent RDDs concurrently? Or would it even be best in that case to run 3 completely separate jobs? We're planning on using YARN so there's 2 levels of scheduling going on. We're trying to figure out the best way to utilize the resources so that we are fully saturating the system and making sure there's constantly work being done rather than anything spinning gears waiting on upstream processing to occur (in mapreduce, we'd just submit a ton of jobs and have them wait in line).
Re: How to incrementally compile spark examples using mvn
You can set SPARK_PREPEND_CLASSES=1 and it should pick your new mllib classes whenever you compile them. I don't see anything similar for examples/, so if you modify example code you need to re-build the examples module (package or install - just compile won't work, since you need to build the new jar). On Thu, Dec 4, 2014 at 10:23 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I made some code changes in mllib project and as mentioned in the previous mails I did mvn install -pl mllib Now I run a program in examples using run-example, the new code is not executing.Instead the previous code itself is running. But if I do an mvn install in the entire spark project , I can see the new code running.But installing the entire spark takes a lot of time and so its difficult to do this each time I make some changes. Can someone tell me how to compile mllib alone and get the changes working? Thanks Regards, Meethu M On Friday, 28 November 2014 2:39 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I have a similar problem.I modified the code in mllib and examples. I did mvn install -pl mllib mvn install -pl examples But when I run the program in examples using run-example,the older version of mllib (before the changes were made) is getting executed. How to get the changes made in mllib while calling it from examples project? Thanks Regards, Meethu M On Monday, 24 November 2014 3:33 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you, Marcelo and Sean, mvn install is a good answer for my demands. -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月21日 1:47 收件人: yiming zhang 抄送: Sean Owen; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn Hi Yiming, On Wed, Nov 19, 2014 at 5:35 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? As Sean suggest, you shouldn't need to install anything. After mvn install, your local repo is a working Spark installation, and you can use spark-submit and other tool directly within it. You just need to remember to rebuild the assembly/ project when modifying Spark code (or the examples/ project when modifying examples). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Market Basket Analysis
Apriori can be thought as a post-processing on product similarity graph...I call it product similarity but for each product you build a node which keeps distinct users visiting the product and two product nodes are connected by an edge if the intersection 0...you are assuming if no one user visits a keyword, he is not going to visit it in the future...this graph is not for prediction but only keeps user visits... Anyway once you have build this graph on graphx, you can do interesting path based analysis...Pick a product and trace it's fanout to see once people bought this product, which other product they bought etc etc..A first stab at the analysis is to calculate the product similarities... You can also generate naturally occurring cluster of products but then you are partitioning the graph using spectral or other graph partitioners like METIS...Even the adhoc analysis of product graph will give lot of useful insights (hopefully deeper than apriori)... On Fri, Dec 5, 2014 at 12:25 PM, Sean Owen so...@cloudera.com wrote: I doubt Amazon uses a priori for this, but who knows. Usually you want also bought functionality, which is a form of similar-item computation. But you don't want to favor items that are simply frequently purchased in general. You probably want to look at pairs of items that co-occur in purchase histories unusually frequently by looking at (log) likelihood ratios, which is a straightforward item similarity computation. On Fri, Dec 5, 2014 at 11:43 AM, Ashic Mahtab as...@live.com wrote: This can definitely be useful. Frequently bought together is something amazon does, though surprisingly, you don't get a discount. Perhaps it can lead to offering (or avoiding!) deals on frequent itemsets. This is a good resource for frequent itemsets implementations: http://infolab.stanford.edu/~ullman/mmds/ch6.pdf From: rpuj...@hortonworks.com Date: Fri, 5 Dec 2014 10:31:17 -0600 Subject: Re: Market Basket Analysis To: so...@cloudera.com CC: t...@preferred.jp; user@spark.apache.org This is a typical use case people who buy electric razors, also tend to buy batteries and shaving gel along with it. The goal is to build a model which will look through POS records and find which product categories have higher likelihood of appearing together in given a transaction. What would you recommend? On Fri, Dec 5, 2014 at 7:21 AM, Sean Owen so...@cloudera.com wrote: Generally I don't think frequent-item-set algorithms are that useful. They're simple and not probabilistic; they don't tell you what sets occurred unusually frequently. Usually people ask for frequent item set algos when they really mean they want to compute item similarity or make recommendations. What's your use case? On Thu, Dec 4, 2014 at 8:23 PM, Rohit Pujari rpuj...@hortonworks.com wrote: Sure, I’m looking to perform frequent item set analysis on POS data set. Apriori is a classic algorithm used for such tasks. Since Apriori implementation is not part of MLLib yet, (see https://issues.apache.org/jira/browse/SPARK-4001) What are some other options/algorithms I could use to perform a similar task? If there’s no spoon to spoon substitute, spoon to fork will suffice too. Hopefully this provides some clarification. Thanks, Rohit From: Tobias Pfeiffer t...@preferred.jp Date: Thursday, December 4, 2014 at 7:20 PM To: Rohit Pujari rpuj...@hortonworks.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Market Basket Analysis Hi, On Thu, Dec 4, 2014 at 11:58 PM, Rohit Pujari rpuj...@hortonworks.com wrote: I'd like to do market basket analysis using spark, what're my options? To do it or not to do it ;-) Seriously, could you elaborate a bit on what you want to know? Tobias CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- Rohit Pujari Solutions Engineer, Hortonworks rpuj...@hortonworks.com 716-430-6899 CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or
Transfer from RDD to JavaRDD
I use Spark in Java. I want to access the vectors of RowMatrix M, thus I use M.rows(), which is a RDDVector I want to transform it to JavaRDDVector, I used the following command; JavaRDDVector data = JavaRDD.fromRDD(M.rows(), scala.reflect.ClassTag$.MODULE$.apply(Vector.class); However, it shows a error like this: The method fromRDD(RDDT, ClassTagT) in the type JavaRDD is not applicable for the arguments (RDDVector, ClassTagObject) Is there anything wrong with the method? Thanks a lot. -- Sincerely Yours Xingwei Yang https://sites.google.com/site/xingweiyang1223/
Re: Stateful mapPartitions
Yeah the main way to do this would be to have your own static cache of connections. These could be using an object in Scala or just a static variable in Java (for instance a set of connections that you can borrow from). - Patrick On Thu, Dec 4, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Dec 5, 2014 at 3:56 AM, Akshat Aranya aara...@gmail.com wrote: Is it possible to have some state across multiple calls to mapPartitions on each partition, for instance, if I want to keep a database connection open? If you're using Scala, you can use a singleton object, this will exist once per JVM (i.e., once per executor), like object DatabaseConnector { lazy val conn = ... } Please be aware that shutting down the connection is much harder than opening it, because you basically have no idea when processing is done for an executor, AFAIK. Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to incrementally compile spark examples using mvn
i suddenly also run into the issue that maven is trying to download snapshots that dont exists for other sub projects. did something change in the maven build? does maven not have capability to smartly compile the other sub-projects that a sub-project depends on? i rather avoid mvn install since this creates a local maven repo. i have been stung by that before (spend a day trying to do something and got weird errors because some toy version i once build was stuck in my local maven repo and it somehow got priority over a real maven repo). On Fri, Dec 5, 2014 at 5:28 PM, Marcelo Vanzin van...@cloudera.com wrote: You can set SPARK_PREPEND_CLASSES=1 and it should pick your new mllib classes whenever you compile them. I don't see anything similar for examples/, so if you modify example code you need to re-build the examples module (package or install - just compile won't work, since you need to build the new jar). On Thu, Dec 4, 2014 at 10:23 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I made some code changes in mllib project and as mentioned in the previous mails I did mvn install -pl mllib Now I run a program in examples using run-example, the new code is not executing.Instead the previous code itself is running. But if I do an mvn install in the entire spark project , I can see the new code running.But installing the entire spark takes a lot of time and so its difficult to do this each time I make some changes. Can someone tell me how to compile mllib alone and get the changes working? Thanks Regards, Meethu M On Friday, 28 November 2014 2:39 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I have a similar problem.I modified the code in mllib and examples. I did mvn install -pl mllib mvn install -pl examples But when I run the program in examples using run-example,the older version of mllib (before the changes were made) is getting executed. How to get the changes made in mllib while calling it from examples project? Thanks Regards, Meethu M On Monday, 24 November 2014 3:33 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you, Marcelo and Sean, mvn install is a good answer for my demands. -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月21日 1:47 收件人: yiming zhang 抄送: Sean Owen; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn Hi Yiming, On Wed, Nov 19, 2014 at 5:35 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? As Sean suggest, you shouldn't need to install anything. After mvn install, your local repo is a working Spark installation, and you can use spark-submit and other tool directly within it. You just need to remember to rebuild the assembly/ project when modifying Spark code (or the examples/ project when modifying examples). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: drop table if exists throws exception
I see. The resulting SchemaRDD is returned so like Michael said, the exception does not propogate to user code. However printing out the following log is confusing :) scala sql(drop table if exists abc) 14/12/05 16:27:02 INFO ParseDriver: Parsing command: drop table if exists abc 14/12/05 16:27:02 INFO ParseDriver: Parse Completed 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO Driver: Concurrency mode is disabled, not creating a lock manager 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO ParseDriver: Parsing command: DROP TABLE IF EXISTS abc 14/12/05 16:27:02 INFO ParseDriver: Parse Completed 14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=parse start=1417825622650 end=1417825622650 duration=0 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc 14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr cmd=get_table : db=default tbl=abc 14/12/05 16:27:02 INFO Driver: Semantic Analysis Completed 14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=semanticAnalyze start=1417825622650 end=1417825622653 duration=3 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=compile start=1417825622650 end=1417825622654 duration=4 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=Driver.execute from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO Driver: Starting command: DROP TABLE IF EXISTS abc 14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=TimeToSubmit start=1417825622650 end=1417825622654 duration=4 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=runTasks from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=task.DDL.Stage-0 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc 14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr cmd=get_table : db=default tbl=abc 14/12/05 16:27:02 ERROR Hive: NoSuchObjectException(message:default.abc table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) at sun.reflect.GeneratedMethodAccessor57.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) On Sat, Dec 6, 2014 at 3:45 AM, Mark Hamstra m...@clearstorydata.com wrote: And that is no different from how Hive has worked for a long time. On Fri, Dec 5, 2014 at 11:42 AM, Michael Armbrust mich...@databricks.com wrote: The command run fine for me on master. Note that Hive does print an exception in the logs, but that exception does not propogate to user code. On Thu, Dec 4, 2014 at 11:31 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got exception saying Hive: NoSuchObjectException(message:table table not found) when running DROP TABLE IF EXISTS table Looks like a new regression in Hive module. Anyone can confirm this? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to incrementally compile spark examples using mvn
i think what changed is that core now has dependencies on other sub projects. ok... so i am forced to install stuff because maven cannot compile what is needed. i will install On Fri, Dec 5, 2014 at 7:12 PM, Koert Kuipers ko...@tresata.com wrote: i suddenly also run into the issue that maven is trying to download snapshots that dont exists for other sub projects. did something change in the maven build? does maven not have capability to smartly compile the other sub-projects that a sub-project depends on? i rather avoid mvn install since this creates a local maven repo. i have been stung by that before (spend a day trying to do something and got weird errors because some toy version i once build was stuck in my local maven repo and it somehow got priority over a real maven repo). On Fri, Dec 5, 2014 at 5:28 PM, Marcelo Vanzin van...@cloudera.com wrote: You can set SPARK_PREPEND_CLASSES=1 and it should pick your new mllib classes whenever you compile them. I don't see anything similar for examples/, so if you modify example code you need to re-build the examples module (package or install - just compile won't work, since you need to build the new jar). On Thu, Dec 4, 2014 at 10:23 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I made some code changes in mllib project and as mentioned in the previous mails I did mvn install -pl mllib Now I run a program in examples using run-example, the new code is not executing.Instead the previous code itself is running. But if I do an mvn install in the entire spark project , I can see the new code running.But installing the entire spark takes a lot of time and so its difficult to do this each time I make some changes. Can someone tell me how to compile mllib alone and get the changes working? Thanks Regards, Meethu M On Friday, 28 November 2014 2:39 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I have a similar problem.I modified the code in mllib and examples. I did mvn install -pl mllib mvn install -pl examples But when I run the program in examples using run-example,the older version of mllib (before the changes were made) is getting executed. How to get the changes made in mllib while calling it from examples project? Thanks Regards, Meethu M On Monday, 24 November 2014 3:33 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you, Marcelo and Sean, mvn install is a good answer for my demands. -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月21日 1:47 收件人: yiming zhang 抄送: Sean Owen; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn Hi Yiming, On Wed, Nov 19, 2014 at 5:35 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? As Sean suggest, you shouldn't need to install anything. After mvn install, your local repo is a working Spark installation, and you can use spark-submit and other tool directly within it. You just need to remember to rebuild the assembly/ project when modifying Spark code (or the examples/ project when modifying examples). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to incrementally compile spark examples using mvn
Maven definitely compiles what is needed, but not if you tell it to only compile one module alone. Unless you have previously built and installed the other local snapshot artifacts it needs, that invocation can't proceed because you have restricted it to build one module whose dependencies don't exist. On Fri, Dec 5, 2014 at 6:44 PM, Koert Kuipers ko...@tresata.com wrote: i think what changed is that core now has dependencies on other sub projects. ok... so i am forced to install stuff because maven cannot compile what is needed. i will install On Fri, Dec 5, 2014 at 7:12 PM, Koert Kuipers ko...@tresata.com wrote: i suddenly also run into the issue that maven is trying to download snapshots that dont exists for other sub projects. did something change in the maven build? does maven not have capability to smartly compile the other sub-projects that a sub-project depends on? i rather avoid mvn install since this creates a local maven repo. i have been stung by that before (spend a day trying to do something and got weird errors because some toy version i once build was stuck in my local maven repo and it somehow got priority over a real maven repo). On Fri, Dec 5, 2014 at 5:28 PM, Marcelo Vanzin van...@cloudera.com wrote: You can set SPARK_PREPEND_CLASSES=1 and it should pick your new mllib classes whenever you compile them. I don't see anything similar for examples/, so if you modify example code you need to re-build the examples module (package or install - just compile won't work, since you need to build the new jar). On Thu, Dec 4, 2014 at 10:23 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I made some code changes in mllib project and as mentioned in the previous mails I did mvn install -pl mllib Now I run a program in examples using run-example, the new code is not executing.Instead the previous code itself is running. But if I do an mvn install in the entire spark project , I can see the new code running.But installing the entire spark takes a lot of time and so its difficult to do this each time I make some changes. Can someone tell me how to compile mllib alone and get the changes working? Thanks Regards, Meethu M On Friday, 28 November 2014 2:39 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I have a similar problem.I modified the code in mllib and examples. I did mvn install -pl mllib mvn install -pl examples But when I run the program in examples using run-example,the older version of mllib (before the changes were made) is getting executed. How to get the changes made in mllib while calling it from examples project? Thanks Regards, Meethu M On Monday, 24 November 2014 3:33 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you, Marcelo and Sean, mvn install is a good answer for my demands. -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月21日 1:47 收件人: yiming zhang 抄送: Sean Owen; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn Hi Yiming, On Wed, Nov 19, 2014 at 5:35 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? As Sean suggest, you shouldn't need to install anything. After mvn install, your local repo is a working Spark installation, and you can use spark-submit and other tool directly within it. You just need to remember to rebuild the assembly/ project when modifying Spark code (or the examples/ project when modifying examples). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transfer from RDD to JavaRDD
You can probably get around it with casting, but I ended up using wrapRDD -- which is not a static method -- from another JavaRDD in scope to address this more directly without casting or warnings. It's not ideal but both should work, just a matter of which you think is less hacky. On Fri, Dec 5, 2014 at 5:51 PM, Xingwei Yang happy...@gmail.com wrote: I use Spark in Java. I want to access the vectors of RowMatrix M, thus I use M.rows(), which is a RDDVector I want to transform it to JavaRDDVector, I used the following command; JavaRDDVector data = JavaRDD.fromRDD(M.rows(), scala.reflect.ClassTag$.MODULE$.apply(Vector.class); However, it shows a error like this: The method fromRDD(RDDT, ClassTagT) in the type JavaRDD is not applicable for the arguments (RDDVector, ClassTagObject) Is there anything wrong with the method? Thanks a lot. -- Sincerely Yours Xingwei Yang https://sites.google.com/site/xingweiyang1223/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to incrementally compile spark examples using mvn
I've never used it, but reading the help it seems the -am option might help here. On Fri, Dec 5, 2014 at 4:47 PM, Sean Owen so...@cloudera.com wrote: Maven definitely compiles what is needed, but not if you tell it to only compile one module alone. Unless you have previously built and installed the other local snapshot artifacts it needs, that invocation can't proceed because you have restricted it to build one module whose dependencies don't exist. On Fri, Dec 5, 2014 at 6:44 PM, Koert Kuipers ko...@tresata.com wrote: i think what changed is that core now has dependencies on other sub projects. ok... so i am forced to install stuff because maven cannot compile what is needed. i will install On Fri, Dec 5, 2014 at 7:12 PM, Koert Kuipers ko...@tresata.com wrote: i suddenly also run into the issue that maven is trying to download snapshots that dont exists for other sub projects. did something change in the maven build? does maven not have capability to smartly compile the other sub-projects that a sub-project depends on? i rather avoid mvn install since this creates a local maven repo. i have been stung by that before (spend a day trying to do something and got weird errors because some toy version i once build was stuck in my local maven repo and it somehow got priority over a real maven repo). On Fri, Dec 5, 2014 at 5:28 PM, Marcelo Vanzin van...@cloudera.com wrote: You can set SPARK_PREPEND_CLASSES=1 and it should pick your new mllib classes whenever you compile them. I don't see anything similar for examples/, so if you modify example code you need to re-build the examples module (package or install - just compile won't work, since you need to build the new jar). On Thu, Dec 4, 2014 at 10:23 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I made some code changes in mllib project and as mentioned in the previous mails I did mvn install -pl mllib Now I run a program in examples using run-example, the new code is not executing.Instead the previous code itself is running. But if I do an mvn install in the entire spark project , I can see the new code running.But installing the entire spark takes a lot of time and so its difficult to do this each time I make some changes. Can someone tell me how to compile mllib alone and get the changes working? Thanks Regards, Meethu M On Friday, 28 November 2014 2:39 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I have a similar problem.I modified the code in mllib and examples. I did mvn install -pl mllib mvn install -pl examples But when I run the program in examples using run-example,the older version of mllib (before the changes were made) is getting executed. How to get the changes made in mllib while calling it from examples project? Thanks Regards, Meethu M On Monday, 24 November 2014 3:33 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you, Marcelo and Sean, mvn install is a good answer for my demands. -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月21日 1:47 收件人: yiming zhang 抄送: Sean Owen; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn Hi Yiming, On Wed, Nov 19, 2014 at 5:35 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? As Sean suggest, you shouldn't need to install anything. After mvn install, your local repo is a working Spark installation, and you can use spark-submit and other tool directly within it. You just need to remember to rebuild the assembly/ project when modifying Spark code (or the examples/ project when modifying examples). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to incrementally compile spark examples using mvn
I tried the following: 511 rm -rf ~/.m2/repository/org/apache/spark/spark-core_2.10/1.3.0-SNAPSHOT/ 513 mvn -am -pl streaming package -DskipTests [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .. SUCCESS [4.976s] [INFO] Spark Project Networking .. SUCCESS [1.279s] [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [0.499s] [INFO] Spark Project Core SUCCESS [1:03.302s] [INFO] Spark Project Streaming ... SUCCESS [26.777s] [INFO] [INFO] BUILD SUCCESS Cheers On Fri, Dec 5, 2014 at 4:53 PM, Marcelo Vanzin van...@cloudera.com wrote: I've never used it, but reading the help it seems the -am option might help here. On Fri, Dec 5, 2014 at 4:47 PM, Sean Owen so...@cloudera.com wrote: Maven definitely compiles what is needed, but not if you tell it to only compile one module alone. Unless you have previously built and installed the other local snapshot artifacts it needs, that invocation can't proceed because you have restricted it to build one module whose dependencies don't exist. On Fri, Dec 5, 2014 at 6:44 PM, Koert Kuipers ko...@tresata.com wrote: i think what changed is that core now has dependencies on other sub projects. ok... so i am forced to install stuff because maven cannot compile what is needed. i will install On Fri, Dec 5, 2014 at 7:12 PM, Koert Kuipers ko...@tresata.com wrote: i suddenly also run into the issue that maven is trying to download snapshots that dont exists for other sub projects. did something change in the maven build? does maven not have capability to smartly compile the other sub-projects that a sub-project depends on? i rather avoid mvn install since this creates a local maven repo. i have been stung by that before (spend a day trying to do something and got weird errors because some toy version i once build was stuck in my local maven repo and it somehow got priority over a real maven repo). On Fri, Dec 5, 2014 at 5:28 PM, Marcelo Vanzin van...@cloudera.com wrote: You can set SPARK_PREPEND_CLASSES=1 and it should pick your new mllib classes whenever you compile them. I don't see anything similar for examples/, so if you modify example code you need to re-build the examples module (package or install - just compile won't work, since you need to build the new jar). On Thu, Dec 4, 2014 at 10:23 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I made some code changes in mllib project and as mentioned in the previous mails I did mvn install -pl mllib Now I run a program in examples using run-example, the new code is not executing.Instead the previous code itself is running. But if I do an mvn install in the entire spark project , I can see the new code running.But installing the entire spark takes a lot of time and so its difficult to do this each time I make some changes. Can someone tell me how to compile mllib alone and get the changes working? Thanks Regards, Meethu M On Friday, 28 November 2014 2:39 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I have a similar problem.I modified the code in mllib and examples. I did mvn install -pl mllib mvn install -pl examples But when I run the program in examples using run-example,the older version of mllib (before the changes were made) is getting executed. How to get the changes made in mllib while calling it from examples project? Thanks Regards, Meethu M On Monday, 24 November 2014 3:33 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you, Marcelo and Sean, mvn install is a good answer for my demands. -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月21日 1:47 收件人: yiming zhang 抄送: Sean Owen; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn Hi Yiming, On Wed, Nov 19, 2014 at 5:35 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? As Sean suggest, you shouldn't need to install anything. After mvn install, your local repo is a working Spark installation, and you can use spark-submit and other tool directly within it. You just need to remember to rebuild the assembly/ project when modifying Spark code (or the examples/ project when modifying examples). --
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Here's the solution I got after talking with Liancheng: 1) using backquote `..` to wrap up all illegal characters val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl_13 = s |CREATE EXTERNAL TABLE $name ( | $schema |) |STORED AS PARQUET |LOCATION '$file' .stripMargin sql(ddl_13) 2) create a new Schema and do applySchema to generate a new SchemaRDD, had to drop and register table val t = table(name) val newSchema = StructType(t.schema.fields.map(s = s.copy(name = s.name.replaceAll(.*?::, sql(sdrop table $name) applySchema(t, newSchema).registerTempTable(name) I'm testing it for now. Thanks for the help! Jianshi On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Problems creating and reading a large test file
I am trying to look at problems reading a data file over 4G. In my testing I am trying to create such a file. My plan is to create a fasta file (a simple format used in biology) looking like 1 TCCTTACGGAGTTCGGGTGTTTATCTTACTTATCGCGGTTCGCTGCCGCTCCGGGAGCCCGGATAGGCTGCGTTAATACCTAAGGAGCGCGTATTG 2 GTCTGATCTAAATGCGACGACGTCTTTAGTGCTAAGTGGAACCCAATCTTAAGACCCAGGCTCTTAAGCAGAAACAGACCGTCCCTGCCTCCTGGAGTAT 3 ... I create a list with 5000 structures - use flatMap to add 5000 per entry and then either call saveAsText or dnaFragmentIterator = mySet.toLocalIterator(); and write to HDFS Then I try to call JavaRDDString lines = ctx.textFile(hdfsFileName); what I get on a 16 node cluster 14/12/06 01:49:21 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(pltrd007.labs.uninett.no,50119) java.nio.channels.ClosedChannelException 2 14/12/06 01:49:35 ERROR BlockManagerMasterActor: Got two different block manager registrations on 20140711-081617-711206558-5050-2543-13 The code is at the line below - I did not want to spam the group although it is only a couple of pages - I am baffled - there are no issues when I create a few thousand records but things blow up when I try 25 million records or a file of 6B or so Can someone take a look - it is not a lot of code https://drive.google.com/file/d/0B4cgoSGuA4KWUmo3UzBZRmU5M3M/view?usp=sharing
Re: rdd.saveAsTextFile problem
Try the workaround for Windows found here: http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7. This fix the issue when calling rdd.saveAsTextFile(..) for me with Spark v1.1.0 on windows 8.1 in local mode. Summary of steps: 1) download compiled winutils.exe from http://social.msdn.microsoft.com/Forums/windowsazure/en-US/28a57efb-082b-424b-8d9e-731b1fe135de/please-read-if-experiencing-job-failures?forum=hdinsight 2) put this file into d:\winutil\bin 3) add in code: System.setProperty(hadoop.home.dir, d:\\winutil\\) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p20546.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode
There were two exit in this code. If the args was wrong, the spark-submit will get the return code 101, but, if the args is correct, spark-submit cannot get the second return code 100. What’s the difference between these two exit? I was so confused. I’m also confused. When I tried your codes, spark-submit returned 1 for both two cases. That’s expected. In the yarn-cluster mode, the driver runs in the ApplicationMaster. The exit code of driver is also the exit code of ApplicationMaster. However, for now, Spark cannot get the exit code of ApplicationMaster from Yarn, because Yarn does not send it back to the client. spark-submit will return 1 when Yarn reports the ApplicationMaster failed. Best Regards, Shixiong Zhu 2014-12-06 1:59 GMT+08:00 LinQili lin_q...@outlook.com: You mean the localhost:4040 or the application master web ui? Sent from my iPhone On Dec 5, 2014, at 17:26, Shixiong Zhu zsxw...@gmail.com wrote: What's the status of this application in the yarn web UI? Best Regards, Shixiong Zhu 2014-12-05 17:22 GMT+08:00 LinQili lin_q...@outlook.com: I tried anather test code: def main(args: Array[String]) { if (args.length != 1) { Util.printLog(ERROR, Args error - arg1: BASE_DIR) exit(101) } val currentFile = args(0).toString val DB = test_spark val tableName = src val sparkConf = new SparkConf().setAppName(sHiveFromSpark) val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) // Before exit Util.printLog(INFO, Exit) exit(100) } There were two `exit` in this code. If the args was wrong, the spark-submit will get the return code 101, but, if the args is correct, spark-submit cannot get the second return code 100. What's the difference between these two `exit`? I was so confused. -- From: lin_q...@outlook.com To: u...@spark.incubator.apache.org Subject: RE: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode Date: Fri, 5 Dec 2014 17:11:39 +0800 I tried in spark client mode, spark-submit can get the correct return code from spark job. But in yarn-cluster mode, It failed. -- From: lin_q...@outlook.com To: u...@spark.incubator.apache.org Subject: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode Date: Fri, 5 Dec 2014 16:55:37 +0800 Hi, all: According to https://github.com/apache/spark/pull/2732, When a spark job fails or exits nonzero in yarn-cluster mode, the spark-submit will get the corresponding return code of the spark job. But I tried in spark-1.1.1 yarn cluster, spark-submit return zero anyway. Here is my spark code: try { val dropTable = sdrop table $DB.$tableName hiveContext.hql(dropTable) val createTbl = do some thing... hiveContext.hql(createTbl) } catch { case ex: Exception = { Util.printLog(ERROR, screate db error.) exit(-1) } } Maybe I did something wrong. Is there any hint? Thanks.
Trying to understand a basic difference between these two configurations
I'm trying to understand the conceptual difference between these two configurations in term of performance (using Spark standalone cluster) Case 1: 1 Node 60 cores 240G of memory 50G of data on local file system Case 2: 6 Nodes 10 cores per node 40G of memory per node 50G of data on HDFS nodes are connected using a 10G network I just wanted to validate my understanding. 1. Reads in case 1 will be slower compared to case 2 because, in case 2 all 6 nodes can read the data in parallel from HDFS. However, if I change the file system to HDFS in Case 1, my read speeds will be conceptually the same as case 2. Correct ? 2. Once the data is loaded, case 1 will execute operations faster because there is no network overhead and all shuffle operations are local. 3. Obviously, case 1 is bad from a fault tolerance point of view because we have a single point of failure. Thanks -Soumya
Re: SPARK LIMITATION - more than one case class is not allowed !!
It's an easy mistake to make... I wonder if an assertion could be implemented that makes sure the type parameter is present. We could use the NotNothing pattern http://blog.evilmonkeylabs.com/2012/05/31/Forcing_Compiler_Nothing_checks/ but I wonder if it would just make the method signature very confusing for the avg user ...
Re: Trying to understand a basic difference between these two configurations
That depends! See inline. I am assuming that when you said replacing local disk with HDFS in case 1, you are connected to a separate HDFS cluster (like case 1) with a single 10G link. Also assumign that all nodes (1 in case 1, and 6 in case 2) are the worker nodes, and the spark application driver is running somewhere else. On Fri, Dec 5, 2014 at 7:31 PM, Soumya Simanta soumya.sima...@gmail.com wrote: I'm trying to understand the conceptual difference between these two configurations in term of performance (using Spark standalone cluster) Case 1: 1 Node 60 cores 240G of memory 50G of data on local file system Case 2: 6 Nodes 10 cores per node 40G of memory per node 50G of data on HDFS nodes are connected using a 10G network I just wanted to validate my understanding. 1. Reads in case 1 will be slower compared to case 2 because, in case 2 all 6 nodes can read the data in parallel from HDFS. However, if I change the file system to HDFS in Case 1, my read speeds will be conceptually the same as case 2. Correct ? If filesystem was HDFS in case 1, it still may not be conceptually same as case 2. It depends on what is the bottleneck of the system. In case 2, the total network b/w with which you can read from the HDFS cluster is 6 x 10G, but in case 1 it will still be 10G. So if the HDFS cluster has very high aggregate read b/w and network is the bottleneck in case 2, then case 1 will be less throughput than case 2. And vice versa, if the HDFS read b/w is the bottleneck, then conceptually case 1 will be same as case 2. And there are other issues like memory read b/w as well. Say the HDFS read b/w is the bottleneck is case 1 seems to be same as case 2. So the aggregate read b/w is say 50Gbps (less than 6 x 10Gbps network b/w). Can the single node of case 1 do memory transfers (reading from NIC to memory for processing) at 50Gbps? That would depend on memory speed, number of memory cards, etc. So all of these need to be considered. And thats how hardwares need to be designed so that all these parameters are balanced :) 2. Once the data is loaded, case 1 will execute operations faster because there is no network overhead and all shuffle operations are local. Assuming case 1 = case 2, yes. Potentially. If you are using local disk to shuffle files, some files systems are not happy with 60 threads trying to read and write from disk. Those can give rise to weird behavior. Ignoring that, conceptually, yes. 3. Obviously, case 1 is bad from a fault tolerance point of view because we have a single point of failure. Yeah. If that single node dies, there are no more resources left to continue the processing. Thanks -Soumya - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Getting this on the home machine as well. Not referencing the spark cassandra connector in libraryDependencies compiles. I've recently updated IntelliJ to 14. Could that be causing an issue? From: as...@live.com To: yuzhih...@gmail.com CC: user@spark.apache.org Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming? Date: Fri, 5 Dec 2014 19:24:46 + Sorry...really don't have enough maven know how to do this quickly. I tried the pom below, and IntelliJ could find org.apache.spark.streaming.StreamingContext and org.apache.spark.streaming.Seconds, but not org.apache.spark.streaming.receiver.Receiver. Is there something specific I can try? I'll try sbt on the home machine in about a couple of hours. ?xml version=1.0 encoding=UTF-8? project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIduntitled100/groupId artifactIduntiled100/artifactId version1.0-SNAPSHOT/version dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector_2.10/artifactId version1.1.0/version /dependency /dependencies /project Date: Fri, 5 Dec 2014 10:58:51 -0800 Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming? From: yuzhih...@gmail.com To: as...@live.com CC: user@spark.apache.org Can you try with maven ? diff --git a/streaming/pom.xml b/streaming/pom.xmlindex b8b8f2e..6cc8102 100644--- a/streaming/pom.xml+++ b/streaming/pom.xml@@ -68,6 +68,11 @@ artifactIdjunit-interface/artifactId scopetest/scope /dependency+dependency+ groupIdcom.datastax.spark/groupId+ artifactIdspark-cassandra-connector_2.10/artifactId+ version1.1.0/version+/dependency /dependencies build outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory You can use the following command:mvn -pl core,streaming package -DskipTests Cheers On Fri, Dec 5, 2014 at 9:35 AM, Ashic Mahtab as...@live.com wrote: Hi, Seems adding the cassandra connector and spark streaming causes issues. I've added by build and code file. Running sbt compile gives weird errors like Seconds is not part of org.apache.spark.streaming and object Receiver is not a member of package org.apache.spark.streaming.receiver. If I take out cassandraConnector from the list of dependencies, sbt compile succeeds. How is adding the dependency removing things from spark streaming packages? Is there something I can do (perhaps in sbt) to not have this break? Here's my build file: import sbt.Keys._ import sbt._ name := untitled99 version := 1.0 scalaVersion := 2.10.4 val spark = org.apache.spark %% spark-core % 1.1.0 val sparkStreaming = org.apache.spark %% spark-streaming % 1.1.0 val cassandraConnector = com.datastax.spark %% spark-cassandra-connector % 1.1.0 withSources() withJavadoc() libraryDependencies ++= Seq( cassandraConnector, spark, sparkStreaming ) resolvers += Akka Repository at http://repo.akka.io/releases/; And here's my code: import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.Receiverobject Foo { def main(args: Array[String]) { val context = new SparkContext() val ssc = new StreamingContext(context, Seconds(2)) } }class Bar extends Receiver[Int]{ override def onStart(): Unit = ???override def onStop(): Unit = ??? }
Re: spark-submit on YARN is slow
Sorry for the delay in my response - for my spark calls for stand-alone and YARN, I am using the --executor-memory and --total-executor-cores for the submission. In standalone, my baseline query completes in ~40s while in YARN, it completes in ~1800s. It does not appear from the RM web UI that its asking for more resources than available but by the same token, it appears that its only using a small amount of cores and available memory. Saying this, let me re-try using the --executor-cores, --executor-memory, and --num-executors arguments as suggested (and documented) vs. the --total-executor-cores On Fri Dec 05 2014 at 1:14:53 PM Andrew Or and...@databricks.com wrote: Hey Arun I've seen that behavior before. It happens when the cluster doesn't have enough resources to offer and the RM hasn't given us our containers yet. Can you check the RM Web UI at port 8088 to see whether your application is requesting more resources than the cluster has to offer? 2014-12-05 12:51 GMT-08:00 Sandy Ryza sandy.r...@cloudera.com: Hey Arun, The sleeps would only cause maximum like 5 second overhead. The idea was to give executors some time to register. On more recent versions, they were replaced with the spark.scheduler.minRegisteredResourcesRatio and spark.scheduler.maxRegisteredResourcesWaitingTime. As of 1.1, by default YARN will wait until either 30 seconds have passed or 80% of the requested executors have registered. -Sandy On Fri, Dec 5, 2014 at 12:46 PM, Ashish Rangole arang...@gmail.com wrote: Likely this not the case here yet one thing to point out with Yarn parameters like --num-executors is that they should be specified *before* app jar and app args on spark-submit command line otherwise the app only gets the default number of containers which is 2. On Dec 5, 2014 12:22 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Denny, Those sleeps were only at startup, so if jobs are taking significantly longer on YARN, that should be a different problem. When you ran on YARN, did you use the --executor-cores, --executor-memory, and --num-executors arguments? When running against a standalone cluster, by default Spark will make use of all the cluster resources, but when running against YARN, Spark defaults to a couple tiny executors. -Sandy On Fri, Dec 5, 2014 at 11:32 AM, Denny Lee denny.g@gmail.com wrote: My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Re: spark-submit on YARN is slow
Okay, my bad for not testing out the documented arguments - once i use the correct ones, the query shrinks completes in ~55s (I can probably make it faster). Thanks for the help, eh?! On Fri Dec 05 2014 at 10:34:50 PM Denny Lee denny.g@gmail.com wrote: Sorry for the delay in my response - for my spark calls for stand-alone and YARN, I am using the --executor-memory and --total-executor-cores for the submission. In standalone, my baseline query completes in ~40s while in YARN, it completes in ~1800s. It does not appear from the RM web UI that its asking for more resources than available but by the same token, it appears that its only using a small amount of cores and available memory. Saying this, let me re-try using the --executor-cores, --executor-memory, and --num-executors arguments as suggested (and documented) vs. the --total-executor-cores On Fri Dec 05 2014 at 1:14:53 PM Andrew Or and...@databricks.com wrote: Hey Arun I've seen that behavior before. It happens when the cluster doesn't have enough resources to offer and the RM hasn't given us our containers yet. Can you check the RM Web UI at port 8088 to see whether your application is requesting more resources than the cluster has to offer? 2014-12-05 12:51 GMT-08:00 Sandy Ryza sandy.r...@cloudera.com: Hey Arun, The sleeps would only cause maximum like 5 second overhead. The idea was to give executors some time to register. On more recent versions, they were replaced with the spark.scheduler.minRegisteredResourcesRatio and spark.scheduler.maxRegisteredResourcesWaitingTime. As of 1.1, by default YARN will wait until either 30 seconds have passed or 80% of the requested executors have registered. -Sandy On Fri, Dec 5, 2014 at 12:46 PM, Ashish Rangole arang...@gmail.com wrote: Likely this not the case here yet one thing to point out with Yarn parameters like --num-executors is that they should be specified *before* app jar and app args on spark-submit command line otherwise the app only gets the default number of containers which is 2. On Dec 5, 2014 12:22 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Denny, Those sleeps were only at startup, so if jobs are taking significantly longer on YARN, that should be a different problem. When you ran on YARN, did you use the --executor-cores, --executor-memory, and --num-executors arguments? When running against a standalone cluster, by default Spark will make use of all the cluster resources, but when running against YARN, Spark defaults to a couple tiny executors. -Sandy On Fri, Dec 5, 2014 at 11:32 AM, Denny Lee denny.g@gmail.com wrote: My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand steps. If I was running this on standalone cluster mode the query finished in 55s but on YARN, the query was still running 30min later. Would the hard coded sleeps potentially be in play here? On Fri, Dec 5, 2014 at 11:23 Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tobias, What version are you using? In some recent versions, we had a couple of large hardcoded sleeps on the Spark side. -Sandy On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or and...@databricks.com wrote: Hey Tobias, As you suspect, the reason why it's slow is because the resource manager in YARN takes a while to grant resources. This is because YARN needs to first set up the application master container, and then this AM needs to request more containers for Spark executors. I think this accounts for most of the overhead. The remaining source probably comes from how our own YARN integration code polls application (every second) and cluster resource states (every 5 seconds IIRC). I haven't explored in detail whether there are optimizations there that can speed this up, but I believe most of the overhead comes from YARN itself. In other words, no I don't know of any quick fix on your end that you can do to speed this up. -Andrew 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer t...@preferred.jp: Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
Fair scheduling accross applications in stand-alone mode
Hi - I understand that one can use spark.deploy.defaultCores and spark.cores.max to assign a fixed number of worker cores to different apps. However, instead of statically assigning the cores, I would like Spark to dynamically assign the cores to multiple apps. For example, when there is a single app running, that app gets the entire cluster resources, but when other apps are submitted, resources that are free get assigned to the new apps. Is there any configuration setting to achieve this in stand-alone mode? Mohammed