does dstream.transform() run on the driver node?
Hello, here's a simple program that demonstrates my problem: Is keyavg = rdd.values().reduce(sum) / rdd.count() inside stats calculated one time per partition or it's just once? I guess another way to ask the same question is DStream.transform() is called on the driver node or not? What would be an alternative way to do this two step computation without calculating the average many times? I guess I could do it in a foreachRDD() block but it doesn't seem appropriate given that this is more of a a transform than an action. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/does-dstream-transform-run-on-the-driver-node-tp24176.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark failed while trying to read parquet files
It doesn't seem to be Parquet 1.7.0 since the package name isn't under org.apache.parquet (1.7.0 is the first official Apache release of Parquet). The version you were using is probably Parquet 1.6.0rc3 according to the line number information: https://github.com/apache/parquet-mr/blob/parquet-1.6.0rc3/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java#L249 And you're hitting PARQUET-136, which has been fixed in (the real) Parquet 1.7.0 https://issues.apache.org/jira/browse/PARQUET-136 Cheng On 8/8/15 6:20 AM, Jerrick Hoang wrote: Hi all, I have a partitioned parquet table (very small table with only 2 partitions). The version of spark is 1.4.1, parquet version is 1.7.0. I applied this patch to spark [SPARK-7743] so I assume that spark can read parquet files normally, however, I'm getting this when trying to do a simple `select count(*) from table`, ```org.apache.spark.SparkException: Job aborted due to stage failure: Task 29 in stage 44.0 failed 15 times, most recent failure: Lost task 29.14 in stage 44.0: java.lang.NullPointerException at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249) at parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155) at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138) at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)``` Has anybody seen this before? Thanks
Re: Spark failed while trying to read parquet files
Yes! I was being dumb, should have caught that earlier, thank you Cheng Lian On Fri, Aug 7, 2015 at 4:25 PM, Cheng Lian lian.cs@gmail.com wrote: It doesn't seem to be Parquet 1.7.0 since the package name isn't under org.apache.parquet (1.7.0 is the first official Apache release of Parquet). The version you were using is probably Parquet 1.6.0rc3 according to the line number information: https://github.com/apache/parquet-mr/blob/parquet-1.6.0rc3/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java#L249 And you're hitting PARQUET-136, which has been fixed in (the real) Parquet 1.7.0 https://issues.apache.org/jira/browse/PARQUET-136 Cheng On 8/8/15 6:20 AM, Jerrick Hoang wrote: Hi all, I have a partitioned parquet table (very small table with only 2 partitions). The version of spark is 1.4.1, parquet version is 1.7.0. I applied this patch to spark [SPARK-7743] so I assume that spark can read parquet files normally, however, I'm getting this when trying to do a simple `select count(*) from table`, ```org.apache.spark.SparkException: Job aborted due to stage failure: Task 29 in stage 44.0 failed 15 times, most recent failure: Lost task 29.14 in stage 44.0: java.lang.NullPointerException at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249) at parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155) at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138) at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)``` Has anybody seen this before? Thanks
Re: How to get total CPU consumption for Spark job
Hi, Spark UI or logs don't provide the situation of cluster. However, you can use Ganglia to monitor the situation of cluster. In spark-ec2, there is an option to install ganglia automatically. If you use CDH, you can also use Cloudera manager. Cheers Gen On Sat, Aug 8, 2015 at 6:06 AM, Xiao JIANG jiangxia...@outlook.com wrote: Hi all, I was running some Hive/spark job on hadoop cluster. I want to see how spark helps improve not only the elapsed time but also the total CPU consumption. For Hive, I can get the 'Total MapReduce CPU Time Spent' from the log when the job finishes. But I didn't find any CPU stats for Spark jobs from either spark log or web UI. Is there any place I can find the total CPU consumption for my spark job? Thanks! Here is the version info: Spark version 1.3.0 Using Scala version 2.10.4, Java 1.7.0_67 Thanks! Xiao
Checkpoint Dir Error in Yarn
I am running in yarn-client mode and trying to execute network word count example. When I connect through nc I see the following in spark app logs: Exception in thread main java.lang.AssertionError: assertion failed: The checkpoint directory has not been set. Please use StreamingContext.checkpoint() or SparkContext.checkpoint() to set the checkpoint directory. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.streaming.dstream.DStream.validate(DStream.scala:183) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:229) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:229) at scala.collection
Re: Problems getting expected results from hbase_inputformat.py
Hi, In fact, Pyspark use org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/) to transform object of Hbase result to python string. Spark update these two scripts recently. However, they are not included in the official release of spark. So you are trying to use this new python script with old jar. You can clone the newest code of spark from github and build examples jar. Then you can get correct result. Cheers Gen On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless eric.bl...@yahoo.com.invalid wrote: I’m having some difficulty getting the desired results from the Spark Python example hbase_inputformat.py. I’m running with CDH5.4, hbase Version 1.0.0, Spark v 1.3.0 Using Python version 2.6.6 I followed the example to create a test HBase table. Here’s the data from the table I created – hbase(main):001:0 scan 'dev_wx_test' ROW COLUMN+CELL row1 column=f1:a, timestamp=1438716994027, value=value1 row1 column=f1:b, timestamp=1438717004248, value=value2 row2 column=f1:, timestamp=1438717014529, value=value3 row3 column=f1:, timestamp=1438717022756, value=value4 3 row(s) in 0.2620 seconds When either of these statements are included - “hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split(\n))” or “hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split(\n)).countByValue().items()” the result is - We only get the following printed; (row1, value2) is not printed: ((u'row1', u'value1'), 1) ((u'row2', u'value3'), 1) ((u'row3', u'value4'), 1) This looks like similar results to the following post I found - http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650 but it appears the pythonconverter HBaseResultToStringConverter has been updated since then. And this problem will be resolved too. When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split(\n)).mapValues(json.loads)” is included, the result is – ValueError: No JSON object could be decoded ** Here is more info on this from the log – Traceback (most recent call last): File hbase_inputformat.py, line 87, in module output = hbase_rdd.collect() File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py, line 701, in collect File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py, line 538, in __call__ File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o44.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, stluhdpddev27.monsanto.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py, line 101, in main process() File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py, line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py, line 236, in dump_stream vs = list(itertools.islice(iterator, batch)) File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py, line 1807, in lambda File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 338, in raw_decode raise ValueError(No JSON object could be decoded) ValueError: No JSON object could be decoded at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at
Re: Spark MLib v/s SparkR
Hi, It depends on the problem that you work on. Just as python and R, Mllib focuses on machine learning and SparkR will focus on statistics, if SparkR follow the way of R. For instance, If you want to use glm to analyse data: 1. if you are interested only in parameters of model, and use this model to predict, then you should use Mllib 2. if your focus is on confidence of the model, for example the confidence interval of result and the significant level of parameters, you should choose SparkR. However, as there is no glm package to this purpose yet, you need to code it by yourself. Hope it can be helpful Cheers Gen On Thu, Aug 6, 2015 at 2:24 AM, praveen S mylogi...@gmail.com wrote: I was wondering when one should go for MLib or SparkR. What is the criteria or what should be considered before choosing either of the solutions for data analysis? or What is the advantages of Spark MLib over Spark R or advantages of SparkR over MLib?
Re: Checkpoint Dir Error in Yarn
Have you tried to do what its suggesting? If you want to learn more about checkpointing, you can see the programming guide - http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing For more in depth understanding, you can see my talk - https://www.youtube.com/watch?v=d5UJonrruHk On Fri, Aug 7, 2015 at 5:48 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running in yarn-client mode and trying to execute network word count example. When I connect through nc I see the following in spark app logs: Exception in thread main java.lang.AssertionError: assertion failed: The checkpoint directory has not been set. Please use StreamingContext.checkpoint() or SparkContext.checkpoint() to set the checkpoint directory. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.streaming.dstream.DStream.validate(DStream.scala:183) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:229) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:229) at scala.collection
Re: [Spark Streaming] Session based windowing like in google dataflow
You can use Spark Streaming's updateStateByKey to do arbitrary sessionization. See the example - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala All it does is store a single number (count of each word seeing since the beginning), but you can extend it store arbitrary state data. And then you can use that state to keep track of gaps, windows, etc. People have done a lot sessionization using this. I am sure others can chime in. On Fri, Aug 7, 2015 at 10:48 AM, Ankur Chauhan an...@malloc64.com wrote: Hi all, I am trying to figure out how to perform equivalent of Session windows (as mentioned in https://cloud.google.com/dataflow/model/windowing) using spark streaming. Is it even possible (i.e. possible to do efficiently at scale). Just to expand on the definition: Taken from the google dataflow documentation: The simplest kind of session windowing specifies a minimum gap duration. All data arriving below a minimum threshold of time delay is grouped into the same window. If data arrives after the minimum specified gap duration time, this initiates the start of a new window. Any help would be appreciated. -- Ankur Chauhan
using Spark or pig group by efficient in my use case?
I have a tens of million records, which is customer ID and city ID pair. There are tens of millions of unique customer ID, and only a few hundreds unique city ID. I want to do a merge to get all city ID aggregated for a specific customer ID, and pull back all records. I want to do this using group by customer ID using Pig on Hadoop, and wondering if it is the most efficient way. Also wondering if there are overhead for sorting in Hadoop (I do not care if customer1 before customer2 or not, as long as all city are aggregated correctly for customer1 and customer 2)? Do you think Spark is better? Here is an example of inputs, CustomerID1 City1 CustomerID2 City2 CustomerID3 City1 CustomerID1 City3 CustomerID2 City4 I want output like this, CustomerID1 City1 City3 CustomerID2 City2 City4 CustomerID3 City1 thanks in advance, Lin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-Spark-or-pig-group-by-efficient-in-my-use-case-tp24178.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Maven Build
I'm trying to build spark 1.4.1 against CDH 5.3.2. I created a profile called cdh5.3.2 in spark_parent.pom, made some changes for sql/hive/v0.13.1, and the build finished successfully. Here is my problem: - If I run `mvn -Pcdh5.3.2,yarn,hive install`, the artifacts are installed into my local repo. - I expected `hadoop-client` version should be `hadoop-client-2.5.0-cdh5.3.2`, but it actually `hadoop-client-2.2.0`. If I add a dependency of `spark-sql-1.2.0-cdh5.3.2`, the version is `hadoop-client-2.5.0-cdh5.3.2`. What's the trick behind it?
Spark master driver UI: How to keep it after process finished?
Hi, A silly question here. The Driver Web UI dies when the spark-submit program finish. I would like some time to analyze after the program ends, as the page does not refresh it self, when I hit F5 I lose all the info. Thanks, Saif
Issue when rebroadcasting a variable outside of the definition scope
Hello everyone, this is my first message ever to a mailing list so please pardon me if for some reason I'm violating the etiquette. I have a problem with rebroadcasting a variable. How it should work is not well documented so I could find only a few and simple example to understand how it should work. What I'm trying to do is to propagate an update to the option for the behaviour of my streaming transformations (in this case, the evaluation of machine learning models). I have a listener on a kafka queue that wait for messages and update the broadcasted variable. I made it to work but the system doesn't rebroadcast anything if I pass the DStream or the broadcasted variable as a parameter. So they must be defined both in the same scope and the rebroadcasting should happen again in the same scope. Right now my main function looks like this: -- var updateVar= sc.broadcast(test) val stream=input.map(x = myTransformation(x,updateVar)) stream.writeToKafka[String, String](outputProps, (m: String) = new KeyedMessage[String, String](configuration.outputTopic, m +updateVar.value )) val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new StringDecoder())(0) for (messageAndTopic - controlStream) { println(ricevo) updateVar.unpersist() updateVar=ssc.sparkContext.broadcast(messageAndTopic.message) } ssc.start() ssc.awaitTermination() -- updateVar is correctly updated both in myTransformation and in the main scope and I can access the updated value. But when I try to do this moving the logic to a class, it fails. I have something like this (or the same queue listener from before, but moved to another class): class Listener(var updateVar: Broadcast[String]){... def someFunc()={ updateVar.unpersist() updateVar=sc.broadcast(new value) } ... } This fails: the variable can be destroyed but cannot be updated. Any suggestion on why there is this behaviour? Also I would like to know how Spark notices the reassignment to var and start the rebroadcasting. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Estimate size of Dataframe programatically
Hello, Is there a way to estimate the approximate size of a dataframe? I know we can cache and look at the size in UI but I'm trying to do this programatically. With RDD, I can sample and sum up size using SizeEstimator. Then extrapolate it to the entire RDD. That will give me approx size of RDD. With dataframes, its tricky due to columnar storage. How do we do it? On a related note, I see size of RDD object to be ~60MB. Is that the footprint of RDD in driver JVM? scala val temp = sc.parallelize(Array(1,2,3,4,5,6)) scala SizeEstimator.estimate(temp) res13: Long = 69507320 Srikanth
Re: log4j custom appender ClassNotFoundException with spark 1.4.1
One possible solution is to spark-submit with --driver-class-path and list all recursive dependencies. This is fragile and error prone. Non-working alternatives (used in SparkSubmit.scala AFTER arguments parser is initialized): spark-submit --packages ... spark-submit --jars ... spark-defaults.conf (spark.driver.extraJavaOptions, spark.jars, spark.driver.extraClassPath, ...) On Fri, Aug 7, 2015 at 8:57 AM, mlemay [via Apache Spark User List] ml-node+s1001560n24169...@n3.nabble.com wrote: That starts to smell... When analyzing SparkSubmit.scala, we can see than one of the firsts thing it does is to parse arguments. This uses Utils object and triggers initialization of member variables. One such variable is ShutdownHookManager (which didn't exists in spark 1.3) with the later log4j initialization. setContextClassLoader is set only a few steps after argument parsing in submit doRunMain runMain.. That pretty much sums it up: spark.util.Utils has a new static dependency on log4j that triggers it's initialization before the call to setContextClassLoader(MutableURLClassLoader) Anyone has a workaround to make this work in 1.4.1? -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/log4j-custom-appender-ClassNotFoundException-with-spark-1-4-1-tp24159p24169.html To unsubscribe from log4j custom appender ClassNotFoundException with spark 1.4.1, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=24159code=bWxlbWF5QGdtYWlsLmNvbXwyNDE1OXwtMTk2MTgzMjQzNg== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log4j-custom-appender-ClassNotFoundException-with-spark-1-4-1-tp24159p24170.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: log4j custom appender ClassNotFoundException with spark 1.4.1
Offending commit is : [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races. https://github.com/apache/spark/commit/e72c16e30d85cdc394d318b5551698885cfda9b8 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log4j-custom-appender-ClassNotFoundException-with-spark-1-4-1-tp24159p24171.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark is in-memory processing, how then can Tachyon make Spark faster?
Exactly! The sharing part is used in the Spark Notebook (this one https://github.com/andypetrella/spark-notebook/blob/master/notebooks/Tachyon%20Test.snb) so we can share stuffs between notebooks which are different SparkContext (in diff JVM). OTOH, we have a project that creates micro services on genomics data, for several reasons we used Tachyon to server genomes cubes (ranges across genomes), see here https://github.com/med-at-scale/high-health. HTH andy On Fri, Aug 7, 2015 at 8:36 PM Calvin Jia jia.cal...@gmail.com wrote: Hi, Tachyon http://tachyon-project.org manages memory off heap which can help prevent long GC pauses. Also, using Tachyon will allow the data to be shared between Spark jobs if they use the same dataset. Here's http://www.meetup.com/Tachyon/events/222485713/ a production use case where Baidu runs Tachyon to get 30x performance improvement in their SparkSQL workload. Hope this helps, Calvin On Fri, Aug 7, 2015 at 9:42 AM, Muler mulugeta.abe...@gmail.com wrote: Spark is an in-memory engine and attempts to do computation in-memory. Tachyon is memory-centeric distributed storage, OK, but how would that help ran Spark faster? -- andy
Re: SparkSQL: add jar blocks all queries
Hi, The issue only seems to happen when trying to access spark via the SparkSQL Thrift Server interface. Does anyone know a fix? james From: Wu, Walt Disney james.c...@disney.commailto:james.c...@disney.com Date: Friday, August 7, 2015 at 12:40 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL: add jar blocks all queries Hi, I got into a situation where a prior add jar command causing Spark SQL stops to work for all users. Does anyone know how to fix the issue? Regards, james From: Wu, Walt Disney james.c...@disney.commailto:james.c...@disney.com Date: Friday, August 7, 2015 at 10:29 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL: remove jar added by add jar command from dependencies Hi, I am using Spark SQL to run some queries on a set of avro data. Somehow I am getting this error 0: jdbc:hive2://n7-z01-0a2a1453 select count(*) from flume_test; Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 26.0 failed 4 times, most recent failure: Lost task 3.3 in stage 26.0 (TID 1027, n7-z01-0a2a1457.iaas.starwave.com): java.io.IOException: Incomplete HDFS URI, no host: hdfs:data/hive-jars/avro-mapred.jar at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:141) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1364) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:498) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:383) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:350) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:347) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:347) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I did not add the jar in this session, so I am wondering how I can get the jar removed from the dependencies so that It is not blocking all my spark sql queries for all sessions. Thanks, James
Problems getting expected results from hbase_inputformat.py
I’m having some difficulty getting the desired results fromthe Spark Python example hbase_inputformat.py. I’m running with CDH5.4, hbaseVersion 1.0.0, Spark v 1.3.0 Using Python version 2.6.6 I followed the example to create a test HBase table. Here’sthe data from the table I created – hbase(main):001:0 scan 'dev_wx_test'ROW COLUMN+CELLrow1column=f1:a, timestamp=1438716994027, value=value1row1column=f1:b, timestamp=1438717004248, value=value2row2column=f1:, timestamp=1438717014529, value=value3row3column=f1:, timestamp=1438717022756, value=value43 row(s) in 0.2620 seconds When either of these statements are included -“hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split(\n))” or “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split(\n)).countByValue().items()” the result is - We only get the following printed; (row1, value2) is notprinted: ((u'row1', u'value1'), 1) ((u'row2', u'value3'), 1) ((u'row3', u'value4'), 1) This looks like similar results to the following post Ifound -http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650but it appears the pythonconverterHBaseResultToStringConverter has been updated since then. When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split(\n)).mapValues(json.loads)” is included, the result is – ValueError: No JSON object could be decoded ** Here is more info on this from the log – Traceback (most recent call last): Filehbase_inputformat.py, line 87, in module output =hbase_rdd.collect() File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py,line 701, in collect File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py,line 538, in __call__ File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py,line 300, in get_return_value py4j.protocol.Py4JJavaError: An erroroccurred while calling o44.collect. : org.apache.spark.SparkException: Jobaborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recentfailure: Lost task 0.3 in stage 1.0 (TID 4, stluhdpddev27.monsanto.com):org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py,line 101, in main process() File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py,line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py,line 236, in dump_stream vs =list(itertools.islice(iterator, batch)) File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py,line 1807, in lambda File/usr/lib64/python2.6/json/__init__.py, line 307, in loads return_default_decoder.decode(s) File/usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end =self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py,line 338, in raw_decode raiseValueError(No JSON object could be decoded) ValueError: No JSON object could bedecoded at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
Re: spark config
In master branch, build/sbt-launch-lib.bash has the following: URL1= https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar I verified that the following exists: https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/#sbt-launch.jar FYI On Fri, Aug 7, 2015 at 2:08 PM, Bryce Lobdell lobde...@gmail.com wrote: I Recently downloaded spark package 1.4.0: A build of Spark with sbt/sbt clean assembly failed with message Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar Upon investigation I figured out that sbt-launch-0.13.7.jar is downloaded at build time and that it contained the the following: html headtitle404 Not Found/title/head body bgcolor=white centerh1404 Not Found/h1/center hrcenternginx/center /body /html which is an HTML error message to the effect that the file is missing (from the web server). The script sbt-launch-lib.bash contains the following lines which determine where the file sbt-launch.jar is downloaded from: acquire_sbt_jar () { SBT_VERSION=`awk -F = '/sbt\.version/ {print $2}' ./project/build.properties` URL1= http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar URL2= http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar JAR=build/sbt-launch-${SBT_VERSION}.jar The script sbt-launch.bash downloads $URL1 first, and incorrectly concludes that it succeeded on the basis that the file sbt-launch-0.13.7.jar exists (though it contains HTML). I succeeded in building Spark by: (1) Downloading the file sbt-launch-0.13.7.jar from $URL2 and placing it in the build directory. (2) Modifying sbt-launch-lib.bash to prevent the download of that file. (3) Restarting the download as I usually would, with SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly I think a lot of people will be confused by this. Probably someone should do some of the following: (1) Delete $URL1 and all references, or replace it with the correct/current URL which points to the sbt-launch.jar(s). (2) Modify sbt-launch-lib.bash, so that it will not conclude that the download of sbt-launch.jar succeeded, when the data returned is an HTML error message. Let me know if this is not clear, I will gladly explain in more detail or with more clarity, if needed. -Bryce Lobdell A transcript of my console is below: @ip-xx-xxx-xx-xxx:~/spark/spark-1.4.0$ SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly NOTE: The sbt/sbt script has been relocated to build/sbt. Please update references to point to the new location. Invoking 'build/sbt clean assembly' now ... Using /usr/lib/jvm/java-7-openjdk-amd64/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar *Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar* inquidia@ip-10-102-69-107:~/spark/spark-1.4.0$ cd build/ inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash *inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ unzip -l sbt-launch-0.13.7.jar* *Archive: sbt-launch-0.13.7.jar* * End-of-central-directory signature not found. Either this file is not* * a zipfile, or it constitutes one disk of a multi-part archive. In the* * latter case the central directory and zipfile comment will be found on* * the last disk(s) of this archive.* unzip: cannot find zipfile directory in one of sbt-launch-0.13.7.jar or sbt-launch-0.13.7.jar.zip, and cannot find sbt-launch-0.13.7.jar.ZIP, period. inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia 162 Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia *162 *Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ cat sbt-launch-0.13.7.jar *html* *headtitle404 Not Found/title/head* *body bgcolor=white* *centerh1404 Not Found/h1/center* *hrcenternginx/center* */body* */html*
Re: tachyon
Thanks Calvin - much appreciated ! -Abhishek- On Aug 7, 2015, at 11:11 AM, Calvin Jia jia.cal...@gmail.com wrote: Hi Abhishek, Here's a production use case that may interest you: http://www.meetup.com/Tachyon/events/222485713/ Baidu is using Tachyon to manage more than 100 nodes in production resulting in a 30x performance improvement for their SparkSQL workload. They are also using the tiered storage feature in Tachyon giving them over 2PB of Tachyon managed space. Hope this helps, Calvin On Fri, Aug 7, 2015 at 10:00 AM, Ted Yu yuzhih...@gmail.com wrote: Looks like you would get better response on Tachyon's mailing list: https://groups.google.com/forum/?fromgroups#!forum/tachyon-users Cheers On Fri, Aug 7, 2015 at 9:56 AM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: Do people use Tachyon in production, or is it experimental grade still? Regards, Abhishek - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark config
That's the correct URL. Recent change? The last time I looked, earlier this week, it still had the obsolete artifactory URL for URL1 ;) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Aug 7, 2015 at 5:19 PM, Ted Yu yuzhih...@gmail.com wrote: In master branch, build/sbt-launch-lib.bash has the following: URL1= https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar I verified that the following exists: https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/#sbt-launch.jar FYI On Fri, Aug 7, 2015 at 2:08 PM, Bryce Lobdell lobde...@gmail.com wrote: I Recently downloaded spark package 1.4.0: A build of Spark with sbt/sbt clean assembly failed with message Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar Upon investigation I figured out that sbt-launch-0.13.7.jar is downloaded at build time and that it contained the the following: html headtitle404 Not Found/title/head body bgcolor=white centerh1404 Not Found/h1/center hrcenternginx/center /body /html which is an HTML error message to the effect that the file is missing (from the web server). The script sbt-launch-lib.bash contains the following lines which determine where the file sbt-launch.jar is downloaded from: acquire_sbt_jar () { SBT_VERSION=`awk -F = '/sbt\.version/ {print $2}' ./project/build.properties` URL1= http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar URL2= http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar JAR=build/sbt-launch-${SBT_VERSION}.jar The script sbt-launch.bash downloads $URL1 first, and incorrectly concludes that it succeeded on the basis that the file sbt-launch-0.13.7.jar exists (though it contains HTML). I succeeded in building Spark by: (1) Downloading the file sbt-launch-0.13.7.jar from $URL2 and placing it in the build directory. (2) Modifying sbt-launch-lib.bash to prevent the download of that file. (3) Restarting the download as I usually would, with SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly I think a lot of people will be confused by this. Probably someone should do some of the following: (1) Delete $URL1 and all references, or replace it with the correct/current URL which points to the sbt-launch.jar(s). (2) Modify sbt-launch-lib.bash, so that it will not conclude that the download of sbt-launch.jar succeeded, when the data returned is an HTML error message. Let me know if this is not clear, I will gladly explain in more detail or with more clarity, if needed. -Bryce Lobdell A transcript of my console is below: @ip-xx-xxx-xx-xxx:~/spark/spark-1.4.0$ SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly NOTE: The sbt/sbt script has been relocated to build/sbt. Please update references to point to the new location. Invoking 'build/sbt clean assembly' now ... Using /usr/lib/jvm/java-7-openjdk-amd64/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar *Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar* inquidia@ip-10-102-69-107:~/spark/spark-1.4.0$ cd build/ inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash *inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ unzip -l sbt-launch-0.13.7.jar* *Archive: sbt-launch-0.13.7.jar* * End-of-central-directory signature not found. Either this file is not* * a zipfile, or it constitutes one disk of a multi-part archive. In the* * latter case the central directory and zipfile comment will be found on* * the last disk(s) of this archive.* unzip: cannot find zipfile directory in one of sbt-launch-0.13.7.jar or sbt-launch-0.13.7.jar.zip, and cannot find sbt-launch-0.13.7.jar.ZIP, period. inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia 162 Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia *162 *Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash
Re: spark config
Looks like Sean fixed it: [SPARK-9633] [BUILD] SBT download locations outdated; need an update Cheers On Fri, Aug 7, 2015 at 3:22 PM, Dean Wampler deanwamp...@gmail.com wrote: That's the correct URL. Recent change? The last time I looked, earlier this week, it still had the obsolete artifactory URL for URL1 ;) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Aug 7, 2015 at 5:19 PM, Ted Yu yuzhih...@gmail.com wrote: In master branch, build/sbt-launch-lib.bash has the following: URL1= https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar I verified that the following exists: https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/#sbt-launch.jar FYI On Fri, Aug 7, 2015 at 2:08 PM, Bryce Lobdell lobde...@gmail.com wrote: I Recently downloaded spark package 1.4.0: A build of Spark with sbt/sbt clean assembly failed with message Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar Upon investigation I figured out that sbt-launch-0.13.7.jar is downloaded at build time and that it contained the the following: html headtitle404 Not Found/title/head body bgcolor=white centerh1404 Not Found/h1/center hrcenternginx/center /body /html which is an HTML error message to the effect that the file is missing (from the web server). The script sbt-launch-lib.bash contains the following lines which determine where the file sbt-launch.jar is downloaded from: acquire_sbt_jar () { SBT_VERSION=`awk -F = '/sbt\.version/ {print $2}' ./project/build.properties` URL1= http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar URL2= http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar JAR=build/sbt-launch-${SBT_VERSION}.jar The script sbt-launch.bash downloads $URL1 first, and incorrectly concludes that it succeeded on the basis that the file sbt-launch-0.13.7.jar exists (though it contains HTML). I succeeded in building Spark by: (1) Downloading the file sbt-launch-0.13.7.jar from $URL2 and placing it in the build directory. (2) Modifying sbt-launch-lib.bash to prevent the download of that file. (3) Restarting the download as I usually would, with SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly I think a lot of people will be confused by this. Probably someone should do some of the following: (1) Delete $URL1 and all references, or replace it with the correct/current URL which points to the sbt-launch.jar(s). (2) Modify sbt-launch-lib.bash, so that it will not conclude that the download of sbt-launch.jar succeeded, when the data returned is an HTML error message. Let me know if this is not clear, I will gladly explain in more detail or with more clarity, if needed. -Bryce Lobdell A transcript of my console is below: @ip-xx-xxx-xx-xxx:~/spark/spark-1.4.0$ SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly NOTE: The sbt/sbt script has been relocated to build/sbt. Please update references to point to the new location. Invoking 'build/sbt clean assembly' now ... Using /usr/lib/jvm/java-7-openjdk-amd64/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar *Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar* inquidia@ip-10-102-69-107:~/spark/spark-1.4.0$ cd build/ inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash *inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ unzip -l sbt-launch-0.13.7.jar* *Archive: sbt-launch-0.13.7.jar* * End-of-central-directory signature not found. Either this file is not* * a zipfile, or it constitutes one disk of a multi-part archive. In the* * latter case the central directory and zipfile comment will be found on* * the last disk(s) of this archive.* unzip: cannot find zipfile directory in one of sbt-launch-0.13.7.jar or sbt-launch-0.13.7.jar.zip, and cannot find sbt-launch-0.13.7.jar.ZIP, period. inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia 162 Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395
Accessing S3 files with s3n://
Hi, I've been trying to track down some problems with Spark reads being very slow with s3n:// URIs (NativeS3FileSystem). After some digging around, I realized that this file system implementation fetches the entire file, which isn't really a Spark problem, but it really slows down things when trying to just read headers from a Parquet file or just creating partitions in the RDD. Is this something that others have observed before, or am I doing something wrong? Thanks, Akshat
How to get total CPU consumption for Spark job
Hi all, I was running some Hive/spark job on hadoop cluster. I want to see how spark helps improve not only the elapsed time but also the total CPU consumption. For Hive, I can get the 'Total MapReduce CPU Time Spent' from the log when the job finishes. But I didn't find any CPU stats for Spark jobs from either spark log or web UI. Is there any place I can find the total CPU consumption for my spark job? Thanks! Here is the version info: Spark version 1.3.0 Using Scala version 2.10.4, Java 1.7.0_67 Thanks!Xiao
Spark failed while trying to read parquet files
Hi all, I have a partitioned parquet table (very small table with only 2 partitions). The version of spark is 1.4.1, parquet version is 1.7.0. I applied this patch to spark [SPARK-7743] so I assume that spark can read parquet files normally, however, I'm getting this when trying to do a simple `select count(*) from table`, ```org.apache.spark.SparkException: Job aborted due to stage failure: Task 29 in stage 44.0 failed 15 times, most recent failure: Lost task 29.14 in stage 44.0: java.lang.NullPointerException at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249) at parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155) at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138) at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)``` Has anybody seen this before? Thanks
Re: Spark failed while trying to read parquet files
Yes, NullPointerExceptions are pretty common in Spark (or, rather, I seem to encounter them a lot!) but can occur for a few different reasons. Could you add some more detail, like what the schema is for the data, or the code you're using to read it? On Fri, Aug 7, 2015 at 3:20 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I have a partitioned parquet table (very small table with only 2 partitions). The version of spark is 1.4.1, parquet version is 1.7.0. I applied this patch to spark [SPARK-7743] so I assume that spark can read parquet files normally, however, I'm getting this when trying to do a simple `select count(*) from table`, ```org.apache.spark.SparkException: Job aborted due to stage failure: Task 29 in stage 44.0 failed 15 times, most recent failure: Lost task 29.14 in stage 44.0: java.lang.NullPointerException at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249) at parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155) at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138) at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)``` Has anybody seen this before? Thanks
Re: Spark master driver UI: How to keep it after process finished?
Hi, all spark processes are saved in the Spark History Server look at your host on port 18080 instead of 4040 François Le 2015-08-07 15:26, saif.a.ell...@wellsfargo.com a écrit : Hi, A silly question here. The Driver Web UI dies when the spark-submit program finish. I would like some time to analyze after the program ends, as the page does not refresh it self, when I hit F5 I lose all the info. Thanks, Saif
RE: Spark master driver UI: How to keep it after process finished?
Hello, thank you, but that port is unreachable for me. Can you please share where can I find that port equivalent in my environment? Thank you Saif From: François Pelletier [mailto:newslett...@francoispelletier.org] Sent: Friday, August 07, 2015 4:38 PM To: user@spark.apache.org Subject: Re: Spark master driver UI: How to keep it after process finished? Hi, all spark processes are saved in the Spark History Server look at your host on port 18080 instead of 4040 François Le 2015-08-07 15:26, saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com a écrit : Hi, A silly question here. The Driver Web UI dies when the spark-submit program finish. I would like some time to analyze after the program ends, as the page does not refresh it self, when I hit F5 I lose all the info. Thanks, Saif
Re: Spark job workflow engine recommendations
Hien, Is Azkaban being phased out at linkedin as rumored? If so, what's linkedin going to use for workflow scheduling? Is there something else that's going to replace Azkaban? On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu yuzhih...@gmail.com wrote: In my opinion, choosing some particular project among its peers should leave enough room for future growth (which may come faster than you initially think). Cheers On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu h...@linkedin.com wrote: Scalability is a known issue due the the current architecture. However this will be applicable if you run more 20K jobs per day. On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu yuzhih...@gmail.com wrote: From what I heard (an ex-coworker who is Oozie committer), Azkaban is being phased out at LinkedIn because of scalability issues (though UI-wise, Azkaban seems better). Vikram: I suggest you do more research in related projects (maybe using their mailing lists). Disclaimer: I don't work for LinkedIn. On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hi Vikram, We use Azkaban (2.5.0) in our production workflow scheduling. We just use local mode deployment and it is fairly easy to set up. It is pretty easy to use and has a nice scheduling and logging interface, as well as SLAs (like kill job and notify if it doesn't complete in 3 hours or whatever). However Spark support is not present directly - we run everything with shell scripts and spark-submit. There is a plugin interface where one could create a Spark plugin, but I found it very cumbersome when I did investigate and didn't have the time to work through it to develop that. It has some quirks and while there is actually a REST API for adding jos and dynamically scheduling jobs, it is not documented anywhere so you kinda have to figure it out for yourself. But in terms of ease of use I found it way better than Oozie. I haven't tried Chronos, and it seemed quite involved to set up. Haven't tried Luigi either. Spark job server is good but as you say lacks some stuff like scheduling and DAG type workflows (independent of spark-defined job flows). On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke jornfra...@gmail.com wrote: Check also falcon in combination with oozie Le ven. 7 août 2015 à 17:51, Hien Luu h...@linkedin.com.invalid a écrit : Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
Re: All masters are unresponsive! Giving up.
There seems to be a version mismatch somewhere. You can try and find out the cause with debug serialization information. I think the jvm flag -Dsun.io.serialization.extendedDebugInfo=true should help. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ http://in.linkedin.com/in/sonalgoyal On Fri, Aug 7, 2015 at 4:42 AM, Jeff Jones jjo...@adaptivebiotech.com wrote: I wrote a very simple Spark 1.4.1 app that I can run through a local driver program just fine using setMaster(“local[*]”). The app is as follows: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD object Hello { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(Simple Application).setMaster(local[*]) val sc = new SparkContext(conf) val data:RDD[Int] = sc.parallelize(Seq(1,2,12,34,2354,123,100), 2) println(Max: + data.max) println(Min: + data.min) } } I compile this using the following build.sbt which will pull the needed Spark libraries for me. name := SparkyJeff version := 1.0 scalaVersion := 2.11.6 // Change this to another test framework if you prefer libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.4.1, org.apache.spark %% spark-sql % 1.4.1) // Uncomment to use Akka //libraryDependencies += com.typesafe.akka %% akka-actor % 2.3.11 fork := true Now I’m trying to run this against a standalone cluster by changing the setMaster(“local[*]”) to setMaster(“spark://p3.ourdomain.com:7077”). I downloaded Spark 1.4.1 for Hadoop 2.6 or greater. Set the SPARK_MASTER_IP=” p3.ourdomain.com”, SPARK_WORKER_CORES=”1000”,SPARK_WORKER_MEMORY=”500g” and then started the cluster using run-all.sh. The cluster appears to start fine. I can hit cluster UI at p3.ourdomain.com:8080 and see the same master URL as mentioned above. Now when I run my little app I get the following client error: … [error] 15/08/05 16:03:40 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master... [error] 15/08/05 16:03:40 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. [error] 15/08/05 16:04:00 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master... [error] 15/08/05 16:04:00 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. [error] 15/08/05 16:04:20 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master... [error] 15/08/05 16:04:20 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. [error] 15/08/05 16:04:40 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. … Looking into the master logs I find: 15/08/06 22:52:28 INFO Master: akka.tcp://sparkDriver@192.168.137.41:48877 got disassociated, removing it. 15/08/06 22:52:46 ERROR Remoting: org.apache.spark.deploy.Command; local class incompatible: stream classdesc serialVersionUID = -7098307370860582211, local class serialVersionUID = -3335312719467547622 java.io.InvalidClassException: org.apache.spark.deploy.Command; local class incompatible: stream classdesc serialVersionUID = -7098307370860582211, local class serialVersionUID = -3335312719467547622 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
miniBatchFraction for LinearRegressionWithSGD
hi, if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0, doesn’t that make it a deterministic/classical gradient descent rather than a SGD? Specifically, miniBatchFraction=1.0 means the entire data set, i.e. all rows. In the spirit of SGD, shouldn’t the default be the fraction that results in exactly one row of the data set? thank you gerald -- Gerald Loeffler mailto:gerald.loeff...@googlemail.com http://www.gerald-loeffler.net - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
However, it's weird that the partition discovery job only spawns 2 tasks. It should use the default parallelism, which is probably 8 according to the logs of the next Parquet reading job. Partition discovery is already done in a distributed manner via a Spark job. But the parallelism is mysteriously low... Cheng On 8/7/15 3:32 PM, Cheng Lian wrote: Hi Philip, Thanks for providing the log file. It seems that most of the time are spent on partition discovery. The code snippet you provided actually issues two jobs. The first one is for listing the input directories to find out all leaf directories (and this actually requires listing all leaf files, because we can only assert that a directory is a leaf one when it contains no sub-directories). Then partition information is extracted from leaf directory paths. This process starts at: 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in parallel under: file:/home/pweaver/work/parquet/day=20150225, … and ends at: 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool The actual tasks execution time is about 36s: 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes) … 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2) You mentioned that your dataset has about 40,000+ partitions, so there are a lot of leaf directories and files out there. My guess is that the local file system spent lots of time listing FileStatus-es of all these files. I also noticed that Mesos job scheduling takes more time then expected. It is probably because this is the first Spark job executed in the application, and the system is not warmed up yet. For example, there’s a 6s gap between these two adjacent lines: 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_RUNNING The 2nd Spark job is the real Parquet reading job, and this one actually finishes pretty quickly, only 3s (note that the Mesos job scheduling latency is also included): 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 8 output partitions … 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes) … 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8) 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8) 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8) That might be the reason why you observed that the C parquet library you mentioned (is it parquet-cpp?) is an order of magnitude faster? Cheng On 8/7/15 2:02 AM, Philip Weaver wrote: With DEBUG, the log output was over 10MB, so I opted for just INFO output. The (sanitized) log is attached. The driver is essentially this code: info(A) val t = System.currentTimeMillis val df = sqlContext.read.parquet(dir).select(...).cache val elapsed = System.currentTimeMillis - t info(sInit time: ${elapsed} ms) We've also observed that it is very slow to read the contents of the parquet files. My colleague wrote a PySpark application that gets the list of files, parallelizes it, maps across it and reads each file manually using a C parquet library, and aggregates manually in the loop. Ignoring the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame query in Scala, his is an order of magnitude faster. Since he is parallelizing the work through Spark, and that isn't causing any performance issues, it seems to be a problem with the parquet reader. I may try to do what he did to construct a DataFrame manually, and see if I can query it with Spark SQL with reasonable performance. - Philip On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Would you mind to provide the driver log? On 8/6/15 3:58 PM, Philip Weaver wrote: I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The initialization time is about 1 minute now, which is still pretty terrible. On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com wrote: Absolutely, thanks! On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote: We've fixed
JavsSparkContext causes hadoop.ipc.RemoteException error
HI, I'm a new spark user,nowdays,I meet a wired erron happeded in our cluster. I depoly spark-1.3.1 and cdh5 on my cluster,weeks ago ,I depoly namenode HA on it. After that , my spark job meet error when I use JAVA-API,like this: http://apache-spark-user-list.1001560.n3.nabble.com/file/n24164/RKW%40X%25S9OF%24SQ3CUA_CWS%5BA.png this error only happen when I use JAVA-API and the job returns normal after some days. Can anyone help me to deal this problem?thks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavsSparkContext-causes-hadoop-ipc-RemoteException-error-tp24164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why use spark.history.fs.logDirectory instead of spark.eventLog.dir
Is there any reason that historyserver use another property for the event log dir ? Thanks
DataFrame column structure change
Hi all, I want to have some nesting structure from the existing columns of the dataframe. For that,,I am trying to transform a DF in the following way,but couldn't do it. scala df.printSchema root |-- a: string (nullable = true) |-- b: string (nullable = true) |-- c: string (nullable = true) |-- d: string (nullable = true) |-- e: string (nullable = true) |-- f: string (nullable = true) *To* scala newDF.printSchema root |-- a: string (nullable = true) |-- b: string (nullable = true) |-- c: string (nullable = true) |-- newCol: struct (nullable = true) ||-- d: string (nullable = true) ||-- e: string (nullable = true) help me. Regards, Rishabh.
StringIndexer + VectorAssembler equivalent to HashingTF?
Is StringIndexer + VectorAssembler equivalent to HashingTF while converting the document for analysis?
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
Hi Philip, Thanks for providing the log file. It seems that most of the time are spent on partition discovery. The code snippet you provided actually issues two jobs. The first one is for listing the input directories to find out all leaf directories (and this actually requires listing all leaf files, because we can only assert that a directory is a leaf one when it contains no sub-directories). Then partition information is extracted from leaf directory paths. This process starts at: 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in parallel under: file:/home/pweaver/work/parquet/day=20150225, … and ends at: 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool The actual tasks execution time is about 36s: 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes) … 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2) You mentioned that your dataset has about 40,000+ partitions, so there are a lot of leaf directories and files out there. My guess is that the local file system spent lots of time listing FileStatus-es of all these files. I also noticed that Mesos job scheduling takes more time then expected. It is probably because this is the first Spark job executed in the application, and the system is not warmed up yet. For example, there’s a 6s gap between these two adjacent lines: 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_RUNNING The 2nd Spark job is the real Parquet reading job, and this one actually finishes pretty quickly, only 3s (note that the Mesos job scheduling latency is also included): 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 8 output partitions … 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes) … 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8) 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8) 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8) That might be the reason why you observed that the C parquet library you mentioned (is it parquet-cpp?) is an order of magnitude faster? Cheng On 8/7/15 2:02 AM, Philip Weaver wrote: With DEBUG, the log output was over 10MB, so I opted for just INFO output. The (sanitized) log is attached. The driver is essentially this code: info(A) val t = System.currentTimeMillis val df = sqlContext.read.parquet(dir).select(...).cache val elapsed = System.currentTimeMillis - t info(sInit time: ${elapsed} ms) We've also observed that it is very slow to read the contents of the parquet files. My colleague wrote a PySpark application that gets the list of files, parallelizes it, maps across it and reads each file manually using a C parquet library, and aggregates manually in the loop. Ignoring the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame query in Scala, his is an order of magnitude faster. Since he is parallelizing the work through Spark, and that isn't causing any performance issues, it seems to be a problem with the parquet reader. I may try to do what he did to construct a DataFrame manually, and see if I can query it with Spark SQL with reasonable performance. - Philip On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Would you mind to provide the driver log? On 8/6/15 3:58 PM, Philip Weaver wrote: I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The initialization time is about 1 minute now, which is still pretty terrible. On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com mailto:philip.wea...@gmail.com wrote: Absolutely, thanks! On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396 Could you give it a shot to see whether it helps in your case? We've observed ~50x performance boost with schema merging turned on. Cheng On 8/6/15 8:26 AM, Philip Weaver wrote: I
Spark on YARN
Hi, I am running spark on YARN on the CDH5.3.2 stack. I have created a new user to own and run a testing environment, however when using this user applications I submit to yarn never begin to run, even if they are the exact same application that is successful with another user? Has anyone seen anything like this before? Thanks, Jem
Re: SparkR Supported Types - Please add bigint
They are actually the same thing, LongType. `long` is friendly for developer, `bigint` is friendly for database guy, maybe data scientists. On Thu, Jul 23, 2015 at 11:33 PM, Sun, Rui rui@intel.com wrote: printSchema calls StructField. buildFormattedString() to output schema information. buildFormattedString() use DataType.typeName as string representation of the data type. LongType. typeName = long LongType.simpleString = bigint I am not sure about the difference of these two type name representations. -Original Message- From: Exie [mailto:tfind...@prodevelop.com.au] Sent: Friday, July 24, 2015 1:35 PM To: user@spark.apache.org Subject: Re: SparkR Supported Types - Please add bigint Interestingly, after more digging, df.printSchema() in raw spark shows the columns as a long, not a bigint. root |-- localEventDtTm: timestamp (nullable = true) |-- asset: string (nullable = true) |-- assetCategory: string (nullable = true) |-- assetType: string (nullable = true) |-- event: string (nullable = true) |-- extras: array (nullable = true) ||-- element: struct (containsNull = true) |||-- name: string (nullable = true) |||-- value: string (nullable = true) |-- ipAddress: string (nullable = true) |-- memberId: string (nullable = true) |-- system: string (nullable = true) |-- timestamp: long (nullable = true) |-- title: string (nullable = true) |-- trackingId: string (nullable = true) |-- version: long (nullable = true) I'm going to have to keep digging I guess. :( -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Supported-Types-Please-add-bigint-tp23975p23978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to distribute non-serializable object in transform task or broadcast ?
Is there any workaround to distribute non-serializable object for RDD transformation or broadcast variable ? Say I have an object of class C which is not serializable. Class C is in a jar package, I have no control on it. Now I need to distribute it either by rdd transformation or by broadcast. I tried to subclass the class C with Serializable interface. It works for serialization, but deserialization does not work, since there are no parameter-less constructor for the class C and deserialization is broken with an invalid constructor exception. I think it's a common use case. Any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France
RE: Specifying the role when launching an AWS spark cluster using spark_ec2
You'll have a lot less hassle using the AWS EMR instances with Spark 1.4.1 for now, until the spark_ec2.py scripts move to Hadoop 2.7.1, at the moment I'm pretty sure it's only using Hadoop 2.4 The EMR setup with Spark lets you use s3:// URIs with IAM roles Ewan -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: 06 August 2015 18:27 To: user@spark.apache.org Subject: Specifying the role when launching an AWS spark cluster using spark_ec2 Hi, I need to access data on S3 from another account and I have been given the IAM role information to access that S3 bucket. From what I understand, AWS allows us to attach a role to a resource at the time it is created. However, I don't see an option for specifying the role using the spark_ec2.py script. So I created a spark cluster using the default role, but I was not able to change its IAM role after creation through AWS console. I see a ticket for this issue: https://github.com/apache/spark/pull/6962 and the status is closed. If anyone knows how I can specify the role using spark_ec2.py, please let me know. I am using spark 1.4.1. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-the-role-when-launching-an-AWS-spark-cluster-using-spark-ec2-tp24154.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to binarize data in spark
I have ended up with the following piece of code but is turns out to be really slow... Any other ideas provided that I can only use MLlib 1.2? val data = test11.map(x= ((x(0) , x(1)) , x(2))).groupByKey().map(x= (x._1 , x._2.toArray)).map{x= var lt : Array[Double] = new Array[Double](test12.size) val id = x._1._1 val cl = x._1._2 val dt = x._2 var i = -1 test12.foreach{y = i += 1; lt(i) = if(dt contains y) 1.0 else 0.0} val vs = Vectors.dense(lt) (id , cl , vs) } *// Adamantios* On Fri, Aug 7, 2015 at 8:36 AM, Yanbo Liang yblia...@gmail.com wrote: I think you want to flatten the 1M products to a vector of 1M elements, of course mostly are zero. It looks like HashingTF https://spark.apache.org/docs/latest/ml-features.html#tf-idf-hashingtf-and-idf can help you. 2015-08-07 11:02 GMT+08:00 praveen S mylogi...@gmail.com: Use StringIndexer in MLib1.4 : https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/ml/feature/StringIndexer.html On Thu, Aug 6, 2015 at 8:49 PM, Adamantios Corais adamantios.cor...@gmail.com wrote: I have a set of data based on which I want to create a classification model. Each row has the following form: user1,class1,product1 user1,class1,product2 user1,class1,product5 user2,class1,product2 user2,class1,product5 user3,class2,product1 etc There are about 1M users, 2 classes, and 1M products. What I would like to do next is create the sparse vectors (something already supported by MLlib) BUT in order to apply that function I have to create the dense vectors (with the 0s), first. In other words, I have to binarize my data. What's the easiest (or most elegant) way of doing that? *// Adamantios*
Spark streaming and session windows
Hi all, I am trying to figure out how to perform equivalent of Session windows (as mentioned in https://cloud.google.com/dataflow/model/windowing) using spark streaming. Is it even possible (i.e. possible to do efficiently at scale). Just to expand on the definition: Taken from the google dataflow documentation: The simplest kind of session windowing specifies a minimum gap duration. All data arriving below a minimum threshold of time delay is grouped into the same window. If data arrives after the minimum specified gap duration time, this initiates the start of a new window. Any help would be appreciated. -- Ankur Chauhan signature.asc Description: Message signed with OpenPGP using GPGMail
Re: StringIndexer + VectorAssembler equivalent to HashingTF?
No, here's an example: COL1 COL2 a one b two a two c three StringIndexer.setInputCol(COL1).setOutputCol(SI1) - (0- a, 1-b,2-c) SI1 0 1 0 2 StringIndexer.setInputCol(COL2).setOutputCol(SI2) - (0- one, 1-two, 2-three) SI1 0 1 1 2 VectorAssembler.setInputCols(SI1, SI2).setOutputCol(features) - features 00 11 01 22 HashingTF.setNumFeatures(2).setInputCol(COL1).setOutputCol(HT1) bucket1 bucket2 a,a,b c HT1 3 //Hash collision 3 3 1 Thanks, Peter Rudenko On 2015-08-07 09:55, praveen S wrote: Is StringIndexer + VectorAssembler equivalent to HashingTF while converting the document for analysis? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkR -Graphx Connected components
Hi I was trying to use stronglyconnectcomponents () Given a DAG is graph I was supposed to get back list of stronglyconnected l comps . def main(args: Array[String]) { val vertexArray = Array( (1L, (Alice, 28)), (2L, (Bob, 27)), (3L, (Charlie, 65)), (4L, (David, 42)), (5L, (Ed, 55)), (6L, (Fran, 50)) ) val edgeArray = Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) ) val sc = new SparkContext(local, readLoCSH, 127.0.0.1) val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray) val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD) val strong_graphs: Graph[VertexId, Int]= graph.stronglyConnectedComponents(10). help needed in completing the code.I do not know from now on how to get stronglyconnected nodes .Pls help in completing this code/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-Connected-components-tp24165.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame column structure change
I am doing it by creating a new data frame out of the fields to be nested and then join with the original DF. Looking for some optimized solution here. On Fri, Aug 7, 2015 at 2:06 PM, Rishabh Bhardwaj rbnex...@gmail.com wrote: Hi all, I want to have some nesting structure from the existing columns of the dataframe. For that,,I am trying to transform a DF in the following way,but couldn't do it. scala df.printSchema root |-- a: string (nullable = true) |-- b: string (nullable = true) |-- c: string (nullable = true) |-- d: string (nullable = true) |-- e: string (nullable = true) |-- f: string (nullable = true) *To* scala newDF.printSchema root |-- a: string (nullable = true) |-- b: string (nullable = true) |-- c: string (nullable = true) |-- newCol: struct (nullable = true) ||-- d: string (nullable = true) ||-- e: string (nullable = true) help me. Regards, Rishabh.
automatically determine cluster number
Hi I am new to spark and I need to use the clustering functionality to process large dataset. There are between 50k and 1mil objects to cluster. However the problem is that the optimal number of clusters is unknown. we cannot even estimate a range, except we know there are N objects. Previously on small dataset I was using R and R's package on calinski and harabasz to automatically determine cluster number. But with that amount of data R simply breaks. So I wonder if spark has implemented any algorithms to automatically determine the cluster number? Many thanks!! -- Ziqi Zhang Research Associate Department of Computer Science University of Sheffield --- This email has been checked for viruses by Avast antivirus software. http://www.avast.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Insert operation in Dataframe
Hi all , Is the Dataframe support the insert operation , like sqlContext.sql(insert into table1 xxx select xxx from table2) ? guoqing0...@yahoo.com.hk
Re: How to distribute non-serializable object in transform task or broadcast ?
If the object cannot be serialized, then I don't think broadcast will make it magically serializable. You can't transfer data structures between nodes without serializing them somehow. On Fri, Aug 7, 2015 at 7:31 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Hao, I think sc.broadcast will allow you to broadcast non-serializable objects. According to the scaladocs the Broadcast class itself is Serializable and it wraps your object, allowing you to get it from the Broadcast object using value(). Not 100% sure though since I haven't tried broadcasting custom objects but maybe worth trying unless you have already and failed. -sujit On Fri, Aug 7, 2015 at 2:39 AM, Hao Ren inv...@gmail.com wrote: Is there any workaround to distribute non-serializable object for RDD transformation or broadcast variable ? Say I have an object of class C which is not serializable. Class C is in a jar package, I have no control on it. Now I need to distribute it either by rdd transformation or by broadcast. I tried to subclass the class C with Serializable interface. It works for serialization, but deserialization does not work, since there are no parameter-less constructor for the class C and deserialization is broken with an invalid constructor exception. I think it's a common use case. Any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: How to distribute non-serializable object in transform task or broadcast ?
If the object is something like an utility object (say a DB connection handler), I often use: @transient lazy val someObj = MyFactory.getObj(...) So basically `@transient` tell the closure cleaner don't serialize this, and the `lazy val` allows it to be initiated on each executor upon its first usage (since the class is in your jar so executor should be able to instantiate it). 2015-08-07 17:20 GMT+02:00 Philip Weaver philip.wea...@gmail.com: If the object cannot be serialized, then I don't think broadcast will make it magically serializable. You can't transfer data structures between nodes without serializing them somehow. On Fri, Aug 7, 2015 at 7:31 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Hao, I think sc.broadcast will allow you to broadcast non-serializable objects. According to the scaladocs the Broadcast class itself is Serializable and it wraps your object, allowing you to get it from the Broadcast object using value(). Not 100% sure though since I haven't tried broadcasting custom objects but maybe worth trying unless you have already and failed. -sujit On Fri, Aug 7, 2015 at 2:39 AM, Hao Ren inv...@gmail.com wrote: Is there any workaround to distribute non-serializable object for RDD transformation or broadcast variable ? Say I have an object of class C which is not serializable. Class C is in a jar package, I have no control on it. Now I need to distribute it either by rdd transformation or by broadcast. I tried to subclass the class C with Serializable interface. It works for serialization, but deserialization does not work, since there are no parameter-less constructor for the class C and deserialization is broken with an invalid constructor exception. I think it's a common use case. Any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France -- *JU Han* Software Engineer @ Teads.tv +33 061960
Re: Newbie question: what makes Spark run faster than MapReduce
This blog outlines a few things that make Spark faster than MapReduce - https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html On Fri, Aug 7, 2015 at 9:13 AM, Muler mulugeta.abe...@gmail.com wrote: Consider the classic word count application over a 4 node cluster with a sizable working data. What makes Spark ran faster than MapReduce considering that Spark also has to write to disk during shuffle?
Re: log4j.xml bundled in jar vs log4.properties in spark/conf
See post for detailed explanation of you problem: http://apache-spark-user-list.1001560.n3.nabble.com/log4j-custom-appender-ClassNotFoundException-with-spark-1-4-1-tt24159.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log4j-xml-bundled-in-jar-vs-log4-properties-in-spark-conf-tp23923p24173.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
distributing large matrices
Is this the sort of problem spark can accommodate? I need to compare 10,000 matrices with each other (10^10 comparison). The matrices are 100x10 (10^7 int values). I have 10 machines with 2 to 8 cores (8-32 processors). All machines have to - contribute to matrices generation (a simulation, takes seconds) - see all matrices - compare matrices (takes very little time compared to simulation) I expect to persist the simulations, have spark push them to processors. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distributing-large-matrices-tp24174.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Newbie question: what makes Spark run faster than MapReduce
1) Spark only needs to shuffle when data needs to be partitioned around the workers in an all-to-all fashion. 2) Multi-stage jobs that would normally require several map reduce jobs, thus causing data to be dumped to disk between the jobs can be cached in memory.
Spark job workflow engine recommendations
Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
Re: How to distribute non-serializable object in transform task or broadcast ?
Hao, I’d say there are few possible ways to achieve that: 1. Use KryoSerializer. The flaw of KryoSerializer is that current version (2.21) has an issue with internal state and it might not work for some objects. Spark get kryo dependency as transitive through chill and it’ll not be resolved quickly. Kryo doesn’t work for me (I have such an classes I have to transfer, but do not have their codebase). 2. Wrap it into something you have control and make that something serializable. The flaw is kind of obvious - it’s really hard to write serialization for complex objects. 3. Tricky algo: don’t do anything that might end up as reshuffle. That’s the way I took. The flow is that we have CSV file as input, parse it and create objects that we cannot serialize / deserialize, thus cannot transfer over the network. Currently we’ve workarounded it so that these objects processed only in those partitions where thye’ve been born. Hope, this helps. On 07 Aug 2015, at 12:39, Hao Ren inv...@gmail.com wrote: Is there any workaround to distribute non-serializable object for RDD transformation or broadcast variable ? Say I have an object of class C which is not serializable. Class C is in a jar package, I have no control on it. Now I need to distribute it either by rdd transformation or by broadcast. I tried to subclass the class C with Serializable interface. It works for serialization, but deserialization does not work, since there are no parameter-less constructor for the class C and deserialization is broken with an invalid constructor exception. I think it's a common use case. Any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France Eugene Morozov fathers...@list.ru
Re: Spark job workflow engine recommendations
Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
Newbie question: what makes Spark run faster than MapReduce
Consider the classic word count application over a 4 node cluster with a sizable working data. What makes Spark ran faster than MapReduce considering that Spark also has to write to disk during shuffle?
Re: Amazon DynamoDB Spark
In general the simplest way is that you can use the Dynamo Java API as is and call it inside a map(), and use the asynchronous put() Dynamo api call . On Aug 7, 2015, at 9:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, Is there a way using DynamoDB in spark application? I have to persist my results to DynamoDB. Thanx, yasemin -- hiç ender hiç - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
Thanks, I also confirmed that the partition discovery is slow by writing a non-Spark application that uses the parquet library directly to load that partitions. It's so slow that my colleague's Python application can read the entire contents of all the parquet data files faster than my application can even discover the partitions! On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian lian.cs@gmail.com wrote: However, it's weird that the partition discovery job only spawns 2 tasks. It should use the default parallelism, which is probably 8 according to the logs of the next Parquet reading job. Partition discovery is already done in a distributed manner via a Spark job. But the parallelism is mysteriously low... Cheng On 8/7/15 3:32 PM, Cheng Lian wrote: Hi Philip, Thanks for providing the log file. It seems that most of the time are spent on partition discovery. The code snippet you provided actually issues two jobs. The first one is for listing the input directories to find out all leaf directories (and this actually requires listing all leaf files, because we can only assert that a directory is a leaf one when it contains no sub-directories). Then partition information is extracted from leaf directory paths. This process starts at: 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in parallel under: file:/home/pweaver/work/parquet/day=20150225, … and ends at: 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool The actual tasks execution time is about 36s: 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes) … 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2) You mentioned that your dataset has about 40,000+ partitions, so there are a lot of leaf directories and files out there. My guess is that the local file system spent lots of time listing FileStatus-es of all these files. I also noticed that Mesos job scheduling takes more time then expected. It is probably because this is the first Spark job executed in the application, and the system is not warmed up yet. For example, there’s a 6s gap between these two adjacent lines: 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_RUNNING The 2nd Spark job is the real Parquet reading job, and this one actually finishes pretty quickly, only 3s (note that the Mesos job scheduling latency is also included): 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 8 output partitions … 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes) … 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8) 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8) 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8) That might be the reason why you observed that the C parquet library you mentioned (is it parquet-cpp?) is an order of magnitude faster? Cheng On 8/7/15 2:02 AM, Philip Weaver wrote: With DEBUG, the log output was over 10MB, so I opted for just INFO output. The (sanitized) log is attached. The driver is essentially this code: info(A) val t = System.currentTimeMillis val df = sqlContext.read.parquet(dir).select(...).cache val elapsed = System.currentTimeMillis - t info(sInit time: ${elapsed} ms) We've also observed that it is very slow to read the contents of the parquet files. My colleague wrote a PySpark application that gets the list of files, parallelizes it, maps across it and reads each file manually using a C parquet library, and aggregates manually in the loop. Ignoring the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame query in Scala, his is an order of magnitude faster. Since he is parallelizing the work through Spark, and that isn't causing any performance issues, it seems to be a problem with the parquet reader. I may try to do what he did to construct a DataFrame manually, and see if I can query it with Spark SQL with reasonable performance. - Philip On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian lian.cs@gmail.com lian.cs@gmail.com wrote: Would you mind to provide the driver log? On 8/6/15 3:58 PM, Philip Weaver wrote: I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The
Re: Spark job workflow engine recommendations
Thanks for the suggestion Hien. I'm curious why not azkaban from linkedin. From what I read online Oozie was very cumbersome to setup and use compared to azkaban. Since you are from linkedin wanted to get some perspective on what it lacks compared to Oozie. Ease of use is very important more than full feature set On Friday, August 7, 2015, Hien Luu h...@linkedin.com wrote: Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com javascript:_e(%7B%7D,'cvml','vikramk...@gmail.com'); wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
RE: Issue when rebroadcasting a variable outside of the definition scope
Simone, here are some thoughts. Please check out the understanding closures section of the Spark Programming Guide. Secondly, broadcast variables do not propagate updates to the underlying data. You must either create a new broadcast variable or alternately if you simply wish to accumulate results you can use an Accumulator that stores an array or queue as a buffer that you then read from to Kafka. You should also be able to send the results to a new DStream instead, and link that DStream to Kafka. Hope this gives you some ideas to play with. Thanks! Thank you, Ilya Ganelin -Original Message- From: simone.robutti [simone.robu...@gmail.commailto:simone.robu...@gmail.com] Sent: Friday, August 07, 2015 10:07 AM Eastern Standard Time To: user@spark.apache.org Subject: Issue when rebroadcasting a variable outside of the definition scope Hello everyone, this is my first message ever to a mailing list so please pardon me if for some reason I'm violating the etiquette. I have a problem with rebroadcasting a variable. How it should work is not well documented so I could find only a few and simple example to understand how it should work. What I'm trying to do is to propagate an update to the option for the behaviour of my streaming transformations (in this case, the evaluation of machine learning models). I have a listener on a kafka queue that wait for messages and update the broadcasted variable. I made it to work but the system doesn't rebroadcast anything if I pass the DStream or the broadcasted variable as a parameter. So they must be defined both in the same scope and the rebroadcasting should happen again in the same scope. Right now my main function looks like this: -- var updateVar= sc.broadcast(test) val stream=input.map(x = myTransformation(x,updateVar)) stream.writeToKafka[String, String](outputProps, (m: String) = new KeyedMessage[String, String](configuration.outputTopic, m +updateVar.value )) val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new StringDecoder())(0) for (messageAndTopic - controlStream) { println(ricevo) updateVar.unpersist() updateVar=ssc.sparkContext.broadcast(messageAndTopic.message) } ssc.start() ssc.awaitTermination() -- updateVar is correctly updated both in myTransformation and in the main scope and I can access the updated value. But when I try to do this moving the logic to a class, it fails. I have something like this (or the same queue listener from before, but moved to another class): class Listener(var updateVar: Broadcast[String]){... def someFunc()={ updateVar.unpersist() updateVar=sc.broadcast(new value) } ... } This fails: the variable can be destroyed but cannot be updated. Any suggestion on why there is this behaviour? Also I would like to know how Spark notices the reassignment to var and start the rebroadcasting. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: miniBatchFraction for LinearRegressionWithSGD
Sounds reasonable to me, feel free to create a JIRA (and PR if you're up for it) so we can see what others think! On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler gerald.loeff...@googlemail.com wrote: hi, if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0, doesn’t that make it a deterministic/classical gradient descent rather than a SGD? Specifically, miniBatchFraction=1.0 means the entire data set, i.e. all rows. In the spirit of SGD, shouldn’t the default be the fraction that results in exactly one row of the data set? thank you gerald -- Gerald Loeffler mailto:gerald.loeff...@googlemail.com http://www.gerald-loeffler.net - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: miniBatchFraction for LinearRegressionWithSGD
Yep, I think that's what Gerald is saying and they are proposing to default miniBatchFraction = (1 / numInstances). Is that correct? On Fri, Aug 7, 2015 at 11:16 AM, Meihua Wu rotationsymmetr...@gmail.com wrote: I think in the SGD algorithm, the mini batch sample is done without replacement. So with fraction=1, then all the rows will be sampled exactly once to form the miniBatch, resulting to the deterministic/classical case. On Fri, Aug 7, 2015 at 9:05 AM, Feynman Liang fli...@databricks.com wrote: Sounds reasonable to me, feel free to create a JIRA (and PR if you're up for it) so we can see what others think! On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler gerald.loeff...@googlemail.com wrote: hi, if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0, doesn’t that make it a deterministic/classical gradient descent rather than a SGD? Specifically, miniBatchFraction=1.0 means the entire data set, i.e. all rows. In the spirit of SGD, shouldn’t the default be the fraction that results in exactly one row of the data set? thank you gerald -- Gerald Loeffler mailto:gerald.loeff...@googlemail.com http://www.gerald-loeffler.net - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL query AVRO file
Hi, Spark users: We currently are using Spark 1.2.2 + Hive 0.12 + Hadoop 2.2.0 on our production cluster, which has 42 data/task nodes. There is one dataset stored as Avro files about 3T. Our business has a complex query running for the dataset, which is stored in nest structure with Array of Struct in Avro and Hive. We can query it using Hive without any problem, but we like the SparkSQL's performance, so we in fact run the same query in the Spark SQL, and found out it is in fact much faster than Hive. But when we run it, we got the following error randomly from Spark executors, sometime seriously enough to fail the whole spark job. Below the stack trace, and I think it is a bug related to Spark due to: 1) The error jumps out inconsistent, as sometimes we won't see it for this job. (We run it daily)2) Sometime it won't fail our job, as it recover after retry.3) Sometime it will fail our job, as I listed below.4) Is this due to the multithreading in Spark? The NullPointException indicates Hive got a Null ObjectInspector of the children of StructObjectInspector, as I read the Hive source code, but I know there is no null of ObjectInsepector as children of StructObjectInspector. Google this error didn't give me any hint. Does any one know anything like this? Project [HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcatWS(,,CAST(account_id#23L, StringType),CAST(gross_contact_count_a#4L, StringType),CASE WHEN IS NULL tag_cnt#21 THEN 0 ELSE CAST(tag_cnt#21, StringType),CAST(list_cnt_a#5L, StringType),CAST(active_contact_count_a#16L, StringType),CAST(other_api_contact_count_a#6L, StringType),CAST(fb_api_contact_count_a#7L, StringType),CAST(evm_contact_count_a#8L, StringType),CAST(loyalty_contact_count_a#9L, StringType),CAST(mobile_jmml_contact_count_a#10L, StringType),CAST(savelocal_contact_count_a#11L, StringType),CAST(siteowner_contact_count_a#12L, StringType),CAST(socialcamp_service_contact_count_a#13L, S...org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 1.0 failed 4 times, most recent failure: Lost task 58.3 in stage 1.0 (TID 257, 10.20.95.146): java.lang.NullPointerExceptionat org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.supportedCategories(AvroObjectInspectorGenerator.java:139) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:89) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:101) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:117) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspector(AvroObjectInspectorGenerator.java:81) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.init(AvroObjectInspectorGenerator.java:55) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:69) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:112) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:109) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
Re: Spark MLib v/s SparkR
SparkR and MLlib are becoming more integrated (we recently added R formula support) but the integration is still quite small. If you learn R and SparkR, you will not be able to leverage most of the distributed algorithms in MLlib (e.g. all the algorithms you cited). However, you could use the equivalent R implementations (e.g. glm for Logistic) but be aware that these will not scale to the large scale datasets Spark is designed to handle. On Thu, Aug 6, 2015 at 8:06 PM, praveen S mylogi...@gmail.com wrote: I am starting off with classification models, Logistic,RandomForest. Basically wanted to learn Machine learning. Since I have a java background I started off with MLib, but later heard R works as well ( with scaling issues - only). So, with SparkR was wondering the scaling issue would be resolved - hence my question why not go with R and Spark R alone.( keeping aside my inclination towards java) On Thu, Aug 6, 2015 at 12:28 AM, Charles Earl charles.ce...@gmail.com wrote: What machine learning algorithms are you interested in exploring or using? Start from there or better yet the problem you are trying to solve, and then the selection may be evident. On Wednesday, August 5, 2015, praveen S mylogi...@gmail.com wrote: I was wondering when one should go for MLib or SparkR. What is the criteria or what should be considered before choosing either of the solutions for data analysis? or What is the advantages of Spark MLib over Spark R or advantages of SparkR over MLib? -- - Charles
Spark is in-memory processing, how then can Tachyon make Spark faster?
Spark is an in-memory engine and attempts to do computation in-memory. Tachyon is memory-centeric distributed storage, OK, but how would that help ran Spark faster?
How to run start-thrift-server in debug mode?
Hi, I'm trying to run the hive thrift server in debug mode. I've tried to simply pass -Xdebug -Xrunjdwp:transport=dt_socket,address=127.0.0.1:,server=y,suspend=n to start-thriftserver.sh as a driver option, but it doesn't seem to host a server. I've then tried to edit the various shell scripts to run hive thrift server but couldn't get things to work. It seems that there must be an easier way to do this. I've also tried to run it directly in eclipse, but ran into issues related to Scala that I haven't quite yet figured out. start-thriftserver.sh --driver-java-options -agentlib:jdwp=transport=dt_socket,address=localhost:8000,server=y,suspend=n -XX:MaxPermSize=512 --master yarn://localhost:9000 --num-executors 2 jdb -attach localhost:8000 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at com.sun.tools.jdi.SocketTransportService.attach(SocketTransportService.java:222) at com.sun.tools.jdi.GenericAttachingConnector.attach(GenericAttachingConnector.java:116) at com.sun.tools.jdi.SocketAttachingConnector.attach(SocketAttachingConnector.java:90) at com.sun.tools.example.debug.tty.VMConnection.attachTarget(VMConnection.java:519) at com.sun.tools.example.debug.tty.VMConnection.open(VMConnection.java:328) at com.sun.tools.example.debug.tty.Env.init(Env.java:63) at com.sun.tools.example.debug.tty.TTY.main(TTY.java:1066) Let me know if I'm missing something here... Thanks in advance, Ben
Re: All masters are unresponsive! Giving up.
Spark 1.4.1 depends on: akka.version2.3.4-spark/akka.version Is it possible that your standalone cluster has another version of akka ? Cheers On Fri, Aug 7, 2015 at 10:48 AM, Jeff Jones jjo...@adaptivebiotech.com wrote: Thanks. Added this to both the client and the master but still not getting any more information. I confirmed the flag with ps. jjones53222 2.7 0.1 19399412 549656 pts/3 Sl 17:17 0:44 /opt/jdk1.8/bin/java -cp /home/jjones/bin/spark-1.4.1-bin-hadoop2.6/sbin/../conf/:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Dsun.io.serialization.extendedDebugInfo=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip p3.ourdomain.com --port 7077 --webui-port 8080’ Error message(s) the same: 15/08/07 17:23:26 ERROR Remoting: org.apache.spark.deploy.Command; local class incompatible: stream classdesc serialVersionUID = -7098307370860582211, local class serialVersionUID = -3335312719467547622 java.io.InvalidClassException: org.apache.spark.deploy.Command; local class incompatible: stream classdesc serialVersionUID = -7098307370860582211, local class serialVersionUID = -3335312719467547622 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payloadClass$1(Endpoint.scala:59) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:99) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) *From:* Sonal Goyal [mailto:sonalgoy...@gmail.com] *Sent:* Thursday, August 6, 2015 11:22 PM *To:* Jeff Jones *Cc:* user@spark.apache.org *Subject:* Re: All masters are unresponsive! Giving up. There seems to be a version mismatch somewhere. You can try and find out the cause with debug serialization information. I think the jvm flag -Dsun.io.*serialization*.*extendedDebugInfo*=true should help. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co Check out Reifier at Spark Summit 2015
Re: All masters are unresponsive! Giving up.
check on which ip/port master listens netstat -a -t --numeric-ports On 7 August 2015 at 20:48, Jeff Jones jjo...@adaptivebiotech.com wrote: Thanks. Added this to both the client and the master but still not getting any more information. I confirmed the flag with ps. jjones53222 2.7 0.1 19399412 549656 pts/3 Sl 17:17 0:44 /opt/jdk1.8/bin/java -cp /home/jjones/bin/spark-1.4.1-bin-hadoop2.6/sbin/../conf/:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Dsun.io.serialization.extendedDebugInfo=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip p3.ourdomain.com --port 7077 --webui-port 8080’ Error message(s) the same: 15/08/07 17:23:26 ERROR Remoting: org.apache.spark.deploy.Command; local class incompatible: stream classdesc serialVersionUID = -7098307370860582211, local class serialVersionUID = -3335312719467547622 java.io.InvalidClassException: org.apache.spark.deploy.Command; local class incompatible: stream classdesc serialVersionUID = -7098307370860582211, local class serialVersionUID = -3335312719467547622 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payloadClass$1(Endpoint.scala:59) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:99) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) *From:* Sonal Goyal [mailto:sonalgoy...@gmail.com] *Sent:* Thursday, August 6, 2015 11:22 PM *To:* Jeff Jones *Cc:* user@spark.apache.org *Subject:* Re: All masters are unresponsive! Giving up. There seems to be a version mismatch somewhere. You can try and find out the cause with debug serialization information. I think the jvm flag -Dsun.io.*serialization*.*extendedDebugInfo*=true should help. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ On
Get bucket details created in shuffle phase
Hey all. I was trying to understand Spark Internals by looking in to (and hacking) the code. I was trying to explore the buckets which are generated when we partition the output of each map task and then let the reduce side fetch them on the basis of paritionId. I went into the write() method of SortShuffleWriter and there is an Iterator by the name of records passed in to it as an argument. This key-value pair is what I though represented the buckets. But upon exploring its contents I realized that I was wrong because pairs with same keys were being shown in different buckets which should not have been the case. I'd really appreciate if someone could help me find where these buckets originate. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Get-bucket-details-created-in-shuffle-phase-tp24175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark SQL query AVRO file
Hi, Michael: I am not sure how spark-avro can help in this case. My understanding is that to use Spark-avro, I have to translate all the logic from this big Hive query into Spark code, right? If I have this big Hive query, how I can use spark-avro to run the query? Thanks Yong From: mich...@databricks.com Date: Fri, 7 Aug 2015 11:32:21 -0700 Subject: Re: Spark SQL query AVRO file To: java8...@hotmail.com CC: user@spark.apache.org Have you considered trying Spark SQL's native support for avro data? https://github.com/databricks/spark-avro On Fri, Aug 7, 2015 at 11:30 AM, java8964 java8...@hotmail.com wrote: Hi, Spark users: We currently are using Spark 1.2.2 + Hive 0.12 + Hadoop 2.2.0 on our production cluster, which has 42 data/task nodes. There is one dataset stored as Avro files about 3T. Our business has a complex query running for the dataset, which is stored in nest structure with Array of Struct in Avro and Hive. We can query it using Hive without any problem, but we like the SparkSQL's performance, so we in fact run the same query in the Spark SQL, and found out it is in fact much faster than Hive. But when we run it, we got the following error randomly from Spark executors, sometime seriously enough to fail the whole spark job. Below the stack trace, and I think it is a bug related to Spark due to: 1) The error jumps out inconsistent, as sometimes we won't see it for this job. (We run it daily)2) Sometime it won't fail our job, as it recover after retry.3) Sometime it will fail our job, as I listed below.4) Is this due to the multithreading in Spark? The NullPointException indicates Hive got a Null ObjectInspector of the children of StructObjectInspector, as I read the Hive source code, but I know there is no null of ObjectInsepector as children of StructObjectInspector. Google this error didn't give me any hint. Does any one know anything like this? Project [HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcatWS(,,CAST(account_id#23L, StringType),CAST(gross_contact_count_a#4L, StringType),CASE WHEN IS NULL tag_cnt#21 THEN 0 ELSE CAST(tag_cnt#21, StringType),CAST(list_cnt_a#5L, StringType),CAST(active_contact_count_a#16L, StringType),CAST(other_api_contact_count_a#6L, StringType),CAST(fb_api_contact_count_a#7L, StringType),CAST(evm_contact_count_a#8L, StringType),CAST(loyalty_contact_count_a#9L, StringType),CAST(mobile_jmml_contact_count_a#10L, StringType),CAST(savelocal_contact_count_a#11L, StringType),CAST(siteowner_contact_count_a#12L, StringType),CAST(socialcamp_service_contact_count_a#13L, S...org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 1.0 failed 4 times, most recent failure: Lost task 58.3 in stage 1.0 (TID 257, 10.20.95.146): java.lang.NullPointerExceptionat org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.supportedCategories(AvroObjectInspectorGenerator.java:139) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:89) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:101) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:117) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspector(AvroObjectInspectorGenerator.java:81) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.init(AvroObjectInspectorGenerator.java:55) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:69) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:112) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:109) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at
Re: tachyon
Looks like you would get better response on Tachyon's mailing list: https://groups.google.com/forum/?fromgroups#!forum/tachyon-users Cheers On Fri, Aug 7, 2015 at 9:56 AM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: Do people use Tachyon in production, or is it experimental grade still? Regards, Abhishek - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Spark Streaming] Session based windowing like in google dataflow
Hi all, I am trying to figure out how to perform equivalent of Session windows (as mentioned in https://cloud.google.com/dataflow/model/windowing) using spark streaming. Is it even possible (i.e. possible to do efficiently at scale). Just to expand on the definition: Taken from the google dataflow documentation: The simplest kind of session windowing specifies a minimum gap duration. All data arriving below a minimum threshold of time delay is grouped into the same window. If data arrives after the minimum specified gap duration time, this initiates the start of a new window. Any help would be appreciated. -- Ankur Chauhan signature.asc Description: Message signed with OpenPGP using GPGMail
Fwd: [Spark + Hive + EMR + S3] Issue when reading from Hive external table backed on S3 with large amount of small files
Please community, I'd really appreciate your opinion on this topic. Best regards, Roberto -- Forwarded message -- From: Roberto Coluccio roberto.coluc...@gmail.com Date: Sat, Jul 25, 2015 at 6:28 PM Subject: [Spark + Hive + EMR + S3] Issue when reading from Hive external table backed on S3 with large amount of small files To: user@spark.apache.org Hello Spark community, I currently have a Spark 1.3.1 batch driver, deployed in YARN-cluster mode on an EMR cluster (AMI 3.7.0) that reads input data through an HiveContext, in particular SELECTing data from an EXTERNAL TABLE backed on S3. Such table has dynamic partitions and contains *hundreds of small GZip files*. Considering at the moment unfeasible to collate such files on the source side, I experience that, by default, the SELECT query is mapped by Spark into as much tasks as many files are found in the table root path(+partitions), e.g. 860 files === 860 tasks to complete the Spark stage of that read operation. This behaviour obviously creates an incredible overhead and, often, in failed stages due to OOM exceptions and subsequent crashes of the executors. Regardless the size of the input that I can manage to handle, I would really appreciate if you could suggest how to collate somehow the input partitions while reading, or, at least, reduce the number of tasks spawned by the Hive query. Looking at http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-hive-differences.html#emr-hive-gzip-splits I tried by setting: hiveContext.sql(set hive.hadoop.supports.splittable.combineinputformat=true) before creating the external table to read from and query it, but it resulted in NO changes. Tried also to set that in the hive-site.xml on the cluster, but I experienced the same behaviour. Thanks to whomever will give me any hints. Best regards, Roberto
Re: Spark job workflow engine recommendations
Check also falcon in combination with oozie Le ven. 7 août 2015 à 17:51, Hien Luu h...@linkedin.com.invalid a écrit : Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
SparkSQL: remove jar added by add jar command from dependencies
Hi, I am using Spark SQL to run some queries on a set of avro data. Somehow I am getting this error 0: jdbc:hive2://n7-z01-0a2a1453 select count(*) from flume_test; Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 26.0 failed 4 times, most recent failure: Lost task 3.3 in stage 26.0 (TID 1027, n7-z01-0a2a1457.iaas.starwave.com): java.io.IOException: Incomplete HDFS URI, no host: hdfs:data/hive-jars/avro-mapred.jar at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:141) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1364) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:498) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:383) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:350) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:347) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:347) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I did not add the jar in this session, so I am wondering how I can get the jar removed from the dependencies so that It is not blocking all my spark sql queries for all sessions. Thanks, James
tachyon
Do people use Tachyon in production, or is it experimental grade still? Regards, Abhishek - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark job workflow engine recommendations
From what I heard (an ex-coworker who is Oozie committer), Azkaban is being phased out at LinkedIn because of scalability issues (though UI-wise, Azkaban seems better). Vikram: I suggest you do more research in related projects (maybe using their mailing lists). Disclaimer: I don't work for LinkedIn. On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hi Vikram, We use Azkaban (2.5.0) in our production workflow scheduling. We just use local mode deployment and it is fairly easy to set up. It is pretty easy to use and has a nice scheduling and logging interface, as well as SLAs (like kill job and notify if it doesn't complete in 3 hours or whatever). However Spark support is not present directly - we run everything with shell scripts and spark-submit. There is a plugin interface where one could create a Spark plugin, but I found it very cumbersome when I did investigate and didn't have the time to work through it to develop that. It has some quirks and while there is actually a REST API for adding jos and dynamically scheduling jobs, it is not documented anywhere so you kinda have to figure it out for yourself. But in terms of ease of use I found it way better than Oozie. I haven't tried Chronos, and it seemed quite involved to set up. Haven't tried Luigi either. Spark job server is good but as you say lacks some stuff like scheduling and DAG type workflows (independent of spark-defined job flows). On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke jornfra...@gmail.com wrote: Check also falcon in combination with oozie Le ven. 7 août 2015 à 17:51, Hien Luu h...@linkedin.com.invalid a écrit : Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
Re: Spark SQL query AVRO file
You can register your data as a table using this library and then query it using HiveQL CREATE TEMPORARY TABLE episodes USING com.databricks.spark.avro OPTIONS (path src/test/resources/episodes.avro) On Fri, Aug 7, 2015 at 11:42 AM, java8964 java8...@hotmail.com wrote: Hi, Michael: I am not sure how spark-avro can help in this case. My understanding is that to use Spark-avro, I have to translate all the logic from this big Hive query into Spark code, right? If I have this big Hive query, how I can use spark-avro to run the query? Thanks Yong -- From: mich...@databricks.com Date: Fri, 7 Aug 2015 11:32:21 -0700 Subject: Re: Spark SQL query AVRO file To: java8...@hotmail.com CC: user@spark.apache.org Have you considered trying Spark SQL's native support for avro data? https://github.com/databricks/spark-avro On Fri, Aug 7, 2015 at 11:30 AM, java8964 java8...@hotmail.com wrote: Hi, Spark users: We currently are using Spark 1.2.2 + Hive 0.12 + Hadoop 2.2.0 on our production cluster, which has 42 data/task nodes. There is one dataset stored as Avro files about 3T. Our business has a complex query running for the dataset, which is stored in nest structure with Array of Struct in Avro and Hive. We can query it using Hive without any problem, but we like the SparkSQL's performance, so we in fact run the same query in the Spark SQL, and found out it is in fact much faster than Hive. But when we run it, we got the following error randomly from Spark executors, sometime seriously enough to fail the whole spark job. Below the stack trace, and I think it is a bug related to Spark due to: 1) The error jumps out inconsistent, as sometimes we won't see it for this job. (We run it daily) 2) Sometime it won't fail our job, as it recover after retry. 3) Sometime it will fail our job, as I listed below. 4) Is this due to the multithreading in Spark? The NullPointException indicates Hive got a Null ObjectInspector of the children of StructObjectInspector, as I read the Hive source code, but I know there is no null of ObjectInsepector as children of StructObjectInspector. Google this error didn't give me any hint. Does any one know anything like this? Project [HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcatWS(,,CAST(account_id#23L, StringType),CAST(gross_contact_count_a#4L, StringType),CASE WHEN IS NULL tag_cnt#21 THEN 0 ELSE CAST(tag_cnt#21, StringType),CAST(list_cnt_a#5L, StringType),CAST(active_contact_count_a#16L, StringType),CAST(other_api_contact_count_a#6L, StringType),CAST(fb_api_contact_count_a#7L, StringType),CAST(evm_contact_count_a#8L, StringType),CAST(loyalty_contact_count_a#9L, StringType),CAST(mobile_jmml_contact_count_a#10L, StringType),CAST(savelocal_contact_count_a#11L, StringType),CAST(siteowner_contact_count_a#12L, StringType),CAST(socialcamp_service_contact_count_a#13L, S...org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 1.0 failed 4 times, most recent failure: Lost task 58.3 in stage 1.0 (TID 257, 10.20.95.146): java.lang.NullPointerException at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.supportedCategories(AvroObjectInspectorGenerator.java:139) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:89) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:101) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:117) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspector(AvroObjectInspectorGenerator.java:81) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.init(AvroObjectInspectorGenerator.java:55) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:69) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:112) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:109) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
RE: All masters are unresponsive! Giving up.
Thanks. Added this to both the client and the master but still not getting any more information. I confirmed the flag with ps. jjones53222 2.7 0.1 19399412 549656 pts/3 Sl 17:17 0:44 /opt/jdk1.8/bin/java -cp /home/jjones/bin/spark-1.4.1-bin-hadoop2.6/sbin/../conf/:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Dsun.io.serialization.extendedDebugInfo=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip p3.ourdomain.com --port 7077 --webui-port 8080’ Error message(s) the same: 15/08/07 17:23:26 ERROR Remoting: org.apache.spark.deploy.Command; local class incompatible: stream classdesc serialVersionUID = -7098307370860582211, local class serialVersionUID = -3335312719467547622 java.io.InvalidClassException: org.apache.spark.deploy.Command; local class incompatible: stream classdesc serialVersionUID = -7098307370860582211, local class serialVersionUID = -3335312719467547622 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payloadClass$1(Endpoint.scala:59) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:99) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) From: Sonal Goyal [mailto:sonalgoy...@gmail.com] Sent: Thursday, August 6, 2015 11:22 PM To: Jeff Jones Cc: user@spark.apache.org Subject: Re: All masters are unresponsive! Giving up. There seems to be a version mismatch somewhere. You can try and find out the cause with debug serialization information. I think the jvm flag -Dsun.io.serialization.extendedDebugInfo=true should help. Best Regards, Sonal Founder, Nube Technologieshttp://www.nubetech.co Check out Reifier at Spark Summit 2015https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ On Fri, Aug 7, 2015 at 4:42 AM, Jeff Jones jjo...@adaptivebiotech.commailto:jjo...@adaptivebiotech.com wrote: I wrote a very simple Spark 1.4.1 app that I can run through a local driver program just fine using setMaster(“local[*]”). The app is as follows: import
Re: Spark is in-memory processing, how then can Tachyon make Spark faster?
Hi, Tachyon http://tachyon-project.org manages memory off heap which can help prevent long GC pauses. Also, using Tachyon will allow the data to be shared between Spark jobs if they use the same dataset. Here's http://www.meetup.com/Tachyon/events/222485713/ a production use case where Baidu runs Tachyon to get 30x performance improvement in their SparkSQL workload. Hope this helps, Calvin On Fri, Aug 7, 2015 at 9:42 AM, Muler mulugeta.abe...@gmail.com wrote: Spark is an in-memory engine and attempts to do computation in-memory. Tachyon is memory-centeric distributed storage, OK, but how would that help ran Spark faster?
Re: Spark job workflow engine recommendations
Oh ok. That's a good enough reason against azkaban then. So looks like Oozie is the best choice here. On Friday, August 7, 2015, Ted Yu yuzhih...@gmail.com wrote: From what I heard (an ex-coworker who is Oozie committer), Azkaban is being phased out at LinkedIn because of scalability issues (though UI-wise, Azkaban seems better). Vikram: I suggest you do more research in related projects (maybe using their mailing lists). Disclaimer: I don't work for LinkedIn. On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath nick.pentre...@gmail.com javascript:_e(%7B%7D,'cvml','nick.pentre...@gmail.com'); wrote: Hi Vikram, We use Azkaban (2.5.0) in our production workflow scheduling. We just use local mode deployment and it is fairly easy to set up. It is pretty easy to use and has a nice scheduling and logging interface, as well as SLAs (like kill job and notify if it doesn't complete in 3 hours or whatever). However Spark support is not present directly - we run everything with shell scripts and spark-submit. There is a plugin interface where one could create a Spark plugin, but I found it very cumbersome when I did investigate and didn't have the time to work through it to develop that. It has some quirks and while there is actually a REST API for adding jos and dynamically scheduling jobs, it is not documented anywhere so you kinda have to figure it out for yourself. But in terms of ease of use I found it way better than Oozie. I haven't tried Chronos, and it seemed quite involved to set up. Haven't tried Luigi either. Spark job server is good but as you say lacks some stuff like scheduling and DAG type workflows (independent of spark-defined job flows). On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke jornfra...@gmail.com javascript:_e(%7B%7D,'cvml','jornfra...@gmail.com'); wrote: Check also falcon in combination with oozie Le ven. 7 août 2015 à 17:51, Hien Luu h...@linkedin.com.invalid a écrit : Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com javascript:_e(%7B%7D,'cvml','vikramk...@gmail.com'); wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
Re: miniBatchFraction for LinearRegressionWithSGD
I think in the SGD algorithm, the mini batch sample is done without replacement. So with fraction=1, then all the rows will be sampled exactly once to form the miniBatch, resulting to the deterministic/classical case. On Fri, Aug 7, 2015 at 9:05 AM, Feynman Liang fli...@databricks.com wrote: Sounds reasonable to me, feel free to create a JIRA (and PR if you're up for it) so we can see what others think! On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler gerald.loeff...@googlemail.com wrote: hi, if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0, doesn’t that make it a deterministic/classical gradient descent rather than a SGD? Specifically, miniBatchFraction=1.0 means the entire data set, i.e. all rows. In the spirit of SGD, shouldn’t the default be the fraction that results in exactly one row of the data set? thank you gerald -- Gerald Loeffler mailto:gerald.loeff...@googlemail.com http://www.gerald-loeffler.net - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Amazon DynamoDB Spark
Thanx Jay. 2015-08-07 19:25 GMT+03:00 Jay Vyas jayunit100.apa...@gmail.com: In general the simplest way is that you can use the Dynamo Java API as is and call it inside a map(), and use the asynchronous put() Dynamo api call . On Aug 7, 2015, at 9:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, Is there a way using DynamoDB in spark application? I have to persist my results to DynamoDB. Thanx, yasemin -- hiç ender hiç -- hiç ender hiç
RE: Spark SQL query AVRO file
Good to know that. Let me research it and give it a try. Thanks Yong From: mich...@databricks.com Date: Fri, 7 Aug 2015 11:44:48 -0700 Subject: Re: Spark SQL query AVRO file To: java8...@hotmail.com CC: user@spark.apache.org You can register your data as a table using this library and then query it using HiveQL CREATE TEMPORARY TABLE episodes USING com.databricks.spark.avro OPTIONS (path src/test/resources/episodes.avro) On Fri, Aug 7, 2015 at 11:42 AM, java8964 java8...@hotmail.com wrote: Hi, Michael: I am not sure how spark-avro can help in this case. My understanding is that to use Spark-avro, I have to translate all the logic from this big Hive query into Spark code, right? If I have this big Hive query, how I can use spark-avro to run the query? Thanks Yong From: mich...@databricks.com Date: Fri, 7 Aug 2015 11:32:21 -0700 Subject: Re: Spark SQL query AVRO file To: java8...@hotmail.com CC: user@spark.apache.org Have you considered trying Spark SQL's native support for avro data? https://github.com/databricks/spark-avro On Fri, Aug 7, 2015 at 11:30 AM, java8964 java8...@hotmail.com wrote: Hi, Spark users: We currently are using Spark 1.2.2 + Hive 0.12 + Hadoop 2.2.0 on our production cluster, which has 42 data/task nodes. There is one dataset stored as Avro files about 3T. Our business has a complex query running for the dataset, which is stored in nest structure with Array of Struct in Avro and Hive. We can query it using Hive without any problem, but we like the SparkSQL's performance, so we in fact run the same query in the Spark SQL, and found out it is in fact much faster than Hive. But when we run it, we got the following error randomly from Spark executors, sometime seriously enough to fail the whole spark job. Below the stack trace, and I think it is a bug related to Spark due to: 1) The error jumps out inconsistent, as sometimes we won't see it for this job. (We run it daily)2) Sometime it won't fail our job, as it recover after retry.3) Sometime it will fail our job, as I listed below.4) Is this due to the multithreading in Spark? The NullPointException indicates Hive got a Null ObjectInspector of the children of StructObjectInspector, as I read the Hive source code, but I know there is no null of ObjectInsepector as children of StructObjectInspector. Google this error didn't give me any hint. Does any one know anything like this? Project [HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcatWS(,,CAST(account_id#23L, StringType),CAST(gross_contact_count_a#4L, StringType),CASE WHEN IS NULL tag_cnt#21 THEN 0 ELSE CAST(tag_cnt#21, StringType),CAST(list_cnt_a#5L, StringType),CAST(active_contact_count_a#16L, StringType),CAST(other_api_contact_count_a#6L, StringType),CAST(fb_api_contact_count_a#7L, StringType),CAST(evm_contact_count_a#8L, StringType),CAST(loyalty_contact_count_a#9L, StringType),CAST(mobile_jmml_contact_count_a#10L, StringType),CAST(savelocal_contact_count_a#11L, StringType),CAST(siteowner_contact_count_a#12L, StringType),CAST(socialcamp_service_contact_count_a#13L, S...org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 1.0 failed 4 times, most recent failure: Lost task 58.3 in stage 1.0 (TID 257, 10.20.95.146): java.lang.NullPointerExceptionat org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.supportedCategories(AvroObjectInspectorGenerator.java:139) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:89) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:101) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:117) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspector(AvroObjectInspectorGenerator.java:81) at org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.init(AvroObjectInspectorGenerator.java:55) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:69) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:112) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:109) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)at
RE: distributing large matrices
Verzonden vanaf mijn Sony Xperia™-smartphone iceback schreef Is this the sort of problem spark can accommodate? I need to compare 10,000 matrices with each other (10^10 comparison). The matrices are 100x10 (10^7 int values). I have 10 machines with 2 to 8 cores (8-32 processors). All machines have to - contribute to matrices generation (a simulation, takes seconds) - see all matrices - compare matrices (takes very little time compared to simulation) I expect to persist the simulations, have spark push them to processors. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distributing-large-matrices-tp24174.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark master driver UI: How to keep it after process finished?
Verzonden vanaf mijn Sony Xperia™-smartphone saif.a.ell...@wellsfargo.com schreef !-- /* Font Definitions */ @font-face {font-family:Calibri; panose-1:2 15 5 2 2 2 4 3 2 4;} @font-face {font-family:Tahoma; panose-1:2 11 6 4 3 5 4 4 2 4;} /* Style Definitions */ p.MsoNormal, li.MsoNormal, div.MsoNormal{margin:0in;margin-bottom:.0001pt; font-size:12.0pt; font-family:Times New Roman,serif; color:black;} a:link, span.MsoHyperlink {mso-style-priority:99; color:blue; text-decoration:underline;} a:visited, span.MsoHyperlinkFollowed {mso-style-priority:99; color:purple; text-decoration:underline;} p.emailquote, li.emailquote, div.emailquote {mso-style-name:emailquote; mso-margin-top-alt:auto; margin-right:0in; mso-margin-bottom-alt:auto; margin-left:1.0pt; border:none;padding:0in;font-size:12.0pt; font-family:Times New Roman,serif; color:black;} span.EmailStyle18 {mso-style-type:personal-reply; font-family:Calibri,sans-serif; color:#1F497D;} .MsoChpDefault {mso-style-type:export-only; font-size:10.0pt;} @page WordSection1 {size:8.5in 11.0in; margin:1.0in 1.0in 1.0in 1.0in;} div.WordSection1 {page:WordSection1;} -- Hello, thank you, but that port is unreachable for me. Can you please share where can I find that port equivalent in my environment? Thank you Saif From: François Pelletier [mailto:newslett...@francoispelletier.org] Sent: Friday, August 07, 2015 4:38 PM To: user@spark.apache.org Subject: Re: Spark master driver UI: How to keep it after process finished? Hi, all spark processes are saved in the Spark History Server look at your host on port 18080 instead of 4040 François Le 2015-08-07 15:26, saif.a.ell...@wellsfargo.com a écrit : Hi, A silly question here. The Driver Web UI dies when the spark-submit program finish. I would like some time to analyze after the program ends, as the page does not refresh it self, when I hit F5 I lose all the info. Thanks, Saif
Re: miniBatchFraction for LinearRegressionWithSGD
Good point; I agree that defaulting to online SGD (single example per iteration) would be a poor UX due to performance. On Fri, Aug 7, 2015 at 12:44 PM, Meihua Wu rotationsymmetr...@gmail.com wrote: Feynman, thanks for clarifying. If we default miniBatchFraction = (1 / numInstances), then we will only hit one row for every iteration of SGD regardless the number of partitions and executors. In other words the parallelism provided by the RDD is lost in this approach. I think this is something we need to consider for the default value of miniBatchFraction. On Fri, Aug 7, 2015 at 11:24 AM, Feynman Liang fli...@databricks.com wrote: Yep, I think that's what Gerald is saying and they are proposing to default miniBatchFraction = (1 / numInstances). Is that correct? On Fri, Aug 7, 2015 at 11:16 AM, Meihua Wu rotationsymmetr...@gmail.com wrote: I think in the SGD algorithm, the mini batch sample is done without replacement. So with fraction=1, then all the rows will be sampled exactly once to form the miniBatch, resulting to the deterministic/classical case. On Fri, Aug 7, 2015 at 9:05 AM, Feynman Liang fli...@databricks.com wrote: Sounds reasonable to me, feel free to create a JIRA (and PR if you're up for it) so we can see what others think! On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler gerald.loeff...@googlemail.com wrote: hi, if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0, doesn’t that make it a deterministic/classical gradient descent rather than a SGD? Specifically, miniBatchFraction=1.0 means the entire data set, i.e. all rows. In the spirit of SGD, shouldn’t the default be the fraction that results in exactly one row of the data set? thank you gerald -- Gerald Loeffler mailto:gerald.loeff...@googlemail.com http://www.gerald-loeffler.net - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL: add jar blocks all queries
Hi, I got into a situation where a prior add jar command causing Spark SQL stops to work for all users. Does anyone know how to fix the issue? Regards, james From: Wu, Walt Disney james.c...@disney.commailto:james.c...@disney.com Date: Friday, August 7, 2015 at 10:29 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL: remove jar added by add jar command from dependencies Hi, I am using Spark SQL to run some queries on a set of avro data. Somehow I am getting this error 0: jdbc:hive2://n7-z01-0a2a1453 select count(*) from flume_test; Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 26.0 failed 4 times, most recent failure: Lost task 3.3 in stage 26.0 (TID 1027, n7-z01-0a2a1457.iaas.starwave.com): java.io.IOException: Incomplete HDFS URI, no host: hdfs:data/hive-jars/avro-mapred.jar at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:141) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1364) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:498) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:383) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:350) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:347) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:347) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I did not add the jar in this session, so I am wondering how I can get the jar removed from the dependencies so that It is not blocking all my spark sql queries for all sessions. Thanks, James
Re: miniBatchFraction for LinearRegressionWithSGD
Feynman, thanks for clarifying. If we default miniBatchFraction = (1 / numInstances), then we will only hit one row for every iteration of SGD regardless the number of partitions and executors. In other words the parallelism provided by the RDD is lost in this approach. I think this is something we need to consider for the default value of miniBatchFraction. On Fri, Aug 7, 2015 at 11:24 AM, Feynman Liang fli...@databricks.com wrote: Yep, I think that's what Gerald is saying and they are proposing to default miniBatchFraction = (1 / numInstances). Is that correct? On Fri, Aug 7, 2015 at 11:16 AM, Meihua Wu rotationsymmetr...@gmail.com wrote: I think in the SGD algorithm, the mini batch sample is done without replacement. So with fraction=1, then all the rows will be sampled exactly once to form the miniBatch, resulting to the deterministic/classical case. On Fri, Aug 7, 2015 at 9:05 AM, Feynman Liang fli...@databricks.com wrote: Sounds reasonable to me, feel free to create a JIRA (and PR if you're up for it) so we can see what others think! On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler gerald.loeff...@googlemail.com wrote: hi, if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0, doesn’t that make it a deterministic/classical gradient descent rather than a SGD? Specifically, miniBatchFraction=1.0 means the entire data set, i.e. all rows. In the spirit of SGD, shouldn’t the default be the fraction that results in exactly one row of the data set? thank you gerald -- Gerald Loeffler mailto:gerald.loeff...@googlemail.com http://www.gerald-loeffler.net - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: miniBatchFraction for LinearRegressionWithSGD
Verzonden vanaf mijn Sony Xperia™-smartphone Meihua Wu schreef Feynman, thanks for clarifying. If we default miniBatchFraction = (1 / numInstances), then we will only hit one row for every iteration of SGD regardless the number of partitions and executors. In other words the parallelism provided by the RDD is lost in this approach. I think this is something we need to consider for the default value of miniBatchFraction. On Fri, Aug 7, 2015 at 11:24 AM, Feynman Liang fli...@databricks.com wrote: Yep, I think that's what Gerald is saying and they are proposing to default miniBatchFraction = (1 / numInstances). Is that correct? On Fri, Aug 7, 2015 at 11:16 AM, Meihua Wu rotationsymmetr...@gmail.com wrote: I think in the SGD algorithm, the mini batch sample is done without replacement. So with fraction=1, then all the rows will be sampled exactly once to form the miniBatch, resulting to the deterministic/classical case. On Fri, Aug 7, 2015 at 9:05 AM, Feynman Liang fli...@databricks.com wrote: Sounds reasonable to me, feel free to create a JIRA (and PR if you're up for it) so we can see what others think! On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler gerald.loeff...@googlemail.com wrote: hi, if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0, doesn’t that make it a deterministic/classical gradient descent rather than a SGD? Specifically, miniBatchFraction=1.0 means the entire data set, i.e. all rows. In the spirit of SGD, shouldn’t the default be the fraction that results in exactly one row of the data set? thank you gerald -- Gerald Loeffler mailto:gerald.loeff...@googlemail.com http://www.gerald-loeffler.net - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark master driver UI: How to keep it after process finished?
look at spark.history.ui.port, if you use standalone spark.yarn.historyServer.address, if you use YARN in your Spark config file Mine is located at /etc/spark/conf/spark-defaults.conf If you use Apache Ambari you can find this settings in the Spark / Configs / Advanced spark-defaults tab François Le 2015-08-07 15:58, saif.a.ell...@wellsfargo.com a écrit : Hello, thank you, but that port is unreachable for me. Can you please share where can I find that port equivalent in my environment? Thank you Saif *From:*François Pelletier [mailto:newslett...@francoispelletier.org] *Sent:* Friday, August 07, 2015 4:38 PM *To:* user@spark.apache.org *Subject:* Re: Spark master driver UI: How to keep it after process finished? Hi, all spark processes are saved in the Spark History Server look at your host on port 18080 instead of 4040 François Le 2015-08-07 15:26, saif.a.ell...@wellsfargo.com mailto:saif.a.ell...@wellsfargo.com a écrit : Hi, A silly question here. The Driver Web UI dies when the spark-submit program finish. I would like some time to analyze after the program ends, as the page does not refresh it self, when I hit F5 I lose all the info. Thanks, Saif
Fwd: spark config
I Recently downloaded spark package 1.4.0: A build of Spark with sbt/sbt clean assembly failed with message Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar Upon investigation I figured out that sbt-launch-0.13.7.jar is downloaded at build time and that it contained the the following: html headtitle404 Not Found/title/head body bgcolor=white centerh1404 Not Found/h1/center hrcenternginx/center /body /html which is an HTML error message to the effect that the file is missing (from the web server). The script sbt-launch-lib.bash contains the following lines which determine where the file sbt-launch.jar is downloaded from: acquire_sbt_jar () { SBT_VERSION=`awk -F = '/sbt\.version/ {print $2}' ./project/build.properties` URL1= http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar URL2= http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar JAR=build/sbt-launch-${SBT_VERSION}.jar The script sbt-launch.bash downloads $URL1 first, and incorrectly concludes that it succeeded on the basis that the file sbt-launch-0.13.7.jar exists (though it contains HTML). I succeeded in building Spark by: (1) Downloading the file sbt-launch-0.13.7.jar from $URL2 and placing it in the build directory. (2) Modifying sbt-launch-lib.bash to prevent the download of that file. (3) Restarting the download as I usually would, with SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly I think a lot of people will be confused by this. Probably someone should do some of the following: (1) Delete $URL1 and all references, or replace it with the correct/current URL which points to the sbt-launch.jar(s). (2) Modify sbt-launch-lib.bash, so that it will not conclude that the download of sbt-launch.jar succeeded, when the data returned is an HTML error message. Let me know if this is not clear, I will gladly explain in more detail or with more clarity, if needed. -Bryce Lobdell A transcript of my console is below: @ip-xx-xxx-xx-xxx:~/spark/spark-1.4.0$ SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly NOTE: The sbt/sbt script has been relocated to build/sbt. Please update references to point to the new location. Invoking 'build/sbt clean assembly' now ... Using /usr/lib/jvm/java-7-openjdk-amd64/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar *Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar* inquidia@ip-10-102-69-107:~/spark/spark-1.4.0$ cd build/ inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash *inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ unzip -l sbt-launch-0.13.7.jar* *Archive: sbt-launch-0.13.7.jar* * End-of-central-directory signature not found. Either this file is not* * a zipfile, or it constitutes one disk of a multi-part archive. In the* * latter case the central directory and zipfile comment will be found on* * the last disk(s) of this archive.* unzip: cannot find zipfile directory in one of sbt-launch-0.13.7.jar or sbt-launch-0.13.7.jar.zip, and cannot find sbt-launch-0.13.7.jar.ZIP, period. inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia 162 Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia *162 *Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ cat sbt-launch-0.13.7.jar *html* *headtitle404 Not Found/title/head* *body bgcolor=white* *centerh1404 Not Found/h1/center* *hrcenternginx/center* */body* */html*
Re: Time series forecasting
Im interested in machine learning on time series. In our environment we have lot of metric data continuously coming from agents. Data are stored in Cassandra. Is it possible to set up spark that would use machine learning on previous data and new incoming data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Time-series-forecasting-tp13236p24167.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Amazon DynamoDB Spark
Hi, Is there a way using DynamoDB in spark application? I have to persist my results to DynamoDB. Thanx, yasemin -- hiç ender hiç
Re: SparkR -Graphx Connected components
Hi The graph returned by SCC (strong_graphs in your code) has vertex data where each vertex in a component is assigned the lowest vertex id of the component. So if you have 6 vertices (1 to 6) and 2 strongly connected components (1 and 3, and 2,4,5 and 6) then the strongly connected components are 1 and 2 (the lowest vertices in each component). So vertices 1 and 3 will have vertex data = 1 and vertices 2,4,5 and 6 will have vertex data 2. Robin --- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/malak/ http://www.manning.com/malak/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-Connected-components-tp24165p24166.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issues with Phoenix 4.5
Hi all, I am getting an exception when trying to execute a Spark Job that is using the new Phoenix 4.5 spark connector. The application works very well in my local machine, but fails to run in a cluster environment on top of yarn. The cluster is a Cloudera CDH 5.4.4 with HBase 1.0.0 and Phoenix 4.5 (phoenix is installed correctly as sqlline works without errors). In the pom.xml, only the spark-core jar (version 1.3.0-cdh5.4.4) has scope provided, while all other jars has been copied by maven into the /myapp/lib folder. I include all the dependent libs using the option --jar in the spark-submit command (among these libraries, there is the phoenix-core-xx.jar, which contains the class PhoenixOutputFormat). This is the command: spark-submit --class my.JobRunner \ --master yarn --deploy-mode client \ --jars `ls -dm /myapp/lib/* | tr -d ' \r\n'` \ /myapp/mainjar.jar The /myapp/lib folders contains the phoenix core lib, which contains class org.apache.phoenix.mapreduce.PhoenixOutputFormat. But it seems that the driver/executor cannot see it. And I get an exception when I try to save to Phoenix an RDD: Exception in thread main java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.phoenix.mapreduce.PhoenixOutputFormat not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2112) at org.apache.hadoop.mapreduce.task.JobContextImpl.getOutputFormatClass(JobContextImpl.java:232) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:971) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:903) at org.apache.phoenix.spark.ProductRDDFunctions.saveToPhoenix(ProductRDDFunctions.scala:51) at com.mypackage.save(DAOImpl.scala:41) at com.mypackage.ProtoStreamingJob.execute(ProtoStreamingJob.scala:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.mypackage.SparkApplication.sparkRun(SparkApplication.scala:95) at com.mypackage.SparkApplication$delayedInit$body.apply(SparkApplication.scala:112) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at com.mypackage.SparkApplication.main(SparkApplication.scala:15) at com.mypackage.ProtoStreamingJobRunner.main(ProtoStreamingJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: Class org.apache.phoenix.mapreduce.PhoenixOutputFormat not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2018) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2110) ... 30 more The phoenix-core-xxx.jar is included in the classpath. I am sure it is in the classpath because I tried to instantiate an object of class PhoenixOutputFormat directly in the main class and it worked. The problem is that the method org.apache.hadoop.conf.Configuration.getClassByName cannot find it. Since I am using client deploy mode, the exception should have been thrown by the driver in the local machine. How can this happen?
Re: log4j custom appender ClassNotFoundException with spark 1.4.1
Looking at the callstack and diffs between 1.3.1 and 1.4.1-rc4, I see something that could be relevant to the issue. 1) Callstack tells that log4j manager gets initialized and uses default java context class loader. This context class loader should probably be MutableURLClassLoader from spark but it's not. We can assume that currentThread.setContextClassLoader has not been called yet. 2) Still in the callstack, we can see that ShutdownHookManager is the class object responsible to trigger log4j initialization. 3) Looking at the diffs between 1.3 and 1.4, we can see that this ShutdownHookManager is a new class object. With this information, is it possible that ShutdownHookManager makes log4j initialize too early? By that, I mean before spark gets the chance to set it's MutableURLClassLoader on thread context? Let me know if it does not make sense. Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log4j-custom-appender-ClassNotFoundException-with-spark-1-4-1-tp24159p24168.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org