Re: mllib on (key, Iterable[Vector])
You could try flatMapping i.e. if you have data : RDD[(key, Iterable[Vector])] then data.flatMap(_._2) : RDD[Vector], which can be GMMed. If you want to first partition by url, I would first create multiple RDDs using `filter`, then running GMM on each of the filtered rdds. On Tue, Aug 11, 2015 at 5:43 AM, Fabian Böhnlein fabian.boehnl...@gmail.com wrote: Hi everyone, I am trying to use mllib.clustering.GaussianMixture, but am blocked by the fact that the API only accepts RDD[Vector]. Broadly speaking I need to run the clustering on an RDD[(key, Iterable[Vector]), e.g. (fabricated): val WebsiteUserAgeRDD : RDD[url, userAgeVector] val ageClusterByUrl = WebsiteUserAgeRDD.groupby(_.url).mapValues(GaussianMixture.setK(x).run) This obviously does not work, as the mapValues function is called on Iterable[Vector] but requires RDD[Vector] As I see it, parallelizing this Iterable is not possible, would result in an RDD of RDDs? Anyone has an idea how to cluster an RDD of (key, Iterable[Vector]) like in above groupBy result? Many thanks, Fabian
Re: Partitioning in spark streaming
I am also trying to understand how are files named when writing to hadoop? for eg: how does saveAs method ensures that each executor is generating unique files? On Tue, Aug 11, 2015 at 4:21 PM, ayan guha guha.a...@gmail.com wrote: partitioning - by itself - is a property of RDD. so essentially it is no different in case of streaming where each batch is one RDD. You can use partitionBy on RDD and pass on your custom partitioner function to it. One thing you should consider is how balanced are your partitions ie your partition scheme should not skew data into one partition too much. Best Ayan On Wed, Aug 12, 2015 at 9:06 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How does partitioning in spark work when it comes to streaming? What's the best way to partition a time series data grouped by a certain tag like categories of product video, music etc. -- Best Regards, Ayan Guha
Re: 答复: 答复: Package Release Annoucement: Spark SQL on HBase Astro
Yan: Where can I find performance numbers for Astro (it's close to middle of August) ? Cheers On Tue, Aug 11, 2015 at 3:58 PM, Yan Zhou.sc yan.zhou...@huawei.com wrote: Finally I can take a look at HBASE-14181 now. Unfortunately there is no design doc mentioned. Superficially it is very similar to Astro with a difference of this being part of HBase client library; while Astro works as a Spark package so will evolve and function more closely with Spark SQL/Dataframe instead of HBase. In terms of architecture, my take is loosely-coupled query engines on top of KV store vs. an array of query engines supported by, and packaged as part of, a KV store. Functionality-wise the two could be close but Astro also supports Python as a result of tight integration with Spark. It will be interesting to see performance comparisons when HBase-14181 is ready. Thanks, *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Tuesday, August 11, 2015 3:28 PM *To:* Yan Zhou.sc *Cc:* Bing Xiao (Bing); d...@spark.apache.org; user@spark.apache.org *Subject:* Re: 答复: Package Release Annoucement: Spark SQL on HBase Astro HBase will not have query engine. It will provide better support to query engines. Cheers On Aug 10, 2015, at 11:11 PM, Yan Zhou.sc yan.zhou...@huawei.com wrote: Ted, I’m in China now, and seem to experience difficulty to access Apache Jira. Anyways, it appears to me that HBASE-14181 https://issues.apache.org/jira/browse/HBASE-14181 attempts to support Spark DataFrame inside HBase. If true, one question to me is whether HBase is intended to have a built-in query engine or not. Or it will stick with the current way as a k-v store with some built-in processing capabilities in the forms of coprocessor, custom filter, …, etc., which allows for loosely-coupled query engines built on top of it. Thanks, *发件人**:* Ted Yu [mailto:yuzhih...@gmail.com yuzhih...@gmail.com] *发送时间**:* 2015年8月11日 8:54 *收件人**:* Bing Xiao (Bing) *抄送**:* d...@spark.apache.org; user@spark.apache.org; Yan Zhou.sc *主题**:* Re: Package Release Annoucement: Spark SQL on HBase Astro Yan / Bing: Mind taking a look at HBASE-14181 https://issues.apache.org/jira/browse/HBASE-14181 'Add Spark DataFrame DataSource to HBase-Spark Module' ? Thanks On Wed, Jul 22, 2015 at 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote: We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed “Astro”, include: · Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique · HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing · SQL, Data Frame support · More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) · Joins with data from other sources · Python/Java/Scala support · Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the “Issues” “Pull Requests” links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. “Astro”, the Spark SQL on HBase package will be useful for ultra low latency* query and analytics of large scale data sets in vertical enterprises**.* We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
Re: Sporadic Input validation failed error when executing LogisticRegressionWithLBFGS.train
Hi Francis, From my observation when using spark sql, dataframe.limit(n) does not necessarily return the same result each time when running Apps. To be more precise, in one App, the result should be same for the same n, however, changing n might not result in the same prefix(the result for n = 10 doesn't necessarily start with the result for n = 5.) When running different Apps, results are usually different for the same n. Thanks On Tue, Aug 11, 2015 at 2:56 PM, Francis Lau francis@smartsheet.com wrote: Has anyone see this issue? I am calling the LogisticRegressionWithLBFGS.train API and about 7 out of 10 times, I get an Input validation failed error. The exact same code and dataset works sometimes but fails at other times. It is odd. I can't seem to find any info on this. Below is the pyspark code and the error message. I did check the dataset and all values are zero or greater. There are no blank spaces or nulls. This code below is pretty much the sample code from the Spark site. Thanks in advance for any help or pointers in how to investigate this issue. -- *Francis * *CODE:* from pyspark.mllib.classification import LogisticRegressionWithLBFGS from pyspark.mllib.regression import LabeledPoint from numpy import array # Load and parse the data def parsePoint(line): #values = [float(x) for x in line.split(' ')] values = [float(x) for x in line.asDict().values()] # need to convert from Row to Array return LabeledPoint(values[0], values[1:]) # convert SQL to format needed for training model regData = sqlContext.sql(select statement) df = regData.limit(1000) data = df.rdd parsedData = data.map(parsePoint) # Build the model model = LogisticRegressionWithLBFGS.train(parsedData) # Evaluating the model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) print(Training Error = + str(trainErr)) print Intercept: + str(model.intercept) print Weights: + str(model.weights) *ERROR:* --- Py4JJavaError Traceback (most recent call last) ipython-input-134-b31b9c04499a in module() 20 21 # Build the model --- 22 model = LogisticRegressionWithLBFGS.train(parsedData) 23 24 # Evaluating the model on training data /home/ubuntu/databricks/spark/python/pyspark/mllib/classification.py in train(cls, data, iterations, initialWeights, regParam, regType, intercept, corrections, tolerance, validateData, numClasses) 344 else: 345 initialWeights = [0.0] * len(data.first().features) * (numClasses - 1) -- 346 return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights) 347 348 /home/ubuntu/databricks/spark/python/pyspark/mllib/regression.py in _regression_train_wrapper(train_func, modelClass, data, initial_weights) 186 if (modelClass == LogisticRegressionModel): 187 weights, intercept, numFeatures, numClasses = train_func( -- 188 data, _convert_to_vector(initial_weights)) 189 return modelClass(weights, intercept, numFeatures, numClasses) 190 else: /home/ubuntu/databricks/spark/python/pyspark/mllib/classification.py in train(rdd, i) 334 return callMLlibFunc(trainLogisticRegressionModelWithLBFGS, rdd, int(iterations), i, 335 float(regParam), regType, bool(intercept), int(corrections), -- 336 float(tolerance), bool(validateData), int(numClasses)) 337 338 if initialWeights is None: /home/ubuntu/databricks/spark/python/pyspark/mllib/common.py in callMLlibFunc(name, *args) 126 sc = SparkContext._active_spark_context 127 api = getattr(sc._jvm.PythonMLLibAPI(), name) -- 128 return callJavaFunc(sc, api, *args) 129 130 /home/ubuntu/databricks/spark/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args) 119 Call Java Function 120 args = [_py2java(sc, a) for a in args] -- 121 return _java2py(sc, func(*args)) 122 123 /home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300
Re: ClassNotFound spark streaming
After changing the '--deploy_mode client' the program seems to work however it looks like there is a bug in spark when using --deploy_mode as 'yarn'. Should I open a bug? On Tue, Aug 11, 2015 at 3:02 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I see the following line in the log 15/08/11 17:59:12 ERROR spark.SparkContext: Jar not found at file:/home/ec2-user/./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar, however I do see that this file exists on all the node in that path. Not sure what's happening here. Please note I am using it in yarn On Tue, Aug 11, 2015 at 1:52 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing following error. I think it's not able to find some other associated classes as I see $2 in the exception, but not sure what I am missing. 15/08/11 16:00:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 50, ip-10-241-251-141.us-west-2.compute.internal): java.lang.ClassNotFoundException: org.sony.spark.stream.test.JavaRecoverableNetworkWordCount$2 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
Partitioning in spark streaming
How does partitioning in spark work when it comes to streaming? What's the best way to partition a time series data grouped by a certain tag like categories of product video, music etc.
RE: Refresh table
Refreshing table only works for the Spark SQL DataSource in my understanding, apparently here, you’re running a Hive Table. Can you try to create a table like: |CREATE TEMPORARY TABLE parquetTable (a int, b string) |USING org.apache.spark.sql.parquet.DefaultSource |OPTIONS ( | path '/root_path' |) And then df2.write.parquet(hdfs://root_path/test_table/key=2) … Cheng From: Jerrick Hoang [mailto:jerrickho...@gmail.com] Sent: Tuesday, August 11, 2015 2:15 PM To: user Subject: Refresh table Hi all, I'm a little confused about how refresh table (SPARK-5833) should work. So I did the following, val df1 = sc.makeRDD(1 to 5).map(i = (i, i * 2)).toDF(single, double) df1.write.parquet(hdfs://path/test_table/key=1) Then I created an external table by doing, CREATE EXTERNAL TABLE `tmp_table` ( `single`: int, `double`: int) PARTITIONED BY ( `key` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://path/test_table/' Then I added the partition to the table by `alter table tmp_table add partition (key=1) location 'hdfs://..` Then I added a new partition with different schema by, val df2 = sc.makeRDD(1 to 5).map(i = (i, i * 3)).toDF(single, triple) df2.write.parquet(hdfs://path/test_table/key=2) And added the new partition to the table by `alter table ..`, But when I did `refresh table tmp_table` and `describe table` it couldn't pick up the new column `triple`. Can someone explain to me how partition discovery and schema merging of refresh table should work? Thanks
spark vs flink low memory available
hi community, i have build a spark and flink k-means application. my test case is a clustering on 1 million points on 3node cluster. in memory bottlenecks begins flink to outsource to disk and work slowly but works. however spark lose executers if the memory is full and starts again (infinety loop?). i try to customize the memory setting with the help from the mailing list here, thanks. but spark not work. is it necessary to have any configurations to be set? i mean flink work with low memory, spark must also be able to or not? best regards, paul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-vs-flink-low-memory-available-tp24208.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkR -Graphx Connected components
To be part of a strongly connected component every vertex must be reachable from every other vertex. Vertex 6 is not reachable from the other components of scc 0. Same goes for 7. So both 6 and 7 form their own strongly connected components. 6 and 7 are part of the connected components of 0 and 3 respectively. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-Connected-components-tp24165p24209.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Wish for 1.4: upper bound on # tasks in Mesos
Hello all, As a quick follow up for this, I have been using Spark on Yarn till now and am currently exploring Mesos and Marathon. Using yarn, we could tell the spark job about the number of executors and number of cores as well, is there a way to do it on mesos? I'm using Spark 1.4.1 on Mesos 0.23.0 and Marathon 0.9. When we launch a marathon app, is there a way we can tell it the max number of cores per executor ( which comes down to maximum number of tasks per executor (for each instance of app)) - please correct me if I am wrong. - I have been browsing over various documentation details but did not come across a direct solution. - Is there a JIRA issue in progress already/a bug fix for the same? I greatly appreciate any help and would love to follow up/investigate further if you can suggest if there is any JIRA issue already or any pointers.. On Wed, May 20, 2015 at 8:27 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: To put this on the devs' radar, I suggest creating a JIRA for it (and checking first if one already exists). issues.apache.org/jira/ Nick On Tue, May 19, 2015 at 1:34 PM Matei Zaharia matei.zaha...@gmail.com wrote: Yeah, this definitely seems useful there. There might also be some ways to cap the application in Mesos, but I'm not sure. Matei On May 19, 2015, at 1:11 PM, Thomas Dudziak tom...@gmail.com wrote: I'm using fine-grained for a multi-tenant environment which is why I would welcome the limit of tasks per job :) cheers, Tom On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Tom, Are you using the fine-grained or coarse-grained scheduler? For the coarse-grained scheduler, there is a spark.cores.max config setting that will limit the total # of cores it grabs. This was there in earlier versions too. Matei On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote: I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom -- Regards, Haripriya Ayyalasomayajula
Re: spark vs flink low memory available
Pa: Can you try 1.5.0 SNAPSHOT ? See SPARK-7075 Project Tungsten (Spark 1.5 Phase 1) Cheers On Tue, Aug 11, 2015 at 12:49 AM, jun kit...@126.com wrote: your detail of log file? At 2015-08-10 22:02:16, Pa Rö paul.roewer1...@googlemail.com wrote: hi community, i have build a spark and flink k-means application. my test case is a clustering on 1 million points on 3node cluster. in memory bottlenecks begins flink to outsource to disk and work slowly but works. however spark lose executers if the memory is full and starts again (infinety loop?). i try to customize the memory setting with the help from the mailing list here http://mail-archives.us.apache.org/mod_mbox/spark-user/201508.mbox/%3ccah2_pykqhfr4tbvpbt2tdhgm+zrkcbzfnk7uedkjpdhe472...@mail.gmail.com%3E, thanks. but spark not work. is it necessary to have any configurations to be set? i mean flink work with low memory, spark must also be able to or not? best regards, paul
Re: Spark Streaming dealing with broken files without dying
You can do something like this: val fStream = ssc.textFileStream(/sigmoid/data/) .map(x = { try{ //Move all the transformations within a try..catch }catch{ case e: Exception = { logError(Whoops!! ); null } } }) Thanks Best Regards On Mon, Aug 10, 2015 at 7:44 PM, Mario Pastorelli mario.pastore...@teralytics.ch wrote: Hey Sparkers, I would like to use Spark Streaming in production to observe a directory and process files that are put inside it. The problem is that some of those files can be broken leading to a IOException from the input reader. This should be fine for the framework I think: the exception should be caught by Spark Streaming and logged somewhere and the file causing the problem should be skipped. Instead, when the exception is thrown the job is aborted with error and no other files are processed. Ideally I would like to have my Spark Streaming job to run forever and if something is not readable, to just log it but stay alive. How can I achieve this? The stack of the errors that kill my job is similar to 15/08/09 23:42:27 ERROR o.a.s.e.Executor Exception in task 823.0 in stage 0.0 (TID 823) java.io.IOException: unexpected end of stream at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:971) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:506) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:335) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:425) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:485) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:185) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thank you, Mario
AW: Spark GraphX memory requirements + java.lang.OutOfMemoryError: GC overhead limit exceeded
Hi – I'd like to follow up on this, as I am running into very similar issues (with a much bigger data set, though – 10^5 nodes, 10^7 edges). So let me repost the question: Any ideas on how to estimate graphx memory requirements? Cheers! Von: Roman Sokolov [mailto:ole...@gmail.com] Gesendet: Samstag, 11. Juli 2015 03:58 An: Ted Yu; Robin East; user Betreff: Re: Spark GraphX memory requirements + java.lang.OutOfMemoryError: GC overhead limit exceeded Hello again. So I could compute triangle numbers when run the code from spark shell without workers (with --driver-memory 15g option), but with workers I have errors. So I run spark shell: ./bin/spark-shell --master spark://192.168.0.31:7077http://192.168.0.31:7077 --executor-memory 6900m --driver-memory 15g and workers (by hands): ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://192.168.0.31:7077http://192.168.0.31:7077 (2 workers, each has 8Gb RAM; master has 32 Gb RAM). The code now is: import org.apache.spark._ import org.apache.spark.graphx._ val graph = GraphLoader.edgeListFile(sc, /home/data/graph.txt).partitionBy(PartitionStrategy.RandomVertexCut) val newgraph = graph.convertToCanonicalEdges() val triangleNum = newgraph.triangleCount().vertices.map(x = x._2.toLong).reduce(_ + _)/3 So how to understand what amount of memory is needed? And why I need it so much? Dataset is only 1,1Gb small... Error: [Stage 7: (0 + 8) / 32]15/07/11 01:48:45 WARN TaskSetManager: Lost task 2.0 in stage 7.0 (TID 130, 192.168.0.28): io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.OutOfMemoryError at sun.misc.Unsafe.allocateMemory(Native Method) at java.nio.DirectByteBuffer.init(DirectByteBuffer.java:127) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:440) at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:187) at io.netty.buffer.PoolArena.allocate(PoolArena.java:165) at io.netty.buffer.PoolArena.reallocate(PoolArena.java:277) at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:108) at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:146) ... 10 more On 26 June 2015 at 14:06, Roman Sokolov ole...@gmail.commailto:ole...@gmail.com wrote: Yep, I already found it. So I added 1 line: val graph = GraphLoader.edgeListFile(sc, , ...) val newgraph = graph.convertToCanonicalEdges() and could successfully count triangles on newgraph. Next will test it on bigger (several Gb) networks. I am using Spark 1.3 and 1.4 but haven't seen this function in https://spark.apache.org/docs/latest/graphx-programming-guide.html Thanks a lot guys! Am 26.06.2015 13:50 schrieb Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com: See SPARK-4917 which went into Spark 1.3.0 On Fri, Jun 26, 2015 at 2:27 AM, Robin East robin.e...@xense.co.ukmailto:robin.e...@xense.co.uk wrote: You’ll get this issue if you just take the first 2000 lines of that file. The problem is triangleCount() expects srdId dstId which is not the case in the file (e.g. vertex 28). You can get round this by calling graph.convertToCanonical Edges() which removes bi-directional edges and ensures srcId dstId. Which version of Spark are you on?
Re: Controlling number of executors on Mesos vs YARN
Hi Tim, Spark on Yarn allows us to do it using --num-executors and --executor_cores commandline arguments. I just got a chance to look at a similar spark user list mail, but no answer yet. So does mesos allow setting the number of executors and cores? Is there a default number it assumes? On Mon, Jan 5, 2015 at 5:07 PM, Tim Chen t...@mesosphere.io wrote: Forgot to hit reply-all. -- Forwarded message -- From: Tim Chen t...@mesosphere.io Date: Sun, Jan 4, 2015 at 10:46 PM Subject: Re: Controlling number of executors on Mesos vs YARN To: mvle m...@us.ibm.com Hi Mike, You're correct there is no such setting in for Mesos coarse grain mode, since the assumption is that each node is launched with one container and Spark is launching multiple tasks in that container. In fine-grain mode there isn't a setting like that, as it currently will launch an executor as long as it satisfies the minimum container resource requirement. I've created a JIRA earlier about capping the number of executors or better distribute the # of executors launched in each node. Since the decision of choosing what node to launch containers is all in the Spark scheduler side, it's very easy to modify it. Btw, what's the configuration to set the # of executors on YARN side? Thanks, Tim On Sun, Jan 4, 2015 at 9:37 PM, mvle m...@us.ibm.com wrote: I'm trying to compare the performance of Spark running on Mesos vs YARN. However, I am having problems being able to configure the Spark workload to run in a similar way on Mesos and YARN. When running Spark on YARN, you can specify the number of executors per node. So if I have a node with 4 CPUs, I can specify 6 executors on that node. When running Spark on Mesos, there doesn't seem to be an equivalent way to specify this. In Mesos, you can somewhat force this by specifying the number of CPU resources to be 6 when running the slave daemon. However, this seems to be a static configuration of the Mesos cluster rather something that can be configured in the Spark framework. So here is my question: For Spark on Mesos, am I correct that there is no way to control the number of executors per node (assuming an idle cluster)? For Spark on Mesos coarse-grained mode, there is a way to specify max_cores but that is still not equivalent to specifying the number of executors per node as when Spark is run on YARN. If I am correct, then it seems Spark might be at a disadvantage running on Mesos compared to YARN (since it lacks the fine tuning ability provided by YARN). Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Controlling-number-of-executors-on-Mesos-vs-YARN-tp20966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Haripriya Ayyalasomayajula
Python3 Spark execution problems
Hi, I have been trying to use spark for the processing I need to do in some logs, and I have found several difficulties during the process. Most of them I could overcome them, but I am really stuck in the last one. I would really like to know how spark is supposed to be deployed. For now, I have a ssh key in the master that can login in any worker. start-master.sh and start-slaves.sh work. According to the docs, I crafted the following command: ~/projects/bigdata/spark/spark/bin/spark-submit --py-files /home/javier/projects/bigdata/bdml/dist/bdml-0.0.1.zip --master='spark:// 10.0.0.71:7077' ml/spark_pipeline.py /srv/bdml/raw2json/json-logs.gz First, when I tried to deploy my project, it was an impossible quest. I was all the time getting module import errors: Traceback (most recent call last): File /home/javier/projects/bigdata/bdml/ml/spark_pipeline.py, line 10, in module from .files import get_interesting_files I tried everything, but there was a moment when I had to hop into scala code to trace that error. Therefore I just merged all the functions of the project in one file. Then I started to get the following error: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.0.0.73): org.apache.spark.api.python.PythonExce ption: Traceback (most recent call last): File /root/spark/python/lib/pyspark.zip/pyspark/worker.py, line 64, in main (%d.%d % sys.version_info[:2], version)) Exception: Python in worker has different version 2.7 than that in driver 3.4, PySpark cannot run with different minor versions I have specified #!/usr/bin/env python3 in the top of the file, and my spark-env.sh on each worker contains the following lines. SPARK_MASTER_IP=10.0.0.71 export PYSPARK_PYTHON=python3.4 PYSPARK_PYTHON=python3.4 export PYTHONHASHSEED=123 PYTHONHASHSEED=123 I had to specify the PYTHONHASHSEED because it wasn't propagating to the workers. I hope you can help me, [image: Fon] http://www.fon.com/Javier Domingo CansinoResearch Development Engineer+34 946545847Skype: javier.domingo.fonAll information in this email is confidential http://corp.fon.com/legal/email-disclaimer
Error while output JavaDStream to disk and mongodb
Hi, I have successfully reduced my data and store it in JavaDStreamBSONObject Now, i want to save this data in mongodb for this i have used BSONObject type. But, when i try to save it, it is giving exception. For this, i also try to save it just as *saveAsTextFile *but same exception. Error Log : attached full log file Excerpt from log file. 2015-08-11 11:18:52,663 INFO (org.apache.spark.storage.BlockManagerMaster:59) - Updated info of block broadcast_4_piece0 2015-08-11 11:18:52,664 INFO (org.apache.spark.SparkContext:59) - Created broadcast 4 from broadcast at DAGScheduler.scala:839 2015-08-11 11:18:52,664 INFO (org.apache.spark.scheduler.DAGScheduler:59) - Submitting 2 missing tasks from Stage 7 (MapPartitionsRDD[5] at foreach at DirectStream.java:167) 2015-08-11 11:18:52,664 INFO (org.apache.spark.scheduler.TaskSchedulerImpl:59) - Adding task set 7.0 with 2 tasks 2015-08-11 11:18:52,665 INFO (org.apache.spark.scheduler.TaskSetManager:59) - Starting task 0.0 in stage 7.0 (TID 5, localhost, PROCESS_LOCAL, 1056 bytes) 2015-08-11 11:18:52,666 INFO (org.apache.spark.scheduler.TaskSetManager:59) - Starting task 1.0 in stage 7.0 (TID 6, localhost, PROCESS_LOCAL, 1056 bytes) 2015-08-11 11:18:52,666 INFO (org.apache.spark.executor.Executor:59) - Running task 0.0 in stage 7.0 (TID 5) 2015-08-11 11:18:52,666 INFO (org.apache.spark.executor.Executor:59) - Running task 1.0 in stage 7.0 (TID 6) 2015-08-11 11:18:52,827 INFO (org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Getting 2 non-empty blocks out of 2 blocks 2015-08-11 11:18:52,828 INFO (org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Started 0 remote fetches in 1 ms 2015-08-11 11:18:52,846 INFO (org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Getting 2 non-empty blocks out of 2 blocks 2015-08-11 11:18:52,847 INFO (org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Started 0 remote fetches in 1 ms 2015-08-11 11:18:52,965 ERROR (org.apache.spark.executor.Executor:96) - Exception in task 1.0 in stage 7.0 (TID 6) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:445) at org.apache.hadoop.util.Shell.run(Shell.java:418) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650) at org.apache.hadoop.util.Shell.execCommand(Shell.java:739) at org.apache.hadoop.util.Shell.execCommand(Shell.java:722) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:633) at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:467) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-08-11 11:18:52,965 ERROR (org.apache.spark.executor.Executor:96) - Exception in task 0.0 in stage 7.0 (TID 5) java.lang.NullPointerException Code for saving output : // for MongoDB Configuration outputConfig = new Configuration(); outputConfig.set(mongo.output.uri, mongodb://localhost:27017/test.spark); outputConfig.set(mongo.output.format, com.mongodb.hadoop.MongoOutputFormat); JavaDStreamBSONObject suspectedStream suspectedStream.foreach(new FunctionJavaRDDBSONObject, Void() { private static final long serialVersionUID = 4414703053334523053L; @Override public Void call(JavaRDDBSONObject rdd) throws Exception { logger.info(rdd.first()); rdd.saveAsTextFile(E://); rdd.saveAsNewAPIHadoopFile(, Object.class, BSONObject.class, MongoOutputFormat.class,outputConfig); return null; } }); Regards, Deepesh 2015-08-11 11:18:52,265 INFO (org.apache.spark.streaming.scheduler.JobScheduler:59) - Finished job streaming job 143927213 ms.1 from job set of time 143927213 ms 2015-08-11 11:18:52,265 INFO (org.apache.spark.streaming.scheduler.JobScheduler:59) - Starting job streaming job 143927213 ms.2 from job set
Re: Wish for 1.4: upper bound on # tasks in Mesos
Consider the spark.max.cores configuration option -- it should do what you require. On Tue, Aug 11, 2015 at 8:26 AM, Haripriya Ayyalasomayajula aharipriy...@gmail.com wrote: Hello all, As a quick follow up for this, I have been using Spark on Yarn till now and am currently exploring Mesos and Marathon. Using yarn, we could tell the spark job about the number of executors and number of cores as well, is there a way to do it on mesos? I'm using Spark 1.4.1 on Mesos 0.23.0 and Marathon 0.9. When we launch a marathon app, is there a way we can tell it the max number of cores per executor ( which comes down to maximum number of tasks per executor (for each instance of app)) - please correct me if I am wrong. - I have been browsing over various documentation details but did not come across a direct solution. - Is there a JIRA issue in progress already/a bug fix for the same? I greatly appreciate any help and would love to follow up/investigate further if you can suggest if there is any JIRA issue already or any pointers.. On Wed, May 20, 2015 at 8:27 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: To put this on the devs' radar, I suggest creating a JIRA for it (and checking first if one already exists). issues.apache.org/jira/ Nick On Tue, May 19, 2015 at 1:34 PM Matei Zaharia matei.zaha...@gmail.com wrote: Yeah, this definitely seems useful there. There might also be some ways to cap the application in Mesos, but I'm not sure. Matei On May 19, 2015, at 1:11 PM, Thomas Dudziak tom...@gmail.com wrote: I'm using fine-grained for a multi-tenant environment which is why I would welcome the limit of tasks per job :) cheers, Tom On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Tom, Are you using the fine-grained or coarse-grained scheduler? For the coarse-grained scheduler, there is a spark.cores.max config setting that will limit the total # of cores it grabs. This was there in earlier versions too. Matei On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote: I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom -- Regards, Haripriya Ayyalasomayajula
Re: Differents of loading data
Load data to where? To spark? If you are referring to spark, then there are some differences the way the connector is implemented. When you use spark, the most important thing that you get is the parallelism (depending on the number of partitions). If you compare it with a native java driver then you will find it slower compared to spark. Thanks Best Regards On Mon, Aug 10, 2015 at 4:31 PM, 李铖 lidali...@gmail.com wrote: What is the differents of loading data using jdbc and loading data using spard data source api? or differents of loading data using mongo-hadoop and loading data using native java driver? Which way is better?
Re: spark vs flink low memory available
my first post is here and a log too: http://mail-archives.us.apache.org/mod_mbox/spark-user/201508.mbox/%3ccah2_pykqhfr4tbvpbt2tdhgm+zrkcbzfnk7uedkjpdhe472...@mail.gmail.com%3E i use cloudera live, i think i can not use spark 1.5. i will try to run it again and post the current logfile here. 2015-08-11 10:03 GMT+02:00 Ted Yu yuzhih...@gmail.com: Pa: Can you try 1.5.0 SNAPSHOT ? See SPARK-7075 Project Tungsten (Spark 1.5 Phase 1) Cheers On Tue, Aug 11, 2015 at 12:49 AM, jun kit...@126.com wrote: your detail of log file? At 2015-08-10 22:02:16, Pa Rö paul.roewer1...@googlemail.com wrote: hi community, i have build a spark and flink k-means application. my test case is a clustering on 1 million points on 3node cluster. in memory bottlenecks begins flink to outsource to disk and work slowly but works. however spark lose executers if the memory is full and starts again (infinety loop?). i try to customize the memory setting with the help from the mailing list here http://mail-archives.us.apache.org/mod_mbox/spark-user/201508.mbox/%3ccah2_pykqhfr4tbvpbt2tdhgm+zrkcbzfnk7uedkjpdhe472...@mail.gmail.com%3E, thanks. but spark not work. is it necessary to have any configurations to be set? i mean flink work with low memory, spark must also be able to or not? best regards, paul
Re: ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
okay. Then do you have any idea how to avoid this error? Thanks On Tue, Aug 11, 2015 at 12:08 AM, Tathagata Das t...@databricks.com wrote: I think this may be expected. When the streaming context is stopped without the SparkContext, then the receivers are stopped by the driver. The receiver sends back the message that it has been stopped. This is being (probably incorrectly) logged with ERROR level. On Sun, Aug 9, 2015 at 12:52 AM, Sadaf sa...@platalytics.com wrote: Hi When i tried to stop spark streaming using ssc.stop(false,true) It gives the following error. ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver 15/08/07 13:41:11 WARN ReceiverSupervisorImpl: Stopped executor without error 15/08/07 13:41:20 WARN WriteAheadLogManager : Failed to write to write ahead log I've implemented Streaming Listener and a Custom Receiver. Does anyone has idea about this? Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-ReceiverTracker-Deregistered-receiver-for-stream-0-Stopped-by-driver-tp24183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Java Streaming Context - File Stream use
Like this: (Including the filter function) JavaPairInputDStreamLongWritable, Text inputStream = ssc.fileStream( testDir.toString(), LongWritable.class, Text.class, TextInputFormat.class, new FunctionPath, Boolean() { @Override public Boolean call(Path v1) throws Exception { return Boolean.TRUE; } }, true); Thanks Best Regards On Tue, Aug 11, 2015 at 1:10 AM, Ashish Soni asoni.le...@gmail.com wrote: Please help as not sure what is incorrect with below code as it gives me complilaton error in eclipse SparkConf sparkConf = new SparkConf().setMaster(local[4]).setAppName(JavaDirectKafkaWordCount); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); *jssc.fileStream(/home/, String.class, String.class, TextInputFormat.class);*
Python3 Spark execution problems
Hi, I have been trying to use spark for the processing I need to do in some logs, and I have found several difficulties during the process. Most of them I could overcome them, but I am really stuck in the last one. I would really like to know how spark is supposed to be deployed. For now, I have a ssh key in the master that can login in any worker. start-master.sh and start-slaves.sh work. According to the docs, I crafted the following command: ~/projects/bigdata/spark/spark/bin/spark-submit --py-files /home/javier/projects/bigdata/bdml/dist/bdml-0.0.1.zip --master='spark:// 10.0.0.71:7077' ml/spark_pipeline.py /srv/bdml/raw2json/json-logs.gz First, when I tried to deploy my project, it was an impossible quest. I was all the time getting module import errors: Traceback (most recent call last): File /home/javier/projects/bigdata/bdml/ml/spark_pipeline.py, line 10, in module from .files import get_interesting_files I tried everything, but there was a moment when I had to hop into scala code to trace that error. Therefore I just merged all the functions of the project in one file. Then I started to get the following error: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.0.0.73): org.apache.spark.api.python.PythonExce ption: Traceback (most recent call last): File /root/spark/python/lib/pyspark.zip/pyspark/worker.py, line 64, in main (%d.%d % sys.version_info[:2], version)) Exception: Python in worker has different version 2.7 than that in driver 3.4, PySpark cannot run with different minor versions I have specified #!/usr/bin/env python3 in the top of the file, and my spark-env.sh on each worker contains the following lines. SPARK_MASTER_IP=10.0.0.71 export PYSPARK_PYTHON=python3.4 PYSPARK_PYTHON=python3.4 export PYTHONHASHSEED=123 PYTHONHASHSEED=123 I had to specify the PYTHONHASHSEED because it wasn't propagating to the workers. I hope you can help me, [image: Fon] http://www.fon.com/Javier Domingo CansinoResearch Development Engineer+34 946545847Skype: javier.domingo.fonAll information in this email is confidential http://corp.fon.com/legal/email-disclaimer
Re: Inquery about contributing codes
You can create a new Issue and send a pull request for the same i think. + dev list Thanks Best Regards On Tue, Aug 11, 2015 at 8:32 AM, Hyukjin Kwon gurwls...@gmail.com wrote: Dear Sir / Madam, I have a plan to contribute some codes about passing filters to a datasource as physical planning. In more detail, I understand when we want to build up filter operations from data like Parquet (when actually reading and filtering HDFS blocks at first not filtering in memory with Spark operations), we need to implement PrunedFilteredScan, PrunedScan or CatalystScan in package org.apache.spark.sql.sources. For PrunedFilteredScan and PrunedScan, it pass the filter objects in package org.apache.spark.sql.sources, which do not access directly to the query parser but are objects built by selectFilters() in package org.apache.spark.sql.sources.DataSourceStrategy. It looks all the filters (rather raw expressions) do not pass to the function below in PrunedFilteredScan and PrunedScan. def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] The passing filters in here are defined in package org.apache.spark.sql.sources. On the other hand, it does not pass EqualNullSafe filter in package org.apache.spark.sql.catalyst.expressions even though this looks possible to pass for other datasources such as Parquet and JSON. I understand that CatalystScan can take the all raw expression accessing to the query planner. However, it is experimental and also it needs different interfaces (as well as unstable for the reasons such as binary capability). As far as I know, Parquet also does not use this. In general, this can be a issue as a user send a query to data such as 1. SELECT * FROM table WHERE field = 1; 2. SELECT * FROM table WHERE field = 1; The second query can be hugely slow because of large network traffic by not filtered data from the source RDD. Also,I could not find a proper issue for this (except for https://issues.apache.org/jira/browse/SPARK-8747) which says it supports now binary capability. Accordingly, I want to add this issue and make a pull request with my codes. Could you please make any comments for this? Thanks.
Re: mllib kmeans produce 1 large and many extremely small clusters
Hi, The issue is very likely to be in the data or the transformations you apply, rather than anything to do with the Spark Kmeans API as such. I'd start debugging by doing a bit of exploratory analysis of the TFIDF vectors. That is, for instance, plot the distribution (histogram) of the TFIDF values for each word in the vectors. It's quite possible that the TFIDF values for most words for most documents are the same in your case, causing all your 5000 points to crowd around the same region in the n-dimensional space that they live in. On 10 August 2015 at 10:28, farhan farhan_siddi...@hotmail.com wrote: I tried running mllib k-means with 20newsgroups data set from sklearn. On a 5000 document data set I get one cluster with most of the documents and other clusters just have handful of documents. #code newsgroups_train = fetch_20newsgroups(subset='train',random_state=1,remove=('headers', 'footers', 'quotes')) small_list = random.sample(newsgroups_train.data,5000) def get_word_vec(text,vocabulary): word_lst = tokenize_line(text) word_counter = Counter(word_lst) lst = [] for v in vocabulary: if v in word_counter: lst.append(word_counter[v]) else: lst.append(0) return lst docsrdd = sc.parallelize(small_list) tf = docsrdd.map(lambda x : get_word_vec(x,vocabulary)) idf = IDF().fit(tf) tfidf = idf.transform(tf) clusters = KMeans.train(tfidf, 20) #documents in each cluster, using clusters.predict(x) Counter({0: 4978, 11: 3, 9: 2, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 10: 1, 12: 1, 13: 1, 14: 1, 15: 1, 16: 1, 17: 1, 18: 1, 19: 1}) Please Help ! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-kmeans-produce-1-large-and-many-extremely-small-clusters-tp24189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 答复: Package Release Annoucement: Spark SQL on HBase Astro
HBase will not have query engine. It will provide better support to query engines. Cheers On Aug 10, 2015, at 11:11 PM, Yan Zhou.sc yan.zhou...@huawei.com wrote: Ted, I’m in China now, and seem to experience difficulty to access Apache Jira. Anyways, it appears to me that HBASE-14181 attempts to support Spark DataFrame inside HBase. If true, one question to me is whether HBase is intended to have a built-in query engine or not. Or it will stick with the current way as a k-v store with some built-in processing capabilities in the forms of coprocessor, custom filter, …, etc., which allows for loosely-coupled query engines built on top of it. Thanks, 发件人: Ted Yu [mailto:yuzhih...@gmail.com] 发送时间: 2015年8月11日 8:54 收件人: Bing Xiao (Bing) 抄送: d...@spark.apache.org; user@spark.apache.org; Yan Zhou.sc 主题: Re: Package Release Annoucement: Spark SQL on HBase Astro Yan / Bing: Mind taking a look at HBASE-14181 'Add Spark DataFrame DataSource to HBase-Spark Module' ? Thanks On Wed, Jul 22, 2015 at 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote: We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed “Astro”, include: · Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique · HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing · SQL, Data Frame support · More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) · Joins with data from other sources · Python/Java/Scala support · Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the “Issues” “Pull Requests” links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. “Astro”, the Spark SQL on HBase package will be useful for ultra low latency query and analytics of large scale data sets in vertical enterprises. We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
Fwd: How to minimize shuffling on Spark dataframe Join?
I have two dataframes like this student_rdf = (studentid, name, ...) student_result_rdf = (studentid, gpa, ...) we need to join this two dataframes. we are now doing like this, student_rdf.join(student_result_rdf, student_result_rdf[studentid] == student_rdf[studentid]) So it is simple. But it creates lots of data shuffling across worker nodes, but as joining key is similar and if the dataframe could (understand the partitionkey) be partitioned using that key (studentid) then there suppose not to be any shuffling at all. As similar data (based on partition key) would reside in similar node. is it possible, to hint spark to do this? So, I am finding the way to partition data based on a column while I read a dataframe from input. And If it is possible that Spark would understand that two partitionkey of two dataframes are similar, then how? -- Abdullah
Re: Spark with GCS Connector - Rate limit error
There's a daily quota and a minutely quota, you could be hitting those. You can ask google to increase the quota for the particular service. Now, to reduce the limit from the spark side, you can actually to a re-partition to a smaller number before doing the save. Another way to use the local file system rather than the GFS would be to set the spark.local.dir and spark.worker.dir configurations for the job. Thanks Best Regards On Mon, Aug 10, 2015 at 4:39 PM, Oren Shpigel o...@yowza3d.com wrote: Hi, I'm using Spark on a Google Compute Engine cluster with the Google Cloud Storage connector (instead of HDFS, as recommended here https://cloud.google.com/hadoop/google-cloud-storage-connector#benefits ), and get a lot of rate limit errors, as added below. The errors relate to temp files (in a folder called _temporary), and not to my input/output of the program. Is there a way to control the read/write rate of Spark? Is there a way to increase the rate limit for my Google Project? Is there a way to use local Hard-Disk for temp files that don't have to be shared with other slaves? Or anyone knows or thinks of any other solution for that? Thanks, Oren java.io.IOException: Error inserting: bucket: *, object: * at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.wrapException(GoogleCloudStorageImpl.java:1600) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl$3.run(GoogleCloudStorageImpl.java:475) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests { code : 429, errors : [ { domain : usageLimits, message : The total number of changes to the object * exceeds the rate limit. Please reduce the rate of create, update, and delete requests., reason : rateLimitExceeded } ], message : The total number of changes to the object * exceeds the rate limit. Please reduce the rate of create, update, and delete requests. } at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145) at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl$3.run(GoogleCloudStorageImpl.java:472) ... 3 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-GCS-Connector-Rate-limit-error-tp24194.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to specify column type when saving DataFrame as parquet file?
Hi all, I'm using Spark 1.4.1. I create a DataFrame from json file. There is a column C that all values are null in the json file. I found that the datatype of column C in the created DataFrame is string. However, I would like to specify the column as Long when saving it as parquet file. What should I do to specify the column type when saving parquet file? Thank you, Jyun-Fan Tsai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
dse spark-submit multiple jars issue
*HI,* Please let me know if i am missing anything in the command below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* *java.lang.ClassNotFoundException: HelloWorld* at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I understand the way I am giving multiple jar file paths in the command is an issue, please provide an appropriate format for providing multiple jars in the command Thanks for support Satish Chandra
Re: dse spark-submit multiple jars issue
use --verbose, it might give you some insights on what0s happening, [image: Fon] http://www.fon.com/Javier Domingo CansinoResearch Development Engineer+34 946545847Skype: javier.domingo.fonAll information in this email is confidential http://corp.fon.com/legal/email-disclaimer On Tue, Aug 11, 2015 at 2:44 PM, satish chandra j jsatishchan...@gmail.com wrote: HI , I have used --jars option as well, please find the command below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld *--jars* ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar Regards, Satish On Tue, Aug 11, 2015 at 4:08 PM, Javier Domingo Cansino javier.domi...@fon.com wrote: I have no real idea (not java user), but have you tried with the --jars option? http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management AFAIK, you are currently submitting the jar names as arguments to the called Class instead of the jars themselves [image: Fon] http://www.fon.com/Javier Domingo CansinoResearch Development Engineer+34 946545847Skype: javier.domingo.fonAll information in this email is confidential http://corp.fon.com/legal/email-disclaimer On Tue, Aug 11, 2015 at 12:29 PM, satish chandra j jsatishchan...@gmail.com wrote: *HI,* Please let me know if i am missing anything in the command below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* *java.lang.ClassNotFoundException: HelloWorld* at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I understand the way I am giving multiple jar file paths in the command is an issue, please provide an appropriate format for providing multiple jars in the command Thanks for support Satish Chandra
Re: Controlling number of executors on Mesos vs YARN
My experience with Mesos + Spark is not great. I saw one executor with 30 CPU and the other executor with 6. So I don't think you can easily configure it without some tweaking at the source code. Sent from my iPad On 2015-08-11, at 2:38, Haripriya Ayyalasomayajula aharipriy...@gmail.com wrote: Hi Tim, Spark on Yarn allows us to do it using --num-executors and --executor_cores commandline arguments. I just got a chance to look at a similar spark user list mail, but no answer yet. So does mesos allow setting the number of executors and cores? Is there a default number it assumes? On Mon, Jan 5, 2015 at 5:07 PM, Tim Chen t...@mesosphere.io wrote: Forgot to hit reply-all. -- Forwarded message -- From: Tim Chen t...@mesosphere.io Date: Sun, Jan 4, 2015 at 10:46 PM Subject: Re: Controlling number of executors on Mesos vs YARN To: mvle m...@us.ibm.com Hi Mike, You're correct there is no such setting in for Mesos coarse grain mode, since the assumption is that each node is launched with one container and Spark is launching multiple tasks in that container. In fine-grain mode there isn't a setting like that, as it currently will launch an executor as long as it satisfies the minimum container resource requirement. I've created a JIRA earlier about capping the number of executors or better distribute the # of executors launched in each node. Since the decision of choosing what node to launch containers is all in the Spark scheduler side, it's very easy to modify it. Btw, what's the configuration to set the # of executors on YARN side? Thanks, Tim On Sun, Jan 4, 2015 at 9:37 PM, mvle m...@us.ibm.com wrote: I'm trying to compare the performance of Spark running on Mesos vs YARN. However, I am having problems being able to configure the Spark workload to run in a similar way on Mesos and YARN. When running Spark on YARN, you can specify the number of executors per node. So if I have a node with 4 CPUs, I can specify 6 executors on that node. When running Spark on Mesos, there doesn't seem to be an equivalent way to specify this. In Mesos, you can somewhat force this by specifying the number of CPU resources to be 6 when running the slave daemon. However, this seems to be a static configuration of the Mesos cluster rather something that can be configured in the Spark framework. So here is my question: For Spark on Mesos, am I correct that there is no way to control the number of executors per node (assuming an idle cluster)? For Spark on Mesos coarse-grained mode, there is a way to specify max_cores but that is still not equivalent to specifying the number of executors per node as when Spark is run on YARN. If I am correct, then it seems Spark might be at a disadvantage running on Mesos compared to YARN (since it lacks the fine tuning ability provided by YARN). Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Controlling-number-of-executors-on-Mesos-vs-YARN-tp20966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Haripriya Ayyalasomayajula
Re: dse spark-submit multiple jars issue
I have no real idea (not java user), but have you tried with the --jars option? http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management AFAIK, you are currently submitting the jar names as arguments to the called Class instead of the jars themselves [image: Fon] http://www.fon.com/Javier Domingo CansinoResearch Development Engineer+34 946545847Skype: javier.domingo.fonAll information in this email is confidential http://corp.fon.com/legal/email-disclaimer On Tue, Aug 11, 2015 at 12:29 PM, satish chandra j jsatishchan...@gmail.com wrote: *HI,* Please let me know if i am missing anything in the command below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* *java.lang.ClassNotFoundException: HelloWorld* at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I understand the way I am giving multiple jar file paths in the command is an issue, please provide an appropriate format for providing multiple jars in the command Thanks for support Satish Chandra
Do you have any other method to get cpu elapsed time of an spark application
Is there more information about spark evenlog, for example Why did not appear the SparkListenerExecutorRemoved event in evenlog while i use dynamic executor? I want to calculate cpu elapsed time of an application base on evenlog. By the way, Do you have any other method to get cpu elapsed time of an spark application? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Do-you-have-any-other-method-to-get-cpu-elapsed-time-of-an-spark-application-tp24211.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet without hadoop: Possible?
Hi all, I don't have any hadoop fs installed on my environment, but I would like to store dataframes in parquet files. I am failing to do so, if possible, anyone have any pointers? Thank you, Saif
RE: Parquet without hadoop: Possible?
I am launching my spark-shell spark-1.4.1-bin-hadoop2.6/bin/spark-shell 15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF scala data.write.parquet(/var/ data/Saif/pq) Then I get a million errors: 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229) at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470) at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. ... ... . 15/08/11 09:46:10 ERROR DefaultWriterContainer: Task attempt attempt_201508110946__m_11_0 aborted. 15/08/11 09:46:10 ERROR Executor: Exception in task 31.0 in stage 0.0 (TID 31) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:191) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at
Unsupported major.minor version 51.0
I found some discussions online, but it all cpome to advice to use JDF 1.7 (or 1.8). Well, I use JDK 1.7 on OS X Yosemite . Both java –verion // java version 1.7.0_80 Java(TM) SE Runtime Environment (build 1.7.0_80-b15) Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode) and echo $JAVA_HOME// /Library/Java/JavaVirtualMachines/jdk1.7.0_80.jdk/Contents/Home show JDK 1.7. But for the Spark 1.4.1. (and for Spark 1.2.2, downloaded 07/10/2015, I have the same error when build with maven () (as sudo mvn -DskipTests -X clean package abra.txt) Exception in thread main java.lang.UnsupportedClassVersionError: org/apache/maven/cli/MavenCli : Unsupported major.minor version 51.0 Please help how to build the thing. Thanks Alexey This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you.
Re: Unsupported major.minor version 51.0
What does the following command say ? mvn -version Maybe you are using an old maven ? Cheers On Tue, Aug 11, 2015 at 7:55 AM, Yakubovich, Alexey alexey.yakubov...@searshc.com wrote: I found some discussions online, but it all cpome to advice to use JDF 1.7 (or 1.8). Well, I use JDK 1.7 on OS X Yosemite . Both java –verion // java version 1.7.0_80 Java(TM) SE Runtime Environment (build 1.7.0_80-b15) Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode) and echo $JAVA_HOME// /Library/Java/JavaVirtualMachines/jdk1.7.0_80.jdk/Contents/Home show JDK 1.7. But for the Spark 1.4.1. (and for Spark 1.2.2, downloaded 07/10/2015, I have the same error when build with maven () (as sudo mvn -DskipTests -X clean package abra.txt) Exception in thread main java.lang.UnsupportedClassVersionError: org/apache/maven/cli/MavenCli : Unsupported major.minor version 51.0 Please help how to build the thing. Thanks Alexey This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you.
Re: dse spark-submit multiple jars issue
HI, Please find the log details below: dse spark-submit --verbose --master local --class HelloWorld etl-0.0.1-SNAPSHOT.jar --jars file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar file:/home/missingmerch/dse.jar file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar Using properties file: /etc/dse/spark/spark-defaults.conf Adding default property: spark.cassandra.connection.factory=com.datastax.bdp.spark.DseCassandraConnectionFactory Adding default property: spark.ssl.keyStore=.keystore Adding default property: spark.ssl.enabled=false Adding default property: spark.ssl.trustStore=.truststore Adding default property: spark.cassandra.auth.conf.factory=com.datastax.bdp.spark.DseAuthConfFactory Adding default property: spark.ssl.keyPassword=cassandra Adding default property: spark.ssl.keyStorePassword=cassandra Adding default property: spark.ssl.protocol=TLS Adding default property: spark.ssl.useNodeLocalConf=true Adding default property: spark.ssl.trustStorePassword=cassandra Adding default property: spark.ssl.enabledAlgorithms=TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA Parsed arguments: master local deployMode null executorMemory null executorCores null totalExecutorCores null propertiesFile /etc/dse/spark/spark-defaults.conf driverMemory512M driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions -Dcassandra.username=missingmerch -Dcassandra.password=STMbrjrlb -XX:MaxPermSize=256M supervise false queue null numExecutorsnull files null pyFiles null archivesnull mainClass HelloWorld primaryResource file:/home/missingmerch/etl-0.0.1-SNAPSHOT.jar nameHelloWorld childArgs [--jars file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar file:/home/missingmerch/dse.jar file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar] jarsnull verbose true Spark properties used, including those specified through --conf and those from the properties file /etc/dse/spark/spark-defaults.conf: spark.cassandra.connection.factory - com.datastax.bdp.spark.DseCassandraConnectionFactory spark.ssl.useNodeLocalConf - true spark.ssl.enabled - false spark.executor.extraJavaOptions - -XX:MaxPermSize=256M spark.ssl.keyStore - .keystore spark.ssl.trustStore - .truststore spark.ssl.trustStorePassword - cassandra spark.ssl.enabledAlgorithms - TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA spark.cassandra.auth.conf.factory - com.datastax.bdp.spark.DseAuthConfFactory spark.ssl.protocol - TLS spark.ssl.keyPassword - cassandra spark.ssl.keyStorePassword - cassandra Main class: HelloWorld Arguments: --jars file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar file:/home/missingmerch/dse.jar file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar System properties: spark.cassandra.connection.factory - com.datastax.bdp.spark.DseCassandraConnectionFactory spark.driver.memory - 512M spark.ssl.useNodeLocalConf - true spark.ssl.enabled - false SPARK_SUBMIT - true spark.executor.extraJavaOptions - -XX:MaxPermSize=256M spark.app.name - HelloWorld spark.ssl.enabledAlgorithms - TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA spark.ssl.trustStorePassword - cassandra spark.driver.extraJavaOptions - -Dcassandra.username=missingmerch -Dcassandra.password=STMbrjrlb -XX:MaxPermSize=256M spark.ssl.keyStore - .keystore spark.ssl.trustStore - .truststore spark.jars - file:/home/missingmerch/etl-0.0.1-SNAPSHOT.jar spark.cassandra.auth.conf.factory - com.datastax.bdp.spark.DseAuthConfFactory spark.master - local spark.ssl.protocol - TLS spark.ssl.keyPassword - cassandra spark.ssl.keyStorePassword - cassandra Classpath elements: file:/home/missingmerch/etl-0.0.1-SNAPSHOT.jar WARN 2015-08-11 08:23:25 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.ClassNotFoundException: org.postgresql.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190) at HelloWorld$.main(HelloWorld.scala:26) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
RE: Is there any external dependencies for lag() and lead() when using data frames?
Jerry, I was able to use window functions without the hive thrift server. HiveContext does not imply that you need the hive thrift server running. Here’s what I used to test this out: var conf = new SparkConf(true).set(spark.cassandra.connection.host, 127.0.0.1) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) val df = sqlContext .read .format(org.apache.spark.sql.cassandra) .options(Map( table - kv, keyspace - test)) .load() val w = Window.orderBy(value).rowsBetween(-2, 0) I then submitted this using spark-submit. From: Jerry [mailto:jerry.c...@gmail.com] Sent: Monday, August 10, 2015 10:55 PM To: Michael Armbrust Cc: user Subject: Re: Is there any external dependencies for lag() and lead() when using data frames? By the way, if Hive is present in the Spark install, does show up in text when you start the spark shell? Any commands I can run to check if it exists? I didn't setup the spark machine that I use, so I don't know what's present or absent. Thanks, Jerry On Mon, Aug 10, 2015 at 2:38 PM, Jerry jerry.c...@gmail.commailto:jerry.c...@gmail.com wrote: Thanks... looks like I now hit that bug about HiveMetaStoreClient as I now get the message about being unable to instantiate it. On a side note, does anyone know where hive-site.xml is typically located? Thanks, Jerry On Mon, Aug 10, 2015 at 2:03 PM, Michael Armbrust mich...@databricks.commailto:mich...@databricks.com wrote: You will need to use a HiveContext for window functions to work. On Mon, Aug 10, 2015 at 1:26 PM, Jerry jerry.c...@gmail.commailto:jerry.c...@gmail.com wrote: Hello, Using Apache Spark 1.4.1 I'm unable to use lag or lead when making queries to a data frame and I'm trying to figure out if I just have a bad setup or if this is a bug. As for the exceptions I get: when using selectExpr() with a string as an argument, I get NoSuchElementException: key not found: lag and when using the select method and ...spark.sql.functions.lag I get an AnalysisException. If I replace lag with abs in the first case, Spark runs without exception, so none of the other syntax is incorrect. As for how I'm running it; the code is written in Java with a static method that takes the SparkContext as an argument which is used to create a JavaSparkContext which then is used to create an SQLContext which loads a json file from the local disk and runs those queries on that data frame object. FYI: the java code is compiled, jared and then pointed to with -cp when starting the spark shell, so all I do is Test.run(sc) in shell. Let me know what to look for to debug this problem. I'm not sure where to look to solve this problem. Thanks, Jerry
RE: Parquet without hadoop: Possible?
I confirm that it works, I was just having this issue: https://issues.apache.org/jira/browse/SPARK-8450 Saif From: Ellafi, Saif A. Sent: Tuesday, August 11, 2015 12:01 PM To: Ellafi, Saif A.; deanwamp...@gmail.com Cc: user@spark.apache.org Subject: RE: Parquet without hadoop: Possible? Sorry, I provided bad information. This example worked fine with reduced parallelism. It seems my problem have to do with something specific with the real data frame at reading point. Saif From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com] Sent: Tuesday, August 11, 2015 11:49 AM To: deanwamp...@gmail.commailto:deanwamp...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Parquet without hadoop: Possible? I am launching my spark-shell spark-1.4.1-bin-hadoop2.6/bin/spark-shell 15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF scala data.write.parquet(/var/ data/Saif/pq) Then I get a million errors: 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229) at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470) at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. ... ... . 15/08/11
PySpark order-only window function issue
Hello everyone, I am trying to use PySpark API with window functions without specifying partition clause. I mean something equivalent to this SELECT v, row_number() OVER (ORDER BY v) AS rn FROM df in SQL. I am not sure if I am doing something wrong or it is a bug but results are far from what I expect. Lets assume we have data as follows: from pyspark.sql.window import Window from pyspark.sql import functions as f df = sqlContext.createDataFrame( zip([foo] * 5 + [bar] * 5, range(1, 6) + range(6, 11)), (k, v) ).withColumn(dummy, f.lit(1)) df.registerTempTable(df) df.show() +---+--+-+ | k| v|dummy| +---+--+-+ |foo| 1|1| |foo| 2|1| |foo| 3|1| |foo| 4|1| |foo| 5|1| |bar| 6|1| |bar| 7|1| |bar| 8|1| |bar| 9|1| |bar|10|1| +---+--+-+ When I use following SQL query sql_ord = SELECT k, v, row_number() OVER ( ORDER BY v ) AS rn FROM df sqlContext.sql(sql_ord).show() I get expected results: +---+--+--+ | k| v|rn| +---+--+--+ |foo| 1| 1| |foo| 2| 2| |foo| 3| 3| |foo| 4| 4| |foo| 5| 5| |bar| 6| 6| |bar| 7| 7| |bar| 8| 8| |bar| 9| 9| |bar|10|10| +---+--+--+ but when I try to define a similar thing using Python API w_ord = Window.orderBy(v) df.select(k, v, f.rowNumber().over(w_ord).alias(avg)).show() I get results like this: +---+--+---+ | k| v|avg| +---+--+---+ |foo| 1| 1| |foo| 2| 1| |foo| 3| 1| |foo| 4| 1| |foo| 5| 1| |bar| 6| 1| |bar| 7| 1| |bar| 8| 1| |bar| 9| 1| |bar|10| 1| +---+--+---+ When I specify both partition on order w_part_ord = Window.partitionBy(dummy).orderBy(v) df.select(k, v, f.rowNumber().over(w_part_ord).alias(avg)).show() everything works as I expect: +---+--+---+ | k| v|avg| +---+--+---+ |foo| 1| 1| |foo| 2| 2| |foo| 3| 3| |foo| 4| 4| |foo| 5| 5| |bar| 6| 6| |bar| 7| 7| |bar| 8| 8| |bar| 9| 9| |bar|10| 10| +---+--+---+ Another example of similar behavior with correct SQL result: sql_ord_rng = SELECT k, v, avg(v) OVER ( ORDER BY v ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) AS avg FROM df sqlContext.sql(sql_ord_rng).show() +---+--+---+ | k| v|avg| +---+--+---+ |foo| 1|1.5| |foo| 2|2.0| |foo| 3|3.0| |foo| 4|4.0| |foo| 5|5.0| |bar| 6|6.0| |bar| 7|7.0| |bar| 8|8.0| |bar| 9|9.0| |bar|10|9.5| +---+--+---+ and not incorrect PySpark w_ord_rng = Window.orderBy(v).rowsBetween(-1, 1) df.select(k, v, f.avg(v).over(w_ord_rng).alias(avg)).show() +---+--++ | k| v| avg| +---+--++ |foo| 1| 1.0| |foo| 2| 2.0| |foo| 3| 3.0| |foo| 4| 4.0| |foo| 5| 5.0| |bar| 6| 6.0| |bar| 7| 7.0| |bar| 8| 8.0| |bar| 9| 9.0| |bar|10|10.0| +---+--++ Same as before adding dummy partitions solves the problem: w_part_ord_rng = Window.partitionBy(dummy).orderBy(v).rowsBetween(-1, 1) df.select(k, v, f.avg(v).over(w_part_ord_rng).alias(avg)).show() +---+--+---+ | k| v|avg| +---+--+---+ |foo| 1|1.5| |foo| 2|2.0| |foo| 3|3.0| |foo| 4|4.0| |foo| 5|5.0| |bar| 6|6.0| |bar| 7|7.0| |bar| 8|8.0| |bar| 9|9.0| |bar|10|9.5| +---+--+---+ I've checked window functions tests (https://github.com/apache/spark/blob/ac507a03c3371cd5404ca195ee0ba0306badfc23/python/pyspark/sql/tests.py#L1105) but these cover only partition + order case. Is there something wrong with my window definitions or should I open Jira issue? Environment: - Debian GNU/Linux - Spark 1.4.1 - Python 2.7.9 - OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-1~deb8u1) -- Best, Maciej signature.asc Description: OpenPGP digital signature
RE: Is there any external dependencies for lag() and lead() when using data frames?
I forgot to mention, my setup was: - Spark 1.4.1 running in standalone mode - Datastax spark cassandra connector 1.4.0-M1 - Cassandra DB - Scala version 2.10.4 From: Benjamin Ross Sent: Tuesday, August 11, 2015 10:16 AM To: Jerry; Michael Armbrust Cc: user Subject: RE: Is there any external dependencies for lag() and lead() when using data frames? Jerry, I was able to use window functions without the hive thrift server. HiveContext does not imply that you need the hive thrift server running. Here’s what I used to test this out: var conf = new SparkConf(true).set(spark.cassandra.connection.host, 127.0.0.1) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) val df = sqlContext .read .format(org.apache.spark.sql.cassandra) .options(Map( table - kv, keyspace - test)) .load() val w = Window.orderBy(value).rowsBetween(-2, 0) I then submitted this using spark-submit. From: Jerry [mailto:jerry.c...@gmail.com] Sent: Monday, August 10, 2015 10:55 PM To: Michael Armbrust Cc: user Subject: Re: Is there any external dependencies for lag() and lead() when using data frames? By the way, if Hive is present in the Spark install, does show up in text when you start the spark shell? Any commands I can run to check if it exists? I didn't setup the spark machine that I use, so I don't know what's present or absent. Thanks, Jerry On Mon, Aug 10, 2015 at 2:38 PM, Jerry jerry.c...@gmail.commailto:jerry.c...@gmail.com wrote: Thanks... looks like I now hit that bug about HiveMetaStoreClient as I now get the message about being unable to instantiate it. On a side note, does anyone know where hive-site.xml is typically located? Thanks, Jerry On Mon, Aug 10, 2015 at 2:03 PM, Michael Armbrust mich...@databricks.commailto:mich...@databricks.com wrote: You will need to use a HiveContext for window functions to work. On Mon, Aug 10, 2015 at 1:26 PM, Jerry jerry.c...@gmail.commailto:jerry.c...@gmail.com wrote: Hello, Using Apache Spark 1.4.1 I'm unable to use lag or lead when making queries to a data frame and I'm trying to figure out if I just have a bad setup or if this is a bug. As for the exceptions I get: when using selectExpr() with a string as an argument, I get NoSuchElementException: key not found: lag and when using the select method and ...spark.sql.functions.lag I get an AnalysisException. If I replace lag with abs in the first case, Spark runs without exception, so none of the other syntax is incorrect. As for how I'm running it; the code is written in Java with a static method that takes the SparkContext as an argument which is used to create a JavaSparkContext which then is used to create an SQLContext which loads a json file from the local disk and runs those queries on that data frame object. FYI: the java code is compiled, jared and then pointed to with -cp when starting the spark shell, so all I do is Test.run(sc) in shell. Let me know what to look for to debug this problem. I'm not sure where to look to solve this problem. Thanks, Jerry
Re: Questions about SparkSQL join on not equality conditions
Hi, After taking a look at the code, I found out the problem: As spark will use broadcastNestedLoopJoin to treat nonequality condition. And one of my dataframe(df1) is created from an existing RDD(logicalRDD), so it uses defaultSizeInBytes * length to estimate the size. The other dataframe(df2) that I use is created from hive table(about 1G). Therefore spark think df1 is larger than df2, although df1 is very small. As a result, spark try to do df2.collect(), which causes the error. Hope this could be helpful Cheers Gen On Mon, Aug 10, 2015 at 11:29 PM, gen tang gen.tan...@gmail.com wrote: Hi, I am sorry to bother again. When I do join as follow: df = sqlContext.sql(selet a.someItem, b.someItem from a full outer join b on condition1 *or* condition2) df.first() The program failed at the result size is bigger than spark.driver.maxResultSize. It is really strange, as one record is no way bigger than 1G. When I do join on just one condition or equity condition, there will be no problem. Could anyone help me, please? Thanks a lot in advance. Cheers Gen On Sun, Aug 9, 2015 at 9:08 PM, gen tang gen.tan...@gmail.com wrote: Hi, I might have a stupid question about sparksql's implementation of join on not equality conditions, for instance condition1 or condition2. In fact, Hive doesn't support such join, as it is very difficult to express such conditions as a map/reduce job. However, sparksql supports such operation. So I would like to know how spark implement it. As I observe such join runs very slow, I guess that spark implement it by doing filter on the top of cartesian product. Is it true? Thanks in advance for your help. Cheers Gen
Spark runs into an Infinite loop even if the tasks are completed successfully
Hi My Spark job (running in local[*] with spark 1.4.1) reads data from a thrift server(Created an RDD, it will compute the partitions in getPartitions() call and in computes hasNext will return records from these partitions), count(), foreach() is working fine it returns the correct number of records. But whenever there is shuffleMap stage (like reduceByKey etc.) then all the tasks are executing properly but it enters in an infinite loop saying : 1. 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1 (map at FilterMain.scala:59) because some of its tasks had failed: 0, 3 Here's the complete stack-trace http://pastebin.com/hyK7cG8S What could be the root cause of this problem? I looked up and bumped into this closed JIRA https://issues.apache.org/jira/browse/SPARK-583 (which is very very old) Thanks Best Regards
Re: Parquet without hadoop: Possible?
It should work fine. I have an example script here: https://github.com/deanwampler/spark-workshop/blob/master/src/main/scala/sparkworkshop/SparkSQLParquet10-script.scala (Spark 1.4.X) What does I am failing to do so mean? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Aug 11, 2015 at 9:28 AM, saif.a.ell...@wellsfargo.com wrote: Hi all, I don’t have any hadoop fs installed on my environment, but I would like to store dataframes in parquet files. I am failing to do so, if possible, anyone have any pointers? Thank you, Saif
Re: Controlling number of executors on Mesos vs YARN
Spark evolved as an example framework for Mesos - thats how I know it. It is surprising to see that the options provided by mesos in this case are less. Tweaking the source code, haven't done it yet but I would love to see what options could be there! On Tue, Aug 11, 2015 at 5:42 AM, Jerry Lam chiling...@gmail.com wrote: My experience with Mesos + Spark is not great. I saw one executor with 30 CPU and the other executor with 6. So I don't think you can easily configure it without some tweaking at the source code. Sent from my iPad On 2015-08-11, at 2:38, Haripriya Ayyalasomayajula aharipriy...@gmail.com wrote: Hi Tim, Spark on Yarn allows us to do it using --num-executors and --executor_cores commandline arguments. I just got a chance to look at a similar spark user list mail, but no answer yet. So does mesos allow setting the number of executors and cores? Is there a default number it assumes? On Mon, Jan 5, 2015 at 5:07 PM, Tim Chen t...@mesosphere.io wrote: Forgot to hit reply-all. -- Forwarded message -- From: Tim Chen t...@mesosphere.io Date: Sun, Jan 4, 2015 at 10:46 PM Subject: Re: Controlling number of executors on Mesos vs YARN To: mvle m...@us.ibm.com Hi Mike, You're correct there is no such setting in for Mesos coarse grain mode, since the assumption is that each node is launched with one container and Spark is launching multiple tasks in that container. In fine-grain mode there isn't a setting like that, as it currently will launch an executor as long as it satisfies the minimum container resource requirement. I've created a JIRA earlier about capping the number of executors or better distribute the # of executors launched in each node. Since the decision of choosing what node to launch containers is all in the Spark scheduler side, it's very easy to modify it. Btw, what's the configuration to set the # of executors on YARN side? Thanks, Tim On Sun, Jan 4, 2015 at 9:37 PM, mvle m...@us.ibm.com wrote: I'm trying to compare the performance of Spark running on Mesos vs YARN. However, I am having problems being able to configure the Spark workload to run in a similar way on Mesos and YARN. When running Spark on YARN, you can specify the number of executors per node. So if I have a node with 4 CPUs, I can specify 6 executors on that node. When running Spark on Mesos, there doesn't seem to be an equivalent way to specify this. In Mesos, you can somewhat force this by specifying the number of CPU resources to be 6 when running the slave daemon. However, this seems to be a static configuration of the Mesos cluster rather something that can be configured in the Spark framework. So here is my question: For Spark on Mesos, am I correct that there is no way to control the number of executors per node (assuming an idle cluster)? For Spark on Mesos coarse-grained mode, there is a way to specify max_cores but that is still not equivalent to specifying the number of executors per node as when Spark is run on YARN. If I am correct, then it seems Spark might be at a disadvantage running on Mesos compared to YARN (since it lacks the fine tuning ability provided by YARN). Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Controlling-number-of-executors-on-Mesos-vs-YARN-tp20966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Haripriya Ayyalasomayajula -- Regards, Haripriya Ayyalasomayajula
RE: Parquet without hadoop: Possible?
Sorry, I provided bad information. This example worked fine with reduced parallelism. It seems my problem have to do with something specific with the real data frame at reading point. Saif From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com] Sent: Tuesday, August 11, 2015 11:49 AM To: deanwamp...@gmail.com Cc: user@spark.apache.org Subject: RE: Parquet without hadoop: Possible? I am launching my spark-shell spark-1.4.1-bin-hadoop2.6/bin/spark-shell 15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF scala data.write.parquet(/var/ data/Saif/pq) Then I get a million errors: 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229) at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470) at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. ... ... . 15/08/11 09:46:10 ERROR DefaultWriterContainer: Task attempt attempt_201508110946__m_11_0 aborted. 15/08/11 09:46:10 ERROR Executor: Exception in task 31.0 in stage 0.0 (TID 31) org.apache.spark.SparkException: Task failed while writing rows. at
Re: avoid duplicate due to executor failure in spark stream
What if processing is neither idempotent nor its in transaction ,say I am posting events to some external server after processing. Is it possible to get accumulator of failed task in retry task? Is there any way to detect whether this task is retried task or original task ? I was trying to achieve something like incrementing a counter after each event processed and if task fails- retry task will just ignore already processed events by accessing counter of failed task. Is it directly possible to access accumulator per task basis without writing to hdfs or hbase. On Tue, Aug 11, 2015 at 3:15 AM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations https://www.youtube.com/watch?v=fXnNEq1v3VA On Mon, Aug 10, 2015 at 4:32 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How can I avoid duplicate processing of kafka messages in spark stream 1.3 because of executor failure. 1.Can I some how access accumulators of failed task in retry task to skip those many events which are already processed by failed task on this partition ? 2.Or I ll have to persist each msg processed and then check before processing each msg whether its already processed by failure task and delete this perisited information at each batch end?
unsubscribe
Michel Robert Almaden Research Center EDA - IBM Systems and Technology Group Phone: (408) 927-2117 T/L 8-457-2117 E-mail: m...@us.ibm.com
Unsupported major.minor version 51.0
I found some discussions online, but it all cpome to advice to use JDF 1.7 (or 1.8). Well, I use JDK 1.7 on OS X Yosemite . Both java –verion // java version 1.7.0_80 Java(TM) SE Runtime Environment (build 1.7.0_80-b15) Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode) and echo $JAVA_HOME// /Library/Java/JavaVirtualMachines/jdk1.7.0_80.jdk/Contents/Home show JDK 1.7. But for the Spark 1.4.1. (and for Spark 1.2.2, downloaded 07/10/2015, I have the same error when build with maven () (as sudo mvn -DskipTests -X clean package abra.txt) Exception in thread main java.lang.UnsupportedClassVersionError: org/apache/maven/cli/MavenCli : Unsupported major.minor version 51.0 Please help how to build the thing. Thanks Alexey -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unsupported-major-minor-version-51-0-tp24215.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet without hadoop: Possible?
Just out of curiosity, what is the advantage of using parquet without hadoop? Sent from my iPhone On 11 Aug, 2015, at 11:12 am, saif.a.ell...@wellsfargo.com wrote: I confirm that it works, I was just having this issue: https://issues.apache.org/jira/browse/SPARK-8450 Saif From: Ellafi, Saif A. Sent: Tuesday, August 11, 2015 12:01 PM To: Ellafi, Saif A.; deanwamp...@gmail.com Cc: user@spark.apache.org Subject: RE: Parquet without hadoop: Possible? Sorry, I provided bad information. This example worked fine with reduced parallelism. It seems my problem have to do with something specific with the real data frame at reading point. Saif From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com] Sent: Tuesday, August 11, 2015 11:49 AM To: deanwamp...@gmail.com Cc: user@spark.apache.org Subject: RE: Parquet without hadoop: Possible? I am launching my spark-shell spark-1.4.1-bin-hadoop2.6/bin/spark-shell 15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF scala data.write.parquet(/var/ data/Saif/pq) Then I get a million errors: 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz] 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229) at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470) at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
Re: Spark DataFrames uses too many partition
You need to configure the spark.sql.shuffle.partitions parameter to a different value. It defaults to 200. On 8/11/15, 11:31 AM, Al M alasdair.mcbr...@gmail.com wrote: I am using DataFrames with Spark 1.4.1. I really like DataFrames but the partitioning makes no sense to me. I am loading lots of very small files and joining them together. Every file is loaded by Spark with just one partition. Each time I join two small files the partition count increases to 200. This makes my application take 10x as long as if I coalesce everything to 1 partition after each join. With normal RDDs it would not expand out the partitions to 200 after joining two files with one partition each. It would either keep it at one or expand it to two. Why do DataFrames expand out the partitions so much? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unsubscribe
See first section of http://spark.apache.org/community.html On Tue, Aug 11, 2015 at 9:47 AM, Michel Robert m...@us.ibm.com wrote: Michel Robert Almaden Research Center EDA - IBM Systems and Technology Group Phone: (408) 927-2117 T/L 8-457-2117 E-mail: m...@us.ibm.com
Application failed error
I am running Spark 1.3 on CDH 5.4 stack. I am getting the following error when I spark-submit my application:- 15/08/11 16:03:49 INFO Remoting: Starting remoting 15/08/11 16:03:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@cdh54-22a4101a-14d7-4f06-b3f8-079c6f757384.cis.cloud :39355] 15/08/11 16:03:49 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkdri...@cdh54-22a4101a-14d7-4f06-b3f8-079c6f757384.cis.cloud :39355] 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 2 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 3 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 5 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 4 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 6 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 7 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 11 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 10 15/08/11 16:03:50 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 15/08/11 16:03:50 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: Master removed our application: FAILED
Re: Spark job workflow engine recommendations
We use Talend, but not for Spark workflows. Although it does have Spark componenets. https://www.talend.com/download/talend-open-studio It is free (commercial support available), easy to design and deploy workflows. Talend for BigData 6.0 was released as month ago. Is anybody using Talend for Spark? -- Ruslan Dautkhanov On Tue, Aug 11, 2015 at 11:30 AM, Hien Luu h...@linkedin.com.invalid wrote: We are in the middle of figuring that out. At the high level, we want to combine the best parts of existing workflow solutions. On Fri, Aug 7, 2015 at 3:55 PM, Vikram Kone vikramk...@gmail.com wrote: Hien, Is Azkaban being phased out at linkedin as rumored? If so, what's linkedin going to use for workflow scheduling? Is there something else that's going to replace Azkaban? On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu yuzhih...@gmail.com wrote: In my opinion, choosing some particular project among its peers should leave enough room for future growth (which may come faster than you initially think). Cheers On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu h...@linkedin.com wrote: Scalability is a known issue due the the current architecture. However this will be applicable if you run more 20K jobs per day. On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu yuzhih...@gmail.com wrote: From what I heard (an ex-coworker who is Oozie committer), Azkaban is being phased out at LinkedIn because of scalability issues (though UI-wise, Azkaban seems better). Vikram: I suggest you do more research in related projects (maybe using their mailing lists). Disclaimer: I don't work for LinkedIn. On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hi Vikram, We use Azkaban (2.5.0) in our production workflow scheduling. We just use local mode deployment and it is fairly easy to set up. It is pretty easy to use and has a nice scheduling and logging interface, as well as SLAs (like kill job and notify if it doesn't complete in 3 hours or whatever). However Spark support is not present directly - we run everything with shell scripts and spark-submit. There is a plugin interface where one could create a Spark plugin, but I found it very cumbersome when I did investigate and didn't have the time to work through it to develop that. It has some quirks and while there is actually a REST API for adding jos and dynamically scheduling jobs, it is not documented anywhere so you kinda have to figure it out for yourself. But in terms of ease of use I found it way better than Oozie. I haven't tried Chronos, and it seemed quite involved to set up. Haven't tried Luigi either. Spark job server is good but as you say lacks some stuff like scheduling and DAG type workflows (independent of spark-defined job flows). On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke jornfra...@gmail.com wrote: Check also falcon in combination with oozie Le ven. 7 août 2015 à 17:51, Hien Luu h...@linkedin.com.invalid a écrit : Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
Re: Accessing S3 files with s3n://
On 10 Aug 2015, at 20:17, Akshat Aranya aara...@gmail.commailto:aara...@gmail.com wrote: Hi Jerry, Akhil, Thanks your your help. With s3n, the entire file is downloaded even while just creating the RDD with sqlContext.read.parquet(). It seems like even just opening and closing the InputStream causes the entire data to get fetched. As it turned out, I was able to use s3a and avoid this problem. I was under the impression that s3a was only meant for using EMRFS, where the metadata of the FS is kept separately. This is not true; s3a maps object keys directly to file names and directories. There's a bug with close() under the httpclient code which was fixed in s3a; sounds like the same issue has arisen in s3n S3a has had some bugs which surfaced after Hadoop 2.6 shipped; it's ready for use in Hadoop 2.7.1 On Sun, Aug 9, 2015 at 6:01 AM, Jerry Lam chiling...@gmail.commailto:chiling...@gmail.com wrote: Hi Akshat, Is there a particular reason you don't use s3a? From my experience,s3a performs much better than the rest. I believe the inefficiency is from the implementation of the s3 interface. It's from some client-side optimisation that for socket reuse reads through the entire incoming HTTP stream on close(). Best Regards, Jerry Sent from my iPhone On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: Depends on which operation you are doing, If you are doing a .count() on a parquet, it might not download the entire file i think, but if you do a .count() on a normal text file it might pull the entire file. Thanks Best Regards On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.commailto:aara...@gmail.com wrote: Hi, I've been trying to track down some problems with Spark reads being very slow with s3n:// URIs (NativeS3FileSystem). After some digging around, I realized that this file system implementation fetches the entire file, which isn't really a Spark problem, but it really slows down things when trying to just read headers from a Parquet file or just creating partitions in the RDD. Is this something that others have observed before, or am I doing something wrong? Thanks, Akshat
Re: Spark job workflow engine recommendations
We are in the middle of figuring that out. At the high level, we want to combine the best parts of existing workflow solutions. On Fri, Aug 7, 2015 at 3:55 PM, Vikram Kone vikramk...@gmail.com wrote: Hien, Is Azkaban being phased out at linkedin as rumored? If so, what's linkedin going to use for workflow scheduling? Is there something else that's going to replace Azkaban? On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu yuzhih...@gmail.com wrote: In my opinion, choosing some particular project among its peers should leave enough room for future growth (which may come faster than you initially think). Cheers On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu h...@linkedin.com wrote: Scalability is a known issue due the the current architecture. However this will be applicable if you run more 20K jobs per day. On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu yuzhih...@gmail.com wrote: From what I heard (an ex-coworker who is Oozie committer), Azkaban is being phased out at LinkedIn because of scalability issues (though UI-wise, Azkaban seems better). Vikram: I suggest you do more research in related projects (maybe using their mailing lists). Disclaimer: I don't work for LinkedIn. On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hi Vikram, We use Azkaban (2.5.0) in our production workflow scheduling. We just use local mode deployment and it is fairly easy to set up. It is pretty easy to use and has a nice scheduling and logging interface, as well as SLAs (like kill job and notify if it doesn't complete in 3 hours or whatever). However Spark support is not present directly - we run everything with shell scripts and spark-submit. There is a plugin interface where one could create a Spark plugin, but I found it very cumbersome when I did investigate and didn't have the time to work through it to develop that. It has some quirks and while there is actually a REST API for adding jos and dynamically scheduling jobs, it is not documented anywhere so you kinda have to figure it out for yourself. But in terms of ease of use I found it way better than Oozie. I haven't tried Chronos, and it seemed quite involved to set up. Haven't tried Luigi either. Spark job server is good but as you say lacks some stuff like scheduling and DAG type workflows (independent of spark-defined job flows). On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke jornfra...@gmail.com wrote: Check also falcon in combination with oozie Le ven. 7 août 2015 à 17:51, Hien Luu h...@linkedin.com.invalid a écrit : Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
Does print/event logging affect performance?
Hi all, silly question. Does logging info messages, both print or to file, or event logging, cause any impact to general performance of spark? Saif
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
Do you think it might be faster to put all the files in one directory but still partitioned the same way? I don't actually need to filter on the values of the partition keys, but I need to rely on there be no overlap in the value of the keys between any two parquet files. On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver philip.wea...@gmail.com wrote: Thanks, I also confirmed that the partition discovery is slow by writing a non-Spark application that uses the parquet library directly to load that partitions. It's so slow that my colleague's Python application can read the entire contents of all the parquet data files faster than my application can even discover the partitions! On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian lian.cs@gmail.com wrote: However, it's weird that the partition discovery job only spawns 2 tasks. It should use the default parallelism, which is probably 8 according to the logs of the next Parquet reading job. Partition discovery is already done in a distributed manner via a Spark job. But the parallelism is mysteriously low... Cheng On 8/7/15 3:32 PM, Cheng Lian wrote: Hi Philip, Thanks for providing the log file. It seems that most of the time are spent on partition discovery. The code snippet you provided actually issues two jobs. The first one is for listing the input directories to find out all leaf directories (and this actually requires listing all leaf files, because we can only assert that a directory is a leaf one when it contains no sub-directories). Then partition information is extracted from leaf directory paths. This process starts at: 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in parallel under: file:/home/pweaver/work/parquet/day=20150225, … and ends at: 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool The actual tasks execution time is about 36s: 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes) … 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2) You mentioned that your dataset has about 40,000+ partitions, so there are a lot of leaf directories and files out there. My guess is that the local file system spent lots of time listing FileStatus-es of all these files. I also noticed that Mesos job scheduling takes more time then expected. It is probably because this is the first Spark job executed in the application, and the system is not warmed up yet. For example, there’s a 6s gap between these two adjacent lines: 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_RUNNING The 2nd Spark job is the real Parquet reading job, and this one actually finishes pretty quickly, only 3s (note that the Mesos job scheduling latency is also included): 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 8 output partitions … 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes) … 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8) 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8) 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8) That might be the reason why you observed that the C parquet library you mentioned (is it parquet-cpp?) is an order of magnitude faster? Cheng On 8/7/15 2:02 AM, Philip Weaver wrote: With DEBUG, the log output was over 10MB, so I opted for just INFO output. The (sanitized) log is attached. The driver is essentially this code: info(A) val t = System.currentTimeMillis val df = sqlContext.read.parquet(dir).select(...).cache val elapsed = System.currentTimeMillis - t info(sInit time: ${elapsed} ms) We've also observed that it is very slow to read the contents of the parquet files. My colleague wrote a PySpark application that gets the list of files, parallelizes it, maps across it and reads each file manually using a C parquet library, and aggregates manually in the loop. Ignoring the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame query in Scala, his is an order of magnitude faster. Since he is parallelizing the work through Spark, and that isn't causing any performance issues, it seems to be a problem with the parquet reader. I may try to do what he did to construct a DataFrame manually,
Re: Unsupported major.minor version 51.0
Can you please mention the output for the following : java -version javac -version
Spark Job Hangs on our production cluster
Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 2.2.0 with MR1. Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster. The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage environment, we found out Spark works better with Oracle JVM. So we run spark under Oracle JDK 1.7.0_79. Now on production, we are facing a issue we never faced, nor can reproduce on our staging cluster. We are using Spark Standalone cluster, and here is the basic configurations: more spark-env.shexport JAVA_HOME=/opt/javaexport PATH=$JAVA_HOME/bin:$PATHexport HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/export SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib/db2jcc4-10.6.jarexport SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/localexport SPARK_MASTER_WEBUI_PORT=8081export SPARK_MASTER_IP=host1export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=42export SPARK_WORKER_MEMORY=24gexport SPARK_WORKER_CORES=6export SPARK_WORKER_DIR=/tmp/spark/workexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=2g more spark-defaults.confspark.master spark://host1:7077spark.eventLog.enabledtruespark.eventLog.dir hdfs://host1:9000/spark/eventLogspark.serializer org.apache.spark.serializer.KryoSerializerspark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps We are using AVRO file format a lot, and we have these 2 datasets, one is about 96G, and the other one is a little over 1T. Since we are using AVRO, so we also built spark-avro of commit a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8, which is the latest version supporting Spark 1.2.x. Here is the problem we are facing on our production cluster, even the following simple spark-shell commands will hang in our production cluster: import org.apache.spark.sql.SQLContextval sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)import com.databricks.spark.avro._val bigData = sqlContext.avroFile(hdfs://namenode:9000/bigData/)bigData.registerTempTable(bigData)bigData.count() From the console, we saw following:[Stage 0: (44 + 42) / 7800] no update for more than 30 minutes and longer. The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS client looks like having problem to read them. Since we are running Spark on the data nodes, all the Spark tasks are running as NODE_LOCAL on locality level. If I go to the data/task node which Spark tasks hang, and use the JStack to dump the thread, I got the following on the top: 015-08-11 15:38:38Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 mixed mode): Attach Listener daemon prio=10 tid=0x7f0660589000 nid=0x1584d waiting on condition [0x] java.lang.Thread.State: RUNNABLE org.apache.hadoop.hdfs.PeerCache@4a88ec00 daemon prio=10 tid=0x7f06508b7800 nid=0x13302 waiting on condition [0x7f060be94000] java.lang.Thread.State: TIMED_WAITING (sleeping)at java.lang.Thread.sleep(Native Method)at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:252)at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39)at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135)at java.lang.Thread.run(Thread.java:745) shuffle-client-1 daemon prio=10 tid=0x7f0650687000 nid=0x132fc runnable [0x7f060d198000] java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)- locked 0x00067bf47710 (a io.netty.channel.nio.SelectedSelectionKeySet)- locked 0x00067bf374e8 (a java.util.Collections$UnmodifiableSet)- locked 0x00067bf373d0 (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Meantime, I can confirm our Hadoop/HDFS cluster works fine, as the MapReduce jobs also run without any problem, and Hadoop fs command works fine in the BigInsight. I attached the jstack output with this email, but I don't know what could be the root reason.The same Spark shell command works fine, if I point to the small dataset, instead of big dataset. The small dataset
Re: Does print/event logging affect performance?
What level of logging are you looking at ? At INFO level, there shouldn't be noticeable difference. On Tue, Aug 11, 2015 at 12:24 PM, saif.a.ell...@wellsfargo.com wrote: Hi all, silly question. Does logging info messages, both print or to file, or event logging, cause any impact to general performance of spark? Saif
Re: ClassNotFound spark streaming
I see the following line in the log 15/08/11 17:59:12 ERROR spark.SparkContext: Jar not found at file:/home/ec2-user/./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar, however I do see that this file exists on all the node in that path. Not sure what's happening here. Please note I am using it in yarn On Tue, Aug 11, 2015 at 1:52 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing following error. I think it's not able to find some other associated classes as I see $2 in the exception, but not sure what I am missing. 15/08/11 16:00:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 50, ip-10-241-251-141.us-west-2.compute.internal): java.lang.ClassNotFoundException: org.sony.spark.stream.test.JavaRecoverableNetworkWordCount$2 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
grouping by a partitioned key
If I have an RDD that happens to already be partitioned by a key, how efficient can I expect a groupBy operation to be? I would expect that Spark shouldn't have to move data around between nodes, and simply will have a small amount of work just checking the partitions to discover that it doesn't need to move anything around. Now, what if we're talking about a parquet database created by using DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group by a key that I'm already partitioned by? - Philip
adding a custom Scala RDD for use in PySpark
Hi, I'm new to Scala, Spark and PySpark and have a question about what approach to take in the problem I'm trying to solve. I have noticed that working with HBase tables read in using `newAPIHadoopRDD` can be quite slow with large data sets when one is interested in only a small subset of the keyspace. A prefix scan on the underlying HBase table in this case takes 11 seconds, while a filter applied to the full RDD returned by `newAPIHadoopRDD` takes 33 minutes. I looked around and found no way to specify a prefix scan from the Python interface. So I have written a Scala class that can be passed an argument, which then constructs a scan object, calls `newAPIHadoopRDD` from Scala with the scan object and feeds the resulting RDD back to Python. It took a few twists and turns to get this to work. A final challenge was the fact that `org.apache.spark.api.python.SerDeUtil` is private. This suggests to me that I'm doing something wrong, although I got it to work with sufficient hackery. What do people recommend for a general approach in getting PySpark RDDs from HBase prefix scans? I hope I have not missed something obvious. Eric
Re: grouping by a partitioned key
Philip, If all data per key are inside just one partition, then Spark will figure that out. Can you guarantee that’s the case? What is it you try to achieve? There might be another way for it, when you might be 100% sure what’s happening. You can print debugString or explain (for DataFrame) to see what’s happening under the hood. On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com wrote: If I have an RDD that happens to already be partitioned by a key, how efficient can I expect a groupBy operation to be? I would expect that Spark shouldn't have to move data around between nodes, and simply will have a small amount of work just checking the partitions to discover that it doesn't need to move anything around. Now, what if we're talking about a parquet database created by using DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group by a key that I'm already partitioned by? - Philip Eugene Morozov fathers...@list.ru
Re: What is the optimal approach to do Secondary Sort in Spark?
You should create key as tuple type. In your case, RDD[((id, timeStamp) , value)] is the proper way to do. Kevin --- Original Message --- Sender : swethaswethakasire...@gmail.com Date : 2015-08-12 09:37 (GMT+09:00) Title : What is the optimal approach to do Secondary Sort in Spark? Hi, What is the optimal approach to do Secondary sort in Spark? I have to first Sort by an Id in the key and further sort it by timeStamp which is present in the value. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-optimal-approach-to-do-Secondary-Sort-in-Spark-tp24219.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org 상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다. 본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. The contents of this e-mail message and any attachments are confidential and are intended solely for addressee. The information may also be legally privileged. This transmission is sent in trust, for the sole purpose of delivery to the intended recipient. If you have received this transmission in error, any use, reproduction or dissemination of this transmission is strictly prohibited. If you are not the intended recipient, please immediately notify the sender by reply e-mail or phone and delete this message and its attachments, if any.
Re: Exception in spark
Can you share a query or stack trace? More information would make this question easier to answer. On Tue, Aug 11, 2015 at 8:50 PM, Ravisankar Mani rrav...@gmail.com wrote: Hi all, We got an exception like “org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object” when using some where condition queries. I am using 1.4.0 version spark. Is this exception resolved in latest spark? Regards, Ravi
Re: Spark job workflow engine recommendations
I also tend to agree that Azkaban is somehqat easier to get set up. Though I haven't used the new UI for Oozie that is part of CDH, so perhaps that is another good option. It's a pity Azkaban is a little rough in terms of documenting its API, and the scalability is an issue. However it would be possible to have a few different instances running for different use cases / groups within the org perhaps — Sent from Mailbox On Wed, Aug 12, 2015 at 12:14 AM, Vikram Kone vikramk...@gmail.com wrote: Hi LarsThanks for the brain dump. All the points you made about target audience, degree of high availability and time based scheduling instead of event based scheduling are all valid and make sense.In our case, most of your Devs are .net based and so xml or web based scheduling is preferred over something written in Java/Scalia/Python. Based on my research so far on the available workflow managers today, azkaban is the most easier to adopt since it doesn't have any hard dependence on Hadoop and is easy to onboard and schedule jobs. I was able to install and execute some spark workflows in a day. Though the fact that it's being phased out in linkedin is troubling , I think it's the best suited for our use case today. Sent from Outlook On Sun, Aug 9, 2015 at 4:51 PM -0700, Lars Albertsson lars.alberts...@gmail.com wrote: I used to maintain Luigi at Spotify, and got some insight in workflow manager characteristics and production behaviour in the process. I am evaluating options for my current employer, and the short list is basically: Luigi, Azkaban, Pinball, Airflow, and rolling our own. The latter is not necessarily more work than adapting an existing tool, since existing managers are typically more or less tied to the technology used by the company that created them. Are your users primarily developers building pipelines that drive data-intensive products, or are they analysts, producing business intelligence? These groups tend to have preferences for different types of tools and interfaces. I have a love/hate relationship with Luigi, but given your requirements, it is probably the best fit: * It has support for Spark, and it seems to be used and maintained. * It has no builtin support for Cassandra, but Cassandra is heavily used at Spotify. IIRC, the code required to support Cassandra targets is more or less trivial. There is no obvious single definition of a dataset in C*, so you'll have to come up with a convention and encode it as a Target subclass. I guess that is why it never made it outside Spotify. * The open source community is active and it is well tested in production at multiple sites. * It is easy to write dependencies, but in a Python DSL. If your users are developers, this is preferable over XML or a web interface. There are always quirks and odd constraints somewhere that require the expressive power of a programming language. It also allows you to create extensions without changing Luigi itself. * It does not have recurring scheduling bulitin. Luigi needs a motor to get going, typically cron, installed on a few machines for redundancy. In a typical pipeline scenario, you give output datasets a time parameter, which arranges for a dataset to be produced each hour/day/week/month. * It supports failure notifications. Pinball and Airflow have similar architecture to Luigi, with a single central scheduler and workers that submit and execute jobs. They seem to be more solidly engineered at a glance, but less battle tested outside Pinterest/Airbnb, and they have fewer integrations to the data ecosystem. Azkaban has a different architecture and user interface, and seems more geared towards data scientists than developers; it has a good UI for controlling jobs, but writing extensions and controlling it programmatically seems more difficult than for Luigi. All of the tools above are centralised, and the central component can become a bottleneck and a single point of problem. I am not aware of any decentralised open source workflow managers, but you can run multiple instances and shard manually. Regarding recurring jobs, it is typically undesirable to blindly run jobs at a certain time. If you run jobs, e.g. with cron, and process whatever data is available in your input sources, your jobs become indeterministic and unreliable. If incoming data is late or missing, your jobs will fail or create artificial skews in output data, leading to confusing results. Moreover, if jobs fail or have bugs, it will be difficult to rerun them and get predictable results. This is why I don't think Chronos is a meaningful alternative for scheduling data processing. There are different strategies on this topic, but IMHO, it is easiest create predictable and reliable pipelines by bucketing incoming data into datasets that you seal off, and mark ready for processing, and then use the workflow manager's DAG logic to process data when input
Re: Exception in spark
Hi Josh Please ignore the last mail stack trace. Kindly refer the exception details. {org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'Sheet1.Teams} Regards, Ravi On Wed, Aug 12, 2015 at 1:34 AM, Ravisankar Mani rrav...@gmail.com wrote: Hi Rosan, Thanks for your response. Kindly refer the following query and stack trace. I have checked same query in hive, It works perfectly. In case i have removed in in where class, it works in spark SELECT If(ISNOTNULL(SUM(`Sheet1`.`Runs`)),SUM(`Sheet1`.`Runs`),0) AS `Sheet1_Runs` ,`Sheet1`.`Teams` AS `Sheet1_Teams` FROM default.Dashboard_Sheet1_20150812010131201 AS `Sheet1` INNER JOIN (SELECT `Sheet1`.`Teams` AS `Teams_0`,If(ISNOTNULL(COUNT(`Sheet1`.`Teams`)),COUNT(`Sheet1`.`Teams`),0) AS `Measure_0` FROM default.Dashboard_Sheet1_20150812010131201 AS `Sheet1` WHERE `Sheet1`.`Teams` IN ('Chennai Super Kings')) AND (`Sheet1`.`Teams` '' )) AND (NOT(`Sheet1`.`Teams` IS NULL GROUP BY `Sheet1`.`Teams` ORDER BY `Measure_0` DESC LIMIT 5 ) `T_0` ON ( `Sheet1`.`Teams` =`T_0`.`Teams_0` ) WHERE `Sheet1`.`Teams` IN ('Chennai Super Kings')) AND (`Sheet1`.`Teams` '' )) AND (NOT(`Sheet1`.`Teams` IS NULL GROUP BY `Sheet1`.`Teams` at Syncfusion.ThriftHive.Base.HqlCommand.ExecuteReader() at Syncfusion.Dashboard.Base.Data.HiveQueryBuilder.ExecuteReaderQuery(String query, String connectionString) in f:\Back_To_Svn\source\base\dashboard.base\src\Data\HiveServer.cs:line 409 at Syncfusion.Dashboard.Base.Data.ServerDataProvider.GetTable(String tableName, List`1 schemaInfoCollection, List`1 originalSchemaInfoCollection, List`1 initialFilterList, List`1 viewerFilterList, Boolean isSelectQueryForServerModeFilterPopup, Boolean isNestedFilter) in f:\Back_To_Svn\source\base\dashboard.base\src\Data\Data.cs:line 536 at Syncfusion.Dashboard.Base.Engine.RelationalEngine.GetDataFromMainSource(List`1 schemaInfos, List`1 viewerFilterList) in f:\Back_To_Svn\source\base\dashboard.base\src\Engine\RelationDataEngine.cs:line 902 at DashboardService.DashboardService.ApplyFilterServerMode(RelationalEngine engine, String ReportName, Boolean Drilled, Boolean useDefaultProperties, Boolean isParamsColumn, SchemaInfo paramsSchema) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Common\DataProcessor.cs:line 1030 at DashboardService.DashboardService.ApplyFilter(RelationalEngine engine, String ReportName, Boolean Drilled, Boolean useDefaultProperties, Boolean isParamsColumn, SchemaInfo paramsSchema) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Common\DataProcessor.cs:line 1047 at DashboardService.DashboardService.GetProcessedControlData(List`1 elements, RelationalEngine dataEngine, DashboardItem control, RelationalReport currentReport) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Common\DataProcessor.cs:line 205 at DashboardService.DashboardService.GetGridControlData(DashboardItem controlObj, List`1 columnList, List`1 gridSchemaInfos, Dictionary`2 barData, Dictionary`2 formattedColumns, List`1 colourSaturationColumns) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Controls\DashboardGrid.cs:line 115 at DashboardService.DashboardService.GetGridData(DashboardItem controlObj, Boolean isLoadGrid) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Controls\DashboardGrid.cs:line 47 at DashboardService.DashboardService.ControlConfigHelper(DashboardItem control, Boolean IsLoad, DataFetcherGrid fn) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\DashboardService.svc.cs:line 1269 Regards, Ravi On Tue, Aug 11, 2015 at 11:53 PM, Josh Rosen rosenvi...@gmail.com wrote: Can you share a query or stack trace? More information would make this question easier to answer. On Tue, Aug 11, 2015 at 8:50 PM, Ravisankar Mani rrav...@gmail.com wrote: Hi all, We got an exception like “org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object” when using some where condition queries. I am using 1.4.0 version spark. Is this exception resolved in latest spark? Regards, Ravi
Re: Boosting spark.yarn.executor.memoryOverhead
Hi Eric, This is likely because you are putting the parameter after the primary resource (latest_msmtdt_by_gridid_and_source.py), which makes it a parameter to your application instead of a parameter to Spark/ -Sandy On Wed, Aug 12, 2015 at 4:40 AM, Eric Bless eric.bl...@yahoo.com.invalid wrote: Previously I was getting a failure which included the message Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. So I attempted the following - spark-submit --jars examples.jar latest_msmtdt_by_gridid_and_source.py --conf spark.yarn.executor.memoryOverhead=1024 host table This resulted in - Application application_1438983806434_24070 failed 2 times due to AM Container for appattempt_1438983806434_24070_02 exited with exitCode: -1000 Am I specifying the spark.yarn.executor.memoryOverhead incorrectly?
What is the optimal approach to do Secondary Sort in Spark?
Hi, What is the optimal approach to do Secondary sort in Spark? I have to first Sort by an Id in the key and further sort it by timeStamp which is present in the value. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-optimal-approach-to-do-Secondary-Sort-in-Spark-tp24219.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception in spark
Hi Rosan, Thanks for your response. Kindly refer the following query and stack trace. I have checked same query in hive, It works perfectly. In case i have removed in in where class, it works in spark SELECT If(ISNOTNULL(SUM(`Sheet1`.`Runs`)),SUM(`Sheet1`.`Runs`),0) AS `Sheet1_Runs` ,`Sheet1`.`Teams` AS `Sheet1_Teams` FROM default.Dashboard_Sheet1_20150812010131201 AS `Sheet1` INNER JOIN (SELECT `Sheet1`.`Teams` AS `Teams_0`,If(ISNOTNULL(COUNT(`Sheet1`.`Teams`)),COUNT(`Sheet1`.`Teams`),0) AS `Measure_0` FROM default.Dashboard_Sheet1_20150812010131201 AS `Sheet1` WHERE `Sheet1`.`Teams` IN ('Chennai Super Kings')) AND (`Sheet1`.`Teams` '' )) AND (NOT(`Sheet1`.`Teams` IS NULL GROUP BY `Sheet1`.`Teams` ORDER BY `Measure_0` DESC LIMIT 5 ) `T_0` ON ( `Sheet1`.`Teams` =`T_0`.`Teams_0` ) WHERE `Sheet1`.`Teams` IN ('Chennai Super Kings')) AND (`Sheet1`.`Teams` '' )) AND (NOT(`Sheet1`.`Teams` IS NULL GROUP BY `Sheet1`.`Teams` at Syncfusion.ThriftHive.Base.HqlCommand.ExecuteReader() at Syncfusion.Dashboard.Base.Data.HiveQueryBuilder.ExecuteReaderQuery(String query, String connectionString) in f:\Back_To_Svn\source\base\dashboard.base\src\Data\HiveServer.cs:line 409 at Syncfusion.Dashboard.Base.Data.ServerDataProvider.GetTable(String tableName, List`1 schemaInfoCollection, List`1 originalSchemaInfoCollection, List`1 initialFilterList, List`1 viewerFilterList, Boolean isSelectQueryForServerModeFilterPopup, Boolean isNestedFilter) in f:\Back_To_Svn\source\base\dashboard.base\src\Data\Data.cs:line 536 at Syncfusion.Dashboard.Base.Engine.RelationalEngine.GetDataFromMainSource(List`1 schemaInfos, List`1 viewerFilterList) in f:\Back_To_Svn\source\base\dashboard.base\src\Engine\RelationDataEngine.cs:line 902 at DashboardService.DashboardService.ApplyFilterServerMode(RelationalEngine engine, String ReportName, Boolean Drilled, Boolean useDefaultProperties, Boolean isParamsColumn, SchemaInfo paramsSchema) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Common\DataProcessor.cs:line 1030 at DashboardService.DashboardService.ApplyFilter(RelationalEngine engine, String ReportName, Boolean Drilled, Boolean useDefaultProperties, Boolean isParamsColumn, SchemaInfo paramsSchema) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Common\DataProcessor.cs:line 1047 at DashboardService.DashboardService.GetProcessedControlData(List`1 elements, RelationalEngine dataEngine, DashboardItem control, RelationalReport currentReport) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Common\DataProcessor.cs:line 205 at DashboardService.DashboardService.GetGridControlData(DashboardItem controlObj, List`1 columnList, List`1 gridSchemaInfos, Dictionary`2 barData, Dictionary`2 formattedColumns, List`1 colourSaturationColumns) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Controls\DashboardGrid.cs:line 115 at DashboardService.DashboardService.GetGridData(DashboardItem controlObj, Boolean isLoadGrid) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\Controls\DashboardGrid.cs:line 47 at DashboardService.DashboardService.ControlConfigHelper(DashboardItem control, Boolean IsLoad, DataFetcherGrid fn) in f:\Back_To_Svn\source\js\dashboardsamples\wcf\DashboardService.svc.cs:line 1269 Regards, Ravi On Tue, Aug 11, 2015 at 11:53 PM, Josh Rosen rosenvi...@gmail.com wrote: Can you share a query or stack trace? More information would make this question easier to answer. On Tue, Aug 11, 2015 at 8:50 PM, Ravisankar Mani rrav...@gmail.com wrote: Hi all, We got an exception like “org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object” when using some where condition queries. I am using 1.4.0 version spark. Is this exception resolved in latest spark? Regards, Ravi
Re: Not seeing Log messages
Forgot to mention. Here is how I run the program : ./bin/spark-submit --conf spark.app.master=local[1] ~/workspace/spark-python/ApacheLogWebServerAnalysis.py On Wednesday, 12 August 2015 10:28 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I wrote a small python program : def parseLogs(self): Read and parse log file self._logger.debug(Parselogs() start) self.parsed_logs = (self._sc .textFile(self._logFile) .map(self._parseApacheLogLine) .cache()) self.access_logs = (self.parsed_logs .filter(lambda s: s[1] == 1) .map(lambda s: s[0]) .cache()) self.failed_logs = (self.parsed_logs .filter(lambda s: s[1] == 0) .map(lambda s: s[0])) failed_logs_count = self.failed_logs.count() if failed_logs_count 0: self._logger.debug('Number of invalid logline: %d' % self.failed_logs.count()) for line in self.failed_logs.take(20): self._logger.debug('Invalid logline: %s' % line) self._logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % \ (self.parsed_logs.count(), self.access_logs.count(), self.failed_logs.count())) return (self.parsed_logs, self.access_logs, self.failed_logs) def main(argv): try: logger = createLogger(pyspark, logging.DEBUG, LogAnalyzer.log, ./) logger.debug(Starting LogAnalyzer) myLogAnalyzer = ApacheLogAnalyzer(logger) (parsed_logs, access_logs, failed_logs) = myLogAnalyzer.parseLogs() except Exception as e: print Encountered Exception %s %str(e) logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())) logger.info(DONE. ALL TESTS PASSED) I see some log messages:Starting LogAnalyzerParselogs() startDONE. ALL TESTS PASSED But do not see some log messages:Read %d lines, successfully parsed %d lines, failed to parse %d lines' But, This line:logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count()))I get the following error : Encountered Exception Cannot pickle files that are not opened for reading Do not have a clue as to what's happening. Any help will be appreciated.
Re: Partitioning in spark streaming
partitioning - by itself - is a property of RDD. so essentially it is no different in case of streaming where each batch is one RDD. You can use partitionBy on RDD and pass on your custom partitioner function to it. One thing you should consider is how balanced are your partitions ie your partition scheme should not skew data into one partition too much. Best Ayan On Wed, Aug 12, 2015 at 9:06 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How does partitioning in spark work when it comes to streaming? What's the best way to partition a time series data grouped by a certain tag like categories of product video, music etc. -- Best Regards, Ayan Guha
RE: Spark DataFrames uses too many partition
That's a good question, we don't support reading small files in a single partition yet, but it's definitely an issue we need to optimize, do you mind to create a jira issue for this? Hopefully we can merge that in 1.6 release. 200 is the default partition number for parallel tasks after the data shuffle, and we have to change that value according to the file size, cluster size etc.. Ideally, this number would be set dynamically and automatically, however, spark sql doesn't support the complex cost based model yet, particularly for the multi-stages job. (https://issues.apache.org/jira/browse/SPARK-4630) Hao -Original Message- From: Al M [mailto:alasdair.mcbr...@gmail.com] Sent: Tuesday, August 11, 2015 11:31 PM To: user@spark.apache.org Subject: Spark DataFrames uses too many partition I am using DataFrames with Spark 1.4.1. I really like DataFrames but the partitioning makes no sense to me. I am loading lots of very small files and joining them together. Every file is loaded by Spark with just one partition. Each time I join two small files the partition count increases to 200. This makes my application take 10x as long as if I coalesce everything to 1 partition after each join. With normal RDDs it would not expand out the partitions to 200 after joining two files with one partition each. It would either keep it at one or expand it to two. Why do DataFrames expand out the partitions so much? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to minimize shuffling on Spark dataframe Join?
Is the source of your dataframe partitioned on key? As per your mail, it looks like it is not. If that is the case, for partitioning the data, you will have to shuffle the data anyway. Another part of your question is - how to co-group data from two dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is a way. I am not sure if something similar is available for DataFrames. Hemant On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar abdullah.ibn.an...@gmail.com wrote: I have two dataframes like this student_rdf = (studentid, name, ...) student_result_rdf = (studentid, gpa, ...) we need to join this two dataframes. we are now doing like this, student_rdf.join(student_result_rdf, student_result_rdf[studentid] == student_rdf[studentid]) So it is simple. But it creates lots of data shuffling across worker nodes, but as joining key is similar and if the dataframe could (understand the partitionkey) be partitioned using that key (studentid) then there suppose not to be any shuffling at all. As similar data (based on partition key) would reside in similar node. is it possible, to hint spark to do this? So, I am finding the way to partition data based on a column while I read a dataframe from input. And If it is possible that Spark would understand that two partitionkey of two dataframes are similar, then how? -- Abdullah
Re: grouping by a partitioned key
Thanks. In my particular case, I am calculating a distinct count on a key that is unique to each partition, so I want to calculate the distinct count within each partition, and then sum those. This approach will avoid moving the sets of that key around between nodes, which would be very expensive. Currently, to accomplish this we are manually reading in the parquet files (not through Spark SQL), using a bitset to calculate the unique count within each partition, and accumulating that sum. Doing this through Spark SQL would be nice, but the naive SELECT distinct(count(...)) approach takes 60 times as long :). The approach I mentioned above might be an acceptable hybrid solution. - Philip On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov fathers...@list.ru wrote: Philip, If all data per key are inside just one partition, then Spark will figure that out. Can you guarantee that’s the case? What is it you try to achieve? There might be another way for it, when you might be 100% sure what’s happening. You can print debugString or explain (for DataFrame) to see what’s happening under the hood. On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com wrote: If I have an RDD that happens to already be partitioned by a key, how efficient can I expect a groupBy operation to be? I would expect that Spark shouldn't have to move data around between nodes, and simply will have a small amount of work just checking the partitions to discover that it doesn't need to move anything around. Now, what if we're talking about a parquet database created by using DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group by a key that I'm already partitioned by? - Philip Eugene Morozov fathers...@list.ru
Error when running SparkPi in Intellij
I import the spark project into intellij, and try to run SparkPi in intellij, but failed with compilation error: Error:scalac: while compiling: /Users/werere/github/spark/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala during phase: jvm library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: -nobootcp -javabootclasspath : -deprecation -feature -classpath /Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/lib/javafx-doclet.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/lib/sa-j I found a similar problem in stackoverflow, but the solution on the post don't works for me. Does anyone meet the same issue and have solution for it ? http://stackoverflow.com/questions/25211071/compilation-errors-in-spark-datatypeconversions-scala-on-intellij-when-using-m
Re: Partitioning in spark streaming
Posting a comment from my previous mail post: When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Now if you want to repartition based on a key, a shuffle is needed. On Wed, Aug 12, 2015 at 4:36 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How does partitioning in spark work when it comes to streaming? What's the best way to partition a time series data grouped by a certain tag like categories of product video, music etc.
Re: Partitioning in spark streaming
Thanks for the info. When data is written in hdfs how does spark keeps the filenames written by multiple executors unique On Tue, Aug 11, 2015 at 9:35 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Posting a comment from my previous mail post: When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Now if you want to repartition based on a key, a shuffle is needed. On Wed, Aug 12, 2015 at 4:36 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How does partitioning in spark work when it comes to streaming? What's the best way to partition a time series data grouped by a certain tag like categories of product video, music etc.
Not seeing Log messages
I wrote a small python program : def parseLogs(self): Read and parse log file self._logger.debug(Parselogs() start) self.parsed_logs = (self._sc .textFile(self._logFile) .map(self._parseApacheLogLine) .cache()) self.access_logs = (self.parsed_logs .filter(lambda s: s[1] == 1) .map(lambda s: s[0]) .cache()) self.failed_logs = (self.parsed_logs .filter(lambda s: s[1] == 0) .map(lambda s: s[0])) failed_logs_count = self.failed_logs.count() if failed_logs_count 0: self._logger.debug('Number of invalid logline: %d' % self.failed_logs.count()) for line in self.failed_logs.take(20): self._logger.debug('Invalid logline: %s' % line) self._logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % \ (self.parsed_logs.count(), self.access_logs.count(), self.failed_logs.count())) return (self.parsed_logs, self.access_logs, self.failed_logs) def main(argv): try: logger = createLogger(pyspark, logging.DEBUG, LogAnalyzer.log, ./) logger.debug(Starting LogAnalyzer) myLogAnalyzer = ApacheLogAnalyzer(logger) (parsed_logs, access_logs, failed_logs) = myLogAnalyzer.parseLogs() except Exception as e: print Encountered Exception %s %str(e) logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())) logger.info(DONE. ALL TESTS PASSED) I see some log messages:Starting LogAnalyzerParselogs() startDONE. ALL TESTS PASSED But do not see some log messages:Read %d lines, successfully parsed %d lines, failed to parse %d lines' But, This line:logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count()))I get the following error : Encountered Exception Cannot pickle files that are not opened for reading Do not have a clue as to what's happening. Any help will be appreciated.
Spark 1.4.0 Docker Slave GPU Access
Hi Trying to access GPU from a Spark 1.4.0 Docker slave, without much luck. In my Spark program, I make a system call to a script, which performs various calculations using GPU. I am able to run this script as standalone, or via Mesos Marathon; however, calling the script through Spark fails due to inaccessibility of /dev/dri devices. I am wondering if Spark is trying to limit apps access to special devices, in any way? Cheers, [http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] Nastooh Avessta ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: +1 604 647 1527 Cisco Systems Limited 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.comhttp://www.cisco.com/ [Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. Preferenceshttp://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribehttp://www.cisco.com/offer/unsubscribe/?sid=000478327 - Privacyhttp://www.cisco.com/web/siteassets/legal/privacy.html
Re: Job is Failing automatically
15/08/11 12:59:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 71, sdldalplhdw02.suddenlink.cequel3.com): java.lang.NullPointerException at com.suddenlink.pnm.process.HBaseStoreHelper.flush(HBaseStoreHelper.java:313) It's your app error. NPE from HBaseStoreHelper On Wed, Aug 12, 2015 at 5:12 AM, Nikhil Gs gsnikhil1432...@gmail.com wrote: Hello Team, I am facing an error which I have pasted below. My job is failing when I am copying my data files into flume spool directory. Most of the time the job is getting failed. Dont know why.. Facing this issue several times. Also, for your reference I have attached the complete Yarn log file. Please suggest me whats the issue. Thanks in advance. 15/08/11 12:59:30 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 2.1 KB, free: 1059.7 MB) 15/08/11 12:59:31 INFO storage.BlockManagerInfo: Added rdd_5_0 in memory on sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 1693.6 KB, free: 1058.0 MB) 15/08/11 12:59:32 INFO storage.BlockManagerInfo: Added rdd_7_0 in memory on sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 1697.6 KB, free: 1056.4 MB) 15/08/11 12:59:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 71, sdldalplhdw02.suddenlink.cequel3.com): java.lang.NullPointerException at com.suddenlink.pnm.process.HBaseStoreHelper.flush(HBaseStoreHelper.java:313) at com.suddenlink.pnm.process.StoreNodeInHBase$1.call(StoreNodeInHBase.java:57) at com.suddenlink.pnm.process.StoreNodeInHBase$1.call(StoreNodeInHBase.java:31) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:304) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:304) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 3.0 (TID 72, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes) 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 3.0 (TID 72) on executor sdldalplhdw02.suddenlink.cequel3.com: java.lang.NullPointerException (null) [duplicate 1] 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 3.0 (TID 73, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes) 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 3.0 (TID 73) on executor sdldalplhdw02.suddenlink.cequel3.com: java.lang.NullPointerException (null) [duplicate 2] 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 3.0 (TID 74, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes) 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 3.0 (TID 74) on executor sdldalplhdw02.suddenlink.cequel3.com: java.lang.NullPointerException (null) [duplicate 3] 15/08/11 12:59:34 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job 15/08/11 12:59:34 INFO cluster.YarnClusterScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/08/11 12:59:34 INFO cluster.YarnClusterScheduler: Cancelling stage 3 15/08/11 12:59:34 INFO scheduler.DAGScheduler: Job 2 failed: foreachRDD at NodeProcessor.java:101, took 4.750491 s 15/08/11 12:59:34 ERROR scheduler.JobScheduler: Error running job streaming job 143931597 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 74, sdldalplhdw02.suddenlink.cequel3.com): java.lang.NullPointerException Regards, Nik. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: Spark Job Hangs on our production cluster
Logs would be helpful to diagnose. Could you attach the logs ? On Wed, Aug 12, 2015 at 5:19 AM, java8964 java8...@hotmail.com wrote: The executor's memory is reset by --executor-memory 24G for spark-shell. The one from the spark-env.sh is just for default setting. I can confirm from the Spark UI the executor heap is set as 24G. Thanks Yong -- From: igor.ber...@gmail.com Date: Tue, 11 Aug 2015 23:31:59 +0300 Subject: Re: Spark Job Hangs on our production cluster To: java8...@hotmail.com CC: user@spark.apache.org how do u want to process 1T of data when you set your executor memory to be 2g? look at spark ui, metrics of tasks...if any look at spark logs on executor machine under work dir(unless you configured log4j) I think your executors are thrashing or spilling to disk. check memory metrics/swapping On 11 August 2015 at 23:19, java8964 java8...@hotmail.com wrote: Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 2.2.0 with MR1. Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster. The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage environment, we found out Spark works better with Oracle JVM. So we run spark under Oracle JDK 1.7.0_79. Now on production, we are facing a issue we never faced, nor can reproduce on our staging cluster. We are using Spark Standalone cluster, and here is the basic configurations: more spark-env.sh export JAVA_HOME=/opt/java export PATH=$JAVA_HOME/bin:$PATH export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/ export SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib /db2jcc4-10.6.jar export SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/local export SPARK_MASTER_WEBUI_PORT=8081 export SPARK_MASTER_IP=host1 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=42 export SPARK_WORKER_MEMORY=24g export SPARK_WORKER_CORES=6 export SPARK_WORKER_DIR=/tmp/spark/work export SPARK_DRIVER_MEMORY=2g export SPARK_EXECUTOR_MEMORY=2g more spark-defaults.conf spark.master spark://host1:7077 spark.eventLog.enabled true spark.eventLog.dir hdfs://host1:9000/spark/eventLog spark.serializer org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps We are using AVRO file format a lot, and we have these 2 datasets, one is about 96G, and the other one is a little over 1T. Since we are using AVRO, so we also built spark-avro of commit a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8 https://github.com/databricks/spark-avro/tree/a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8, which is the latest version supporting Spark 1.2.x. Here is the problem we are facing on our production cluster, even the following simple spark-shell commands will hang in our production cluster: import org.apache.spark.sql.SQLContext val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import com.databricks.spark.avro._ val bigData = sqlContext.avroFile(hdfs://namenode:9000/bigData/) bigData.registerTempTable(bigData) bigData.count() From the console, we saw following: [Stage 0: (44 + 42) / 7800] no update for more than 30 minutes and longer. The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS client looks like having problem to read them. Since we are running Spark on the data nodes, all the Spark tasks are running as NODE_LOCAL on locality level. If I go to the data/task node which Spark tasks hang, and use the JStack to dump the thread, I got the following on the top: 015-08-11 15:38:38 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 mixed mode): Attach Listener daemon prio=10 tid=0x7f0660589000 nid=0x1584d waiting on condition [0x] java.lang.Thread.State: RUNNABLE org.apache.hadoop.hdfs.PeerCache@4a88ec00 daemon prio=10 tid=0x7f06508b7800 nid=0x13302 waiting on condition [0x7f060be94000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:252) at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39) at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135) at java.lang.Thread.run(Thread.java:745) shuffle-client-1 daemon prio=10 tid=0x7f0650687000 nid=0x132fc runnable [0x7f060d198000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at
Exception in spark
Hi all, We got an exception like “org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object” when using some where condition queries. I am using 1.4.0 version spark. Is this exception resolved in latest spark? Regards, Ravi
RE: Very high latency to initialize a DataFrame from partitioned parquet database.
Definitely worth to try. And you can sort the record before writing out, and then you will get the parquet files without overlapping keys. Let us know if that helps. Hao From: Philip Weaver [mailto:philip.wea...@gmail.com] Sent: Wednesday, August 12, 2015 4:05 AM To: Cheng Lian Cc: user Subject: Re: Very high latency to initialize a DataFrame from partitioned parquet database. Do you think it might be faster to put all the files in one directory but still partitioned the same way? I don't actually need to filter on the values of the partition keys, but I need to rely on there be no overlap in the value of the keys between any two parquet files. On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: Thanks, I also confirmed that the partition discovery is slow by writing a non-Spark application that uses the parquet library directly to load that partitions. It's so slow that my colleague's Python application can read the entire contents of all the parquet data files faster than my application can even discover the partitions! On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com wrote: However, it's weird that the partition discovery job only spawns 2 tasks. It should use the default parallelism, which is probably 8 according to the logs of the next Parquet reading job. Partition discovery is already done in a distributed manner via a Spark job. But the parallelism is mysteriously low... Cheng On 8/7/15 3:32 PM, Cheng Lian wrote: Hi Philip, Thanks for providing the log file. It seems that most of the time are spent on partition discovery. The code snippet you provided actually issues two jobs. The first one is for listing the input directories to find out all leaf directories (and this actually requires listing all leaf files, because we can only assert that a directory is a leaf one when it contains no sub-directories). Then partition information is extracted from leaf directory paths. This process starts at: 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in parallel under: file:/home/pweaver/work/parquet/day=20150225, … and ends at: 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool The actual tasks execution time is about 36s: 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes) … 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2) You mentioned that your dataset has about 40,000+ partitions, so there are a lot of leaf directories and files out there. My guess is that the local file system spent lots of time listing FileStatus-es of all these files. I also noticed that Mesos job scheduling takes more time then expected. It is probably because this is the first Spark job executed in the application, and the system is not warmed up yet. For example, there’s a 6s gap between these two adjacent lines: 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_RUNNING The 2nd Spark job is the real Parquet reading job, and this one actually finishes pretty quickly, only 3s (note that the Mesos job scheduling latency is also included): 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 8 output partitions … 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes) 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes) … 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8) 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8) 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8) That might be the reason why you observed that the C parquet library you mentioned (is it parquet-cpp?) is an order of magnitude faster? Cheng On 8/7/15 2:02 AM, Philip Weaver wrote: With DEBUG, the log output was over 10MB, so I opted for just INFO output. The (sanitized) log is attached. The driver is essentially this code: info(A) val t = System.currentTimeMillis val df = sqlContext.read.parquet(dir).select(...).cache val elapsed = System.currentTimeMillis - t info(sInit time: ${elapsed} ms) We've also observed that it is very slow to read the contents of the parquet files. My colleague wrote a PySpark application that gets the list of files, parallelizes it, maps across it and reads each file manually using a C
pregel graphx job not finishing
Hi im currently using a pregel message passing function for my graph in spark and graphx. The problem i have is that the code runs perfectly on spark 1.0 and finishes in a couple of minutes but as we have upgraded now im trying to run the same code on 1.3 but it doesnt finish (left it overnight and it was still going) and get a lot of messages as follows (doesnt happen in v1.0). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pregel-graphx-job-not-finishing-tp24221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org